英文:
Go - How to know when an output channel is done
问题
我尝试按照Rob Pike在他的演讲《并发不等于并行》中的示例做了一些类似的事情:
我启动了许多作为工作者的Go协程,它们从输入通道读取数据,执行一些处理,然后通过输出通道发送结果。
然后我启动了另一个Go协程,它从某个来源读取数据,并将其通过输入通道发送给工作者。
最后,我想迭代输出通道中的所有结果,并对它们进行某些操作。
问题是,由于工作被分配给了工作者,我不知道何时所有的工作者都已经完成,以便我可以停止向输出通道请求更多的结果,并且我的程序可以正常结束。
如何最好地知道工作者何时完成向输出通道发送结果?
英文:
I tried to follow Rob Pike's example from the talk 'Concurrency is not parallelism' and did something like this:
I'm starting many go routines as workers that read from an input channel, perform some processing and then send the result through the output channel.
Then I start another go routine that reads data from some source and send it to the workers through their input channel.
Lastly I want to iterate over all of the results in the output channel and do something with them.
The problem is that because the work is split between the workers I don't know when all of the workers have finished so I can stop asking the output channel for more results, and my program could end properly.
What is the best practice to know when workers have finished sending results to an output channel?
答案1
得分: 4
我个人喜欢使用sync.WaitGroup
来实现这个功能。WaitGroup
是一个同步计数器,它有三个方法 - Wait()
、Done()
和Add()
。你可以通过增加计数器的值,将其传递给工作线程,并在工作线程完成时调用Done()
方法。然后,在另一端阻塞等待WaitGroup
,当所有工作线程都完成时,关闭输出通道,导致输出处理器退出。
基本上的代码如下:
// 创建WaitGroup
wg := sync.WaitGroup{}
// 这是输出通道
outchan := make(chan whatever)
// 启动工作线程
for i := 0; i < N; i++ {
wg.Add(1) // 增加WaitGroup的计数器
// 工作线程将数据推送到输出通道,并在完成时调用wg.Done()
go work(&wg, outchan)
}
// 这是我们的“等待者” - 它会阻塞直到所有工作线程完成,并关闭通道
go func() {
wg.Wait()
close(outchan)
}()
// 当outchan关闭时,此循环将自动退出
for item := range outchan {
workWithIt(item)
}
// 完成!
希望对你有帮助!
英文:
I personally like to use a sync.WaitGroup
for that. A waitgroup is a synchronized counter that has three methods - Wait()
, Done()
and Add()
. What you do is increment the the waitgroup's counter, pass it to the workers, and have them call Done()
when they're done. Then you just block on the waitgroup on the other end and close the output channel when they're all done, causing the output processor to exit.
Basically:
// create the wait group
wg := sync.WaitGroup{}
// this is the output channel
outchan := make(chan whatever)
// start the workers
for i := 0; i < N; i++ {
wg.Add(1) //we increment by one the waitgroup's count
//the worker pushes data onto the output channel and calls wg.Done() when done
go work(&wg, outchan)
}
// this is our "waiter" - it blocks until all workers are done and closes the channel
go func() {
wg.Wait()
close(outchan)
}()
//this loop will exit automatically when outchan is closed
for item := range outchan {
workWithIt(item)
}
// TADA!
答案2
得分: 0
首先,我想澄清一下您的术语:对于通道的两端的误解可能会导致以后的问题。您提到了“输出通道”和“输入通道”,但实际上并不存在这样的概念;只有通道。
每个通道都有两个端点:输出(写入)端和输入(读取)端。我会假设这就是您的意思。
现在来回答您的问题。
我们来看最简单的情况:您只有一个发送goroutine将数据写入通道,只有一个工作goroutine从另一端读取数据,并且通道没有缓冲区。发送goroutine在写入每个数据项时都会被阻塞,直到该数据项被消费。通常情况下,第一次消费会很快完成。一旦第一个数据项传递给工作goroutine,工作goroutine就会忙碌起来,发送goroutine必须等待第二个数据项传递。因此,会出现乒乓效应:写入者或读取者中的一个会忙碌,但不会同时忙碌。这些goroutine将以Rob Pike所描述的方式并发执行,但并不总是实际上并行执行。
如果有多个工作goroutine从通道中读取数据(它们共享输入端),发送goroutine最初可以将一个数据项分发给每个工作goroutine,但然后必须等待它们完成工作(类似于上面描述的乒乓效应)。最后,当发送goroutine发送了所有数据项后,它完成了自己的工作。然而,读取者可能尚未完成工作。有时我们关心发送者是否提前完成,有时我们不关心。使用WaitGroup可以最方便地知道何时发生这种情况(请参阅Not_a_Golfer的答案和我在相关问题上的回答)。
还有一种稍微复杂一点的选择:您可以使用返回通道来表示完成,而不是使用WaitGroup
。这并不难做到,但在这种情况下,WaitGroup
更简单。
如果通道包含缓冲区,发送者发送最后一个数据项的时间点会更早。在极限情况下,当通道的缓冲区空间与工作goroutine数量相等时,发送者可以非常快地完成,并且随后可能可以做其他事情。(超过这个缓冲区大小将是浪费的)。
这种发送者的解耦允许完全异步的行为模式,这是其他技术栈(如Node.js和JVM)中使用的模式。与它们不同,Go语言并不需要您这样做,但您可以选择这样做。
在90年代初,作为对批量同步并行(BSP)策略的工作的副作用,Leslie Valiant证明了有时非常简单的同步策略可以很廉价。关键因素是需要足够的并行松弛度(也称为过剩并行性)来保持处理器核心忙碌。这意味着必须有足够多的其他工作要做,以至于如果任何特定的goroutine被阻塞一段时间,那真的无关紧要。
有趣的是,这意味着处理较少数量的goroutine可能需要比处理更多数量的goroutine更多的注意。
了解过剩并行性的影响是有用的:如果整个网络具有过剩的并行性,通常不需要额外努力使一切都异步,因为无论如何CPU核心都会忙碌。
因此,虽然了解如何等待发送者完成是有用的,但较大的应用程序可能不需要您以同样的方式关注这个问题。
最后,WaitGroup
在BSP中是一个屏障的概念。通过结合屏障和通道,您同时使用了BSP和CSP。
英文:
Please can I firstly clarify your terminology: a misunderstanding on the ends of channels could cause problems later. You ask about "output channels" and "input channels". There is no such thing; there are only channels.
Every channel has two ends: the output (writing) end, and the input (reading) end. I will assume that that is what you meant.
Now to answer your question.
Take the simplest case: you have only one sender goroutine writing to a channel, and you only have one worker goroutine reading from the other end, and the channel has zero buffering. The sender goroutine will block as it writes each item it until that item has been consumed. Typically this happens quickly the first time. Once the first item has passed to the worker, the worker will then be busy and the sender will have to wait before the second item can be passed over. So a ping-pong effect follows: either the writer or the reader will be busy but not both. The goroutines will be concurrent in the sense described by Rob Pike, but not always actually executing in parallel.
In the case where you have many worker goroutines reading from the channel (and its input end is shared by all of them), the sender can initially distribute one item to each worker, but then it has to wait whilst they work (similar to the ping-pong case described above). Finally, when all items have been sent by the sender, it has finished its work. However, the readers may not, yet, have finished their work. Sometimes we care that the sender finishes early, and sometimes we don't. Knowing when this happens is most easily done with a WaitGroup (see Not_a_Golfer's answer and my answer to a related question).
There is a slightly more complex alternative: you can use a return channel for signalling completion instead of a WaitGroup
. This isn't hard to do, but WaitGroup
is preferred in this case, being simpler.
If instead the channel were to contain a buffer, the point at which the sender had sent its last item would happen sooner. In the limit case when the channel has one buffer space per worker; this would allow the sender to complete very quickly and then, potentially, get on with something else. (Any more buffering than this would be wasteful).
This decoupling of the sender allows a fully asynchronous pattern of behaviour, beloved of people using other technology stacks (Node-JS and the JVM spring to mind). Unlike them, Go doesn't need you to do this, but you have the choice.
Back in the early '90s, as a side-effect of work on the Bulk Synchronous Parallelism (BSP) strategy, Leslie Valiant proved that sometimes very simple synchronisation strategies can be cheap. The crucial factor is that there is a need for enough parallel slackness (a.k.a. excess parallelism) to keep the processor cores busy. That means there must be plenty enough other work to be done so that it really doesn't matter if any particular goroutine is blocked for a period of time.
Curiously, this can mean that working with smaller numbers of goroutines might require more care than working with larger numbers.
Understanding the impact of excess parallelism is useful: it is often not necessary to put extra effort into making everything asynchronous if the network as a whole has excess parallelism, because the CPU cores would be busy either way.
Therefore, although it is useful to know how to wait until your sender has completed, a larger application may not need you to be concerned in the same way.
As a final footnote, WaitGroup
is a barrier in the sense used in BSP. By combining barriers and channels, you are making use of both BSP and CSP.
答案3
得分: 0
var Z = "Z"
func Loop() {
sc := make(chan *string)
ss := make([]string, 0)
done := make(chan struct{}, 1)
go func() {
//1 查询
slice1 := []string{"a", "b", "c"}
//2 初始化等待组
var wg1 sync.WaitGroup
wg1.Add(len(slice1))
//3 循环处理slice1
loopSlice1(slice1, sc, &wg1)
//7 等待wg1完成
wg1.Wait()
sc <- &Z
done <- struct{}{}
}()
go func() {
var cc *string
for {
cc = <-sc
log.Infof("<-sc %s", *cc)
if *cc == Z {
break
}
ss = append(ss, *cc)
}
}()
<-done
log.Infof("FUN: %#v", ss)
}
func loopSlice1(slice1 []string, sc chan *string, wg1 *sync.WaitGroup) {
for i, x := range slice1 {
//4 启动goroutine
go func(n int, v string) {
//5 wg1完成
defer wg1.Done()
//6 处理中
//[1 查询
slice2 := []string{"X", "Y", "Z"}
//[2 初始化等待组
var wg2 sync.WaitGroup
wg2.Add(len(slice2))
//[3 循环处理slice2
loopSlice2(n, v, slice2, sc, &wg2)
//[7 等待wg2完成
wg2.Wait()
}(i, x)
}
}
func loopSlice2(n1 int, v1 string, slice2 []string, sc chan *string, wg2 *sync.WaitGroup) {
for j, y := range slice2 {
//[4 启动goroutine
go func(n2 int, v2 string) {
//[5 wg2完成
defer wg2.Done()
//[6 处理中
r := fmt.Sprintf("%v%v %v,%v", n1, n2, v1, v2)
sc <- &r
}(j, y)
}
}
英文:
var Z = "Z"
func Loop() {
sc := make(chan *string)
ss := make([]string, 0)
done := make(chan struct{}, 1)
go func() {
//1 QUERY
slice1 := []string{"a", "b", "c"}
//2 WG INIT
var wg1 sync.WaitGroup
wg1.Add(len(slice1))
//3 LOOP->
loopSlice1(slice1, sc, &wg1)
//7 WG WAIT<-
wg1.Wait()
sc <- &Z
done <- struct{}{}
}()
go func() {
var cc *string
for {
cc = <-sc
log.Infof("<-sc %s", *cc)
if *cc == Z {
break
}
ss = append(ss, *cc)
}
}()
<-done
log.Infof("FUN: %#v", ss)
}
func loopSlice1(slice1 []string, sc chan *string, wg1 *sync.WaitGroup) {
for i, x := range slice1 {
//4 GO
go func(n int, v string) {
//5 WG DONE
defer wg1.Done()
//6 DOING
//[1 QUERY
slice2 := []string{"X", "Y", "Z"}
//[2 WG INIT
var wg2 sync.WaitGroup
wg2.Add(len(slice2))
//[3 LOOP ->
loopSlice2(n, v, slice2, sc, &wg2)
//[7 WG WAIT <-
wg2.Wait()
}(i, x)
}
}
func loopSlice2(n1 int, v1 string, slice2 []string, sc chan *string, wg2 *sync.WaitGroup) {
for j, y := range slice2 {
//[4 GO
go func(n2 int, v2 string) {
//[5 WG DONE
defer wg2.Done()
//[6 DOING
r := fmt.Sprintf("%v%v %v,%v", n1, n2, v1, v2)
sc <- &r
}(j, y)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论