处理任意数量的goroutine完成后的输出

huangapple go评论88阅读模式
英文:

Process output of arbitrary number of goroutines as they finish

问题

WaitGroups 用于在继续执行之前“等待”所有 goroutine 完成,但是在它们完成时如何处理它们的输出呢?

这种方法还可以:

c := make(chan string)
rc := 0
for _, url := range urls {
    rc++
    go func(url string) {
        data := get(url)
        c <- data
    }(url)
}
for i := 0; i < rc; i++ {
    data <- c
}

然而,当你需要从 goroutine 中调用 goroutine 时,它就不再起作用了:

for _, url := range urls {
    go func(url string) {
        data := get(url)
        urls := get_urls(data)
        for _, url := range urls {
            go func(url){
                data := get(url)
                c <- data
            }(url)
        }
    }(url)
}

这次我们需要将 rc++ 放在 goroutine 内部,这将导致未定义的行为。怎么办呢?

英文:

WaitGroups are used to wait for all goroutines to finish before continuing execution, but how do you process their outputs as they finish?

This method works okay

c := make(chan string)
rc := 0
for _, url := range urls {
	rc++
	go func(url string) {
		data := get(url)
		c &lt;- data
	}(url)
}
for i := 0; i &lt; rc; i++ {
    data &lt;- c
}

It stops working okay however when you have to call goroutines from goroutines

for _, url := range urls {
	go func(url string) {
		data := get(url)
        urls := get_urls(data)
        for _, url := range urls {
            go func(url){
                data := get(url)
                c &lt;- data
            }(url)
        }
	}(url)
}

This time we will have to put rc++ inside of goroutine which will result in undefined behavior. What do?

答案1

得分: 1

有很多方法可以解决这个问题,但最惯用(因此可能是“最好的”)的方法是根据colm.anseo在评论中提出的建议进行操作:让“处理输出”例程使用一个范围循环,并让生成例程使用一个带有关闭goroutine的sync.WaitGroup计数器,代码如下:

// result channel:
c := make(chan string)

// wait-group:
var wg sync.WaitGroup

// 遍历URLs
for _, url := range urls {
wg.Add(1) // 计数器加1
go start(&wg, url, c)
}
go func() {
// 等待所有start()函数完成
wg.Wait()
// 现在关闭c
close(c)
}()
for data := range c {
// 处理数据
}

现在我们来看看每个start函数是如何工作的:

// start()从URL加载一组URL,
// 然后启动工作goroutine从每个URL读取数据,并将数据发送到通道c。
// 当所有子goroutine都完成时,start()发出完成信号。
func start(wg *sync.Waitgroup, url string, c chan string) {
defer wg.Done()
data := get(url)
urls := get_urls(data)
var subWG sync.WaitGroup
for _, url := range urls {
subWG.Add(1)
go func(url) {
defer subWG.Done()
data := get(url)
c <- data
}(url)
}
subWG.Wait()
}

当然,start函数可以内联,就像你最初所做的那样;我之所以以这种方式编写它是为了清晰明了。

(通常情况下,你真正想要的是每个URL的固定数量的工作goroutine,也就是工作池模式。网络上有很多这方面的例子。)

英文:

There are plenty of ways to solve this, but the most idiomatic (thus probably "best") is as colm.anseo suggested in a comment: have the "process output" routine use a ranged for loop, and have the generating routines use a sync.WaitGroup counter with a closing goroutine, like this:

// result channel:
c := make(chan string)

// wait-group:
var wg sync.WaitGroup

// run over the URLs
for _, url := range urls {
    wg.Add(1) // count up another start()
    go start(&amp;wg, url, c)
}
go func() {
    // wait for all start()s to say they are done
    wg.Wait()
    // and now close c
    close(c)
}()
for data := range c {
    // deal with data
}

Now we can see how each start function works:

// start() loads up a set of URLs from a URL,
// then spins off worker goroutines that read from
// each of those URLs, sending data to channel c.
// When all of its subsidiary goroutines have finished,
// start() signals that it is done.
func start(wg *sync.Waitgroup, url string, c chan string) {
    defer wg.Done()
    data := get(url)
    urls := get_urls(data)
    var subWG sync.WaitGroup
    for _, url := range urls {
        subWG.Add(1)
        go func(url) {
            defer subWG.Done()
            data := get(url)
            c &lt;- data
        }(url)
    }
    subWG.Wait()
}

Function start can of course be inlined, as you did originally; I wrote it this way for clarity.

(In general, rather than spinning off some mystery number of goroutines per URL, you really want a "limited number of workers" pattern here, i.e., a worker pool. There are lots of examples of this on the web.)

答案2

得分: 1

当你有一个不确定数量的结果时,最好让工作线程和工作线程管理器协程来管理结果通道,并在所有结果完成时关闭它。这样可以避免混乱的计数/互斥逻辑。

所以,将你的第一个示例转换为:

c := make(chan string)

var wg sync.WaitGroup // 用于管理协程完成的管理器协程

for _, url := range urls {
    wg.Add(1) // 即将启动工作线程
    go func(url string) {
        defer wg.Done() // 工作线程完成
        data := get(url)
        c <- data
    }(url)
}

// 管理器协程
go func() {
    wg.Wait() // 所有工作线程都完成...
    close(c) // ...通过关闭通道来发出信号
}()

// 结果收集非常简单
for data := range c {
    fmt.Println(data)
}

这个设计在工作线程创建更多协程的情况下同样适用。但是,如@torek所指出的,需要一个额外的sync.WaitGroup

c := make(chan string)

var wg sync.WaitGroup

for _, url := range urls {
    wg.Add(1) // 新的工作线程
    go func(url string) {
        defer wg.Done() // 工作线程完成
        data := get(url)
        urls := get_urls(data)
        var wg2 sync.WaitGroup
        for _, url := range urls {
            wg2.Add(1)
            go func(url string) {
                defer wg2.Done()
                data := get(url)
                c <- data
            }(url)
        }
        wg2.Wait()
    }(url)
}

// 管理器协程
go func() {
    wg.Wait() // 所有工作线程都完成...
    close(c)  // ...通过关闭通道来发出信号
}()

for data := range c {
    fmt.Println(data)
}

希望对你有帮助!

英文:

When you have an indeterminate number of results - its best to let the workers and the worker manager goroutine manage the results channel and close it when all results are complete. This avoid messy counting/mutex logic.

So to convert your first example:

c := make(chan string)

var wg sync.WaitGroup // used by manager goroutine to determine finish

for _, url := range urls {
	wg.Add(1) // about to start worker
	go func(url string) {
		defer wg.Done() // worker is complete
		data := get(url)
		c &lt;- data
	}(url)
}

// manager goroutine
go func() {
	wg.Wait() // all workers are done ...
	close(c) // ... so signal this via channel close
}()

// results collection is then very simple
for data := range c {
	fmt.Println(data)
}

This design applies identically to your situation where the workers create more goroutines. But as @torek noted an extra sync.WaitGroup is needed:

c := make(chan string)

var wg sync.WaitGroup

for _, url := range urls {
	wg.Add(1) // new worker
	go func(url string) {
		defer wg.Done() // worker done
		data := get(url)
		urls := get_urls(data)
        var wg2 sync.WaitGroup
		for _, url := range urls {
            wg2.Add(1)
			go func(url) {
                defer wg2.Done()
				data := get(url)
				c &lt;- data
			}(url)
		}
        wg2.Wait()
	}(url)
}

// manager goroutine
go func() {
	wg.Wait() // all workers are done ...
	close(c)  // ... so signal this via channel close
}()

for data := range c {
	fmt.Println(data)
}

huangapple
  • 本文由 发表于 2021年9月12日 09:12:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/69147764.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定