如何编写更好的双通道选择器

huangapple go评论79阅读模式
英文:

How to write a better two channel select

问题

在以下代码中,有两个通道A和B,它们包含工作任务。在实际代码中,它们是不同的结构体。工作者需要在退出之前排空这两个通道。工作者需要处理来自这两个通道的信息。两个select语句可以工作,但是它们非常笨拙。如果我添加default:使它们非阻塞,那么代码将无法排空通道。有没有更好的编写select语句的方法?

现在,如果通道A没有工作任务,那么通道B也不会被处理。这是另一个要解决的问题,但不是我主要关注的问题。

以下是用于测试以下代码的playground

package main

import (
	"fmt"
	"time"
)

const (
	fillCount  = 10 // 每个输入通道中的元素数量
	numWorkers = 3  // 消费者数量
)

func Wait() {
	time.Sleep(2000 * time.Millisecond)
}

func fillChannel(work chan string, name string) {
	for i := 0; i < fillCount; i++ {
		work <- fmt.Sprintf("%s%d", name, i)
	}
	close(work) // 我们完成了
}

func doWork(id int, ch1 chan string, ch2 chan string, done chan bool) {
	fmt.Println("Running worker", id)
	defer fmt.Println("Ending worker", id)

	for ch1Open, ch2Open := true, true; ch1Open && ch2Open; {
		cnt1 := len(ch1)
		cnt2 := len(ch2)

		if ch1Open {
			select {
			case str, more := <-ch1:
				if more {
					fmt.Printf("%d: ch1(%d) %s\n", id, cnt1, str)
				} else {
					fmt.Printf("%d: ch1 closed\n", id)
					ch1Open = false
				}
			}
		}

		if ch2Open {
			select {
			case str, more := <-ch2:
				if more {
					fmt.Printf("%d: ch2(%d) %s\n", id, cnt2, str)
				} else {
					fmt.Printf("%d: ch2 closed\n", id)
					ch2Open = false
				}
			}
		}
	}
	done <- true
}

func main() {

	a := make(chan string, 2) // 一个小的通道
	b := make(chan string, 5) // 一个较大的通道

	// 生成工作任务
	go fillChannel(a, "A")
	go fillChannel(b, "B")

	// 启动消费者
	done := make(chan bool)

	for i := 0; i < numWorkers; i++ {
		go doWork(i, a, b, done)
	}

	// 等待goroutine完成
	for i := 0; i < numWorkers; i++ {
		<-done
	}
	fmt.Println("All workers done.")

	Wait() // 如果没有这个,工作者的延迟打印语句将不会刷新
}
英文:

In the following code there are two channels A & B that contain work, in the real code they are different structures, the workers need to drain both channels before quitting. The workers need the information coming in from both channels. The two select statements work but it's very clumsy. If I add default: to make them non-blocking then the code fails to drain the channels. Is there a better way of writing the selects?

Right now if channel A has no work then channel B does not get serviced either. Another problem to solve, but not my main concern.

playground for testing following code:

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
const (
fillCount  = 10 // number of elements in each input channel
numWorkers = 3  // number of consumers.
)
func Wait() {
time.Sleep(2000 * time.Millisecond)
}
func fillChannel(work chan string, name string) {
for i := 0; i &lt; fillCount; i++ {
work &lt;- fmt.Sprintf(&quot;%s%d&quot;, name, i)
}
close(work) // we&#39;re finished
}
func doWork(id int, ch1 chan string, ch2 chan string, done chan bool) {
fmt.Println(&quot;Running worker&quot;, id)
defer fmt.Println(&quot;Ending worker&quot;, id)
for ch1Open, ch2Open := true, true; ch1Open &amp;&amp; ch2Open; {
cnt1 := len(ch1)
cnt2 := len(ch2)
if ch1Open {
select {
case str, more := &lt;-ch1:
if more {
fmt.Printf(&quot;%d: ch1(%d) %s\n&quot;, id, cnt1, str)
} else {
fmt.Printf(&quot;%d: ch1 closed\n&quot;, id)
ch1Open = false
}
}
}
if ch2Open {
select {
case str, more := &lt;-ch2:
if more {
fmt.Printf(&quot;%d: ch2(%d) %s\n&quot;, id, cnt2, str)
} else {
fmt.Printf(&quot;%d: ch2 closed\n&quot;, id)
ch2Open = false
}
}
}
}
done &lt;- true
}
func main() {
a := make(chan string, 2) // a small channel
b := make(chan string, 5) // a bigger channel
// generate work
go fillChannel(a, &quot;A&quot;)
go fillChannel(b, &quot;B&quot;)
// launch the consumers
done := make(chan bool)
for i := 0; i &lt; numWorkers; i++ {
go doWork(i, a, b, done)
}
// wait for the goroutines to finish.
for i := 0; i &lt; numWorkers; i++ {
&lt;-done
}
fmt.Println(&quot;All workers done.&quot;)
Wait() // without this the defered prints from the workers doesn&#39;t flush
}

答案1

得分: 8

在循环中选择两个通道。当一个通道关闭时,将通道变量设置为nil,使得该通道上的接收操作不可用。当两个通道都为nil时,退出循环。

package main

import (
    "fmt"
    "time"
)

const (
    fillCount  = 10 // 每个输入通道中的元素数量
    numWorkers = 3  // 消费者数量
)

func fillChannel(work chan string, name string) {
    for i := 0; i < fillCount; i++ {
        work <- fmt.Sprintf("%s%d", name, i)
    }
    close(work) // 我们完成了
}

func doWork(id int, ch1 chan string, ch2 chan string, done chan bool) {
    fmt.Println("Running worker", id)
    for ch1 != nil || ch2 != nil {
        select {
        case str, ok := <-ch1:
            if ok {
                fmt.Printf("%d: ch1(%d) %s\n", id, len(ch1), str)
            } else {
                ch1 = nil
                fmt.Printf("%d: ch1 closed\n", id)
            }

        case str, ok := <-ch2:
            if ok {
                fmt.Printf("%d: ch2(%d) %s\n", id, len(ch2), str)
            } else {
                ch2 = nil
                fmt.Printf("%d: ch2 closed\n", id)
            }

        }
    }
    fmt.Println("Ending worker", id)
    done <- true
}

func main() {

    a := make(chan string, 2) // 一个小的通道
    b := make(chan string, 5) // 一个较大的通道

    // 生成任务
    go fillChannel(a, "A")
    go fillChannel(b, "B")

    // 启动消费者
    done := make(chan bool)

    for i := 0; i < numWorkers; i++ {
        go doWork(i, a, b, done)
    }

    // 等待goroutine完成
    for i := 0; i < numWorkers; i++ {
        <-done
    }
    fmt.Println("所有工作已完成。")
}
英文:

Select on both channels in a loop. When a channel is closed, set the channel variable to nil to make receive on that channel not ready. Break out of the loop when both channels are nil.

http://play.golang.org/p/9gRY1yKqJ9

package main
import (
&quot;fmt&quot;
&quot;time&quot;
)
const (
fillCount  = 10 // number of elements in each input channel
numWorkers = 3  // number of consumers.
)
func fillChannel(work chan string, name string) {
for i := 0; i &lt; fillCount; i++ {
work &lt;- fmt.Sprintf(&quot;%s%d&quot;, name, i)
}
close(work) // we&#39;re finished
}
func doWork(id int, ch1 chan string, ch2 chan string, done chan bool) {
fmt.Println(&quot;Running worker&quot;, id)
for ch1 != nil || ch2 != nil {
select {
case str, ok := &lt;-ch1:
if ok {
fmt.Printf(&quot;%d: ch1(%d) %s\n&quot;, id, len(ch1), str)
} else {
ch1 = nil
fmt.Printf(&quot;%d: ch1 closed\n&quot;, id)
}
case str, ok := &lt;-ch2:
if ok {
fmt.Printf(&quot;%d: ch2(%d) %s\n&quot;, id, len(ch2), str)
} else {
ch2 = nil
fmt.Printf(&quot;%d: ch2 closed\n&quot;, id)
}
}
}
fmt.Println(&quot;Ending worker&quot;, id)
done &lt;- true
}
func main() {
a := make(chan string, 2) // a small channel
b := make(chan string, 5) // a bigger channel
// generate work
go fillChannel(a, &quot;A&quot;)
go fillChannel(b, &quot;B&quot;)
// launch the consumers
done := make(chan bool)
for i := 0; i &lt; numWorkers; i++ {
go doWork(i, a, b, done)
}
// wait for the goroutines to finish.
for i := 0; i &lt; numWorkers; i++ {
&lt;-done
}
fmt.Println(&quot;All workers done.&quot;)
}

huangapple
  • 本文由 发表于 2014年9月18日 23:53:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/25917250.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定