并发处理程序正在阻塞。

huangapple go评论91阅读模式
英文:

Concurrent handler is blocking

问题

我们发现一个 mqtt.MessageHandler 没有正常工作。在处理程序中,我们将过滤传入的消息,然后将有效的事件传递给一个函数进行处理。该函数的实现如下:

func processEvent(i models.Foo) (string, error) {
    var wg sync.WaitGroup
    quit := make(chan bool)
    errc := make(chan error)
    done := make(chan error)

    err := func1()
    if err != nil {
        return err
    }

    switch strings.ToUpper(i.Status) {
    case "OK":
        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask1()
            ch := done
            if err != nil {
                log.Error("%s", err.Error())
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        wg.Add(1)
        go func() {
            defer wg.Done()
            err = longTimeTask2()
            ch := done
            if err != nil {
                ch = errc
            }
            select {
            case ch <- err:
                return
            case <-quit:
                return
            }
        }()

        result := "processed"
        count := 0
        for {
            select {
            case err := <-errc:
                close(quit)
                log.Info("event: %s, %s", "", err.Error())
                return "", err
            case <-done:
                count++
                if count == 4 { // 为什么是4???
                    return result, nil
                }
            }
        }

        wg.Wait()

        if err != nil {
            log.Info("event: %s, %s", result, err.Error())
            return result, err
        }
        close(quit)
        close(errc)
        close(done)
        return result, nil
    default:
        return "", nil
    }

    return "", nil
}

我理解了,它试图同步 longTimeTask1()longTimeTask2()。但是对我来说,这段代码相当复杂,我不太理解 count 和 count == 4 的目的是什么?为什么最后要关闭通道?代码提示中的 wg.Wait() 是无法到达的。

在这个函数之前,它工作得很好。但最近 longTimeTask1()longTimeTask2() 可能会返回一些错误,导致代码中断,这个函数似乎完全被阻塞了。你能帮我理解这段代码,并找出潜在的问题,重构这部分吗?

英文:

We found one mqtt.MessageHandler is not working properly. In the handler, we will filter the coming message then pass the valid event to one func to process. The func is implemented as below:

func processEvent(i models.Foo) (string, error) {
var wg sync.WaitGroup
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
err := func1()
if err != nil {
return err
}
switch strings.ToUpper(i.Status) {
case &quot;OK&quot;:
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask1()
ch := done
if err != nil {
log.Error(&quot;%s&quot;, err.Error())
ch = errc
}
select {
case ch &lt;- err:
return
case &lt;-quit:
return
}
}()
wg.Add(1)
go func() {
defer wg.Done()
err = longTimeTask2()
ch := done
if err != nil {
ch = errc
}
select {
case ch &lt;- err:
return
case &lt;-quit:
return
}
}()
result := &quot;processed&quot;
count := 0
for {
select {
case err := &lt;-errc:
close(quit)
log.Info(&quot;event: %s, %s&quot;, &quot;&quot;, err.Error())
return &quot;&quot;, err
case &lt;-done:
count++
if count == 4 { // why 4???
return result, nil
}
}
}
wg.Wait()
if err != nil {
log.Info(&quot;event: %s, %s&quot;, result, err.Error())
return result, err
}
close(quit)
close(errc)
close(done)
return result, nil
default:
return &quot;&quot;, nil
}
return &quot;&quot;, nil
}

I understand, it's trying to sync the longTimeTask1() and longTimeTask2(). But it's quite complex for me to understand. What's the purpose of count and count == 4? Why the close at the last? The wg.Wait() is unreachable by the code hint.
Before this func is working well. but recently longTimeTask1() or longTimeTask2() might return some error, which breaks the code, this func seems is blocked totally. Could you please help me understand the code and find the potential issues and refactor this part?

答案1

得分: 3

看起来,代码中的count变量似乎期望从done通道接收到四条消息。然而,这段代码最多只能从两个goroutine中产生两条这样的消息,所以这是一个bug。

另外,如果任何一个goroutine返回错误,它将不会写入done通道,这也是另一个bug。

另一种编写代码的方式可以是:

...
result := "processed"
for {
    select {
       case err := <-errc:
          close(quit) // 告诉goroutine终止
          log.Info("event: %s, %s", "", err.Error())
          wg.Wait() // 等待它们完成
          return "", err
  
       case <-done:
          count++
          if count == 2 {
              wg.Wait()
              return result, nil
          }    
}
英文:

Looking at count, it appears like the code is expecting to receive four messages from the done channel. However, this code can produce at most two such messages from the two goroutines, so that's a bug.

Also, if any of the goroutines returns an error, it will not write to the done channel, so that's another bug.

Another way to write this could be:

...
result := &quot;processed&quot;
for {
select {
case err := &lt;-errc:
close(quit) // Tell the goroutines to terminate
log.Info(&quot;event: %s, %s&quot;, &quot;&quot;, err.Error())
wg.Wait() // Wait for them to finish
return &quot;&quot;, err
case &lt;-done:
count++
if count == 2 {
wg.Wait()
return result, nil
}    
}
</details>
# 答案2
**得分**: 2
这正是[`errgroup`](https://pkg.go.dev/golang.org/x/sync/errgroup)包设计用来处理的分叉和合并并发的情况:
```go
func processEvent(ctx context.Context, i models.Foo) (string, error) {
err := func1()
if err != nil {
return "", err
}
g, ctx := errgroup.WithContext(ctx)
if strings.ToUpper(i.Status) != "OK" {
return "", nil
}
g.Go(func() error { return longTimeTask1(ctx) })
g.Go(func() error { return longTimeTask2(ctx) })
if err := g.Wait(); err != nil {
log.Printf("event: %v", err)
return "", err
}
return "processed", nil
}

(https://play.golang.org/p/JNMKftQTLGs)

英文:

This is exactly the sort of fork-and-join concurrency that the errgroup package was designed for:

func processEvent(ctx context.Context, i models.Foo) (string, error) {
	err := func1()
	if err != nil {
		return &quot;&quot;, err
	}

	g, ctx := errgroup.WithContext(ctx)

	if strings.ToUpper(i.Status) != &quot;OK&quot; {
		return &quot;&quot;, nil
	}

	g.Go(func() error { return longTimeTask1(ctx) })
	g.Go(func() error { return longTimeTask2(ctx) })

	if err := g.Wait(); err != nil {
		log.Printf(&quot;event: %v&quot;, err)
		return &quot;&quot;, err
	}
	return &quot;processed&quot;, nil
}

(https://play.golang.org/p/JNMKftQTLGs)

huangapple
  • 本文由 发表于 2021年9月23日 21:55:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/69301457.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定