英文:
Is it possible to multiplex several channels into one?
问题
这个想法是在一个切片中有可变数量的通道,将接收到的每个值通过它们推送到一个单独的通道,并在输入通道的最后一个关闭时关闭此输出通道。像这样,但是对于多于两个通道的数量:
func multiplex(cin1, cin2, cout chan int) {
n := 2
for {
select {
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n -= 1
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n -= 1
}
}
if n == 0 {
close(cout)
break
}
}
}
上面的代码避免了忙碌循环,因为没有default
情况,这是好的(编辑:看起来存在",ok"使得选择语句非阻塞,循环仍然忙碌。但是为了示例的完整性,可以将代码视为阻塞)。是否可以使用任意数量的输入通道实现相同类型的功能?显然,可以通过将切片逐对减少到单个通道来实现,但如果可能的话,我更感兴趣的是更简单的解决方案。
英文:
The idea is to have a variable number of channels in a slice, push each value received through them into a single channel, and close this output channel once the last one of the input channels is closed. Something like this, but for a number of channels more than two:
func multiplex(cin1, cin2, cout chan int) {
n := 2
for {
select {
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n -= 1
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n -= 1
}
}
if n == 0 {
close(cout)
break
}
}
}
The above code avoids busy looping since there is no default
case, which is good (EDIT: it looks like the presence of ", ok" makes the select statement non-blocking and the loop is busy after all. But for the sake of the example, think of the code as if it would block). Could the same kind of functionality also be achieved with an arbitrary number of input channels? Obviously, this could be done by reducing the slice pairwise to a single channel, but I would be more interested in a simpler solution, if possible.
答案1
得分: 28
我相信这段代码可以实现你想要的功能。我已经更改了函数签名,以便清楚地表明输入和输出只应用于单向通信。请注意添加了一个 sync.WaitGroup
,你需要一种方式让所有的输入信号都完成,这很容易实现。
func combine(inputs []<-chan int, output chan<- int) {
var group sync.WaitGroup
for i := range inputs {
group.Add(1)
go func(input <-chan int) {
for val := range input {
output <- val
}
group.Done()
} (inputs[i])
}
go func() {
group.Wait()
close(output)
} ()
}
英文:
I believe this snippet does what you're looking for. I've changed the signature so that it's clear that the inputs and output should only be used for communication in one direction. Note the addition of a sync.WaitGroup
, you need some way for all of the inputs to signal that they have completed, and this is pretty easy.
func combine(inputs []<-chan int, output chan<- int) {
var group sync.WaitGroup
for i := range inputs {
group.Add(1)
go func(input <-chan int) {
for val := range input {
output <- val
}
group.Done()
} (inputs[i])
}
go func() {
group.Wait()
close(output)
} ()
}
答案2
得分: 3
首选的解决方案是“重新构造,使您不需要一个通道切片”的非答案。重新构造通常可以利用多个goroutine可以发送到单个通道的特性。因此,不要让每个源都发送到单独的通道,然后处理从一堆通道接收的问题,只需创建一个通道,让所有源都发送到该通道。
Go语言没有提供从通道切片接收的功能。这是一个经常被问到的问题,虽然刚才给出的解决方案是首选的,但也有其他编程方法。我认为您在原始问题中提到的“减少切片成对”的解决方案是一种二分法的解决方案。只要您有一个将两个通道多路复用为一个的解决方案,这个方法就可以正常工作。您的示例代码非常接近工作状态。
您只需要一个小技巧就可以使您的示例代码正常工作。在减少n的地方,添加一行将通道变量设置为nil的代码。例如,我修改了代码如下:
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n--
cin1 = nil
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n--
cin2 = nil
}
}
这个解决方案可以实现您想要的功能,并且不会忙等待。
因此,下面是一个完整的示例,将此解决方案整合到一个函数中,该函数将多路复用一个切片:
package main
import (
"fmt"
"time"
)
func multiplex(cin []chan int, cout chan int) {
var cin0, cin1 chan int
switch len(cin) {
case 2:
cin1 = cin[1]
fallthrough
case 1:
cin0 = cin[0]
case 0:
default:
cin0 = make(chan int)
cin1 = make(chan int)
half := len(cin) / 2
go multiplex(cin[:half], cin0)
go multiplex(cin[half:], cin1)
}
for cin0 != nil || cin1 != nil {
select {
case v, ok := <-cin0:
if ok {
cout <- v
} else {
cin0 = nil
}
case v, ok := <-cin1:
if ok {
cout <- v
} else {
cin1 = nil
}
}
}
close(cout)
}
func main() {
cin := []chan int{
make(chan int),
make(chan int),
make(chan int),
}
cout := make(chan int)
for i, c := range cin {
go func(x int, cx chan int) {
for i := 1; i <= 3; i++ {
time.Sleep(100 * time.Millisecond)
cx <- x*10 + i
}
close(cx)
}(i, c)
}
go multiplex(cin, cout)
for v := range cout {
fmt.Println("main gets", v)
}
}
英文:
Edit: added pairwise reduction example code and reordered parts of the answer.
The preferred solution is the non-answer of "restructure so that you don't have a slice of channels." The restructuring can often make use of the feature that multiple goroutines can send to a single channel. So instead of having each of your sources send on separate channels and then having to deal with receiving from a bunch of channels, just create one channel and let all the sources send on that channel.
Go does not offer a feature to receive from a slice of channels. It's a frequently asked question, and while the solution just given is preferred, there are ways to program it. A solution I thought you were suggesting in your original question by saying "reducing the slice pairwise" is a binary divide and conquer solution. This works just fine, as long as you have a solution for multiplexing two channels into one. Your example code for this is very close to working.
You're just missing one little trick to make your example code work. Where you decrement n, add a line to set the channel variable to nil. For example, I made the code read
case v, ok := <-cin1:
if ok {
cout <- v
} else {
n--
cin1 = nil
}
case v, ok := <-cin2:
if ok {
cout <- v
} else {
n--
cin2 = nil
}
}
This solution does what you want and is not busy waiting.
So then, a full example incorporating this solution into a function that multiplexes a slice:
package main
import (
"fmt"
"time"
)
func multiplex(cin []chan int, cout chan int) {
var cin0, cin1 chan int
switch len(cin) {
case 2:
cin1 = cin[1]
fallthrough
case 1:
cin0 = cin[0]
case 0:
default:
cin0 = make(chan int)
cin1 = make(chan int)
half := len(cin) / 2
go multiplex(cin[:half], cin0)
go multiplex(cin[half:], cin1)
}
for cin0 != nil || cin1 != nil {
select {
case v, ok := <-cin0:
if ok {
cout <- v
} else {
cin0 = nil
}
case v, ok := <-cin1:
if ok {
cout <- v
} else {
cin1 = nil
}
}
}
close(cout)
}
func main() {
cin := []chan int{
make(chan int),
make(chan int),
make(chan int),
}
cout := make(chan int)
for i, c := range cin {
go func(x int, cx chan int) {
for i := 1; i <= 3; i++ {
time.Sleep(100 * time.Millisecond)
cx <- x*10 + i
}
close(cx)
}(i, c)
}
go multiplex(cin, cout)
for v := range cout {
fmt.Println("main gets", v)
}
}
答案3
得分: 0
使用goroutines,我生成了这个。这是你想要的吗?
package main
import (
"fmt"
)
func multiplex(cin []chan int, cout chan int) {
n := len(cin)
for _, ch := range cin {
go func(src chan int) {
for {
v, ok := <-src
if ok {
cout <- v
} else {
n-- // 有点危险。也许使用一个通道来避免遗漏的递减
if n == 0 {
close(cout)
}
break
}
}
}(ch)
}
}
// 用于测试multiplex的主函数
func main() {
cin := make([]chan int, 3)
cin[0] = make(chan int, 2)
cin[1] = make(chan int, 2)
cin[2] = make(chan int, 2)
cout := make(chan int, 2)
multiplex(cin, cout)
cin[1] <- 1
cin[0] <- 2
cin[2] <- 3
cin[1] <- 4
cin[0] <- 5
close(cin[1])
close(cin[0])
close(cin[2])
for {
v, ok := <-cout
if ok {
fmt.Println(v)
} else {
break
}
}
}
编辑:参考资料:
http://golang.org/ref/spec#Receive_operator
http://golang.org/ref/spec#Close
英文:
Using goroutines I produced this. Is it what you want ?
package main
import (
"fmt"
)
func multiplex(cin []chan int, cout chan int) {
n := len(cin)
for _, ch := range cin {
go func(src chan int) {
for {
v, ok := <-src
if ok {
cout <- v
} else {
n-- // a little dangerous. Maybe use a channel to avoid missed decrements
if n == 0 {
close(cout)
}
break
}
}
}(ch)
}
}
// a main to test the multiplex
func main() {
cin := make([]chan int, 3)
cin[0] = make(chan int, 2)
cin[1] = make(chan int, 2)
cin[2] = make(chan int, 2)
cout := make(chan int, 2)
multiplex(cin, cout)
cin[1] <- 1
cin[0] <- 2
cin[2] <- 3
cin[1] <- 4
cin[0] <- 5
close(cin[1])
close(cin[0])
close(cin[2])
for {
v, ok := <-cout
if ok {
fmt.Println(v)
} else {
break
}
}
}
EDIT : References :
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论