英文:
How do I close a channel multiple goroutines are sending on?
问题
我正在尝试并行计算。程序设计为每个工作协程将已解决的谜题的“片段”发送回等待接收和组装来自工作协程发送的所有内容的控制器协程。
在Go语言中,关闭单个通道的惯用方式是什么?我不能在每个协程中调用close函数关闭通道,因为这样可能会在关闭的通道上发送数据。同样,无法预先确定哪个协程会先完成。在这种情况下,是否需要使用sync.WaitGroup?
英文:
I am attempting to do some computation in parallel. The program is designed so that each worker goroutine sends "pieces" of a solved puzzle back to the controller goroutine that waits to receive and assembles everything sent from the worker routines.
What is the idomatic Go for closing the single channel? I cannot call close on the channel in each goroutine because then I could possibly send on a closed channel. Likewise, there is no way to predetermine which goroutine will finish first. Is a sync.WaitGroup necessary here?
答案1
得分: 9
这是一个使用sync.WaitGroup
的示例,可以实现你所需的功能。
这个示例接受一个较长的整数列表,然后通过将N个并行工作器分配给相等大小的输入数据块来将它们全部相加。可以在go playground上运行此示例:
package main
import (
"fmt"
"sync"
)
const WorkerCount = 10
func main() {
// 一些要操作的输入数据
// 每个工作器都获得相等的份额来处理
data := make([]int, WorkerCount*10)
for i := range data {
data[i] = i
}
// 求和所有的条目
result := sum(data)
fmt.Printf("Sum: %d\n", result)
}
// sum通过将操作委托给并行操作在输入数据的子切片上的工作器来将给定列表中的数字相加。
func sum(data []int) int {
var sum int
result := make(chan int)
defer close(result)
// 从工作器累积结果
go func() {
for {
select {
case value := <-result:
sum += value
}
}
}()
// WaitGroup将跟踪所有工作器的完成情况。
wg := new(sync.WaitGroup)
wg.Add(WorkerCount)
// 将工作分配给工作器的数量。
chunkSize := len(data) / WorkerCount
// 启动工作器。
for i := 0; i < WorkerCount; i++ {
go func(i int) {
offset := i * chunkSize
worker(result, data[offset:offset+chunkSize])
wg.Done()
}(i)
}
// 等待所有工作器完成,然后返回结果。
wg.Wait()
return sum
}
// worker对给定列表中的数字求和。
func worker(result chan int, data []int) {
var sum int
for _, v := range data {
sum += v
}
result <- sum
}
英文:
Here is an example using the sync.WaitGroup
to do what you are looking for,
This example accepts a lenghty list of integers, then sums them all up by handing N parallel workers an equal-sized chunk of the input data. It can be run on go playground:
package main
import (
"fmt"
"sync"
)
const WorkerCount = 10
func main() {
// Some input data to operate on.
// Each worker gets an equal share to work on.
data := make([]int, WorkerCount*10)
for i := range data {
data[i] = i
}
// Sum all the entries.
result := sum(data)
fmt.Printf("Sum: %d\n", result)
}
// sum adds up the numbers in the given list, by having the operation delegated
// to workers operating in parallel on sub-slices of the input data.
func sum(data []int) int {
var sum int
result := make(chan int)
defer close(result)
// Accumulate results from workers.
go func() {
for {
select {
case value := <-result:
sum += value
}
}
}()
// The WaitGroup will track completion of all our workers.
wg := new(sync.WaitGroup)
wg.Add(WorkerCount)
// Divide the work up over the number of workers.
chunkSize := len(data) / WorkerCount
// Spawn workers.
for i := 0; i < WorkerCount; i++ {
go func(i int) {
offset := i * chunkSize
worker(result, data[offset:offset+chunkSize])
wg.Done()
}(i)
}
// Wait for all workers to finish, before returning the result.
wg.Wait()
return sum
}
// worker sums up the numbers in the given list.
func worker(result chan int, data []int) {
var sum int
for _, v := range data {
sum += v
}
result <- sum
}
答案2
得分: 4
是的,这是sync.WaitGroup的一个完美使用案例。
你的另一个选择是为每个goroutine使用1个通道,并且有一个多路复用的goroutine,从每个通道中输入到一个单独的通道中。但是这样会很快变得难以管理,所以我会选择使用sync.WaitGroup。
英文:
Yes, This is a perfect use case for sync.WaitGroup.
Your other option is to use 1 channel per goroutine and one multiplexer goroutine that feeds from each channel into a single channel. But that would get unwieldy fast so I'd just go with a sync.WaitGroup.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论