一个通道复用器

huangapple go评论124阅读模式
英文:

A channel multiplexer

问题

注意 - 我是Go的新手。

我写了一个多路复用器,应该将一个通道数组的输出合并成一个通道。欢迎提出建设性的批评。

  1. func Mux(channels []chan big.Int) chan big.Int {
  2. // 计算通道的数量,每个通道关闭时计数减一,当计数为零时关闭输出通道。
  3. n := len(channels)
  4. // 输出通道。
  5. ch := make(chan big.Int, n)
  6. // 为每个通道创建一个goroutine。
  7. for _, c := range channels {
  8. go func() {
  9. // 将数据从通道中取出并发送到输出通道。
  10. for x := range c {
  11. ch <- x
  12. }
  13. // 通道关闭。
  14. n -= 1
  15. // 如果所有通道都关闭了,则关闭输出通道。
  16. if n == 0 {
  17. close(ch)
  18. }
  19. }()
  20. }
  21. return ch
  22. }

我用以下代码进行测试:

  1. func fromTo(f, t int) chan big.Int {
  2. ch := make(chan big.Int)
  3. go func() {
  4. for i := f; i < t; i++ {
  5. fmt.Println("Feed:", i)
  6. ch <- *big.NewInt(int64(i))
  7. }
  8. close(ch)
  9. }()
  10. return ch
  11. }
  12. func testMux() {
  13. r := make([]chan big.Int, 10)
  14. for i := 0; i < 10; i++ {
  15. r[i] = fromTo(i*10, i*10+10)
  16. }
  17. all := Mux(r)
  18. // 输出结果。
  19. for l := range all {
  20. fmt.Println(l)
  21. }
  22. }

但是我的输出结果非常奇怪:

  1. Feed: 0
  2. Feed: 10
  3. Feed: 20
  4. Feed: 30
  5. Feed: 40
  6. Feed: 50
  7. Feed: 60
  8. Feed: 70
  9. Feed: 80
  10. Feed: 90
  11. Feed: 91
  12. Feed: 92
  13. Feed: 93
  14. Feed: 94
  15. Feed: 95
  16. Feed: 96
  17. Feed: 97
  18. Feed: 98
  19. Feed: 99
  20. {false [90]}
  21. {false [91]}
  22. {false [92]}
  23. {false [93]}
  24. {false [94]}
  25. {false [95]}
  26. {false [96]}
  27. {false [97]}
  28. {false [98]}
  29. {false [99]}

所以我的问题是:

  • 我在Mux函数中做错了什么?
  • 为什么我只从输出通道中获取到最后10个元素?
  • 为什么输入的顺序看起来这么奇怪?(每个输入通道的第一个元素,然后是最后一个通道的所有元素,然后没有了)
  • 有更好的方法吗?

我希望所有的输入通道都有相等的权利来获取输出通道的元素 - 也就是说,我不能先获取一个通道的所有输出,然后再获取下一个通道的所有输出等等。

对于对此感兴趣的人 - 这是修复后的最终代码,以及正确(可能)使用sync.WaitGroup

  1. import (
  2. "math/big"
  3. "sync"
  4. )
  5. /*
  6. 将多个通道多路复用成一个通道。
  7. */
  8. func Mux(channels []chan big.Int) chan big.Int {
  9. // 计算通道的数量,每个通道关闭时计数减一,当计数为零时关闭输出通道。
  10. var wg sync.WaitGroup
  11. wg.Add(len(channels))
  12. // 输出通道。
  13. ch := make(chan big.Int, len(channels))
  14. // 为每个通道创建一个goroutine。
  15. for _, c := range channels {
  16. go func(c <-chan big.Int) {
  17. // 将数据从通道中取出并发送到输出通道。
  18. for x := range c {
  19. ch <- x
  20. }
  21. // 通道关闭。
  22. wg.Done()
  23. }(c)
  24. }
  25. // 在所有goroutine完成后关闭通道。
  26. go func() {
  27. // 等待所有goroutine完成。
  28. wg.Wait()
  29. // 关闭通道。
  30. close(ch)
  31. }()
  32. return ch
  33. }
英文:

Note - newbie in Go.

I've written a multiplexer that should merge the outputs of an array of channels into one. Happy with constructive criticism.

  1. func Mux(channels []chan big.Int) chan big.Int {
  2. // Count down as each channel closes. When hits zero - close ch.
  3. n := len(channels)
  4. // The channel to output to.
  5. ch := make(chan big.Int, n)
  6. // Make one go per channel.
  7. for _, c := range channels {
  8. go func() {
  9. // Pump it.
  10. for x := range c {
  11. ch &lt;- x
  12. }
  13. // It closed.
  14. n -= 1
  15. // Close output if all closed now.
  16. if n == 0 {
  17. close(ch)
  18. }
  19. }()
  20. }
  21. return ch
  22. }

I am testing it with:

  1. func fromTo(f, t int) chan big.Int {
  2. ch := make(chan big.Int)
  3. go func() {
  4. for i := f; i &lt; t; i++ {
  5. fmt.Println(&quot;Feed:&quot;, i)
  6. ch &lt;- *big.NewInt(int64(i))
  7. }
  8. close(ch)
  9. }()
  10. return ch
  11. }
  12. func testMux() {
  13. r := make([]chan big.Int, 10)
  14. for i := 0; i &lt; 10; i++ {
  15. r[i] = fromTo(i*10, i*10+10)
  16. }
  17. all := Mux(r)
  18. // Roll them out.
  19. for l := range all {
  20. fmt.Println(l)
  21. }
  22. }

but my output is very strange:

  1. Feed: 0
  2. Feed: 10
  3. Feed: 20
  4. Feed: 30
  5. Feed: 40
  6. Feed: 50
  7. Feed: 60
  8. Feed: 70
  9. Feed: 80
  10. Feed: 90
  11. Feed: 91
  12. Feed: 92
  13. Feed: 93
  14. Feed: 94
  15. Feed: 95
  16. Feed: 96
  17. Feed: 97
  18. Feed: 98
  19. Feed: 99
  20. {false [90]}
  21. {false [91]}
  22. {false [92]}
  23. {false [93]}
  24. {false [94]}
  25. {false [95]}
  26. {false [96]}
  27. {false [97]}
  28. {false [98]}
  29. {false [99]}

So to my questions:

  • Is there something I am doing wrong in Mux?
  • Why am I only getting the last 10 from my output channel?
  • Why does the feeding look so strange? (1st of each input channel, all of the last channel and then nothing)
  • Is there a better way of doing this?

I need all of the input channels to have equal rights to the output channel - i.e. I cannot have all of the output from one channel and then all from the next etc.


For anyone interested - this was the final code after the fix and the correct (presumably) use of sync.WaitGroup

  1. import (
  2. &quot;math/big&quot;
  3. &quot;sync&quot;
  4. )
  5. /*
  6. Multiplex a number of channels into one.
  7. */
  8. func Mux(channels []chan big.Int) chan big.Int {
  9. // Count down as each channel closes. When hits zero - close ch.
  10. var wg sync.WaitGroup
  11. wg.Add(len(channels))
  12. // The channel to output to.
  13. ch := make(chan big.Int, len(channels))
  14. // Make one go per channel.
  15. for _, c := range channels {
  16. go func(c &lt;-chan big.Int) {
  17. // Pump it.
  18. for x := range c {
  19. ch &lt;- x
  20. }
  21. // It closed.
  22. wg.Done()
  23. }(c)
  24. }
  25. // Close the channel when the pumping is finished.
  26. go func() {
  27. // Wait for everyone to be done.
  28. wg.Wait()
  29. // Close.
  30. close(ch)
  31. }()
  32. return ch
  33. }

答案1

得分: 3

你的Mux函数生成的每个goroutine都从同一个通道中获取数据,因为c在循环的每次迭代中都会更新,它们不仅仅捕获了c的值。如果你像下面这样将通道传递给goroutine,你将会得到预期的结果:

  1. for _, c := range channels {
  2. go func(c <-chan big.Int) {
  3. ...
  4. }(c)
  5. }

你可以在这里测试这个修改。

另一个可能的问题是你对n变量的处理:如果你的GOMAXPROCS != 1,可能会有两个goroutine同时尝试更新它。使用sync.WaitGroup类型会更安全地等待goroutine完成。

英文:

Each of your goroutines spawned from Mux ends up pulling from the same channel, since c gets updated on each iteration of the loop &ndash; they don't just capture the value of c. You will get the expected results if you pass the channel to the goroutine like so:

  1. for _, c := range channels {
  2. go func(c &lt;-chan big.Int) {
  3. ...
  4. }(c)
  5. }

You can test this modification here.

One other possible problem is your handling of the n variable: if you're running with GOMAXPROCS != 1, you could have two goroutines trying to update it at once. The sync.WaitGroup type would be a safer way to wait for goroutines to complete.

答案2

得分: 2

我知道有点晚了,但我写了一个实现类似于这个的通用Multiplex函数的包。它使用反射包中的"select"调用来确保高效和平衡的多路复用,而无需锁定或等待组。

  • 代码:https://github.com/eapache/channels
  • 文档:https://godoc.org/github.com/eapache/channels
英文:

A bit after the fact, I know, but I wrote a package which implements a generic Multiplex function similar to this one. It uses the "select" call in the reflection package to ensure efficient and balanced multiplexing without any need for a lock or wait group.

答案3

得分: 0

为了解决在使用range语句时重新赋值的问题,可以按照James Hentridge的回答,使用一种惯用的方法是将一个局部变量赋值给待处理的值:

  1. for _, c := range channels {
  2. c := c
  3. go func() {
  4. ...
  5. }()
  6. }
英文:

To build on James Hentridge answer, an idiomatic way to handle the re-assignement problem when using the range statement is to assign a local variable to the value at stake:

  1. for _, c := range channels {
  2. c := c
  3. go func() {
  4. ...
  5. }()
  6. }

huangapple
  • 本文由 发表于 2013年10月5日 07:44:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/19192377.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定