如何在由未知数量的goroutine提供数据的通道上进行阻塞(和加入)?

huangapple go评论116阅读模式
英文:

How can I block (and join) on a channel fed by an unknown number of goroutines?

问题

我有一个递归函数。根据它接收到的数据,该函数将使用不同的值调用自身,因此递归的参数个数和深度是未知的:每次调用可能会调用自身零次或多次。该函数可以返回任意数量的值。

我想通过使用goroutine和channel来并行化它。inner的每次递归都在自己的goroutine中运行,并在通道上发送一个值。外部函数处理这些值。

  1. func outer(response []int) {
  2. results := make([]int)
  3. resultsChannel := make(chan int)
  4. inner := func(...) {
  5. resultsChannel <- «一些结果»;
  6. // 在新的goroutine中进行递归。
  7. for _, recursionArgument := range «一些计算得到的数据» {
  8. go inner(recursionArgument)
  9. }
  10. }
  11. go inner(«初始值»);
  12. for {
  13. result := <- resultsChannel
  14. results = append(results, result)
  15. // 帮助!我该如何决定何时退出循环?
  16. }
  17. return results
  18. }

问题出在如何跳出结果通道循环。由于递归的“形状”(参数个数和深度未知),我无法说“在n个事件之后结束”,也无法发送一个特殊值。

如何检测所有递归都已完成并从outer函数返回?有没有更好的方法来解决这个问题?

英文:

I have a recursive function. The function will call itself with various different values depending on the data it gets, so the arity and depth of recursion is not known: each call may call itself zero or more times. The function may return any number of values.

I want to parallelise it by getting goroutines and channels involved. Each recursion of inner runs in its own goroutine, and sends back a value on the channel. The outer function deals with those values.

  1. func outer(response []int) {
  2. results := make([]int)
  3. resultsChannel := make(chan int)
  4. inner := func(...) {
  5. resultsChannel &lt;- &#171;some result&#187;;
  6. // Recurse in a new goroutine.
  7. for _, recursionArgument in &#171;some calculated data&#187; {
  8. go inner(recursionArgument)
  9. }
  10. }
  11. go inner(&#171;initial values&#187;);
  12. for {
  13. result := &lt;- resultsChannel
  14. results = append(results, result)
  15. // HELP! How do I decide when to break?
  16. }
  17. return results
  18. }

The problem comes with escaping the results channel loop. Because of the 'shape' of the recursion (unknown arity and depth) I can't say "finish after n events" and I can't send a sentinel value.

How do I detect when all my recursions have happened and return from outer? Is there a better way to approach this?

答案1

得分: 3

你可以使用sync.WaitGroup来管理你所创建的goroutine集合:在每个新的goroutine生成之前调用Add(1),在每个goroutine完成时调用Done。代码示例如下:

  1. var wg sync.WaitGroup
  2. inner := func(...) {
  3. ...
  4. // 在一个新的goroutine中递归调用。
  5. for _, recursionArgument := range «一些计算得到的数据» {
  6. wg.Add(1)
  7. go inner(recursionArgument)
  8. }
  9. ...
  10. wg.Done()
  11. }
  12. wg.Add(1)
  13. go inner(«初始值»)
  14. 现在等待`wg`将告诉你所有的goroutine何时完成
  15. 如果你从一个channel中读取结果判断没有更多结果的明显方法是关闭该channel你可以通过另一个goroutine来实现这一点
  16. go func() {
  17. wg.Wait()
  18. close(resultsChannel)
  19. }()
  20. 现在你可以简单地使用`range`遍历`resultsChannel`来读取所有的结果
英文:

You can use a sync.WaitGroup to manage the collection of goroutines you spawn: call Add(1) before spawning each new goroutine, and Done when each goroutine completes. So something like this:

  1. var wg sync.WaitGroup
  2. inner := func(...) {
  3. ...
  4. // Recurse in a new goroutine.
  5. for _, recursionArgument := range &#171;some calculated data&#187; {
  6. wg.Add(1)
  7. go inner(recursionArgument)
  8. }
  9. ...
  10. wg.Done()
  11. }
  12. wg.Add(1)
  13. go inner(&#171;initial values&#187;)

Now waiting on wg will tell you when all the goroutines have completed.

If you are reading the results from a channel, the obvious way to tell when there are no more results is by closing the channel. You can achieve this through another goroutine to do this for us:

  1. go func() {
  2. wg.Wait()
  3. close(resultsChannel)
  4. }()

You should now be able to simply range over resultsChannel to read all the results.

huangapple
  • 本文由 发表于 2013年12月3日 20:13:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/20350915.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定