在Go语言中,通道和Go协程的同步是一个常见的问题。

huangapple go评论69阅读模式
英文:

Issue with channel / go routine synchronization in Go

问题

这是一个简单的示例程序,展示了我想要使其正常工作的基本架构/流程。我该如何使所有的数字和"end"消息都打印出来?我尝试在各个地方加入close语句,但要么不起作用,要么会出现尝试关闭已关闭通道的恐慌错误...

package main

import (
	"fmt"
	"time"
)

func main() {
	d := make(chan uint)

	go bar(d)

	c1 := make(chan uint)
	c2 := make(chan uint)
	c3 := make(chan uint)

	go foo(c1, d)
	go foo(c2, d)
	go foo(c3, d)

	c1 <- 1
	c2 <- 2
	c3 <- 3

	c1 <- 4
	c2 <- 5
	c3 <- 6

	c1 <- 7
	c2 <- 8
	c3 <- 9
}

func foo(c chan uint, d chan uint) {
	fmt.Println("foo start")

	for stuff := range c {
		time.Sleep(1)
		d <- stuff * 2
	}

	fmt.Println("foo end")
}

func bar(d chan uint) {
	fmt.Println("bar start")

	for stuff := range d {
		fmt.Printf("bar received %d\n", stuff)
	}

	fmt.Println("bar end")
}

我得到的输出结果如下所示。请注意,最后一组数字和"end"输出缺失了。

foo start
bar start
foo start
foo start
bar received 6
bar received 2
bar received 4
bar received 12
bar received 8
bar received 10

在我的实际程序中,每个"foo"函数都执行过滤和一系列复杂的字符串正则表达式操作。而我需要"bar"函数,因为它负责根据时间戳重新排序,并进行序列化打印,以避免输出交错。

英文:

Here is a small example program with the basic architecture/flow that I am trying to get working. How do I get all the numbers and "end" messages to print out? I have tried putting close statements here and there, but it either doesn't work, or I get panics about trying to close an already closed channel...

package main

import (
	&quot;fmt&quot;
	&quot;time&quot;
)

func main() {
	d := make(chan uint)
	
	go bar(d)

	c1 := make(chan uint)
	c2 := make(chan uint)
	c3 := make(chan uint)

	go foo(c1, d)
	go foo(c2, d)
	go foo(c3, d)
	
	c1 &lt;- 1
	c2 &lt;- 2
	c3 &lt;- 3
	
	c1 &lt;- 4
	c2 &lt;- 5
	c3 &lt;- 6
	
	c1 &lt;- 7
	c2 &lt;- 8
	c3 &lt;- 9
}

func foo(c chan uint, d chan uint) {
	fmt.Println(&quot;foo start&quot;)
	
	for stuff := range c {
		time.Sleep(1)
		d &lt;- stuff * 2
	}
	
	fmt.Println(&quot;foo end&quot;)
}

func bar(d chan uint) {
	fmt.Println(&quot;bar start&quot;)
	
	for stuff := range d {
		fmt.Printf(&quot;bar received %d\n&quot;, stuff)
	}
	
	fmt.Println(&quot;bar end&quot;)
}

The output I am getting looks like this. Notice the last set of numbers and the "end" outputs are missing.

foo start
bar start
foo start
foo start
bar received 6
bar received 2
bar received 4
bar received 12
bar received 8
bar received 10

In my actual program, each "foo" function is doing filtering and a bunch of heavy string regexp stuff. And I need the "bar" function, because it has the job of reordering based on a timestamp, and serializing printing, so output doesn't get interlaced.

答案1

得分: 4

你的程序在所有goroutine完成之前就退出了。你需要等待foobar的goroutine都完成后才能从main函数返回。

通常的做法是使用sync.WaitGroup来实现,但是由于main函数不是d通道的生产者,所以你需要确保在关闭d通道之前,所有对该通道的发送操作都已经完成,可以使用第二个WaitGroup(或等效的方法)来实现。

var (
    fooWG sync.WaitGroup
    barWG sync.WaitGroup
)

func main() {
    d := make(chan uint)

    barWG.Add(1)
    go bar(d)

    c1 := make(chan uint)
    c2 := make(chan uint)
    c3 := make(chan uint)

    fooWG.Add(3)
    go foo(c1, d)
    go foo(c2, d)
    go foo(c3, d)

    c1 <- 1
    c2 <- 2
    c3 <- 3

    c1 <- 4
    c2 <- 5
    c3 <- 6

    c1 <- 7
    c2 <- 8
    c3 <- 9

    // 关闭通道以使foo的goroutine退出
    close(c1)
    close(c2)
    close(c3)
    fooWG.Wait()

    // 所有foo都完成了,所以可以安全地关闭d通道并等待bar
    close(d)
    barWG.Wait()
}

func foo(c chan uint, d chan uint) {
    defer fooWG.Done()
    fmt.Println("foo start")

    for stuff := range c {
        time.Sleep(1)
        d <- stuff * 2
    }

    fmt.Println("foo end")
}

func bar(d chan uint) {
    defer barWG.Done()
    fmt.Println("bar start")

    for stuff := range d {
        fmt.Printf("bar received %d\n", stuff)
    }

    fmt.Println("bar end")
}
英文:

Your program is exiting before all goroutines are done. You need to wait for both the foo and bar goroutines to finish before returning from main.

The usual way of doing this is by using a sync.WaitGroup, but since main isn't the producer for the d channel, you will have to ensure that all sends on that channel are finished before closing that with a second WaitGroup (or equivalent).

var (
fooWG sync.WaitGroup
barWG sync.WaitGroup
)
func main() {
d := make(chan uint)
barWG.Add(1)
go bar(d)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
fooWG.Add(3)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 &lt;- 1
c2 &lt;- 2
c3 &lt;- 3
c1 &lt;- 4
c2 &lt;- 5
c3 &lt;- 6
c1 &lt;- 7
c2 &lt;- 8
c3 &lt;- 9
// close the channels so the foo goroutines can exit
close(c1)
close(c2)
close(c3)
fooWG.Wait()
// all foo are done, so it&#39;s safe to close d and wait for bar
close(d)
barWG.Wait()
}
func foo(c chan uint, d chan uint) {
defer fooWG.Done()
fmt.Println(&quot;foo start&quot;)
for stuff := range c {
time.Sleep(1)
d &lt;- stuff * 2
}
fmt.Println(&quot;foo end&quot;)
}
func bar(d chan uint) {
defer barWG.Done()
fmt.Println(&quot;bar start&quot;)
for stuff := range d {
fmt.Printf(&quot;bar received %d\n&quot;, stuff)
}
fmt.Println(&quot;bar end&quot;)
}

答案2

得分: 1

JimB的答案确实有效,但在代码中添加了比实际需要的更多的复杂性。一个简单的完整通道就足以同步这段代码的完成。

此外,使用通道同步,time.Sleep(1)命令不再需要用于功能:

package main

import (
    "fmt"
    "time"
)

func main() {
    d := make(chan uint)
    complete := make(chan bool)
    go bar(d, complete)

    c1 := make(chan uint)
    c2 := make(chan uint)
    c3 := make(chan uint)

    go foo(c1, d)
    go foo(c2, d)
    go foo(c3, d)

    c1 <- 1
    c2 <- 2
    c3 <- 3

    c1 <- 4
    c2 <- 5
    c3 <- 6

    c1 <- 7
    c2 <- 8
    c3 <- 9

    //如果你知道输入的数量,可以计数以确保完成
    for i := 0; i < 9; i++ {
        <-complete
    }

    //清理资源,避免内存泄漏
    close(c1)
    close(c2)
    close(c3)
    close(d)

    //验证bar是否正确完成并关闭
    <-complete
    close(complete)
}

func foo(c chan uint, d chan uint) {
    fmt.Println("foo start")

    for stuff := range c {
        time.Sleep(1) //程序功能不需要此行
        d <- stuff * 2
    }

    fmt.Println("foo end")
}

func bar(d chan uint, cmp chan bool) {
    fmt.Println("bar start")

    for stuff := range d {
        fmt.Printf("bar received %d\n", stuff)
        cmp <- true
    }

    fmt.Println("bar end")

    //验证cmp是否可以关闭(所有输出都完成,d已关闭)
    cmp <- true
}
英文:

JimB's answer definitely works, but it's adding more complexity than is actually needed in the code. A simple complete channel would suffice to synchronize this code though completion.

Also, with channel synchronization, the time.Sleep(1) command is no longer needed for functionality:

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
func main() {
d := make(chan uint)
complete := make(chan bool)
go bar(d, complete)
c1 := make(chan uint)
c2 := make(chan uint)
c3 := make(chan uint)
go foo(c1, d)
go foo(c2, d)
go foo(c3, d)
c1 &lt;- 1
c2 &lt;- 2
c3 &lt;- 3
c1 &lt;- 4
c2 &lt;- 5
c3 &lt;- 6
c1 &lt;- 7
c2 &lt;- 8
c3 &lt;- 9
//If you know the number of inputs, count them to ensure completion
for i:=0; i &lt; 9; i++{
&lt;-complete
}
//Clean up after yourself, to keep away the memory leaks
close(c1)
close(c2)
close(c3)
close(d)
//Verify bar is done and closed correctly
&lt;-complete
close(complete)
}
func foo(c chan uint, d chan uint) {
fmt.Println(&quot;foo start&quot;)
for stuff := range c {
time.Sleep(1)      //Not needed for the program to function
d &lt;- stuff * 2
}
fmt.Println(&quot;foo end&quot;)
}
func bar(d chan uint, cmp chan bool) {
fmt.Println(&quot;bar start&quot;)
for stuff := range d {
fmt.Printf(&quot;bar received %d\n&quot;, stuff)
cmp &lt;- true
}
fmt.Println(&quot;bar end&quot;)
//verify that cmp can be closed (all output is done, and d is closed)
cmp &lt;- true
}

huangapple
  • 本文由 发表于 2015年8月28日 22:18:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/32273218.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定