读取两个具有未知项目数量的通道

huangapple go评论89阅读模式
英文:

Reading two channels with unknown number of items

问题

尝试生成数字并将其发送到另一个函数将读取并执行某些操作,然后根据是否出现错误返回两个通道。我不知道会有多少错误,所以out和errorCh都有一个延迟关闭来表示没有更多的项目。

在这个示例中,我有一个通道,我将数字发送到该通道,然后将该通道传递给一个函数,该函数返回两个通道,即out和errorCh。然后在最后,我从out和errorCh中读取值。

我延迟关闭了in通道,所以当没有更多的值发送到下游运行函数时,它将知道没有更多的数字需要处理。这意味着在从in接收到关闭通道信号后,运行函数将完成?这也会关闭out和errorCh,所以select语句会接收到这两个通道已关闭的信号。我是否没有正确关闭其中一个通道?为什么会出现死锁?

链接:https://go.dev/play/p/hKSkfk0VQ2d

package main

import (
	"errors"
	"fmt"
)

func run(in <-chan int) (chan int, chan error) {
	out := make(chan int)
	errorCh := make(chan error)

	go func() {
		defer close(out)
		defer close(errorCh)

		for i := range in {
			if i%2 == 0 {
				out <- i
			} else {
				errorCh <- errors.New("we don't like odd numbers")
			}
		}
	}()
	return out, errorCh
}

func main() {
	in := make(chan int)
	out := make(chan int)
	errors := make(chan error)

	// 生成要发送到"in"通道的数字
	go func(in chan int) {
		defer close(in)
		for i := 0; i < 10; i++ {
			fmt.Println("  input", i)
			in <- i
		}
	}(in)

	// run函数从"in"通道读取数据,执行某些操作,并返回"out"通道和"errorCh"
	go func(in chan int) {
		out, errors = run(in)
	}(in)

	// 从"out"通道和"errorCh"读取,直到所有"in"通道中的数字都被处理完毕
	for {
		select {
		case i, ok := <-out:
			if !ok { // out通道已关闭
				return // 完成
			}
			fmt.Println("done", i)
		case err, ok := <-errors:
			if !ok {
				return
			}
			if err != nil {
				fmt.Println(err)
			}
		}
	}
}
英文:

Trying to generate numbers and send them into a channel another function will read then do something and return two channels based on if there was an error or not. I don't know how many errors there will be so the out and errorCh have a defer close to signal no more items.

In this example I have a channel that I send numbers into and then pass that channel to a function which returns two channels, out and errorCh. Then at the end I read the values from out and errorCh.

I defer close the in channel so when there are no more values sent in the downstream run function will know there are no more numbers to process. That means the run function will complete after it gets the close channel signal from in? Which would also close the out and errorCh so the select statement would get the signal those two channels are closed. Am I not closing one of the channels properly? Why is there a deadlock?

https://go.dev/play/p/hKSkfk0VQ2d


package main
import (
&quot;errors&quot;
&quot;fmt&quot;
)
func run(in &lt;-chan int) (chan int, chan error) {
out := make(chan int)
errorCh := make(chan error)
go func() {
defer close(out)
defer close(errorCh)
for i := range in {
if i%2 == 0 {
out &lt;- i
} else {
errorCh &lt;- errors.New(&quot;we don&#39;t like odd numbers&quot;)
}
}
}()
return out, errorCh
}
func main() {
in := make(chan int)
out := make(chan int)
errors := make(chan error)
// generate numbers to send to the &quot;in&quot; channel
go func(in chan int) {
defer close(in)
for i := 0; i &lt; 10; i++ {
fmt.Println(&quot;  input&quot;, i)
in &lt;- i
}
}(in)
// run function reads from the &quot;in&quot; channel, does something and returns
// the &quot;out&quot; channel and &quot;errorCh&quot;
go func(in chan int) {
out, errors = run(in)
}(in)
// read from the &quot;out&quot; channel and &quot;errorCh&quot; until all of the numbers
// in the &quot;in&quot; channel have been run
for {
select {
case i, ok := &lt;-out:
if !ok { // out channel is closed
return // Done
}
fmt.Println(&quot;done&quot;, i)
case err, ok := &lt;-errors:
if !ok {
return
}
if err != nil {
fmt.Println(err)
}
}
}
}

答案1

得分: 3

死锁非常有趣,死锁可能发生的原因有多种。其中两个原因是:

  1. 如果你正在从一个还没有准备好放入数据或数据尚未写入的通道中读取数据。

  2. 如果你正在向一个通道中写入数据,但在写入数据时没有其他人在那里读取该数据。

在你的情况下,你在main函数中提到的out通道和你在run函数中提到的out通道是完全不同的。尽管你试图从out, errors = run(in)中复制值,但不用说,只有当run函数中的out <- i被执行时,这才会成功。

现在在main函数中的case i, ok := <-out:处,你正在尝试从out通道中读取数据。现在让我再次提醒你,你在两个goroutine中没有使用相同的out通道,所以Go运行时会在out <- i发生之前尝试从<-out读取值,就像@zerkms所提到的那样。

因此,这属于在写入之前从通道中读取的情况。另一方面,写入部分也在等待某个goroutine来读取。这意味着读取部分正在等待写入,而写入部分正在等待读取,这是一个典型的死锁条件。

因此,准确地说,你的代码可能没有竞争条件,但存在死锁

我猜你正在尝试创建一个生产者消费者模式。请参考以下模板来实现这样的模式。

package main

import (
	"errors"
	"fmt"
)

func Consume(in, out chan int, err chan error) {
	defer close(out)
	defer close(err)
	for i := range in {
		if i%2 == 0 {
			out <- i
		} else {
			err <- errors.New("we don't like odd numbers")
		}
	}
}

func main() {
	in := make(chan int)
	out := make(chan int)
	err := make(chan error)
	go func(in chan int) {
		defer close(in)
		for i := 0; i < 10; i++ {
			fmt.Println("  input", i)
			in <- i
		}
	}(in)

	go Consume(in, out, err)

	for {
		select {
		case i, ok := <-out:
			if !ok {
				return
			}
			fmt.Printf("Processed Data %d\n", i)
		case er, ok := <-err:
			if !ok {
				return
			}
			if er != nil {
				fmt.Println(er)
			}
		}
	}
}

请注意,我是如何在主函数中定义通道并将其传递而不是重新定义通道的。

英文:

Deadlocks are pretty interesting and there are multiple reasons why deadlock might occur. Two of those are

  1. If you are reading from a channel which is not ready to put data or where the data is not yet written to.

  2. If you are writing into a channel but no one is there at the time of writing to read that data.

In you case, the out channel that you have mentioned in the main function and out channel that you have mentioned in run function are entirely different. Although you are trying to copy the value from out, errors = run(in) but needless to say that this would be successful only when out &lt;- i gets executed in the run function.

Now later in the main function at case i, ok := &lt;-out: you are trying to read form the out channel. Now let me remind you again that you haven't use the same out channel in both the go routines so Go runtime simply tries to read the value from &lt;-out even before out &lt;- i can occur as mentioned by @zerkms.

So this falls in to the category of Reading from the channel before it is written. On the other hand the writing part is also waiting for some goroutine to read as well. Which means reading part is waiting for writing and writing part is waiting for reading, a typical deadlock condition.

So to be precise, Your code may not have race but it has deadlocks

I presume you are trying to create a producer consumer pattern. Take a help form the following template to do such.


package main

import (
	&quot;errors&quot;
	&quot;fmt&quot;
)

func Consume(in, out chan int, err chan error) {
	defer close(out)
	defer close(err)
	for i := range in {
		if i%2 == 0 {
			out &lt;- i
		} else {
			err &lt;- errors.New(&quot;we don&#39;t like odd numbers&quot;)
		}
	}
}

func main() {
	in := make(chan int)
	out := make(chan int)
	err := make(chan error)
	go func(in chan int) {
		defer close(in)
		for i := 0; i &lt; 10; i++ {
			fmt.Println(&quot;  input&quot;, i)
			in &lt;- i
		}
	}(in)

	go Consume(in, out, err)

	for {
		select {
		case i, ok := &lt;-out:
			if !ok {
				return
			}
			fmt.Printf(&quot;Processed Data %d\n&quot;, i)
		case er, ok := &lt;-err:
			if !ok {
				return
			}
			if er != nil {
				fmt.Println(er)
			}
		}
	}
}

Notice how I am defining the channels in the main and passing those along instead of redefining the channel

答案2

得分: 1

这里有几个你需要修复的问题。

这段代码导致了@zerkms的竞态条件:

    go func(in chan int) {
        out, errors = run(in)
    }(in)

在你开始在下面的select语句中对这些通道进行选择之前,out, errors必须被设置为run(in)的结果,所以你不希望在一个goroutine中运行它。你在main的开头使用make创建了它们,然后在run中重新使用了make

    out, errors = run(in)

这使得main中对通道outerrorsmake变得多余,因为run将会创建新的通道。

	var out chan int
	var errors chan error
    go func(in chan int) {
     ... 

像这样将通道传递给函数是一种明确作用域的好方法。对于复杂的程序,我避免内联定义非试验性的goroutine函数,以免共享作用域。

除此之外,你的代码将会运行,但是它缺少并行处理的关键组件:你没有进行任何并行操作。你只有一个工作goroutine,即run创建的那个。

作为一个工作负载,确定一个整数的奇偶性是一个很好的概念替代,可以很容易地进行并行化处理。但是你正在逐个处理工作项 - 实际上,通道在多个工作项可以同时处理的情况下才有用。

如果你有多个工作goroutine,你不能在每个工作goroutine中关闭outerror通道。没有一个工作goroutine知道其他工作goroutine是否已经完成处理。实际上,你的goroutine目前都不知道所有的工作goroutine是否已经完成。你知道in已经关闭,但如果有多个工作goroutine,这并不意味着所有in的消息都已经完成处理。

这就是sync.WaitGroup的作用。使用sync.WaitGroup,你可以计算你已经创建的工作goroutine的数量,然后等待它们全部完成。这样可以在所有工作goroutine完成后关闭outerrors

首先,我们将重写工作goroutine,使其能够被多次调用。我们的工作goroutine本质上是相同的,但是我们不再关闭outerrs,而是使用wg.Done()来表示我们已经完成。

func worker(wg *sync.WaitGroup, in <-chan int, out chan<- int, errs chan<- error) {
	defer wg.Done()
	for i := range in {
		if i%2 == 0 {
			out <- i
		} else {
			errs <- fmt.Errorf("we don't like odd numbers (%d)", i)
		}
	}
}

现在我们需要创建一些工作goroutine。

	var in = make(chan int)
	var out = make(chan int)
	var errs = make(chan error)

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go worker(&wg, in, out, errs)
	}

还有一个goroutine用于等待等待组完成后关闭outerrs

	go func() {
		wg.Wait()
		close(errs)
		close(out)
	}()

我们还需要调整这两个通道的行为:

        case i, ok := <-out:
            if !ok { // out通道已关闭
                return // 完成
            }

首先,进行一个更正:

            if !ok { // out通道已关闭

!ok表示通道已关闭且为空。

对于一个工作goroutine,关闭outerrors都是表示程序完成的等效信号,你知道不会再有更多的发送操作,因为这个工作goroutine正在退出。但是对于多个工作goroutine,我们不能确定所有通道的所有消息都已经接收完毕。如果outerrors中的一个为空且已关闭,程序将在另一个通道的消息被接收之前结束。

相反,使用Go的一个巧妙特性:select不会从一个空的通道中读取。因此,当通道关闭并变为空时,我们可以将它们设置为nil。当它们都是nil时,我们完成了。

	for errs != nil || out != nil {
		select {
		case i, ok := <-out:
			if !ok { // out通道已关闭
				out = nil
			}
			fmt.Println("完成", i)
		case err, ok := <-errs:
			if !ok {
				errs = nil
			}
			if err != nil {
				fmt.Println(err)
			}
		}
	}

这是完整的代码

像这样同时使用等待组和通道是一种常见的做法。通常,我发现使用一个返回类型,其中包含输入项、结果和错误,而不是使用两个返回通道(一个用于结果,一个用于错误),更简单和更有用。

type workResult struct {
   err error
   parity bool
   input int
}

在并行化工作时,通常可以将输入与错误关联起来是很有用的。然后,你就不需要进行任何select操作,收集结果看起来更简单:

for res := range results {
   if res.err != nil {  
      // 处理错误...
   } else {
      // 对结果进行操作
   }
}

当处理多个错误时,你可以考虑使用https://pkg.go.dev/github.com/hashicorp/go-multierror,它是一种将错误收集和合并为一个最终值的有用方法,同时仍然允许成功的工作继续进行。

英文:

There's a few things here you have to fix.

This is causing @zerkms's race condition:

    go func(in chan int) {
out, errors = run(in)
}(in)

out, errors must be set to the results of run(in) before you start selecting on those channels below, so you don't want to run it in a goroutine. You maked them at the beginning of main, and then remaked them in run.

    out, errors = run(in)

this makes main's makeing of channels out and errors wasteful, since run is going to create new ones.

	var out chan int
var errors chan error
    go func(in chan int) {
... 

Passing the channel into the function like this is a good way to make the scope explicit. For complex programs I avoid defining nontriial goroutine functions inline sos that it doesn't share scope.

Other than that, your code will run, but it's missing a key component of parallel processing: you aren't doing anything in parallel. You only have one worker goroutine, the one run creates.

As a workload, determining an integer's parity is a good conceptual stand-in for easily parallelized work. But you're processing work one item at a time - in reality, channels are useful in situations when multiple items of work can be processed at once.

If you have multiple workers, you can't close the out and error channels in each worker. No worker knows whether the others are done processing. In fact, none of your goroutines currently know whether all workers are done. You know that in is closed, but if there are multiple workers, this does not mean all of in's messages are finished processing.

This is where sync.WaitGroup comes in. With a sync.WaitGroup you can count the number of workers you've spawned, then wait for them all to be done. This allows you to close out and errors after all workers are done.

First we'll rewrite the worker to be able to be invoked many times. Our worker is essentially the same, but instead of closing out and errs, we
signal we are complete with wg.Done().

func worker(wg *sync.WaitGroup, in &lt;-chan int, out chan&lt;- int, errs chan&lt;- error) {
defer wg.Done()
for i := range in {
if i%2 == 0 {
out &lt;- i
} else {
errs &lt;- fmt.Errorf(&quot;we don&#39;t like odd numbers (%d)&quot;, i)
}
}
}

Now we need to create some workers

	var in = make(chan int)
var out = make(chan int)
var errs = make(chan error)
var wg sync.WaitGroup
for i := 0; i &lt; 10; i++ {
wg.Add(1)
go worker(&amp;wg, in, out, errs)
}

And a goroutine to wait for the wait group before closing out and errs

	go func() {
wg.Wait()
close(errs)
close(out)
}()

We also need to adjust this behavior for both channels:

        case i, ok := &lt;-out:
if !ok { // out channel is closed
return // Done
}

first, a correction:

            if !ok { // out channel is closed

!ok means that the channel is closed and empty.

With one worker, closing out or errors are equivalent signals that the program is done, and you know there will be no more sends on the channel because the one worker is exiting. But with many workers, we can't be sure all messages from all channels have been .If one of out and errors are empty and closed, the program ends before the other channel's messages are received.

Instead, use a neat feature of Go: select won't read from a nil channel. So as the channels close and become empty, we can set them to nil . When they're both nil, we're done.

	for errs != nil || out != nil {
select {
case i, ok := &lt;-out:
if !ok { // out channel is closed
out = nil
}
fmt.Println(&quot;done&quot;, i)
case err, ok := &lt;-errs:
if !ok {
errs = nil
}
if err != nil {
fmt.Println(err)
}
}
}

Here's the whole thing.

Using wait groups and channels together like this is a common practice. Typically I find it simpler and more useful to have a return type consisting of input item, reuslt, and error instead of having two return channels, one for results and one for errors.

type workResult struct {
err error
parity bool
input int
}

Often when parallelizing work it's useful to be able to associate the input with the error. Then you don't have to do any selecting either and collecting results looks more like a simple:

for res := range results {
if res.err != nil {  
// handle error ... 
} else {
// do whatever with results
}
}

When you're handling multiple errors you might consider using https://pkg.go.dev/github.com/hashicorp/go-multierror which is a useful way to collect and coalesce errors into one final value while still allowing successful work to continue.

huangapple
  • 本文由 发表于 2023年3月29日 05:24:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/75871205.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定