select vs multiple concurrent coroutines receiving on different channels : Is there a difference in logic or in performance?

huangapple go评论80阅读模式
英文:

select vs multiple concurrent coroutines receiving on different channels : Is there a difference in logic or in performance?

问题

选择与同时接收不同通道的多个协程:逻辑或性能上有区别吗?

我的问题更多地涉及Go中“扇入”方案的实现。在我看来,使用“select”方案在通道数量任意大(通道数组较大)的情况下不起作用。

请参见下面的示例中的receive()和receive2()函数。

receive2()函数是否过于复杂?是否过度设计?

为什么选择语法被认为更符合惯用法?

package main

import (
	"fmt"
	"time"
)

func main() {

	var ch1 = make(chan int)
	var ch2 = make(chan int)

	send(ch1, ch2)
	//receive(ch1, ch2)
	receive2(ch1, ch2)

	time.Sleep(3 * time.Second)

}

func send(ch1 chan int, ch2 chan int) {
	go func() {
		for i := 0; i < 10; i++ {
			ch1 <- i
		}
		close(ch1)
	}()
	go func() {
		for i := 0; i < 10; i++ {
			ch2 <- i
		}
		close(ch2)
	}()
}

func receive(ch1 chan int, ch2 chan int) {
	go func() {
		for item := range ch1 {
			fmt.Printf("1: %d\n", item)
		}
	}()

	go func() {
		for item := range ch2 {
			fmt.Printf("2: %d\n", item)
		}
	}()
}

func receive2(ch1 chan int, ch2 chan int) {
	for {
		select {
		case x, ok := <-ch1:
			fmt.Println("ch1", x, ok)
			if !ok {
				ch1 = nil
			}
		case x, ok := <-ch2:
			fmt.Println("ch2", x, ok)
			if !ok {
				ch2 = nil
			}
		}

		if ch1 == nil && ch2 == nil {
			break
		}
	}
}

英文:

Select vs multiple concurrent coroutines receiving on different channels : Is there a difference in logic or in performance ?

My question is more generally about the implementation of an "fan-in" scheme in Go. It seems to me that the scheme using "select" does not work in the case of an arbitrarily large amount of channels (large array of channels).

See receive() and receive2() in the example hereunder.

Is the receive2() function overcomplicated? Overkill?

Why is the select formulation regarded as more idiomatic ?

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
func main() {
var ch1 = make(chan int)
var ch2 = make(chan int)
send(ch1, ch2)
//receive(ch1, ch2)
receive2(ch1, ch2)
time.Sleep(3 * time.Second)
}
func send(ch1 chan int, ch2 chan int) {
go func() {
for i := 0; i &lt; 10; i++ {
ch1 &lt;- i
}
close(ch1)
}()
go func() {
for i := 0; i &lt; 10; i++ {
ch2 &lt;- i
}
close(ch2)
}()
}
func receive(ch1 chan int, ch2 chan int) {
go func() {
for item := range ch1 {
fmt.Printf(&quot;1: %d\n&quot;, item)
}
}()
go func() {
for item := range ch2 {
fmt.Printf(&quot;2: %d\n&quot;, item)
}
}()
}
func receive2(ch1 chan int, ch2 chan int) {
for {
select {
case x, ok := &lt;-ch1:
fmt.Println(&quot;ch1&quot;, x, ok)
if !ok {
ch1 = nil
}
case x, ok := &lt;-ch2:
fmt.Println(&quot;ch2&quot;, x, ok)
if !ok {
ch2 = nil
}
}
if ch1 == nil &amp;&amp; ch2 == nil {
break
}
}
}

答案1

得分: 2

你的实现不是一个 fan-in。fan-in 将多个通道的结果收集到一个通道中。类似这样,你可以在单个 out 通道上接收所有的值。

func fanIn(in ...chan int) chan int {
    out := make(chan int)
    for _, c := range in {
        go func(i chan int) {
            for v := range i {
                out <- v
            }
        }(c)
    }
    return out
}

在你的实现中,第一个 receive() 只是并发地从 N 个通道中接收,彼此独立。它可以重写为接受可变参数,并且看起来更像一个真正的 fan-in:

func receive(cs ...chan int) {
    for i, cN := range cs {
        go func(i int, c chan int) {
            for item := range c {
                fmt.Printf("%d: %d\n", i, item)
            }
        }(i, cN)
    }
}

receive2() 实际上是顺序执行的,不会扩展,因为你需要为每个通道编写一个 case,并将它们全部用 && 连接在一起,以确定何时退出循环。&& 可以用 sync.WaitGroup 重写,但你仍然每次迭代只处理一个项目(在准备好接收的 case 中随机选择)。

英文:

Your implementation is not a fan-in. The fan-in collects the results from multiple channels into one channel. Something like this, where you can receive all values on the single out channel.

func fanIn(in ...chan int) chan int {
out := make(chan int)
for _, c := range in {
go func(i chan int) {
for v := range i {
out &lt;- v
}
}(c)
}
return out
}

In your implementation, the first receive() is simply receiving concurrently from N channels, independent of each other. It can be rewritten to accept a vararg and can look more like an actual fan-in:

func receive(cs ...chan int) {
for i, cN := range cs {
go func(i int, c chan int) {
for item := range c {
fmt.Printf(&quot;%d: %d\n&quot;, i, item)
}
}(i, cN)
}
}

The receive2() instead is essentially sequential and won't scale, since you would have to write one case for each channel, and &amp;&amp; them all together to know when to break the loop. The &amp;&amp; may be rewritten with a sync.WaitGroup but you'll still process only one item at each iteration (at random, among the cases that are ready to receive).

huangapple
  • 本文由 发表于 2021年11月16日 00:10:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/69977449.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定