为什么所有任务都在第一个goroutine中运行?

huangapple go评论107阅读模式
英文:

Why all tasks run in first goroutine?

问题

我正在实现管道的扇入扇出模式。但我不明白为什么代码会像这样工作,请解释一下。

第一段代码。所有任务都在第一个goroutine中运行:

func main() {
	var rdNums []int
	for i := 0; i < 1000; i++ {
		rdNums = append(rdNums, i)
	}

	pl := generatePipeline(rdNums)
	c1 := fanOut(pl, "1")
	c2 := fanOut(pl, "2")
	c3 := fanOut(pl, "3")
	c4 := fanOut(pl, "4")

	c := fanIn(c1, c2, c3, c4)
	sum := 0
	for i := range c {
		sum += i
	}

	fmt.Println(sum)
}

func generatePipeline(arrNums []int) <-chan int {
	pl := make(chan int, 100)
	go func() {
		for _, n := range arrNums {
			pl <- n
		}

		close(pl)
	}()

	return pl
}

func fanOut(in <-chan int, name string) <-chan int {
	out := make(chan int)
	go func() {
		for v := range in {
			fmt.Printf("Push square of %d to channel %s \n", v*v, name)
			out <- v * v

		}

		close(out)
	}()

	return out
}

func fanIn(inputChan ...<-chan int) <-chan int {
	in := make(chan int)

	go func() {
		for _, c := range inputChan {
			for v := range c {
				in <- v
			}
		}

		close(in)
	}()

	return in
}

第二段代码。任务大致均匀地分布在每个goroutine中:

func main() {
	var rdNums []int
	for i := 0; i < 1000; i++ {
		rdNums = append(rdNums, i)
	}

	pl := createQueue(rdNums)
	for i := 0; i < 5; i++ {
		go process(pl, fmt.Sprintf("worker%d", i))
	}

	time.Sleep(1 * time.Minute)
}

func createQueue(arrNums []int) <-chan int {
	pl := make(chan int)
	go func() {
		for _, n := range arrNums {
			pl <- n
		}

		close(pl)
	}()

	return pl
}

func process(in <-chan int, name string) {
	count := 0
	go func() {
		for v := range in {
			fmt.Printf("Push square of %d to channel %s \n", v*v, name)
			count++

		}

		fmt.Printf("Process %s success, total number reiceive %d\n", name, count)
	}()
}

我真的想理解为什么会有这种差异?谢谢。

英文:

I'm implementing pipeline fan-in fan-out pattern. Buut I don't understand why the code is working like this, please explain me.

My first codes. All tasks run into first goroutine:
https://go.dev/play/p/X6DqEuj86cZ

func main() {
var rdNums []int
for i := 0; i &lt; 1000; i++ {
rdNums = append(rdNums, i)
}
pl := generatePipeline(rdNums)
c1 := fanOut(pl, &quot;1&quot;)
c2 := fanOut(pl, &quot;2&quot;)
c3 := fanOut(pl, &quot;3&quot;)
c4 := fanOut(pl, &quot;4&quot;)
c := fanIn(c1, c2, c3, c4)
sum := 0
for i := range c {
sum += i
}
fmt.Println(sum)
}
func generatePipeline(arrNums []int) &lt;-chan int {
pl := make(chan int, 100)
go func() {
for _, n := range arrNums {
pl &lt;- n
}
close(pl)
}()
return pl
}
func fanOut(in &lt;-chan int, name string) &lt;-chan int {
out := make(chan int)
go func() {
for v := range in {
fmt.Printf(&quot;Push square of %d to channel %s \n&quot;, v*v, name)
out &lt;- v * v
}
close(out)
}()
return out
}
func fanIn(inputChan ...&lt;-chan int) &lt;-chan int {
in := make(chan int)
go func() {
for _, c := range inputChan {
for v := range c {
in &lt;- v
}
}
close(in)
}()
return in
}

My second codes. The tasks are roughly evenly on each goroutine:
https://go.dev/play/p/7vg8X6KgUrp

func main() {
var rdNums []int
for i := 0; i &lt; 1000; i++ {
rdNums = append(rdNums, i)
}
pl := createQueue(rdNums)
for i := 0; i &lt; 5; i++ {
go process(pl, fmt.Sprintf(&quot;worker%d&quot;, i))
}
time.Sleep(1 * time.Minute)
}
func createQueue(arrNums []int) &lt;-chan int {
pl := make(chan int)
go func() {
for _, n := range arrNums {
pl &lt;- n
}
close(pl)
}()
return pl
}
func process(in &lt;-chan int, name string) {
count := 0
go func() {
for v := range in {
fmt.Printf(&quot;Push square of %d to channel %s \n&quot;, v*v, name)
count++
}
fmt.Printf(&quot;Process %s success, total number reiceive %d\n&quot;, name, count)
}()
}

I really wanna understand why there is this difference?? Tks.

答案1

得分: 4

你的第一个fan-in实现有问题。

    go func() {
        for _, c := range inputChan {
            for v := range c {
                in <- v
            }
        }
    }

上述的goroutine会获取第一个inputChan,并从中读取数据,直到该通道关闭,也就是直到生成器完成。有四个goroutine向四个通道发送数据,所以当第一个inputChan正在处理时,其他所有通道都将等待发送到各自的通道。一旦第一个inputChan完成并关闭,剩下的三个goroutine就可以发送它们的输入,程序终止。

可以通过将从输入通道读取数据的内部循环移动到单独的goroutine中来解决这个问题,每个通道一个goroutine,并将数据发送到一个共同的输出通道。

英文:

Your first fan-in implementation is broken.

    go func() {
for _, c := range inputChan {
for v := range c {
in &lt;- v
}
}

The above goroutine will get the first inputChan, and it will read from it until that channel is closed, which means until the generator is done. There are four goroutines sending data to four channels, so while the first inputChan is being processed, all other will be waiting to send to their respective channels. Once the first inputChan is finished and closed, the remaining three goroutines can send their inputs, and the program terminates.

The problem can be fixed by moving the inner-loop where you are reading from the input channels into individual goroutines, one for each channel, and sending the data to a common output channel.

huangapple
  • 本文由 发表于 2023年7月6日 18:31:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/76627904.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定