英文:
Concurrent handler is blocking
问题
我们发现一个 mqtt.MessageHandler
没有正常工作。在处理程序中,我们将过滤传入的消息,然后将有效的事件传递给一个函数进行处理。该函数的实现如下:
func processEvent(i models.Foo) (string, error) {
var wg sync.WaitGroup
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
err := func1()
if err != nil {
return err
}
switch strings.ToUpper(i.Status) {
case "OK":
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask1()
ch := done
if err != nil {
log.Error("%s", err.Error())
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask2()
ch := done
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
result := "processed"
count := 0
for {
select {
case err := <-errc:
close(quit)
log.Info("event: %s, %s", "", err.Error())
return "", err
case <-done:
count++
if count == 4 { // 为什么是4???
return result, nil
}
}
}
wg.Wait()
if err != nil {
log.Info("event: %s, %s", result, err.Error())
return result, err
}
close(quit)
close(errc)
close(done)
return result, nil
default:
return "", nil
}
return "", nil
}
我理解了,它试图同步 longTimeTask1()
和 longTimeTask2()
。但是对我来说,这段代码相当复杂,我不太理解 count 和 count == 4 的目的是什么?为什么最后要关闭通道?代码提示中的 wg.Wait()
是无法到达的。
在这个函数之前,它工作得很好。但最近 longTimeTask1()
或 longTimeTask2()
可能会返回一些错误,导致代码中断,这个函数似乎完全被阻塞了。你能帮我理解这段代码,并找出潜在的问题,重构这部分吗?
英文:
We found one mqtt.MessageHandler
is not working properly. In the handler, we will filter the coming message then pass the valid event to one func to process. The func is implemented as below:
func processEvent(i models.Foo) (string, error) {
var wg sync.WaitGroup
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
err := func1()
if err != nil {
return err
}
switch strings.ToUpper(i.Status) {
case "OK":
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask1()
ch := done
if err != nil {
log.Error("%s", err.Error())
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask2()
ch := done
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}()
result := "processed"
count := 0
for {
select {
case err := <-errc:
close(quit)
log.Info("event: %s, %s", "", err.Error())
return "", err
case <-done:
count++
if count == 4 { // why 4???
return result, nil
}
}
}
wg.Wait()
if err != nil {
log.Info("event: %s, %s", result, err.Error())
return result, err
}
close(quit)
close(errc)
close(done)
return result, nil
default:
return "", nil
}
return "", nil
}
I understand, it's trying to sync the longTimeTask1()
and longTimeTask2(). But it's quite complex for me to understand. What's the purpose of count and count == 4? Why the close at the last? The wg.Wait()
is unreachable by the code hint.
Before this func is working well. but recently longTimeTask1()
or longTimeTask2()
might return some error, which breaks the code, this func seems is blocked totally. Could you please help me understand the code and find the potential issues and refactor this part?
答案1
得分: 3
看起来,代码中的count
变量似乎期望从done
通道接收到四条消息。然而,这段代码最多只能从两个goroutine中产生两条这样的消息,所以这是一个bug。
另外,如果任何一个goroutine返回错误,它将不会写入done
通道,这也是另一个bug。
另一种编写代码的方式可以是:
...
result := "processed"
for {
select {
case err := <-errc:
close(quit) // 告诉goroutine终止
log.Info("event: %s, %s", "", err.Error())
wg.Wait() // 等待它们完成
return "", err
case <-done:
count++
if count == 2 {
wg.Wait()
return result, nil
}
}
英文:
Looking at count
, it appears like the code is expecting to receive four messages from the done
channel. However, this code can produce at most two such messages from the two goroutines, so that's a bug.
Also, if any of the goroutines returns an error, it will not write to the done
channel, so that's another bug.
Another way to write this could be:
...
result := "processed"
for {
select {
case err := <-errc:
close(quit) // Tell the goroutines to terminate
log.Info("event: %s, %s", "", err.Error())
wg.Wait() // Wait for them to finish
return "", err
case <-done:
count++
if count == 2 {
wg.Wait()
return result, nil
}
}
</details>
# 答案2
**得分**: 2
这正是[`errgroup`](https://pkg.go.dev/golang.org/x/sync/errgroup)包设计用来处理的分叉和合并并发的情况:
```go
func processEvent(ctx context.Context, i models.Foo) (string, error) {
err := func1()
if err != nil {
return "", err
}
g, ctx := errgroup.WithContext(ctx)
if strings.ToUpper(i.Status) != "OK" {
return "", nil
}
g.Go(func() error { return longTimeTask1(ctx) })
g.Go(func() error { return longTimeTask2(ctx) })
if err := g.Wait(); err != nil {
log.Printf("event: %v", err)
return "", err
}
return "processed", nil
}
(https://play.golang.org/p/JNMKftQTLGs)
英文:
This is exactly the sort of fork-and-join concurrency that the errgroup
package was designed for:
func processEvent(ctx context.Context, i models.Foo) (string, error) {
err := func1()
if err != nil {
return "", err
}
g, ctx := errgroup.WithContext(ctx)
if strings.ToUpper(i.Status) != "OK" {
return "", nil
}
g.Go(func() error { return longTimeTask1(ctx) })
g.Go(func() error { return longTimeTask2(ctx) })
if err := g.Wait(); err != nil {
log.Printf("event: %v", err)
return "", err
}
return "processed", nil
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论