英文:
Process output of arbitrary number of goroutines as they finish
问题
WaitGroups 用于在继续执行之前“等待”所有 goroutine 完成,但是在它们完成时如何处理它们的输出呢?
这种方法还可以:
c := make(chan string)
rc := 0
for _, url := range urls {
rc++
go func(url string) {
data := get(url)
c <- data
}(url)
}
for i := 0; i < rc; i++ {
data <- c
}
然而,当你需要从 goroutine 中调用 goroutine 时,它就不再起作用了:
for _, url := range urls {
go func(url string) {
data := get(url)
urls := get_urls(data)
for _, url := range urls {
go func(url){
data := get(url)
c <- data
}(url)
}
}(url)
}
这次我们需要将 rc++
放在 goroutine 内部,这将导致未定义的行为。怎么办呢?
英文:
WaitGroups are used to wait for all goroutines to finish before continuing execution, but how do you process their outputs as they finish?
This method works okay
c := make(chan string)
rc := 0
for _, url := range urls {
rc++
go func(url string) {
data := get(url)
c <- data
}(url)
}
for i := 0; i < rc; i++ {
data <- c
}
It stops working okay however when you have to call goroutines from goroutines
for _, url := range urls {
go func(url string) {
data := get(url)
urls := get_urls(data)
for _, url := range urls {
go func(url){
data := get(url)
c <- data
}(url)
}
}(url)
}
This time we will have to put rc++
inside of goroutine which will result in undefined behavior. What do?
答案1
得分: 1
有很多方法可以解决这个问题,但最惯用(因此可能是“最好的”)的方法是根据colm.anseo在评论中提出的建议进行操作:让“处理输出”例程使用一个范围循环,并让生成例程使用一个带有关闭goroutine的sync.WaitGroup
计数器,代码如下:
// result channel:
c := make(chan string)
// wait-group:
var wg sync.WaitGroup
// 遍历URLs
for _, url := range urls {
wg.Add(1) // 计数器加1
go start(&wg, url, c)
}
go func() {
// 等待所有start()函数完成
wg.Wait()
// 现在关闭c
close(c)
}()
for data := range c {
// 处理数据
}
现在我们来看看每个start
函数是如何工作的:
// start()从URL加载一组URL,
// 然后启动工作goroutine从每个URL读取数据,并将数据发送到通道c。
// 当所有子goroutine都完成时,start()发出完成信号。
func start(wg *sync.Waitgroup, url string, c chan string) {
defer wg.Done()
data := get(url)
urls := get_urls(data)
var subWG sync.WaitGroup
for _, url := range urls {
subWG.Add(1)
go func(url) {
defer subWG.Done()
data := get(url)
c <- data
}(url)
}
subWG.Wait()
}
当然,start
函数可以内联,就像你最初所做的那样;我之所以以这种方式编写它是为了清晰明了。
(通常情况下,你真正想要的是每个URL的固定数量的工作goroutine,也就是工作池模式。网络上有很多这方面的例子。)
英文:
There are plenty of ways to solve this, but the most idiomatic (thus probably "best") is as colm.anseo suggested in a comment: have the "process output" routine use a ranged for loop, and have the generating routines use a sync.WaitGroup
counter with a closing goroutine, like this:
// result channel:
c := make(chan string)
// wait-group:
var wg sync.WaitGroup
// run over the URLs
for _, url := range urls {
wg.Add(1) // count up another start()
go start(&wg, url, c)
}
go func() {
// wait for all start()s to say they are done
wg.Wait()
// and now close c
close(c)
}()
for data := range c {
// deal with data
}
Now we can see how each start
function works:
// start() loads up a set of URLs from a URL,
// then spins off worker goroutines that read from
// each of those URLs, sending data to channel c.
// When all of its subsidiary goroutines have finished,
// start() signals that it is done.
func start(wg *sync.Waitgroup, url string, c chan string) {
defer wg.Done()
data := get(url)
urls := get_urls(data)
var subWG sync.WaitGroup
for _, url := range urls {
subWG.Add(1)
go func(url) {
defer subWG.Done()
data := get(url)
c <- data
}(url)
}
subWG.Wait()
}
Function start
can of course be inlined, as you did originally; I wrote it this way for clarity.
(In general, rather than spinning off some mystery number of goroutines per URL, you really want a "limited number of workers" pattern here, i.e., a worker pool. There are lots of examples of this on the web.)
答案2
得分: 1
当你有一个不确定数量的结果时,最好让工作线程和工作线程管理器协程来管理结果通道,并在所有结果完成时关闭它。这样可以避免混乱的计数/互斥逻辑。
所以,将你的第一个示例转换为:
c := make(chan string)
var wg sync.WaitGroup // 用于管理协程完成的管理器协程
for _, url := range urls {
wg.Add(1) // 即将启动工作线程
go func(url string) {
defer wg.Done() // 工作线程完成
data := get(url)
c <- data
}(url)
}
// 管理器协程
go func() {
wg.Wait() // 所有工作线程都完成...
close(c) // ...通过关闭通道来发出信号
}()
// 结果收集非常简单
for data := range c {
fmt.Println(data)
}
这个设计在工作线程创建更多协程的情况下同样适用。但是,如@torek所指出的,需要一个额外的sync.WaitGroup
:
c := make(chan string)
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1) // 新的工作线程
go func(url string) {
defer wg.Done() // 工作线程完成
data := get(url)
urls := get_urls(data)
var wg2 sync.WaitGroup
for _, url := range urls {
wg2.Add(1)
go func(url string) {
defer wg2.Done()
data := get(url)
c <- data
}(url)
}
wg2.Wait()
}(url)
}
// 管理器协程
go func() {
wg.Wait() // 所有工作线程都完成...
close(c) // ...通过关闭通道来发出信号
}()
for data := range c {
fmt.Println(data)
}
希望对你有帮助!
英文:
When you have an indeterminate number of results - its best to let the workers and the worker manager goroutine manage the results channel and close it when all results are complete. This avoid messy counting/mutex logic.
So to convert your first example:
c := make(chan string)
var wg sync.WaitGroup // used by manager goroutine to determine finish
for _, url := range urls {
wg.Add(1) // about to start worker
go func(url string) {
defer wg.Done() // worker is complete
data := get(url)
c <- data
}(url)
}
// manager goroutine
go func() {
wg.Wait() // all workers are done ...
close(c) // ... so signal this via channel close
}()
// results collection is then very simple
for data := range c {
fmt.Println(data)
}
This design applies identically to your situation where the workers create more goroutines. But as @torek noted an extra sync.WaitGroup
is needed:
c := make(chan string)
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1) // new worker
go func(url string) {
defer wg.Done() // worker done
data := get(url)
urls := get_urls(data)
var wg2 sync.WaitGroup
for _, url := range urls {
wg2.Add(1)
go func(url) {
defer wg2.Done()
data := get(url)
c <- data
}(url)
}
wg2.Wait()
}(url)
}
// manager goroutine
go func() {
wg.Wait() // all workers are done ...
close(c) // ... so signal this via channel close
}()
for data := range c {
fmt.Println(data)
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论