How to collect values from a channel into a slice in Go?

huangapple go评论94阅读模式
英文:

How to collect values from a channel into a slice in Go?

问题

假设我有一个辅助函数 helper(n int),它返回一个长度可变的整数切片。我想要并行运行 helper(n),并收集输出结果到一个大的切片中。我首次尝试的代码如下:

package main

import (
	"fmt"

	"golang.org/x/sync/errgroup"
)

func main() {
	out := make([]int, 0)
	ch := make(chan int)

	go func() {
		for i := range ch {
			out = append(out, i)
		}
	}()

	g := new(errgroup.Group)
	for n := 2; n <= 3; n++ {
		n := n
		g.Go(func() error {
			for _, i := range helper(n) {
				ch <- i
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		panic(err)
	}
	close(ch)

	// time.Sleep(time.Second)
	fmt.Println(out) // 应该包含与 [0 1 0 1 2] 相同的元素
}

func helper(n int) []int {
	out := make([]int, 0)
	for i := 0; i < n; i++ {
		out = append(out, i)
	}
	return out
}

然而,如果我运行这个示例,我得到的结果不是预期的全部5个值,而是:

[0 1 0 1]

(如果我取消注释 time.Sleep,我会得到所有五个值 [0 1 2 0 1],但这不是一个可接受的解决方案)。

看起来问题在于 out 在一个 goroutine 中被更新,但是 main 函数在更新完成之前就返回了。

一个可行的解决方案是使用大小为5的缓冲通道:

func main() {
	ch := make(chan int, 5)

	g := new(errgroup.Group)
	for n := 2; n <= 3; n++ {
		n := n
		g.Go(func() error {
			for _, i := range helper(n) {
				ch <- i
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		panic(err)
	}
	close(ch)

	out := make([]int, 0)
	for i := range ch {
		out = append(out, i)
	}

	fmt.Println(out) // 应该包含与 [0 1 0 1 2] 相同的元素
}

然而,在这个简化的示例中,我知道输出的大小是多少,但在实际应用中,我事先是不知道的。实际上,我希望有一个“无限”缓冲区,使得向通道发送数据永远不会阻塞,或者有一种更符合惯用方式的方法来实现相同的效果;我阅读了 https://blog.golang.org/pipelines,但没有找到与我的用例非常匹配的内容。有什么想法吗?

英文:

Suppose I have a helper function helper(n int) which returns a slice of integers of variable length. I would like to run helper(n) in parallel for various values of n and collect the output in one big slice. My first attempt at this is the following:

package main

import (
	&quot;fmt&quot;

	&quot;golang.org/x/sync/errgroup&quot;
)

func main() {
	out := make([]int, 0)
	ch := make(chan int)

	go func() {
		for i := range ch {
			out = append(out, i)
		}
	}()

	g := new(errgroup.Group)
	for n := 2; n &lt;= 3; n++ {
		n := n
		g.Go(func() error {
			for _, i := range helper(n) {
				ch &lt;- i
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		panic(err)
	}
	close(ch)

	// time.Sleep(time.Second)
	fmt.Println(out) // should have the same elements as [0 1 0 1 2]
}

func helper(n int) []int {
	out := make([]int, 0)
	for i := 0; i &lt; n; i++ {
		out = append(out, i)
	}
	return out
}

However, if I run this example I do not get all 5 expected values, instead I get

[0 1 0 1]

(If I uncomment the time.Sleep I do get all five values, [0 1 2 0 1], but this is not an acceptable solution).

It seems that the problem with this is that out is being updated in a goroutine, but the main function returns before it is done updating.

One thing that would work is using a buffered channel of size 5:

func main() {
	ch := make(chan int, 5)

	g := new(errgroup.Group)
	for n := 2; n &lt;= 3; n++ {
		n := n
		g.Go(func() error {
			for _, i := range helper(n) {
				ch &lt;- i
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		panic(err)
	}
	close(ch)

	out := make([]int, 0)
	for i := range ch {
		out = append(out, i)
	}

	fmt.Println(out) // should have the same elements as [0 1 0 1 2]
}

However, although in this simplified example I know what the size of the output should be, in my actual application this is not known a priori. Essentially what I would like is an 'infinite' buffer such that sending to the channel never blocks, or a more idiomatic way to achieve the same thing; I've read https://blog.golang.org/pipelines but wasn't able to find a close match to my use case. Any ideas?

答案1

得分: 2

在这个代码版本中,执行被阻塞,直到ch被关闭。

ch总是在负责向ch推送数据的例程结束时关闭。因为程序在例程中向ch推送数据,所以不需要使用缓冲通道。

以下是修复后的第一个代码版本,它虽然复杂,但演示了sync.WaitGroup的用法。

package main

import (
	"fmt"
	"sync"

	"golang.org/x/sync/errgroup"
)

func main() {
	out := make([]int, 0)
	ch := make(chan int)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := range ch {
			out = append(out, i)
		}
	}()

	g := new(errgroup.Group)
	for n := 2; n <= 3; n++ {
		n := n
		g.Go(func() error {
			for _, i := range helper(n) {
				ch <- i
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		panic(err)
	}
	close(ch)

	wg.Wait()
	// time.Sleep(time.Second)
	fmt.Println(out) // 应该与 [0 1 0 1 2] 有相同的元素
}

func helper(n int) []int {
	out := make([]int, 0)
	for i := 0; i < n; i++ {
		out = append(out, i)
	}
	return out
}

希望能对你有所帮助!

英文:

In this version of the code, the execution is blocked until ch is closed.

ch is always closed at the end of a routine that is responsible to push into ch. Because the program pushes to ch in a routine, it is not needed to use a buffered channel.

package main

import (
	&quot;fmt&quot;

	&quot;golang.org/x/sync/errgroup&quot;
)

func main() {
	ch := make(chan int)

	go func() {
		g := new(errgroup.Group)
		for n := 2; n &lt;= 3; n++ {
			n := n
			g.Go(func() error {
				for _, i := range helper(n) {
					ch &lt;- i
				}
				return nil
			})
		}
		if err := g.Wait(); err != nil {
			panic(err)
		}
		close(ch)
	}()

	out := make([]int, 0)
	for i := range ch {
		out = append(out, i)
	}

	fmt.Println(out) // should have the same elements as [0 1 0 1 2]
}

func helper(n int) []int {
	out := make([]int, 0)
	for i := 0; i &lt; n; i++ {
		out = append(out, i)
	}
	return out
}

Here is the fixed version of the first code, it is convoluted but demonstrates the usage of sync.WaitGroup.

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;

	&quot;golang.org/x/sync/errgroup&quot;
)

func main() {
	out := make([]int, 0)
	ch := make(chan int)

	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := range ch {
			out = append(out, i)
		}
	}()

	g := new(errgroup.Group)
	for n := 2; n &lt;= 3; n++ {
		n := n
		g.Go(func() error {
			for _, i := range helper(n) {
				ch &lt;- i
			}
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		panic(err)
	}
	close(ch)

	wg.Wait()
	// time.Sleep(time.Second)
	fmt.Println(out) // should have the same elements as [0 1 0 1 2]
}

func helper(n int) []int {
	out := make([]int, 0)
	for i := 0; i &lt; n; i++ {
		out = append(out, i)
	}
	return out
}

huangapple
  • 本文由 发表于 2021年6月30日 21:08:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/68195431.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定