
huangapple go评论75阅读模式

Reading two channels with unknown number of items






package main

import (

func run(in <-chan int) (chan int, chan error) {
	out := make(chan int)
	errorCh := make(chan error)

	go func() {
		defer close(out)
		defer close(errorCh)

		for i := range in {
			if i%2 == 0 {
				out <- i
			} else {
				errorCh <- errors.New("we don't like odd numbers")
	return out, errorCh

func main() {
	in := make(chan int)
	out := make(chan int)
	errors := make(chan error)

	// 生成要发送到"in"通道的数字
	go func(in chan int) {
		defer close(in)
		for i := 0; i < 10; i++ {
			fmt.Println("  input", i)
			in <- i

	// run函数从"in"通道读取数据,执行某些操作,并返回"out"通道和"errorCh"
	go func(in chan int) {
		out, errors = run(in)

	// 从"out"通道和"errorCh"读取,直到所有"in"通道中的数字都被处理完毕
	for {
		select {
		case i, ok := <-out:
			if !ok { // out通道已关闭
				return // 完成
			fmt.Println("done", i)
		case err, ok := <-errors:
			if !ok {
			if err != nil {

Trying to generate numbers and send them into a channel another function will read then do something and return two channels based on if there was an error or not. I don't know how many errors there will be so the out and errorCh have a defer close to signal no more items.

In this example I have a channel that I send numbers into and then pass that channel to a function which returns two channels, out and errorCh. Then at the end I read the values from out and errorCh.

I defer close the in channel so when there are no more values sent in the downstream run function will know there are no more numbers to process. That means the run function will complete after it gets the close channel signal from in? Which would also close the out and errorCh so the select statement would get the signal those two channels are closed. Am I not closing one of the channels properly? Why is there a deadlock?


package main
import (
func run(in &lt;-chan int) (chan int, chan error) {
out := make(chan int)
errorCh := make(chan error)
go func() {
defer close(out)
defer close(errorCh)
for i := range in {
if i%2 == 0 {
out &lt;- i
} else {
errorCh &lt;- errors.New(&quot;we don&#39;t like odd numbers&quot;)
return out, errorCh
func main() {
in := make(chan int)
out := make(chan int)
errors := make(chan error)
// generate numbers to send to the &quot;in&quot; channel
go func(in chan int) {
defer close(in)
for i := 0; i &lt; 10; i++ {
fmt.Println(&quot;  input&quot;, i)
in &lt;- i
// run function reads from the &quot;in&quot; channel, does something and returns
// the &quot;out&quot; channel and &quot;errorCh&quot;
go func(in chan int) {
out, errors = run(in)
// read from the &quot;out&quot; channel and &quot;errorCh&quot; until all of the numbers
// in the &quot;in&quot; channel have been run
for {
select {
case i, ok := &lt;-out:
if !ok { // out channel is closed
return // Done
fmt.Println(&quot;done&quot;, i)
case err, ok := &lt;-errors:
if !ok {
if err != nil {


得分: 3


  1. 如果你正在从一个还没有准备好放入数据或数据尚未写入的通道中读取数据。

  2. 如果你正在向一个通道中写入数据,但在写入数据时没有其他人在那里读取该数据。

在你的情况下,你在main函数中提到的out通道和你在run函数中提到的out通道是完全不同的。尽管你试图从out, errors = run(in)中复制值,但不用说,只有当run函数中的out <- i被执行时,这才会成功。

现在在main函数中的case i, ok := <-out:处,你正在尝试从out通道中读取数据。现在让我再次提醒你,你在两个goroutine中没有使用相同的out通道,所以Go运行时会在out <- i发生之前尝试从<-out读取值,就像@zerkms所提到的那样。




package main

import (

func Consume(in, out chan int, err chan error) {
	defer close(out)
	defer close(err)
	for i := range in {
		if i%2 == 0 {
			out <- i
		} else {
			err <- errors.New("we don't like odd numbers")

func main() {
	in := make(chan int)
	out := make(chan int)
	err := make(chan error)
	go func(in chan int) {
		defer close(in)
		for i := 0; i < 10; i++ {
			fmt.Println("  input", i)
			in <- i

	go Consume(in, out, err)

	for {
		select {
		case i, ok := <-out:
			if !ok {
			fmt.Printf("Processed Data %d\n", i)
		case er, ok := <-err:
			if !ok {
			if er != nil {



Deadlocks are pretty interesting and there are multiple reasons why deadlock might occur. Two of those are

  1. If you are reading from a channel which is not ready to put data or where the data is not yet written to.

  2. If you are writing into a channel but no one is there at the time of writing to read that data.

In you case, the out channel that you have mentioned in the main function and out channel that you have mentioned in run function are entirely different. Although you are trying to copy the value from out, errors = run(in) but needless to say that this would be successful only when out &lt;- i gets executed in the run function.

Now later in the main function at case i, ok := &lt;-out: you are trying to read form the out channel. Now let me remind you again that you haven't use the same out channel in both the go routines so Go runtime simply tries to read the value from &lt;-out even before out &lt;- i can occur as mentioned by @zerkms.

So this falls in to the category of Reading from the channel before it is written. On the other hand the writing part is also waiting for some goroutine to read as well. Which means reading part is waiting for writing and writing part is waiting for reading, a typical deadlock condition.

So to be precise, Your code may not have race but it has deadlocks

I presume you are trying to create a producer consumer pattern. Take a help form the following template to do such.

package main

import (

func Consume(in, out chan int, err chan error) {
	defer close(out)
	defer close(err)
	for i := range in {
		if i%2 == 0 {
			out &lt;- i
		} else {
			err &lt;- errors.New(&quot;we don&#39;t like odd numbers&quot;)

func main() {
	in := make(chan int)
	out := make(chan int)
	err := make(chan error)
	go func(in chan int) {
		defer close(in)
		for i := 0; i &lt; 10; i++ {
			fmt.Println(&quot;  input&quot;, i)
			in &lt;- i

	go Consume(in, out, err)

	for {
		select {
		case i, ok := &lt;-out:
			if !ok {
			fmt.Printf(&quot;Processed Data %d\n&quot;, i)
		case er, ok := &lt;-err:
			if !ok {
			if er != nil {

Notice how I am defining the channels in the main and passing those along instead of redefining the channel


得分: 1



    go func(in chan int) {
        out, errors = run(in)

在你开始在下面的select语句中对这些通道进行选择之前,out, errors必须被设置为run(in)的结果,所以你不希望在一个goroutine中运行它。你在main的开头使用make创建了它们,然后在run中重新使用了make

    out, errors = run(in)


	var out chan int
	var errors chan error
    go func(in chan int) {



作为一个工作负载,确定一个整数的奇偶性是一个很好的概念替代,可以很容易地进行并行化处理。但是你正在逐个处理工作项 - 实际上,通道在多个工作项可以同时处理的情况下才有用。




func worker(wg *sync.WaitGroup, in <-chan int, out chan<- int, errs chan<- error) {
	defer wg.Done()
	for i := range in {
		if i%2 == 0 {
			out <- i
		} else {
			errs <- fmt.Errorf("we don't like odd numbers (%d)", i)


	var in = make(chan int)
	var out = make(chan int)
	var errs = make(chan error)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		go worker(&wg, in, out, errs)


	go func() {


        case i, ok := <-out:
            if !ok { // out通道已关闭
                return // 完成


            if !ok { // out通道已关闭




	for errs != nil || out != nil {
		select {
		case i, ok := <-out:
			if !ok { // out通道已关闭
				out = nil
			fmt.Println("完成", i)
		case err, ok := <-errs:
			if !ok {
				errs = nil
			if err != nil {



type workResult struct {
   err error
   parity bool
   input int


for res := range results {
   if res.err != nil {  
      // 处理错误...
   } else {
      // 对结果进行操作



There's a few things here you have to fix.

This is causing @zerkms's race condition:

    go func(in chan int) {
out, errors = run(in)

out, errors must be set to the results of run(in) before you start selecting on those channels below, so you don't want to run it in a goroutine. You maked them at the beginning of main, and then remaked them in run.

    out, errors = run(in)

this makes main's makeing of channels out and errors wasteful, since run is going to create new ones.

	var out chan int
var errors chan error
    go func(in chan int) {

Passing the channel into the function like this is a good way to make the scope explicit. For complex programs I avoid defining nontriial goroutine functions inline sos that it doesn't share scope.

Other than that, your code will run, but it's missing a key component of parallel processing: you aren't doing anything in parallel. You only have one worker goroutine, the one run creates.

As a workload, determining an integer's parity is a good conceptual stand-in for easily parallelized work. But you're processing work one item at a time - in reality, channels are useful in situations when multiple items of work can be processed at once.

If you have multiple workers, you can't close the out and error channels in each worker. No worker knows whether the others are done processing. In fact, none of your goroutines currently know whether all workers are done. You know that in is closed, but if there are multiple workers, this does not mean all of in's messages are finished processing.

This is where sync.WaitGroup comes in. With a sync.WaitGroup you can count the number of workers you've spawned, then wait for them all to be done. This allows you to close out and errors after all workers are done.

First we'll rewrite the worker to be able to be invoked many times. Our worker is essentially the same, but instead of closing out and errs, we
signal we are complete with wg.Done().

func worker(wg *sync.WaitGroup, in &lt;-chan int, out chan&lt;- int, errs chan&lt;- error) {
defer wg.Done()
for i := range in {
if i%2 == 0 {
out &lt;- i
} else {
errs &lt;- fmt.Errorf(&quot;we don&#39;t like odd numbers (%d)&quot;, i)

Now we need to create some workers

	var in = make(chan int)
var out = make(chan int)
var errs = make(chan error)
var wg sync.WaitGroup
for i := 0; i &lt; 10; i++ {
go worker(&amp;wg, in, out, errs)

And a goroutine to wait for the wait group before closing out and errs

	go func() {

We also need to adjust this behavior for both channels:

        case i, ok := &lt;-out:
if !ok { // out channel is closed
return // Done

first, a correction:

            if !ok { // out channel is closed

!ok means that the channel is closed and empty.

With one worker, closing out or errors are equivalent signals that the program is done, and you know there will be no more sends on the channel because the one worker is exiting. But with many workers, we can't be sure all messages from all channels have been .If one of out and errors are empty and closed, the program ends before the other channel's messages are received.

Instead, use a neat feature of Go: select won't read from a nil channel. So as the channels close and become empty, we can set them to nil . When they're both nil, we're done.

	for errs != nil || out != nil {
select {
case i, ok := &lt;-out:
if !ok { // out channel is closed
out = nil
fmt.Println(&quot;done&quot;, i)
case err, ok := &lt;-errs:
if !ok {
errs = nil
if err != nil {

Here's the whole thing.

Using wait groups and channels together like this is a common practice. Typically I find it simpler and more useful to have a return type consisting of input item, reuslt, and error instead of having two return channels, one for results and one for errors.

type workResult struct {
err error
parity bool
input int

Often when parallelizing work it's useful to be able to associate the input with the error. Then you don't have to do any selecting either and collecting results looks more like a simple:

for res := range results {
if res.err != nil {  
// handle error ... 
} else {
// do whatever with results

When you're handling multiple errors you might consider using https://pkg.go.dev/github.com/hashicorp/go-multierror which is a useful way to collect and coalesce errors into one final value while still allowing successful work to continue.

  • 本文由 发表于 2023年3月29日 05:24:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/75871205.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
