如何监听N个频道?(动态选择语句)

huangapple go评论108阅读模式
英文:

how to listen to N channels? (dynamic select statement)

问题

要实现执行N个goroutine的无限循环,可以使用以下代码:

在接收到消息后,它将启动一个新的goroutine,并且会一直执行下去。

numChans := 2
var chans = []chan string{}

for i := 0; i < numChans; i++ {
    tmp := make(chan string)
    chans = append(chans, tmp)
    go DoStuff(tmp, i+1)
}

for {
    for _, ch := range chans {
        select {
        case msg := <-ch:
            fmt.Println("received", msg)
            go DoStuff(ch, 1)
        }
    }
}

这段代码中,我们首先创建了一个包含N个通道的切片chans,并在每个通道上启动一个goroutine。然后,我们使用嵌套的for循环和select语句来循环遍历每个通道,并在接收到消息时执行相应的操作。

希望这可以帮助到你!

英文:

to start an endless loop of executing two goroutines, I can use the code below:

after receiving the msg it will start a new goroutine and go on for ever.

c1 := make(chan string)
c2 := make(chan string)

go DoStuff(c1, 5)
go DoStuff(c2, 2)
    
for ; true;  {
    select {
    case msg1 := &lt;-c1:
        fmt.Println(&quot;received &quot;, msg1)
        go DoStuff(c1, 1)
    case msg2 := &lt;-c2:
        fmt.Println(&quot;received &quot;, msg2)
        go DoStuff(c2, 9)
    }
}

I would now like to have the same behavior for N goroutines, but how will the select statement look in that case?

This is the code bit I have started with, but I am confused how to code the select statement

numChans := 2

//I keep the channels in this slice, and want to &quot;loop&quot; over them in the select statemnt
var chans = [] chan string{}
	
for i:=0;i&lt;numChans;i++{
    tmp := make(chan string);
    chans = append(chans, tmp);
    go DoStuff(tmp, i + 1)

//How shall the select statment be coded for this case?  
for ; true;  {
    select {
    case msg1 := &lt;-c1:
        fmt.Println(&quot;received &quot;, msg1)
        go DoStuff(c1, 1)
    case msg2 := &lt;-c2:
        fmt.Println(&quot;received &quot;, msg2)
        go DoStuff(c2, 9)
    }
}

答案1

得分: 181

你可以使用reflect包中的Select函数来实现这个功能:

> func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
>
> Select函数执行由cases列表描述的select操作。与Go的select语句类似,它会阻塞,直到至少有一个case可以执行,然后进行均匀的伪随机选择,并执行该case。它返回所选case的索引,如果该case是接收操作,则返回接收到的值和一个布尔值,指示该值是否对应于通道上的发送(而不是因为通道关闭而接收到的零值)。

你需要传入一个SelectCase结构体的数组,该结构体标识要选择的通道、操作的方向以及在发送操作的情况下要发送的值。

你可以像这样进行操作:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// 如果通道未关闭,ok将为true。
ch := chans[chosen]
msg := value.String()

你可以在这里尝试一个更详细的示例:http://play.golang.org/p/8zwvSk4kjx

英文:

You can do this using the Select function from the reflect package:

> func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)
>
> Select executes a select operation described by the list of cases. Like
> the Go select statement, it blocks until at least one of the cases can
> proceed, makes a uniform pseudo-random choice, and then executes that
> case. It returns the index of the chosen case and, if that case was a
> receive operation, the value received and a boolean indicating whether
> the value corresponds to a send on the channel (as opposed to a zero
> value received because the channel is closed).

You pass in an array of SelectCase structs that identify the channel to select on, the direction of the operation, and a value to send in the case of a send operation.

So you could do something like this:

cases := make([]reflect.SelectCase, len(chans))
for i, ch := range chans {
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
}
chosen, value, ok := reflect.Select(cases)
// ok will be true if the channel has not been closed.
ch := chans[chosen]
msg := value.String()

You can experiment with a more fleshed out example here: http://play.golang.org/p/8zwvSk4kjx

答案2

得分: 128

你可以通过将每个通道包装在一个goroutine中来实现这一点,该goroutine将消息“转发”到一个共享的“聚合”通道。例如:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg <- msg
    }
  }(ch)
}

select {
case msg := <-agg:
    fmt.Println("received ", msg)
}

如果你需要知道消息来自哪个通道,你可以在将其转发到聚合通道之前,将其包装在一个带有任何额外信息的结构体中。

在我的(有限的)测试中,这种方法在性能上远远优于使用reflect包:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect	       1	5265109013 ns/op
BenchmarkGoSelect	          20	  81911344 ns/op
ok  	command-line-arguments	9.463s

基准代码在这里

英文:

You can accomplish this by wrapping each channel in a goroutine which "forwards" messages to a shared "aggregate" channel. For example:

agg := make(chan string)
for _, ch := range chans {
  go func(c chan string) {
    for msg := range c {
      agg &lt;- msg
    }
  }(ch)
}

select {
case msg &lt;- agg:
    fmt.Println(&quot;received &quot;, msg)
}

If you need to know which channel the message originated from, you could wrap it in a struct with any extra information before forwarding it to the aggregate channel.

In my (limited) testing, this method greatly out performs using the reflect package:

$ go test dynamic_select_test.go -test.bench=.
...
BenchmarkReflectSelect	       1	5265109013 ns/op
BenchmarkGoSelect	          20	  81911344 ns/op
ok  	command-line-arguments	9.463s

Benchmark code here

答案3

得分: 31

为了进一步扩展之前答案中的一些评论并提供更清晰的比较,以下是迄今为止提出的两种方法的示例,给定相同的输入:要读取的通道片段和要调用的每个值的函数,该函数还需要知道该值来自哪个通道。

这两种方法之间有三个主要区别:

  • 复杂性。尽管部分是读者偏好,但我认为通道方法更符合惯用法,更直观和可读。

  • 性能。在我的Xeon amd64系统上,goroutines+通道的性能比反射解决方案高出大约两个数量级(一般来说,Go中的反射通常较慢,只有在绝对需要时才应使用)。当然,如果在处理结果的函数或将值写入输入通道的过程中存在任何显着延迟,这种性能差异很容易变得无关紧要。

  • 阻塞/缓冲语义。这取决于用例的重要性。通常情况下,要么不重要,要么goroutine合并解决方案中的轻微额外缓冲可能有助于吞吐量。但是,如果希望具有只有一个写入者解除阻塞并且在任何其他写入者解除阻塞之前完全处理其值的语义,那么只能使用反射解决方案来实现。

请注意,如果不需要发送通道的“id”或者源通道永远不会关闭,则可以简化这两种方法。

Goroutine合并通道:

// Process1对从任何`chans`通道接收到的每个值调用`fn`函数。`fn`的参数是通道的索引和字符串值。Process1在所有通道关闭后返回。
func Process1(chans []<-chan string, fn func(int, string)) {
    // 设置
    type item struct {
        int    // 来自哪个通道的索引
        string // 实际的字符串项
    }
    merged := make(chan item)
    var wg sync.WaitGroup
    wg.Add(len(chans))
    for i, c := range chans {
        go func(i int, c <-chan string) {
            // 在我们甚至不知道是否可以写入`merged`之前,从`c`读取和缓冲一个单独的项。
            //
            // Go没有提供一种像这样做的方法:
            //     merged <- (<-c)
            // 原子地,我们延迟从`c`读取
            // 直到我们可以写入`merged`。从
            // `c`的读取将始终首先发生(根据需要阻塞),
            // 然后我们在`merged`上阻塞(使用
            // 上面或下面的语法都没有区别)。
            for s := range c {
                merged <- item{i, s}
            }
            // 如果/当此输入通道关闭时,我们只需停止
            // 向合并通道写入,并通过WaitGroup
            // 让它知道有一个更少的活动通道。
            wg.Done()
        }(i, c)
    }
    // 一个额外的goroutine来监视所有合并goroutine是否完成,然后关闭合并通道。
    go func() {
        wg.Wait()
        close(merged)
    }()

    // “select-like”循环
    for i := range merged {
        // 处理每个值
        fn(i.int, i.string)
    }
}

反射选择:

// Process2与Process1完全相同,只是它使用反射包从输入通道中选择和读取,这保证只有一个值“在飞行”(即当调用`fn`时,只有一个通道上的单个发送将成功,其余的将被阻塞)。它大约比Process1慢两个数量级(如果输入值之间存在显着延迟或者如果`fn`运行时间显着,则仍然可以忽略不计)。
func Process2(chans []<-chan string, fn func(int, string)) {
    // 设置
    cases := make([]reflect.SelectCase, len(chans))
    // `ids`将cases中的索引映射到原始`chans`索引。
    ids := make([]int, len(chans))
    for i, c := range chans {
        cases[i] = reflect.SelectCase{
            Dir:  reflect.SelectRecv,
            Chan: reflect.ValueOf(c),
        }
        ids[i] = i
    }

    // 选择循环
    for len(cases) > 0 {
        // 与合并goroutine的区别在于
        // `v`是任何工作线程发送的唯一值“在飞行”。
        // 所有其他工作线程都被阻塞
        // 尝试发送他们计算的单个值
        // 而goroutine版本从每个工作线程读取/缓冲一个额外的值。
        i, v, ok := reflect.Select(cases)
        if !ok {
            // 通道cases[i]已关闭,从我们的cases切片中删除它,并更新我们的ids映射。
            cases = append(cases[:i], cases[i+1:]...)
            ids = append(ids[:i], ids[i+1:]...)
            continue
        }

        // 处理每个值
        fn(ids[i], v.String())
    }
}

完整代码(在Go playground上)

英文:

To expand on some comments on previous answers and to provide a clearer comparison here is an example of both approaches presented so far given the same input, a slice of channels to read from and a function to call for each value which also need to know which channel the value came from.

There are three main differences between the approaches:

  • Complexity. Although it may partially be a reader preference I find the channel approach more idiomatic, straight-forward, and readable.

  • Performance. On my Xeon amd64 system the goroutines+channels out performs the reflect solution by about two orders of magnitude (in general reflection in Go is often slower and should only be used when absolutely required). Of course, if there is any significant delay in either the function processing the results or in the writing of values to the input channels this performance difference can easily become insignificant.

  • Blocking/buffering semantics. The importantance of this depends on the use case. Most often it either won't matter or the slight extra buffering in the goroutine merging solution may be helpful for throughput. However, if it is desirable to have the semantics that only a single writer is unblocked and it's value fully handled before any other writer is unblocked, then that can only be achieved with the reflect solution.

Note, both approaches can be simplified if either the "id" of the sending channel isn't required or if the source channels will never be closed.

Goroutine merging channel:

// Process1 calls `fn` for each value received from any of the `chans`
// channels. The arguments to `fn` are the index of the channel the
// value came from and the string value. Process1 returns once all the
// channels are closed.
func Process1(chans []&lt;-chan string, fn func(int, string)) {
	// Setup
	type item struct {
		int    // index of which channel this came from
		string // the actual string item
	}
	merged := make(chan item)
	var wg sync.WaitGroup
	wg.Add(len(chans))
	for i, c := range chans {
		go func(i int, c &lt;-chan string) {
			// Reads and buffers a single item from `c` before
			// we even know if we can write to `merged`.
			//
			// Go doesn&#39;t provide a way to do something like:
			//     merged &lt;- (&lt;-c)
			// atomically, where we delay the read from `c`
			// until we can write to `merged`. The read from
			// `c` will always happen first (blocking as
			// required) and then we block on `merged` (with
			// either the above or the below syntax making
			// no difference).
			for s := range c {
				merged &lt;- item{i, s}
			}
			// If/when this input channel is closed we just stop
			// writing to the merged channel and via the WaitGroup
			// let it be known there is one fewer channel active.
			wg.Done()
		}(i, c)
	}
	// One extra goroutine to watch for all the merging goroutines to
	// be finished and then close the merged channel.
	go func() {
		wg.Wait()
		close(merged)
	}()

	// &quot;select-like&quot; loop
	for i := range merged {
		// Process each value
		fn(i.int, i.string)
	}
}

Reflection select:

// Process2 is identical to Process1 except that it uses the reflect
// package to select and read from the input channels which guarantees
// there is only one value &quot;in-flight&quot; (i.e. when `fn` is called only
// a single send on a single channel will have succeeded, the rest will
// be blocked). It is approximately two orders of magnitude slower than
// Process1 (which is still insignificant if their is a significant
// delay between incoming values or if `fn` runs for a significant
// time).
func Process2(chans []&lt;-chan string, fn func(int, string)) {
	// Setup
	cases := make([]reflect.SelectCase, len(chans))
	// `ids` maps the index within cases to the original `chans` index.
	ids := make([]int, len(chans))
	for i, c := range chans {
		cases[i] = reflect.SelectCase{
			Dir:  reflect.SelectRecv,
			Chan: reflect.ValueOf(c),
		}
		ids[i] = i
	}

	// Select loop
	for len(cases) &gt; 0 {
		// A difference here from the merging goroutines is
		// that `v` is the only value &quot;in-flight&quot; that any of
		// the workers have sent. All other workers are blocked
		// trying to send the single value they have calculated
		// where-as the goroutine version reads/buffers a single
		// extra value from each worker.
		i, v, ok := reflect.Select(cases)
		if !ok {
			// Channel cases[i] has been closed, remove it
			// from our slice of cases and update our ids
			// mapping as well.
			cases = append(cases[:i], cases[i+1:]...)
			ids = append(ids[:i], ids[i+1:]...)
			continue
		}

		// Process each value
		fn(ids[i], v.String())
	}
}

[Full code on the Go playground.]

答案4

得分: 6

我们实际上对这个主题进行了一些研究,并找到了最佳解决方案。我们一度使用了reflect.Select,它是解决这个问题的一个很好的方案。它比每个通道一个goroutine更轻量级且操作简单。但不幸的是,它并不真正支持大量的通道,而我们的情况正是如此,所以我们找到了一些有趣的东西,并写了一篇博文介绍它:https://cyolo.io/blog/how-we-enabled-dynamic-channel-selection-at-scale-in-go/

我将总结一下那里写的内容:
我们静态地创建了一批select..case语句,针对每个指数的二次幂结果,从2的0次方到2的32次方,同时还有一个函数,用于路由到不同的case并通过一个聚合通道来汇总结果。

这是一个这样批处理的示例:

func select4(ctx context.Context, chanz []chan interface{}, res chan *r, r *r, i int) {
	select {
	case r.v, r.ok = <-chanz[0]:
		r.i = i + 0
		res <- r
	case r.v, r.ok = <-chanz[1]:
		r.i = i + 1
		res <- r
	case r.v, r.ok = <-chanz[2]:
		r.i = i + 2
		res <- r
	case r.v, r.ok = <-chanz[3]:
		r.i = i + 3
		res <- r
	case <-ctx.Done():
		break
	}
}

以及使用这些select..case批处理来聚合任意数量通道的第一个结果的逻辑:

	for i < len(channels) {
		l = len(channels) - i
		switch {
		case l > 31 && maxBatchSize >= 32:
			go select32(ctx, channels[i:i+32], agg, rPool.Get().(*r), i)
			i += 32
		case l > 15 && maxBatchSize >= 16:
			go select16(ctx, channels[i:i+16], agg, rPool.Get().(*r), i)
			i += 16
		case l > 7 && maxBatchSize >= 8:
			go select8(ctx, channels[i:i+8], agg, rPool.Get().(*r), i)
			i += 8
		case l > 3 && maxBatchSize >= 4:
			go select4(ctx, channels[i:i+4], agg, rPool.Get().(*r), i)
			i += 4
		case l > 1 && maxBatchSize >= 2:
			go select2(ctx, channels[i:i+2], agg, rPool.Get().(*r), i)
			i += 2
		case l > 0:
			go select1(ctx, channels[i], agg, rPool.Get().(*r), i)
			i += 1
		}
	}
英文:

We actually made some research about this subject and found the best solution. We used reflect.Select for a while and it is a great solution for the problem. It is much lighter than a goroutine per channel and simple to operate. But unfortunately, it doesn't really support a massive amount of channels which is our case so we found something interesting and wrote a blog post about it: https://cyolo.io/blog/how-we-enabled-dynamic-channel-selection-at-scale-in-go/

I'll summarize what is written there:
We statically created batches of select..case statements for every result of the power of two of exponent up to 32 along with a function that routes to the different cases and aggregates the results through an aggregate channel.

An example of such a batch:

func select4(ctx context.Context, chanz []chan interface{}, res chan *r, r *r, i int) {
	select {
	case r.v, r.ok = &lt;-chanz[0]:
		r.i = i + 0
		res &lt;- r
	case r.v, r.ok = &lt;-chanz[1]:
		r.i = i + 1
		res &lt;- r
	case r.v, r.ok = &lt;-chanz[2]:
		r.i = i + 2
		res &lt;- r
	case r.v, r.ok = &lt;-chanz[3]:
		r.i = i + 3
		res &lt;- r
	case &lt;-ctx.Done():
		break
	}
}

And the logic of aggregating the first result from any number of channels using these kinds of select..case batches:

	for i &lt; len(channels) {
		l = len(channels) - i
		switch {
		case l &gt; 31 &amp;&amp; maxBatchSize &gt;= 32:
			go select32(ctx, channels[i:i+32], agg, rPool.Get().(*r), i)
			i += 32
		case l &gt; 15 &amp;&amp; maxBatchSize &gt;= 16:
			go select16(ctx, channels[i:i+16], agg, rPool.Get().(*r), i)
			i += 16
		case l &gt; 7 &amp;&amp; maxBatchSize &gt;= 8:
			go select8(ctx, channels[i:i+8], agg, rPool.Get().(*r), i)
			i += 8
		case l &gt; 3 &amp;&amp; maxBatchSize &gt;= 4:
			go select4(ctx, channels[i:i+4], agg, rPool.Get().(*r), i)
			i += 4
		case l &gt; 1 &amp;&amp; maxBatchSize &gt;= 2:
			go select2(ctx, channels[i:i+2], agg, rPool.Get().(*r), i)
			i += 2
		case l &gt; 0:
			go select1(ctx, channels[i], agg, rPool.Get().(*r), i)
			i += 1
		}
	}

</details>



# 答案5
**得分**: 3

可能更简单的选择是

不要使用一个通道数组而是将一个通道作为参数传递给在单独的goroutine上运行的函数并在一个消费者goroutine中监听该通道

这样可以在监听器中只选择一个通道从而实现简单的选择操作并避免创建新的goroutine来聚合来自多个通道的消息

<details>
<summary>英文:</summary>

Possibly simpler option:

Instead of having an array of channels, why not pass just one channel as a parameter to the functions being run on separate goroutines, and then listen to the channel in a consumer goroutine?

This allows you to select on just one channel in your listener, making for a simple select, and avoiding creation of new goroutines to aggregate messages from multiple channels?

</details>



# 答案6
**得分**: 0

根据James Henstridge的答案我编写了这个通用的go >=1.18`Select`函数它接受一个上下文和一个通道切片并返回所选的通道

```go
func Select[T any](ctx context.Context, chs []chan T) (int, T, error) {
	var zeroT T
	cases := make([]reflect.SelectCase, len(chs)+1)
	for i, ch := range chs {
		cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
	}
	cases[len(chs)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
	// 如果通道未关闭,则ok为true。
	chosen, value, ok := reflect.Select(cases)
	if !ok {
		if ctx.Err() != nil {
			return -1, zeroT, ctx.Err()
		}
		return chosen, zeroT, errors.New("通道已关闭")
	}
	if ret, ok := value.Interface().(T); ok {
		return chosen, ret, nil
	}
	return chosen, zeroT, errors.New("值转换失败")
}

以下是如何使用它的示例:

func TestSelect(t *testing.T) {
	c1 := make(chan int)
	c2 := make(chan int)
	c3 := make(chan int)
	chs := []chan int{c1, c2, c3}
	go func() {
		time.Sleep(time.Second)
		//close(c2)
		c2 <- 42
	}()
	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

	chosen, val, err := Select(ctx, chs)

	assert.Equal(t, 1, chosen)
	assert.Equal(t, 42, val)
	assert.NoError(t, err)
}
英文:

Based on the answer of James Henstridge,
I made this generic (go >=1.18) Select function that takes a context and a slice of channels and returns the selected one:

func Select[T any](ctx context.Context, chs []chan T) (int, T, error) {
	var zeroT T
	cases := make([]reflect.SelectCase, len(chs)+1)
	for i, ch := range chs {
		cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
	}
	cases[len(chs)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done())}
	// ok will be true if the channel has not been closed.
	chosen, value, ok := reflect.Select(cases)
	if !ok {
		if ctx.Err() != nil {
			return -1, zeroT, ctx.Err()
		}
		return chosen, zeroT, errors.New(&quot;channel closed&quot;)
	}
	if ret, ok := value.Interface().(T); ok {
		return chosen, ret, nil
	}
	return chosen, zeroT, errors.New(&quot;failed to cast value&quot;)
}

Here is an example on how to use it:

func TestSelect(t *testing.T) {
	c1 := make(chan int)
	c2 := make(chan int)
	c3 := make(chan int)
	chs := []chan int{c1, c2, c3}
	go func() {
		time.Sleep(time.Second)
		//close(c2)
		c2 &lt;- 42
	}()
	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)

	chosen, val, err := Select(ctx, chs)

	assert.Equal(t, 1, chosen)
	assert.Equal(t, 42, val)
	assert.NoError(t, err)
}

答案7

得分: -1

为什么假设有人发送事件,这种方法不起作用?

func main() {
    numChans := 2
    var chans = []chan string{}

    for i := 0; i < numChans; i++ {
        tmp := make(chan string)
        chans = append(chans, tmp)
    }

    for true {
        for i, c := range chans {
            select {
            case x := <-c:
                fmt.Printf("received %d \n", i)
                go DoShit(x, i)
            default:
                continue
            }
        }
    }
}

这段代码的作用是创建了两个字符串类型的通道,并在一个无限循环中监听这些通道。当从通道中接收到数据时,会打印接收到的数据,并使用go关键字在后台启动一个DoShit函数的goroutine来处理接收到的数据。如果没有从通道中接收到数据,则继续下一个循环。

英文:

Why this approach wouldn't work assuming that somebody is sending events?

func main() {
numChans := 2
var chans = []chan string{}
for i := 0; i &lt; numChans; i++ {
tmp := make(chan string)
chans = append(chans, tmp)
}
for true {
for i, c := range chans {
select {
case x = &lt;-c:
fmt.Printf(&quot;received %d \n&quot;, i)
go DoShit(x, i)
default: continue
}
}
}
}

huangapple
  • 本文由 发表于 2013年11月15日 10:18:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/19992334.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定