英文:
Closing a Go channel, and syncing a go routine
问题
我无法终止我的WaitGroup,因此无法退出range循环。有人能告诉我为什么吗?或者有没有更好的方法来限制go协程的数量,同时仍然能够在通道关闭时退出!
我看过的大多数示例都与静态类型的通道长度有关,但是这个通道的大小是根据其他进程的结果动态调整的。
在示例中,打印语句("DONE!")显示testValProducer正确打印了正确的次数,但代码从未达到("--EXIT--"),这意味着wg.Wait仍然在某种程度上阻塞。
type TestValContainer chan string
func StartFunc(){
testValContainer := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg := sync.WaitGroup{}
// 限制worker goroutines的数量
for i:=0; i < 3; i++ {
wg.Add(1)
go func(){
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer{
testValProducer(str, &wg)
}
fmt.Println(v, "--EXIT --") // 从未调用
}()
}
wg.Wait()
close(testValContainer)
}
func get(url string){
http.Get(url)
ch <- getUnvisited()
}
func testValProducer(testStr string, wg *sync.WaitGroup){
doSomething(testStr)
fmt.Println("done !") // 被调用
wg.Done() // 没有效果??
}
英文:
Im unable to terminate my WaitGroup in go and consequently can't exit the range loop. Can anybody tell me why. Or a better way of limiting the number of go routines while still being able to exit on chan close!
Most examples i have seen relate to a statically typed chan length, but this channel is dynamically resized as a result of other processes.
The print statement ("DONE!") in the example are printed showing that the testValProducer prints the right amount of times but the code never reaches ("--EXIT--") which means wg.Wait is still blocking somehow.
type TestValContainer chan string
func StartFunc(){
testValContainer := make(TestValContainer)
go func(){testValContainer <- "string val 1"}()
go func(){testValContainer <- "string val 2"}()
go func(){testValContainer <- "string val 3"}()
go func(){testValContainer <- "string val 4"}()
go func(){testValContainer <- "string val 5"}()
go func(){testValContainer <- "string val 6"}()
go func(){testValContainer <- "string val 7"}()
wg := sync.WaitGroup{}
// limit the number of worker goroutines
for i:=0; i < 3; i++ {
wg.Add(1)
go func(){
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer{
testValProducer(str, &wg)
}
fmt.Println(v, "--EXIT --") // never called
}()
}
wg.Wait()
close(testValContainer)
}
func get(url string){
http.Get(url)
ch <- getUnvisited()
}
func testValProducer(testStr string, wg *sync.WaitGroup){
doSomething(testStr)
fmt.Println("done !") // called
wg.Done() // NO EFFECT??
}
答案1
得分: 0
在你的示例中,有两个错误:
- 你在每个工作线程的循环中调用了
wg.Done
,而不是在工作线程结束之前(就在它完成之前)。对wg.Done
的调用必须与wg.Add(1)
一一对应。 - 修复了上述问题后,存在一个死锁,即主线程正在等待工作线程完成,而工作线程正在等待主线程关闭输入通道。
如果你更清楚地将“生产者”和“消费者”两个方面分开,逻辑会更清晰、更容易理解。为每个方面运行一个单独的goroutine。示例代码如下:
// 生产者方面(只允许写和关闭)。
go func() {
testValContainer <- "string val 1"
testValContainer <- "string val 2"
testValContainer <- "string val 3"
testValContainer <- "string val 4"
testValContainer <- "string val 5"
testValContainer <- "string val 6"
testValContainer <- "string val 7"
close(testValContainer) // 信号表示生产完成。
}()
// 消费者方面(只允许读)。
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer {
doSomething(str)
}
fmt.Println(v, "--EXIT --")
}()
}
wg.Wait()
如果项目是从其他来源生成的,可能是一组goroutine,你仍然应该有以下两种方式之一:1)一个单独的goroutine或逻辑来监督生产并在完成后调用close
,或者2)使主线程等待生产方面完成(例如,使用WaitGroup
等待“生产者”goroutine),并在等待消费方面之前关闭通道。
如果你仔细考虑一下,无论如何安排逻辑,你都需要有某种“侧通道”方式,在一个同步的地方检测到不再产生更多的消息。否则,你永远无法知道何时关闭通道。
换句话说,你不能等待消费者方面的范围循环完成后触发close
,因为这会导致一个进退两难的情况。
英文:
In your example you have two errors:
- You are calling
wg.Done
in side the loop in each worker thread rather than at the end of the worker thread (right before it completes). The calls towg.Done
must be matched one-to-one withwg.Add(1)
s. - With that fixed, there is a deadlock where the main thread is waiting for the worker threads to complete, while the worker threads area waiting for the input channel to be closed by the main thread.
The logic will be cleaner and easier to understand if you separate the producer side from the consumer side more clearly. Run a separate goroutine for each side. Example:
// Producer side (only write and close allowed).
go func() {
testValContainer <- "string val 1"
testValContainer <- "string val 2"
testValContainer <- "string val 3"
testValContainer <- "string val 4"
testValContainer <- "string val 5"
testValContainer <- "string val 6"
testValContainer <- "string val 7"
close(testValContainer) // Signals that production is done.
}()
// Consumer side (only read allowed).
for i:=0; i < 3; i++ {
wg.Add(1)
go func() {
defer wg.Done()
v := i
fmt.Printf("launching %v", i)
for str := range testValContainer {
doSomething(str)
}
fmt.Println(v, "--EXIT --")
}()
}
wg.Wait()
If the items are being produced from some other source, potentially a collection of goroutines, you should still have either: 1) a separate goroutine or logic somewhere that oversees that production and calls close
once it's done, or 2) make your main thread wait for the production side to complete (e.g. with a WaitGroup
waiting for the producer goroutines) and close the channel before waiting for the consumptions side.
If you think about it, no matter how you arrange the logic you need to have some "side-channel" way of detecting, in one single synchronised place, that there are no more messages being produced. Otherwise you can never know when the channel should be closed.
In other words, you can't wait for the range loops on the consumer side to complete to trigger the close
, as this leads to a catch 22.
答案2
得分: 0
我可能会这样做,这样可以使代码易于理解。我定义了一个结构体,其中实现了一个信号量来控制正在运行的 Go 协程的数量,并允许我在它们到达时从通道中读取数据。
package main
import (
"fmt"
"sync"
)
type TestValContainer struct {
wg sync.WaitGroup
sema chan struct{}
data chan int
}
func doSomething(number int) {
fmt.Println(number)
}
func main() {
// 信号量限制同时运行的协程数为 10
tvc := TestValContainer{
sema: make(chan struct{}, 10),
data: make(chan int),
}
for i := 0; i <= 100; i++ {
tvc.wg.Add(1)
go func(i int) {
tvc.sema <- struct{}{}
defer func() {
<-tvc.sema
tvc.wg.Done()
}()
tvc.data <- i
}(i)
}
// 在后台等待,以防止等待和关闭通道阻塞下面的 for 循环
go func() {
tvc.wg.Wait()
close(tvc.data)
}()
// 获取通道结果
for res := range tvc.data {
doSomething(res)
}
}
希望对你有帮助!
英文:
I might do something like this, it keeps everything easy to follow. I define a structure which implements a semaphore to control the number of active Go routines spinning up... and allows me to read from the channel as they come in.
package main
import (
"fmt"
"sync"
)
type TestValContainer struct {
wg sync.WaitGroup
sema chan struct{}
data chan int
}
func doSomething(number int) {
fmt.Println(number)
}
func main() {
//semaphore limit 10 routines at time
tvc := TestValContainer{
sema: make(chan struct{}, 10),
data: make(chan int),
}
for i := 0; i <= 100; i++ {
tvc.wg.Add(1)
go func(i int) {
tvc.sema <- struct{}{}
defer func() {
<-tvc.sema
tvc.wg.Done()
}()
tvc.data <- i
}(i)
}
// wait in the background so that waiting and closing the channel dont
// block the for loop below
go func() {
tvc.wg.Wait()
close(tvc.data)
}()
// get channel results
for res := range tvc.data {
doSomething(res)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论