How to collect values from a channel into a slice in Go?

huangapple go评论122阅读模式
英文:

How to collect values from a channel into a slice in Go?

问题

假设我有一个辅助函数 helper(n int),它返回一个长度可变的整数切片。我想要并行运行 helper(n),并收集输出结果到一个大的切片中。我首次尝试的代码如下:

  1. package main
  2. import (
  3. "fmt"
  4. "golang.org/x/sync/errgroup"
  5. )
  6. func main() {
  7. out := make([]int, 0)
  8. ch := make(chan int)
  9. go func() {
  10. for i := range ch {
  11. out = append(out, i)
  12. }
  13. }()
  14. g := new(errgroup.Group)
  15. for n := 2; n <= 3; n++ {
  16. n := n
  17. g.Go(func() error {
  18. for _, i := range helper(n) {
  19. ch <- i
  20. }
  21. return nil
  22. })
  23. }
  24. if err := g.Wait(); err != nil {
  25. panic(err)
  26. }
  27. close(ch)
  28. // time.Sleep(time.Second)
  29. fmt.Println(out) // 应该包含与 [0 1 0 1 2] 相同的元素
  30. }
  31. func helper(n int) []int {
  32. out := make([]int, 0)
  33. for i := 0; i < n; i++ {
  34. out = append(out, i)
  35. }
  36. return out
  37. }

然而,如果我运行这个示例,我得到的结果不是预期的全部5个值,而是:

  1. [0 1 0 1]

(如果我取消注释 time.Sleep,我会得到所有五个值 [0 1 2 0 1],但这不是一个可接受的解决方案)。

看起来问题在于 out 在一个 goroutine 中被更新,但是 main 函数在更新完成之前就返回了。

一个可行的解决方案是使用大小为5的缓冲通道:

  1. func main() {
  2. ch := make(chan int, 5)
  3. g := new(errgroup.Group)
  4. for n := 2; n <= 3; n++ {
  5. n := n
  6. g.Go(func() error {
  7. for _, i := range helper(n) {
  8. ch <- i
  9. }
  10. return nil
  11. })
  12. }
  13. if err := g.Wait(); err != nil {
  14. panic(err)
  15. }
  16. close(ch)
  17. out := make([]int, 0)
  18. for i := range ch {
  19. out = append(out, i)
  20. }
  21. fmt.Println(out) // 应该包含与 [0 1 0 1 2] 相同的元素
  22. }

然而,在这个简化的示例中,我知道输出的大小是多少,但在实际应用中,我事先是不知道的。实际上,我希望有一个“无限”缓冲区,使得向通道发送数据永远不会阻塞,或者有一种更符合惯用方式的方法来实现相同的效果;我阅读了 https://blog.golang.org/pipelines,但没有找到与我的用例非常匹配的内容。有什么想法吗?

英文:

Suppose I have a helper function helper(n int) which returns a slice of integers of variable length. I would like to run helper(n) in parallel for various values of n and collect the output in one big slice. My first attempt at this is the following:

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;golang.org/x/sync/errgroup&quot;
  5. )
  6. func main() {
  7. out := make([]int, 0)
  8. ch := make(chan int)
  9. go func() {
  10. for i := range ch {
  11. out = append(out, i)
  12. }
  13. }()
  14. g := new(errgroup.Group)
  15. for n := 2; n &lt;= 3; n++ {
  16. n := n
  17. g.Go(func() error {
  18. for _, i := range helper(n) {
  19. ch &lt;- i
  20. }
  21. return nil
  22. })
  23. }
  24. if err := g.Wait(); err != nil {
  25. panic(err)
  26. }
  27. close(ch)
  28. // time.Sleep(time.Second)
  29. fmt.Println(out) // should have the same elements as [0 1 0 1 2]
  30. }
  31. func helper(n int) []int {
  32. out := make([]int, 0)
  33. for i := 0; i &lt; n; i++ {
  34. out = append(out, i)
  35. }
  36. return out
  37. }

However, if I run this example I do not get all 5 expected values, instead I get

  1. [0 1 0 1]

(If I uncomment the time.Sleep I do get all five values, [0 1 2 0 1], but this is not an acceptable solution).

It seems that the problem with this is that out is being updated in a goroutine, but the main function returns before it is done updating.

One thing that would work is using a buffered channel of size 5:

  1. func main() {
  2. ch := make(chan int, 5)
  3. g := new(errgroup.Group)
  4. for n := 2; n &lt;= 3; n++ {
  5. n := n
  6. g.Go(func() error {
  7. for _, i := range helper(n) {
  8. ch &lt;- i
  9. }
  10. return nil
  11. })
  12. }
  13. if err := g.Wait(); err != nil {
  14. panic(err)
  15. }
  16. close(ch)
  17. out := make([]int, 0)
  18. for i := range ch {
  19. out = append(out, i)
  20. }
  21. fmt.Println(out) // should have the same elements as [0 1 0 1 2]
  22. }

However, although in this simplified example I know what the size of the output should be, in my actual application this is not known a priori. Essentially what I would like is an 'infinite' buffer such that sending to the channel never blocks, or a more idiomatic way to achieve the same thing; I've read https://blog.golang.org/pipelines but wasn't able to find a close match to my use case. Any ideas?

答案1

得分: 2

在这个代码版本中,执行被阻塞,直到ch被关闭。

ch总是在负责向ch推送数据的例程结束时关闭。因为程序在例程中向ch推送数据,所以不需要使用缓冲通道。

以下是修复后的第一个代码版本,它虽然复杂,但演示了sync.WaitGroup的用法。

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. "golang.org/x/sync/errgroup"
  6. )
  7. func main() {
  8. out := make([]int, 0)
  9. ch := make(chan int)
  10. var wg sync.WaitGroup
  11. wg.Add(1)
  12. go func() {
  13. defer wg.Done()
  14. for i := range ch {
  15. out = append(out, i)
  16. }
  17. }()
  18. g := new(errgroup.Group)
  19. for n := 2; n <= 3; n++ {
  20. n := n
  21. g.Go(func() error {
  22. for _, i := range helper(n) {
  23. ch <- i
  24. }
  25. return nil
  26. })
  27. }
  28. if err := g.Wait(); err != nil {
  29. panic(err)
  30. }
  31. close(ch)
  32. wg.Wait()
  33. // time.Sleep(time.Second)
  34. fmt.Println(out) // 应该与 [0 1 0 1 2] 有相同的元素
  35. }
  36. func helper(n int) []int {
  37. out := make([]int, 0)
  38. for i := 0; i < n; i++ {
  39. out = append(out, i)
  40. }
  41. return out
  42. }

希望能对你有所帮助!

英文:

In this version of the code, the execution is blocked until ch is closed.

ch is always closed at the end of a routine that is responsible to push into ch. Because the program pushes to ch in a routine, it is not needed to use a buffered channel.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;golang.org/x/sync/errgroup&quot;
  5. )
  6. func main() {
  7. ch := make(chan int)
  8. go func() {
  9. g := new(errgroup.Group)
  10. for n := 2; n &lt;= 3; n++ {
  11. n := n
  12. g.Go(func() error {
  13. for _, i := range helper(n) {
  14. ch &lt;- i
  15. }
  16. return nil
  17. })
  18. }
  19. if err := g.Wait(); err != nil {
  20. panic(err)
  21. }
  22. close(ch)
  23. }()
  24. out := make([]int, 0)
  25. for i := range ch {
  26. out = append(out, i)
  27. }
  28. fmt.Println(out) // should have the same elements as [0 1 0 1 2]
  29. }
  30. func helper(n int) []int {
  31. out := make([]int, 0)
  32. for i := 0; i &lt; n; i++ {
  33. out = append(out, i)
  34. }
  35. return out
  36. }

Here is the fixed version of the first code, it is convoluted but demonstrates the usage of sync.WaitGroup.

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;sync&quot;
  5. &quot;golang.org/x/sync/errgroup&quot;
  6. )
  7. func main() {
  8. out := make([]int, 0)
  9. ch := make(chan int)
  10. var wg sync.WaitGroup
  11. wg.Add(1)
  12. go func() {
  13. defer wg.Done()
  14. for i := range ch {
  15. out = append(out, i)
  16. }
  17. }()
  18. g := new(errgroup.Group)
  19. for n := 2; n &lt;= 3; n++ {
  20. n := n
  21. g.Go(func() error {
  22. for _, i := range helper(n) {
  23. ch &lt;- i
  24. }
  25. return nil
  26. })
  27. }
  28. if err := g.Wait(); err != nil {
  29. panic(err)
  30. }
  31. close(ch)
  32. wg.Wait()
  33. // time.Sleep(time.Second)
  34. fmt.Println(out) // should have the same elements as [0 1 0 1 2]
  35. }
  36. func helper(n int) []int {
  37. out := make([]int, 0)
  38. for i := 0; i &lt; n; i++ {
  39. out = append(out, i)
  40. }
  41. return out
  42. }

huangapple
  • 本文由 发表于 2021年6月30日 21:08:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/68195431.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定