英文:
Issue with channel / go routine synchronization in Go
问题
这是一个简单的示例程序,展示了我想要使其正常工作的基本架构/流程。我该如何使所有的数字和"end"消息都打印出来?我尝试在各个地方加入close语句,但要么不起作用,要么会出现尝试关闭已关闭通道的恐慌错误...
package main
import (
"fmt"
"time"
)
func main() {
d := make(chan uint)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
}
func foo(c chan uint, d chan uint) {
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1)
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint) {
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d\n", stuff)
}
fmt.Println("bar end")
}
我得到的输出结果如下所示。请注意,最后一组数字和"end"输出缺失了。
foo start
bar start
foo start
foo start
bar received 6
bar received 2
bar received 4
bar received 12
bar received 8
bar received 10
在我的实际程序中,每个"foo"函数都执行过滤和一系列复杂的字符串正则表达式操作。而我需要"bar"函数,因为它负责根据时间戳重新排序,并进行序列化打印,以避免输出交错。
英文:
Here is a small example program with the basic architecture/flow that I am trying to get working. How do I get all the numbers and "end" messages to print out? I have tried putting close statements here and there, but it either doesn't work, or I get panics about trying to close an already closed channel...
package main
import (
"fmt"
"time"
)
func main() {
d := make(chan uint)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
}
func foo(c chan uint, d chan uint) {
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1)
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint) {
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d\n", stuff)
}
fmt.Println("bar end")
}
The output I am getting looks like this. Notice the last set of numbers and the "end" outputs are missing.
foo start
bar start
foo start
foo start
bar received 6
bar received 2
bar received 4
bar received 12
bar received 8
bar received 10
In my actual program, each "foo" function is doing filtering and a bunch of heavy string regexp stuff. And I need the "bar" function, because it has the job of reordering based on a timestamp, and serializing printing, so output doesn't get interlaced.
答案1
得分: 4
你的程序在所有goroutine完成之前就退出了。你需要等待foo
和bar
的goroutine都完成后才能从main
函数返回。
通常的做法是使用sync.WaitGroup
来实现,但是由于main
函数不是d
通道的生产者,所以你需要确保在关闭d
通道之前,所有对该通道的发送操作都已经完成,可以使用第二个WaitGroup
(或等效的方法)来实现。
var (
fooWG sync.WaitGroup
barWG sync.WaitGroup
)
func main() {
d := make(chan uint)
barWG.Add(1)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
fooWG.Add(3)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
// 关闭通道以使foo的goroutine退出
close(c1)
close(c2)
close(c3)
fooWG.Wait()
// 所有foo都完成了,所以可以安全地关闭d通道并等待bar
close(d)
barWG.Wait()
}
func foo(c chan uint, d chan uint) {
defer fooWG.Done()
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1)
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint) {
defer barWG.Done()
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d\n", stuff)
}
fmt.Println("bar end")
}
英文:
Your program is exiting before all goroutines are done. You need to wait for both the foo
and bar
goroutines to finish before returning from main
.
The usual way of doing this is by using a sync.WaitGroup
, but since main
isn't the producer for the d
channel, you will have to ensure that all sends on that channel are finished before closing that with a second WaitGroup (or equivalent).
var (
fooWG sync.WaitGroup
barWG sync.WaitGroup
)
func main() {
d := make(chan uint)
barWG.Add(1)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
fooWG.Add(3)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
// close the channels so the foo goroutines can exit
close(c1)
close(c2)
close(c3)
fooWG.Wait()
// all foo are done, so it's safe to close d and wait for bar
close(d)
barWG.Wait()
}
func foo(c chan uint, d chan uint) {
defer fooWG.Done()
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1)
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint) {
defer barWG.Done()
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d\n", stuff)
}
fmt.Println("bar end")
}
答案2
得分: 1
JimB的答案确实有效,但在代码中添加了比实际需要的更多的复杂性。一个简单的完整通道就足以同步这段代码的完成。
此外,使用通道同步,time.Sleep(1)
命令不再需要用于功能:
package main
import (
"fmt"
"time"
)
func main() {
d := make(chan uint)
complete := make(chan bool)
go bar(d, complete)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
//如果你知道输入的数量,可以计数以确保完成
for i := 0; i < 9; i++ {
<-complete
}
//清理资源,避免内存泄漏
close(c1)
close(c2)
close(c3)
close(d)
//验证bar是否正确完成并关闭
<-complete
close(complete)
}
func foo(c chan uint, d chan uint) {
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1) //程序功能不需要此行
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint, cmp chan bool) {
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d\n", stuff)
cmp <- true
}
fmt.Println("bar end")
//验证cmp是否可以关闭(所有输出都完成,d已关闭)
cmp <- true
}
英文:
JimB's answer definitely works, but it's adding more complexity than is actually needed in the code. A simple complete channel would suffice to synchronize this code though completion.
Also, with channel synchronization, the time.Sleep(1)
command is no longer needed for functionality:
package main
import (
"fmt"
"time"
)
func main() {
d := make(chan uint)
complete := make(chan bool)
go bar(d, complete)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 <- 1
c2 <- 2
c3 <- 3
c1 <- 4
c2 <- 5
c3 <- 6
c1 <- 7
c2 <- 8
c3 <- 9
//If you know the number of inputs, count them to ensure completion
for i:=0; i < 9; i++{
<-complete
}
//Clean up after yourself, to keep away the memory leaks
close(c1)
close(c2)
close(c3)
close(d)
//Verify bar is done and closed correctly
<-complete
close(complete)
}
func foo(c chan uint, d chan uint) {
fmt.Println("foo start")
for stuff := range c {
time.Sleep(1) //Not needed for the program to function
d <- stuff * 2
}
fmt.Println("foo end")
}
func bar(d chan uint, cmp chan bool) {
fmt.Println("bar start")
for stuff := range d {
fmt.Printf("bar received %d\n", stuff)
cmp <- true
}
fmt.Println("bar end")
//verify that cmp can be closed (all output is done, and d is closed)
cmp <- true
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论