英文:
A channel multiplexer
问题
注意 - 我是Go的新手。
我写了一个多路复用器,应该将一个通道数组的输出合并成一个通道。欢迎提出建设性的批评。
func Mux(channels []chan big.Int) chan big.Int {
// 计算通道的数量,每个通道关闭时计数减一,当计数为零时关闭输出通道。
n := len(channels)
// 输出通道。
ch := make(chan big.Int, n)
// 为每个通道创建一个goroutine。
for _, c := range channels {
go func() {
// 将数据从通道中取出并发送到输出通道。
for x := range c {
ch <- x
}
// 通道关闭。
n -= 1
// 如果所有通道都关闭了,则关闭输出通道。
if n == 0 {
close(ch)
}
}()
}
return ch
}
我用以下代码进行测试:
func fromTo(f, t int) chan big.Int {
ch := make(chan big.Int)
go func() {
for i := f; i < t; i++ {
fmt.Println("Feed:", i)
ch <- *big.NewInt(int64(i))
}
close(ch)
}()
return ch
}
func testMux() {
r := make([]chan big.Int, 10)
for i := 0; i < 10; i++ {
r[i] = fromTo(i*10, i*10+10)
}
all := Mux(r)
// 输出结果。
for l := range all {
fmt.Println(l)
}
}
但是我的输出结果非常奇怪:
Feed: 0
Feed: 10
Feed: 20
Feed: 30
Feed: 40
Feed: 50
Feed: 60
Feed: 70
Feed: 80
Feed: 90
Feed: 91
Feed: 92
Feed: 93
Feed: 94
Feed: 95
Feed: 96
Feed: 97
Feed: 98
Feed: 99
{false [90]}
{false [91]}
{false [92]}
{false [93]}
{false [94]}
{false [95]}
{false [96]}
{false [97]}
{false [98]}
{false [99]}
所以我的问题是:
- 我在Mux函数中做错了什么?
- 为什么我只从输出通道中获取到最后10个元素?
- 为什么输入的顺序看起来这么奇怪?(每个输入通道的第一个元素,然后是最后一个通道的所有元素,然后没有了)
- 有更好的方法吗?
我希望所有的输入通道都有相等的权利来获取输出通道的元素 - 也就是说,我不能先获取一个通道的所有输出,然后再获取下一个通道的所有输出等等。
对于对此感兴趣的人 - 这是修复后的最终代码,以及正确(可能)使用sync.WaitGroup
:
import (
"math/big"
"sync"
)
/*
将多个通道多路复用成一个通道。
*/
func Mux(channels []chan big.Int) chan big.Int {
// 计算通道的数量,每个通道关闭时计数减一,当计数为零时关闭输出通道。
var wg sync.WaitGroup
wg.Add(len(channels))
// 输出通道。
ch := make(chan big.Int, len(channels))
// 为每个通道创建一个goroutine。
for _, c := range channels {
go func(c <-chan big.Int) {
// 将数据从通道中取出并发送到输出通道。
for x := range c {
ch <- x
}
// 通道关闭。
wg.Done()
}(c)
}
// 在所有goroutine完成后关闭通道。
go func() {
// 等待所有goroutine完成。
wg.Wait()
// 关闭通道。
close(ch)
}()
return ch
}
英文:
Note - newbie in Go.
I've written a multiplexer that should merge the outputs of an array of channels into one. Happy with constructive criticism.
func Mux(channels []chan big.Int) chan big.Int {
// Count down as each channel closes. When hits zero - close ch.
n := len(channels)
// The channel to output to.
ch := make(chan big.Int, n)
// Make one go per channel.
for _, c := range channels {
go func() {
// Pump it.
for x := range c {
ch <- x
}
// It closed.
n -= 1
// Close output if all closed now.
if n == 0 {
close(ch)
}
}()
}
return ch
}
I am testing it with:
func fromTo(f, t int) chan big.Int {
ch := make(chan big.Int)
go func() {
for i := f; i < t; i++ {
fmt.Println("Feed:", i)
ch <- *big.NewInt(int64(i))
}
close(ch)
}()
return ch
}
func testMux() {
r := make([]chan big.Int, 10)
for i := 0; i < 10; i++ {
r[i] = fromTo(i*10, i*10+10)
}
all := Mux(r)
// Roll them out.
for l := range all {
fmt.Println(l)
}
}
but my output is very strange:
Feed: 0
Feed: 10
Feed: 20
Feed: 30
Feed: 40
Feed: 50
Feed: 60
Feed: 70
Feed: 80
Feed: 90
Feed: 91
Feed: 92
Feed: 93
Feed: 94
Feed: 95
Feed: 96
Feed: 97
Feed: 98
Feed: 99
{false [90]}
{false [91]}
{false [92]}
{false [93]}
{false [94]}
{false [95]}
{false [96]}
{false [97]}
{false [98]}
{false [99]}
So to my questions:
- Is there something I am doing wrong in Mux?
- Why am I only getting the last 10 from my output channel?
- Why does the feeding look so strange? (1st of each input channel, all of the last channel and then nothing)
- Is there a better way of doing this?
I need all of the input channels to have equal rights to the output channel - i.e. I cannot have all of the output from one channel and then all from the next etc.
For anyone interested - this was the final code after the fix and the correct (presumably) use of sync.WaitGroup
import (
"math/big"
"sync"
)
/*
Multiplex a number of channels into one.
*/
func Mux(channels []chan big.Int) chan big.Int {
// Count down as each channel closes. When hits zero - close ch.
var wg sync.WaitGroup
wg.Add(len(channels))
// The channel to output to.
ch := make(chan big.Int, len(channels))
// Make one go per channel.
for _, c := range channels {
go func(c <-chan big.Int) {
// Pump it.
for x := range c {
ch <- x
}
// It closed.
wg.Done()
}(c)
}
// Close the channel when the pumping is finished.
go func() {
// Wait for everyone to be done.
wg.Wait()
// Close.
close(ch)
}()
return ch
}
答案1
得分: 3
你的Mux
函数生成的每个goroutine都从同一个通道中获取数据,因为c
在循环的每次迭代中都会更新,它们不仅仅捕获了c
的值。如果你像下面这样将通道传递给goroutine,你将会得到预期的结果:
for _, c := range channels {
go func(c <-chan big.Int) {
...
}(c)
}
你可以在这里测试这个修改。
另一个可能的问题是你对n
变量的处理:如果你的GOMAXPROCS != 1
,可能会有两个goroutine同时尝试更新它。使用sync.WaitGroup
类型会更安全地等待goroutine完成。
英文:
Each of your goroutines spawned from Mux
ends up pulling from the same channel, since c
gets updated on each iteration of the loop – they don't just capture the value of c
. You will get the expected results if you pass the channel to the goroutine like so:
for _, c := range channels {
go func(c <-chan big.Int) {
...
}(c)
}
You can test this modification here.
One other possible problem is your handling of the n
variable: if you're running with GOMAXPROCS != 1
, you could have two goroutines trying to update it at once. The sync.WaitGroup
type would be a safer way to wait for goroutines to complete.
答案2
得分: 2
我知道有点晚了,但我写了一个实现类似于这个的通用Multiplex函数的包。它使用反射包中的"select"调用来确保高效和平衡的多路复用,而无需锁定或等待组。
- 代码:https://github.com/eapache/channels
- 文档:https://godoc.org/github.com/eapache/channels
英文:
A bit after the fact, I know, but I wrote a package which implements a generic Multiplex function similar to this one. It uses the "select" call in the reflection package to ensure efficient and balanced multiplexing without any need for a lock or wait group.
- Code: https://github.com/eapache/channels
- Documentation: https://godoc.org/github.com/eapache/channels
答案3
得分: 0
为了解决在使用range
语句时重新赋值的问题,可以按照James Hentridge的回答,使用一种惯用的方法是将一个局部变量赋值给待处理的值:
for _, c := range channels {
c := c
go func() {
...
}()
}
英文:
To build on James Hentridge answer, an idiomatic way to handle the re-assignement problem when using the range
statement is to assign a local variable to the value at stake:
for _, c := range channels {
c := c
go func() {
...
}()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论