在故障情况下管理生产者-消费者死锁

huangapple go评论70阅读模式
英文:

Managing Producer Consumer deadlock in case of failure

问题

说我有一个在不同例程中的读取器、操作器和消费器的情况:

package main

import (
	"context"
	"fmt"
	"golang.org/x/sync/errgroup"
	"github.com/pkg/errors"
)

func Reader(ctx context.Context, chanFromReader chan int) error {
	defer close(chanFromReader)
	for i := 0; i < 100; i++ {
		select {
		case <-ctx.Done():
			return nil
		case chanFromReader <- i:
		}
	}
	return nil
}

func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
	defer close(chanToWriter)
	for {
		select {
		case <-ctx.Done():
			return nil
		case x, ok := <-chanFromReader:
			if !ok {
				return nil
			}
			chanToWriter <- 2 * x
		}
	}
}

func Writer(ctx context.Context, chanToWriter chan int) error {
	for {
		select {
		case <-ctx.Done():
			return nil
		case x, ok := <-chanToWriter:
			if !ok {
				return nil
			}
			fmt.Println("Writer: ", x)
			if x == 10 {
				return errors.New("Generate some error in writer")
			}
		}
	}
}

func main() {
	g, ctx := errgroup.WithContext(context.Background())
	chanFromReader := make(chan int)
	chanToWriter := make(chan int)

	func(ctx context.Context, chanToWriter chan int) {
		g.Go(func() error {
			return Writer(ctx, chanToWriter)
		})
	}(ctx, chanToWriter)

	func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
		g.Go(func() error {
			return Manipulate(ctx, chanFromReader, chanToWriter)
		})
	}(ctx, chanFromReader, chanToWriter)

	func(ctx context.Context, chanFromReader chan int) {
		g.Go(func() error {
			return Reader(ctx, chanFromReader)
		})
	}(ctx, chanFromReader)

	g.Wait()
	fmt.Println("Main wait done")
}

如果写入器由于某种原因失败,我无法中止其他例程。在上面的示例中,尽管它们监听 ctx 进行取消,但仍会在写入器失败的情况下发生死锁,有没有解决这个问题的方法?

我考虑添加以下内容:

func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
	defer close(chanToWriter)
	for {
		select {
		case <-ctx.Done():
			return nil
		case x, ok := <-chanFromReader:
			if !ok {
				return nil
			}
			select {
			case <-ctx.Done():
				return nil
			case chanToWriter <- 2 * x:
			}
		}
	}
}

这样可以解决问题,但看起来不太干净...

英文:

say I have a case of reader, manipulator, consumer in different routines:

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;golang.org/x/sync/errgroup&quot;
&quot;github.com/pkg/errors&quot;
)
func Reader(ctx context.Context, chanFromReader chan int) error {
defer close(chanFromReader)
for i := 0; i &lt; 100; i++ {
select {
case &lt;-ctx.Done():
return nil
case chanFromReader &lt;- i:
}
}
return nil
}
func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
defer close(chanToWriter)
for {
select {
case &lt;-ctx.Done():
return nil
case x, ok := &lt;-chanFromReader:
if !ok {
return nil
}
chanToWriter &lt;- 2 * x
}
}
}
func Writer(ctx context.Context, chanToWriter chan int) error {
for {
select {
case &lt;-ctx.Done():
return nil
case x, ok := &lt;-chanToWriter:
if !ok {
return nil
}
fmt.Println(&quot;Writer: &quot;, x)
if x == 10 {
return errors.New(&quot;Generate some error in writer&quot;)
}
}
}
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
chanFromReader := make(chan int)
chanToWriter := make(chan int)
func(ctx context.Context, chanToWriter chan int) {
g.Go(func() error {
return Writer(ctx, chanToWriter)
})
}(ctx, chanToWriter)
func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
g.Go(func() error {
return Manipulate(ctx, chanFromReader, chanToWriter)
})
}(ctx, chanFromReader, chanToWriter)
func(ctx context.Context, chanFromReader chan int) {
g.Go(func() error {
return Reader(ctx, chanFromReader)
})
}(ctx, chanFromReader)
g.Wait()
fmt.Println(&quot;Main wait done&quot;)
}

https://play.golang.org/p/whslVE3rzel

In case the writer fails for some reason, I'm having trouble aborting the rest of the routines.
In the example above for instance, though they listen on ctx for cancellation they still deadlock on case of fail in writer, is there a workaround this?

I thought of adding this:

func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
defer close(chanToWriter)
for {
select {
case &lt;-ctx.Done():
return nil
case x, ok := &lt;-chanFromReader:
if !ok {
return nil
}
select {
case &lt;-ctx.Done():
return nil
case chanToWriter &lt;- 2 * x:
}
}
}
}

which solves it, but it looks so unclean...

答案1

得分: 1

我建议的解决方案是,每个通道只能由创建它的代码关闭。可以通过从创建通道的函数返回一个只接收通道来实现这一点,并负责关闭它:

(感谢mh-cbon进一步完善此代码:)

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
)

func read(ctx context.Context) (<-chan int, <-chan error) {
	ch := make(chan int)
	e := make(chan error)

	go func() {
		defer close(e)
		defer close(ch)

		for i := 0; i < 12; i++ {
			select {
			case <-ctx.Done():
				return
			case ch <- i:
			}
		}
	}()
	return ch, e
}

func manipulate(in <-chan int) (<-chan int, <-chan error) {
	ch := make(chan int)
	e := make(chan error)

	go func() {
		defer close(e)
		defer close(ch)

		for n := range in {
			ch <- 2 * n
		}
	}()
	return ch, e
}

func write(in <-chan int) <-chan error {
	e := make(chan error)
	go func() {
		defer close(e)
		for n := range in {
			fmt.Println("written: ", n)
			if n == 10 {
				e <- fmt.Errorf("output error during write")
			}
		}
	}()
	return e
}

func collectErrors(errs ...<-chan error) {
	var wg sync.WaitGroup
	for i := 0; i < len(errs); i++ {
		wg.Add(1)
		go func(errs <-chan error) {
			defer wg.Done()
			for err := range errs {
				log.Printf("%v", err)
			}
		}(errs[i])
	}
	wg.Wait()
}

func main() {
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	ch1, err1 := read(ctx)
	ch2, err2 := manipulate(ch1)
	err3 := write(ch2)

	collectErrors(err1, err2, err3)
	fmt.Println("main wait complete")
}

这样,每个通道都能可靠地关闭,并且来自写入的I/O错误将导致子上下文被取消,关闭其他goroutine。

英文:

I would propose a solution where each channel gets closed only by the code that creates it. This can be enforced by returning a receive-only channel from the function that creates the channel and is responsible for closing it:

(kudos to mh-cbon for further refining this:)

https://play.golang.org/p/Tq4OVW5sSP4

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;sync&quot;
)
func read(ctx context.Context) (&lt;-chan int, &lt;-chan error) {
ch := make(chan int)
e := make(chan error)
go func() {
defer close(e)
defer close(ch)
for i := 0; i &lt; 12; i++ {
select {
case &lt;-ctx.Done():
return
case ch &lt;- i:
}
}
}()
return ch, e
}
func manipulate(in &lt;-chan int) (&lt;-chan int, &lt;-chan error) {
ch := make(chan int)
e := make(chan error)
go func() {
defer close(e)
defer close(ch)
for n := range in {
ch &lt;- 2 * n
}
}()
return ch, e
}
func write(in &lt;-chan int) &lt;-chan error {
e := make(chan error)
go func() {
defer close(e)
for n := range in {
fmt.Println(&quot;written: &quot;, n)
if n == 10 {
e &lt;- fmt.Errorf(&quot;output error during write&quot;)
}
}
}()
return e
}
func collectErrors(errs ...&lt;-chan error) {
var wg sync.WaitGroup
for i := 0; i &lt; len(errs); i++ {
wg.Add(1)
go func(errs &lt;-chan error) {
defer wg.Done()
for err := range errs {
log.Printf(&quot;%v&quot;, err)
}
}(errs[i])
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch1, err1 := read(ctx)
ch2, err2 := manipulate(ch1)
err3 := write(ch2)
collectErrors(err1, err2, err3)
fmt.Println(&quot;main wait complete&quot;)
}

This way, each channel gets closed reliably, and the I/O errors from write will cause the child context to be cancelled, shutting down the other goroutines.

huangapple
  • 本文由 发表于 2021年8月14日 00:40:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/68775773.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定