避免死锁的生产者消费者模型

huangapple go评论85阅读模式
英文:

Go producer consumer avoiding deadlock

问题

我有一个用Go语言编写的消费者和生产者的代码。虽然我已经在这里提问了代码审查的问题here,并且这个想法的很大一部分是从这个线程here中得出的,这是在playground中的code

  • 这段代码有多个生产者和消费者共享同一个通道。
  • 这段代码有一个错误处理机制,如果任何一个工作器(生产者或消费者)出错,那么所有的工作器都应该停止。

我担心的是当所有的消费者都关闭但生产者仍然向共享通道添加数据时可能发生死锁的情况。为了"缓解"这个问题,我在添加数据到数据队列之前添加了一个上下文检查,具体是在go playground中的第85行。

然而,如果在第85行中生产者检查context.Done(),然后取消context导致所有的消费者关闭,然后生产者尝试将数据插入队列,是否仍然可能发生死锁?

如果是这样,如何缓解这个问题。

以下是重新发布的代码:

package main

import (
	"context"
	"fmt"
	"sync"
)

func main() {
	a1 := []int{1, 2, 3, 4, 5}
	a2 := []int{5, 4, 3, 1, 1}
	a3 := []int{6, 7, 8, 9}
	a4 := []int{1, 2, 3, 4, 5}
	a5 := []int{5, 4, 3, 1, 1}
	a6 := []int{6, 7, 18, 9}
	arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}

	ctx, cancel := context.WithCancel(context.Background())
	ch1 := read(ctx, arrayOfArray)

	messageCh := make(chan int)
	errCh := make(chan error)

	producerWg := &sync.WaitGroup{}
	for i := 0; i < 3; i++ {
		producerWg.Add(1)
		producer(ctx, producerWg, ch1, messageCh, errCh)
	}

	consumerWg := &sync.WaitGroup{}
	for i := 0; i < 3; i++ {
		consumerWg.Add(1)
		consumer(ctx, consumerWg, messageCh, errCh)
	}

	firstError := handleAllErrors(ctx, cancel, errCh)

	producerWg.Wait()
	close(messageCh)

	consumerWg.Wait()
	close(errCh)

	fmt.Println(<-firstError)
}

func read(ctx context.Context, arrayOfArray [][]int) <-chan []int {
	ch := make(chan []int)

	go func() {
		defer close(ch)

		for i := 0; i < len(arrayOfArray); i++ {
			select {
			case <-ctx.Done():
				return
			case ch <- arrayOfArray[i]:
			}
		}
	}()

	return ch
}

func producer(ctx context.Context, wg *sync.WaitGroup, in <-chan []int, messageCh chan<- int, errCh chan<- error) {
	go func() {
		defer wg.Done()
		for {
			select {
			case <-ctx.Done():
				return
			case arr, ok := <-in:
				if !ok {
					return
				}

				for i := 0; i < len(arr); i++ {

					// simulating an error.
					//if arr[i] == 10 {
					//	errCh <- fmt.Errorf("producer interrupted")
					//}

					select {
					case <-ctx.Done():
						return
					case messageCh <- 2 * arr[i]:
					}
				}
			}
		}
	}()
}

func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh <-chan int, errCh chan<- error) {
	go func() {
		wg.Done()

		for {
			select {
			case <-ctx.Done():
				return
			case n, ok := <-messageCh:
				if !ok {
					return
				}
				fmt.Println("consumed: ", n)

				// simulating erros
				//if n == 10 {
				//	errCh <- fmt.Errorf("output error during write")
				//}
			}
		}
	}()
}

func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) <-chan error {
	firstErrCh := make(chan error, 1)
	isFirstError := true
	go func() {
		defer close(firstErrCh)
		for err := range errCh {
			select {
			case <-ctx.Done():
			default:
				cancel()
			}
			if isFirstError {
				firstErrCh <- err
				isFirstError = !isFirstError
			}
		}
	}()

	return firstErrCh
}

希望对你有所帮助!如果有任何其他问题,请随时问我。

英文:

I have a code for consumer and producer in go. Although I have asked this question for code-review here and a good part of the idea was derived from this thread here here is the code in playground.

  • This code has multiple producers and consumers sharing the same channel.
  • This code has an error handling mechanism, if any of the workers (producer or consumer) errors out than all the workers should by halted.

I am concerned about deadlock scenario where all consumers are shut but producer is still adding data to shared channel. To "mitigate" this I have added a context check right before adding data into the data queue - specifically Line 85 in go playground.

However is a dead lock still possible if - the producer checks for context.Done() in Line 85, then the context is cancelled causing all consumers to shut down, and then the producer tries to insert data into the queue ?

If so how to mitigate.

Reposting the code:

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;sync&quot;
)
func main() {
a1 := []int{1, 2, 3, 4, 5}
a2 := []int{5, 4, 3, 1, 1}
a3 := []int{6, 7, 8, 9}
a4 := []int{1, 2, 3, 4, 5}
a5 := []int{5, 4, 3, 1, 1}
a6 := []int{6, 7, 18, 9}
arrayOfArray := [][]int{a1, a2, a3, a4, a5, a6}
ctx, cancel := context.WithCancel(context.Background())
ch1 := read(ctx, arrayOfArray)
messageCh := make(chan int)
errCh := make(chan error)
producerWg := &amp;sync.WaitGroup{}
for i := 0; i &lt; 3; i++ {
producerWg.Add(1)
producer(ctx, producerWg, ch1, messageCh, errCh)
}
consumerWg := &amp;sync.WaitGroup{}
for i := 0; i &lt; 3; i++ {
consumerWg.Add(1)
consumer(ctx, consumerWg, messageCh, errCh)
}
firstError := handleAllErrors(ctx, cancel, errCh)
producerWg.Wait()
close(messageCh)
consumerWg.Wait()
close(errCh)
fmt.Println(&lt;-firstError)
}
func read(ctx context.Context, arrayOfArray [][]int) &lt;-chan []int {
ch := make(chan []int)
go func() {
defer close(ch)
for i := 0; i &lt; len(arrayOfArray); i++ {
select {
case &lt;-ctx.Done():
return
case ch &lt;- arrayOfArray[i]:
}
}
}()
return ch
}
func producer(ctx context.Context, wg *sync.WaitGroup, in &lt;-chan []int, messageCh chan&lt;- int, errCh chan&lt;- error) {
go func() {
defer wg.Done()
for {
select {
case &lt;-ctx.Done():
return
case arr, ok := &lt;-in:
if !ok {
return
}
for i := 0; i &lt; len(arr); i++ {
// simulating an error.
//if arr[i] == 10 {
//	errCh &lt;- fmt.Errorf(&quot;producer interrupted&quot;)
//}
select {
case &lt;-ctx.Done():
return
case messageCh &lt;- 2 * arr[i]:
}
}
}
}
}()
}
func consumer(ctx context.Context, wg *sync.WaitGroup, messageCh &lt;-chan int, errCh chan&lt;- error) {
go func() {
wg.Done()
for {
select {
case &lt;-ctx.Done():
return
case n, ok := &lt;-messageCh:
if !ok {
return
}
fmt.Println(&quot;consumed: &quot;, n)
// simulating erros
//if n == 10 {
//	errCh &lt;- fmt.Errorf(&quot;output error during write&quot;)
//}
}
}
}()
}
func handleAllErrors(ctx context.Context, cancel context.CancelFunc, errCh chan error) &lt;-chan error {
firstErrCh := make(chan error, 1)
isFirstError := true
go func() {
defer close(firstErrCh)
for err := range errCh {
select {
case &lt;-ctx.Done():
default:
cancel()
}
if isFirstError {
firstErrCh &lt;- err
isFirstError = !isFirstError
}
}
}()
return firstErrCh
}

答案1

得分: 1

关于死锁:根据你描述的情况,似乎不可能发生死锁。如果生产者能够发送到通道,这意味着有一个消费者可以接收,而且这将是原子操作,因此不可能出现生产者决定发送到通道,但消费者停止的情况。只有在消费者能够消费时,生产者才会发送到通道。

但是,当然还有可能改进的地方。由于你只对第一个错误感兴趣,你可以简单地这样做:

func main() {
  ...
  var firstErr error
  go func() {
      for err := range errCh {
         if firstErr == nil {
           firstErr = err
         }
      }
  }()
  ...
  close(errCh)
  ...
}
英文:

Regarding deadlock: it doesn't appear to be possible in the scenario you describe. If the producer is able to send to the channel, that means there is a consumer that can receive, and this will happen atomically, so it is not possible to have a producer decide to send to the channel but then the consumer stops. The producer sends to the channel only if the consumer can consume.

But then, there are of course possible improvements. Since you are only interested in the first error, you can simply do:

func main() {
...
var firstErr error
go func() {
for err := range errCh {
if firstErr!=nil {
firstErr=err
}
}
}()
...
close(errCh)
...

答案2

得分: 1

不,你没问题,这不会在生产者写入时发生死锁,因为你在通道写入操作中使用了select语句,所以即使通道写入无法发生(因为消费者已经终止),你仍然会触发上下文取消的条件并终止生产者。

为了演示这个概念,你可以运行这段代码,看到尽管它尝试进行没有读取器的通道写入,但不会发生死锁。

package main

import (
	"context"
	"fmt"
	"time"
)

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	ch := make(chan struct{})

	go func() {
		time.Sleep(1 * time.Second)
		cancel()
	}()

	select {
	case ch <- struct{}{}:
	case <-ctx.Done():
		fmt.Println("context canceled")
	}
	fmt.Println("bye!")
}

这是它的playground链接

关于代码简化的问题。如果这是实际代码的一部分,我可能会使用golang.org/x/sync/errgroup中的Group。要么借鉴它们的做法,利用sync.Once使用一个函数包装所有的生产者和消费者,生成goroutine,并且可以处理错误,而不需要在错误处理函数中编写更复杂的错误通道处理代码。

英文:

Nope, you're fine, this won't deadlock on a producer write because you're wrapping the channel writes in a select statement, so even if the channel write can't happen because the consumers have terminated, you'll still hit the context cancellation clause and terminate your producer.

Just to demonstrate the concept, you can run this and see it doesn't deadlock despite the fact that it's attempting a channel write with no readers.

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;time&quot;
)
func main() {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan struct{})
go func() {
time.Sleep(1 * time.Second)
cancel()
}()
select {
case ch &lt;- struct{}{}:
case &lt;-ctx.Done():
fmt.Println(&quot;context canceled&quot;)
}
fmt.Println(&quot;bye!&quot;)
}

Here's the playground link for it.

Regarding some code simplification. If this is for any sort of real-life code, I'd probably just use a Group from golang.org/x/sync/errgroup. Either that or take a cue from them and leverage sync.Once and wrap all of your producers and consumers with a function that spawns the goroutines and can handle the errors without the more complex error channel draining code you have in your error handling function.

huangapple
  • 本文由 发表于 2023年1月21日 03:14:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/75188466.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定