go routines and channel to send response

huangapple go评论85阅读模式
英文:

go routines and channel to send response

问题

我有以下代码:
我有一个要遍历的列表,并且需要对列表中的值进行某些操作,所以我想使用Go协程,但我需要限制同时运行的Go协程的最大数量。在每个Go协程中,我需要进行一次调用,该调用将返回响应和错误。当错误不为空时,我需要终止所有的Go协程并返回一个HTTP响应;如果没有错误,我需要终止Go协程并返回一个HTTP响应。

当值较少时,它可以正常工作,但当值较多时,会出现问题,因为当我调用cancel函数时,仍然会有Go协程尝试发送到已关闭的响应通道,我会继续收到以下错误信息:

goroutine 36 [chan send]:

以下是代码:

type response struct {
    value string
}

func Testing() []response {

    fakeValues := getFakeValues()

    maxParallel := 25
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if len(fakeValues) < maxParallel {
        maxParallel = len(fakeValues)
    }

    type responseChannel struct {
        Response response
        Err      error
    }

    reqChan := make(chan string) //make this an unbuffered channel
    resChan := make(chan responseChannel)

    wg := &sync.WaitGroup{}
    wg.Add(maxParallel)
    for i := 0; i < maxParallel; i++ {
        go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
            for {
                select {
                case val := <-ch:
                    resp, err := getFakeResult(val)
                    resChan <- responseChannel{
                        Response: resp,
                        Err:      err,
                    }

                case <-ctx.Done():
                    wg.Done()
                    return
                }
            }
        }(ctx, reqChan, resChan)
    }

    go func() {
        for _, body := range fakeValues {
            reqChan <- body
        }

        close(reqChan)
        cancel()

    }()

    go func() {
        wg.Wait()
        close(resChan)
    }()

    var hasErr error
    response := make([]response, 0, len(fakeValues))
    for res := range resChan {
        if res.Err != nil {
            hasErr = res.Err
            cancel()
            break
        }

        response = append(response, res.Response)
    }

    if hasErr != nil {
        // return responses.ErrorResponse(hasErr) // 返回HTTP响应
    }

    // return responses.Accepted(response, nil) // 返回HTTP响应
    return nil
}

func getFakeValues() []string {
    return []string{"a"}
}

func getFakeResult(val string) (response, error) {
    if val == "" {
        return response{}, fmt.Errorf("ooh noh:%s", val)
    }

    return response{
        value: val,
    }, nil
}

希望对你有所帮助!

英文:

I have the following code:
i have a list to go through and do something with a value from that list, and so i thought of using go routines, but i need to use a max number of go routines, and then in go routine i need to make a call that will get a return of response, err, when the err is different from null I need to terminate all the go routines and return an http response, and if there is no err I need to terminate the go routines and return an http response,

When I have few values ​​it works ok, but when I have many values ​​I have a problem, because when I call cancel I will still have go routines trying to send to the response channel that is already closed and I keep getting errors from:

> goroutine 36 [chan send]:

type response struct {
value string
}
func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(fakeValues) &lt; maxParallel {
maxParallel = len(fakeValues)
}
type responseChannel struct {
Response response
Err      error
}
reqChan := make(chan string) //make this an unbuffered channel
resChan := make(chan responseChannel)
wg := &amp;sync.WaitGroup{}
wg.Add(maxParallel)
for i := 0; i &lt; maxParallel; i++ {
go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
for {
select {
case val := &lt;-ch:
resp, err := getFakeResult(val)
resChan &lt;- responseChannel{
Response: resp,
Err:      err,
}
case &lt;-ctx.Done():
wg.Done()
return
}
}
}(ctx, reqChan, resChan)
}
go func() {
for _, body := range fakeValues {
reqChan &lt;- body
}
close(reqChan)
cancel()
}()
go func() {
wg.Wait()
close(resChan)
}()
var hasErr error
response := make([]response, 0, len(fakeValues))
for res := range resChan {
if res.Err != nil {
hasErr = res.Err
cancel()
break
}
response = append(response, res.Response)
}
if hasErr != nil {
// return responses.ErrorResponse(hasErr) // returns http response
}
//	return responses.Accepted(response, nil) // returns http response
return nil
}
func getFakeValues() []string {
return []string{&quot;a&quot;}
}
func getFakeResult(val string) (response, error) {
if val == &quot;&quot; {
return response{}, fmt.Errorf(&quot;ooh noh:%s&quot;, val)
}
return response{
value: val,
}, nil
}

答案1

得分: 1

工人们最终被阻塞在发送到resChan上,因为它没有缓冲区,在出现错误后,没有任何东西从中读取。
你可以选择使resChan具有缓冲区,大小至少与maxParallel一样大。或者检查一下上下文是否被取消,例如将resChan <-改为

select {
case resChan <- responseChannel{
					Response: resp,
					Err:      err,
}:
case <-ctx.Done():
}
英文:

The workers end up blocked on sending to resChan because it's not buffered, and after an error, nothing reads from it.
You can either make resChan buffered, with a size at least as large as maxParallel. Or check to see if the context was canceled, e.g. change the resChan &lt;- to

select {
case resChan &lt;- responseChannel{
Response: resp,
Err:      err,
}:
case &lt;-ctx.Done():
}

答案2

得分: 1

你的解决方案存在两个主要问题:

首先,如果fakeValues切片中的项数多于maxParallel+1,程序将在以下部分阻塞:

for _, body := range fakeValues {
    reqChan <- body
}

为什么会发生这种情况呢?当你开始向reqChan中放入值时,每个启动的goroutine都会从reqChan中读取一个值,并尝试将响应写入resChan。但是,由于resChan仍未读取响应,每个goroutine都会在那里阻塞(写入resChan)。最终,一旦每个goroutine都被阻塞,从reqChan中读取也会被阻塞,你就无法再放入任何值(除了一个缓冲值)。

其次,你将上下文传递给了goroutine,但是没有对其进行任何处理。你可以使用ctx.Done()通道来接收退出goroutine的信号。类似这样:

go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
    for {
        select {
        case val := <-ch:
            resp, err := getFakeResult(val)
            resChan <- responseChannel{
                Response: resp,
                Err:      err,
            }

        case <-ctx.Done():
            return
        }
    }
}(ctx, reqChan, resChan)

现在,为了将所有内容整合在一起,以确保没有死锁、竞争条件和未处理的值,需要进行一些其他更改。我在下面发布了完整的代码。

func Testing() []response {

    fakeValues := getFakeValues()

    maxParallel := 25
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    if len(fakeValues) < maxParallel {
        maxParallel = len(fakeValues)
    }

    type responseChannel struct {
        Response response
        Err      error
    }

    reqChan := make(chan string) // 将其改为非缓冲通道
    resChan := make(chan responseChannel)

    wg := &sync.WaitGroup{}
    wg.Add(maxParallel)
    for i := 0; i < maxParallel; i++ {
        go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
            for {
                select {
                case val := <-ch:
                    resp, err := getFakeResult(val)
                    resChan <- responseChannel{
                        Response: resp,
                        Err:      err,
                    }

                case <-ctx.Done():
                    wg.Done()
                    return
                }
            }
            wg.Done()
        }(ctx, reqChan, resChan)
    }

    go func() {
        for _, body := range fakeValues {
            reqChan <- body
        }

        close(reqChan)
        // 将cancel放在这里,以便在从reqChan读取所有值后终止所有goroutine
        cancel()

    }()

    go func() {
        wg.Wait()
        close(resChan)
    }()

    var hasErr error
    response := make([]response, 0, len(fakeValues))
    for res := range resChan {
        if res.Err != nil {
            hasErr = res.Err
            cancel()
            break
        }

        response = append(response, res.Response)
    }

    if hasErr != nil {
        return responses.ErrorResponse(hasErr) // 返回http响应
    }

    return responses.Accepted(response, nil) // 返回http响应
}

简而言之,更改如下:

  • reqChan是一个非缓冲通道,这样可以避免在关闭读取数据的goroutine时,某些值可能无法被处理的情况。
  • 工作goroutine已更改以适应发生错误时和没有更多来自reqChan的数据要处理时的情况。当上下文被取消时,执行wg.Done()以确保最终关闭resChan
  • 创建了一个单独的goroutine,用于在不阻塞程序的情况下将数据放入reqChan,然后关闭它并取消上下文。
英文:

There are two main problems with your solution:

First, if your fakeValues slice has more items than maxParallel+1, your program will block on this part:

for _, body := range fakeValues {
reqChan &lt;- body
}

How does this happen? As you start putting values in reqChan, each started goroutine will read one value from the reqChan and try to write the response to resChan. But, since resChan is still not reading responses, each goroutine will block there (writing to resChan). Eventually, once each goroutine is blocked, reading from the reqChan is blocked as well and you cannot put any more values in it (apart from one buffered value).

Second, you are passing the context to your goroutines, but you are not doing anything with it. You can use ctx.Done() channel to get a signal to exit the goroutine. Something like this:

go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
for {
select {
case val := &lt;-ch:
resp, err := getFakeResult(val)
resChan &lt;- responseChannel{
Response: resp,
Err:      err,
}
case &lt;- ctx.Done():
return
}
}
}(ctx, reqChan, resChan)

Now, to tie everything together so that there are no deadlocks, no race conditions, and no situations where values are not processed, a few other changes need to be made. I've posted the entire code below.

func Testing() []response {
fakeValues := getFakeValues()
maxParallel := 25
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if len(fakeValues) &lt; maxParallel {
maxParallel = len(fakeValues)
}
type responseChannel struct {
Response response
Err      error
}
reqChan := make(chan string) //make this an unbuffered channel
resChan := make(chan responseChannel)
wg := &amp;sync.WaitGroup{}
wg.Add(maxParallel)
for i := 0; i &lt; maxParallel; i++ {
go func(ctx context.Context, ch chan string, resChan chan responseChannel) {
for {
select {
case val := &lt;-ch:
resp, err := getFakeResult(val)
resChan &lt;- responseChannel{
Response: resp,
Err:      err,
}
case &lt;- ctx.Done():
wg.Done()
return
}
}
wg.Done()
}(ctx, reqChan, resChan)
}
go func() {
for _, body := range fakeValues {
reqChan &lt;- body
}
close(reqChan)
//putting cancel here so that it can terminate all goroutines when all values are read from reqChan
cancel()
}()
go func() {
wg.Wait()
close(resChan)
}()
var hasErr error
response := make([]response, 0, len(fakeValues))
for res := range resChan {
if res.Err != nil {
hasErr = res.Err
cancel()
break
}
response = append(response, res.Response)
}
if hasErr != nil {
return responses.ErrorResponse(hasErr) // returns http response
}
return responses.Accepted(response, nil) // returns http response
}

In short, the changes are:

  • reqChan is an unbuffered channel, as this will help in cases where values might not get processed when we close goroutines that read data from buffered channels.
  • worker goroutines have been changed to accommodate the cases of both exiting when error happens and when there is no more data from reqChan to process. wg.Done() is executed when the context is canceled to ensure that resChan is eventually closed.
  • separate goroutine is created to put the data in the reqChan without blocking the program, close it afterward, and cancel the context.

huangapple
  • 本文由 发表于 2022年4月13日 05:03:21
  • 转载请务必保留本文链接:https://go.coder-hub.com/71849083.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定