在故障情况下管理生产者-消费者死锁

huangapple go评论111阅读模式
英文:

Managing Producer Consumer deadlock in case of failure

问题

说我有一个在不同例程中的读取器、操作器和消费器的情况:

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "golang.org/x/sync/errgroup"
  6. "github.com/pkg/errors"
  7. )
  8. func Reader(ctx context.Context, chanFromReader chan int) error {
  9. defer close(chanFromReader)
  10. for i := 0; i < 100; i++ {
  11. select {
  12. case <-ctx.Done():
  13. return nil
  14. case chanFromReader <- i:
  15. }
  16. }
  17. return nil
  18. }
  19. func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
  20. defer close(chanToWriter)
  21. for {
  22. select {
  23. case <-ctx.Done():
  24. return nil
  25. case x, ok := <-chanFromReader:
  26. if !ok {
  27. return nil
  28. }
  29. chanToWriter <- 2 * x
  30. }
  31. }
  32. }
  33. func Writer(ctx context.Context, chanToWriter chan int) error {
  34. for {
  35. select {
  36. case <-ctx.Done():
  37. return nil
  38. case x, ok := <-chanToWriter:
  39. if !ok {
  40. return nil
  41. }
  42. fmt.Println("Writer: ", x)
  43. if x == 10 {
  44. return errors.New("Generate some error in writer")
  45. }
  46. }
  47. }
  48. }
  49. func main() {
  50. g, ctx := errgroup.WithContext(context.Background())
  51. chanFromReader := make(chan int)
  52. chanToWriter := make(chan int)
  53. func(ctx context.Context, chanToWriter chan int) {
  54. g.Go(func() error {
  55. return Writer(ctx, chanToWriter)
  56. })
  57. }(ctx, chanToWriter)
  58. func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
  59. g.Go(func() error {
  60. return Manipulate(ctx, chanFromReader, chanToWriter)
  61. })
  62. }(ctx, chanFromReader, chanToWriter)
  63. func(ctx context.Context, chanFromReader chan int) {
  64. g.Go(func() error {
  65. return Reader(ctx, chanFromReader)
  66. })
  67. }(ctx, chanFromReader)
  68. g.Wait()
  69. fmt.Println("Main wait done")
  70. }

如果写入器由于某种原因失败,我无法中止其他例程。在上面的示例中,尽管它们监听 ctx 进行取消,但仍会在写入器失败的情况下发生死锁,有没有解决这个问题的方法?

我考虑添加以下内容:

  1. func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
  2. defer close(chanToWriter)
  3. for {
  4. select {
  5. case <-ctx.Done():
  6. return nil
  7. case x, ok := <-chanFromReader:
  8. if !ok {
  9. return nil
  10. }
  11. select {
  12. case <-ctx.Done():
  13. return nil
  14. case chanToWriter <- 2 * x:
  15. }
  16. }
  17. }
  18. }

这样可以解决问题,但看起来不太干净...

英文:

say I have a case of reader, manipulator, consumer in different routines:

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;fmt&quot;
  5. &quot;golang.org/x/sync/errgroup&quot;
  6. &quot;github.com/pkg/errors&quot;
  7. )
  8. func Reader(ctx context.Context, chanFromReader chan int) error {
  9. defer close(chanFromReader)
  10. for i := 0; i &lt; 100; i++ {
  11. select {
  12. case &lt;-ctx.Done():
  13. return nil
  14. case chanFromReader &lt;- i:
  15. }
  16. }
  17. return nil
  18. }
  19. func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
  20. defer close(chanToWriter)
  21. for {
  22. select {
  23. case &lt;-ctx.Done():
  24. return nil
  25. case x, ok := &lt;-chanFromReader:
  26. if !ok {
  27. return nil
  28. }
  29. chanToWriter &lt;- 2 * x
  30. }
  31. }
  32. }
  33. func Writer(ctx context.Context, chanToWriter chan int) error {
  34. for {
  35. select {
  36. case &lt;-ctx.Done():
  37. return nil
  38. case x, ok := &lt;-chanToWriter:
  39. if !ok {
  40. return nil
  41. }
  42. fmt.Println(&quot;Writer: &quot;, x)
  43. if x == 10 {
  44. return errors.New(&quot;Generate some error in writer&quot;)
  45. }
  46. }
  47. }
  48. }
  49. func main() {
  50. g, ctx := errgroup.WithContext(context.Background())
  51. chanFromReader := make(chan int)
  52. chanToWriter := make(chan int)
  53. func(ctx context.Context, chanToWriter chan int) {
  54. g.Go(func() error {
  55. return Writer(ctx, chanToWriter)
  56. })
  57. }(ctx, chanToWriter)
  58. func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
  59. g.Go(func() error {
  60. return Manipulate(ctx, chanFromReader, chanToWriter)
  61. })
  62. }(ctx, chanFromReader, chanToWriter)
  63. func(ctx context.Context, chanFromReader chan int) {
  64. g.Go(func() error {
  65. return Reader(ctx, chanFromReader)
  66. })
  67. }(ctx, chanFromReader)
  68. g.Wait()
  69. fmt.Println(&quot;Main wait done&quot;)
  70. }

https://play.golang.org/p/whslVE3rzel

In case the writer fails for some reason, I'm having trouble aborting the rest of the routines.
In the example above for instance, though they listen on ctx for cancellation they still deadlock on case of fail in writer, is there a workaround this?

I thought of adding this:

  1. func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
  2. defer close(chanToWriter)
  3. for {
  4. select {
  5. case &lt;-ctx.Done():
  6. return nil
  7. case x, ok := &lt;-chanFromReader:
  8. if !ok {
  9. return nil
  10. }
  11. select {
  12. case &lt;-ctx.Done():
  13. return nil
  14. case chanToWriter &lt;- 2 * x:
  15. }
  16. }
  17. }
  18. }

which solves it, but it looks so unclean...

答案1

得分: 1

我建议的解决方案是,每个通道只能由创建它的代码关闭。可以通过从创建通道的函数返回一个只接收通道来实现这一点,并负责关闭它:

(感谢mh-cbon进一步完善此代码:)

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "log"
  6. "sync"
  7. )
  8. func read(ctx context.Context) (<-chan int, <-chan error) {
  9. ch := make(chan int)
  10. e := make(chan error)
  11. go func() {
  12. defer close(e)
  13. defer close(ch)
  14. for i := 0; i < 12; i++ {
  15. select {
  16. case <-ctx.Done():
  17. return
  18. case ch <- i:
  19. }
  20. }
  21. }()
  22. return ch, e
  23. }
  24. func manipulate(in <-chan int) (<-chan int, <-chan error) {
  25. ch := make(chan int)
  26. e := make(chan error)
  27. go func() {
  28. defer close(e)
  29. defer close(ch)
  30. for n := range in {
  31. ch <- 2 * n
  32. }
  33. }()
  34. return ch, e
  35. }
  36. func write(in <-chan int) <-chan error {
  37. e := make(chan error)
  38. go func() {
  39. defer close(e)
  40. for n := range in {
  41. fmt.Println("written: ", n)
  42. if n == 10 {
  43. e <- fmt.Errorf("output error during write")
  44. }
  45. }
  46. }()
  47. return e
  48. }
  49. func collectErrors(errs ...<-chan error) {
  50. var wg sync.WaitGroup
  51. for i := 0; i < len(errs); i++ {
  52. wg.Add(1)
  53. go func(errs <-chan error) {
  54. defer wg.Done()
  55. for err := range errs {
  56. log.Printf("%v", err)
  57. }
  58. }(errs[i])
  59. }
  60. wg.Wait()
  61. }
  62. func main() {
  63. ctx, cancel := context.WithCancel(context.Background())
  64. defer cancel()
  65. ch1, err1 := read(ctx)
  66. ch2, err2 := manipulate(ch1)
  67. err3 := write(ch2)
  68. collectErrors(err1, err2, err3)
  69. fmt.Println("main wait complete")
  70. }

这样,每个通道都能可靠地关闭,并且来自写入的I/O错误将导致子上下文被取消,关闭其他goroutine。

英文:

I would propose a solution where each channel gets closed only by the code that creates it. This can be enforced by returning a receive-only channel from the function that creates the channel and is responsible for closing it:

(kudos to mh-cbon for further refining this:)

https://play.golang.org/p/Tq4OVW5sSP4

  1. package main
  2. import (
  3. &quot;context&quot;
  4. &quot;fmt&quot;
  5. &quot;log&quot;
  6. &quot;sync&quot;
  7. )
  8. func read(ctx context.Context) (&lt;-chan int, &lt;-chan error) {
  9. ch := make(chan int)
  10. e := make(chan error)
  11. go func() {
  12. defer close(e)
  13. defer close(ch)
  14. for i := 0; i &lt; 12; i++ {
  15. select {
  16. case &lt;-ctx.Done():
  17. return
  18. case ch &lt;- i:
  19. }
  20. }
  21. }()
  22. return ch, e
  23. }
  24. func manipulate(in &lt;-chan int) (&lt;-chan int, &lt;-chan error) {
  25. ch := make(chan int)
  26. e := make(chan error)
  27. go func() {
  28. defer close(e)
  29. defer close(ch)
  30. for n := range in {
  31. ch &lt;- 2 * n
  32. }
  33. }()
  34. return ch, e
  35. }
  36. func write(in &lt;-chan int) &lt;-chan error {
  37. e := make(chan error)
  38. go func() {
  39. defer close(e)
  40. for n := range in {
  41. fmt.Println(&quot;written: &quot;, n)
  42. if n == 10 {
  43. e &lt;- fmt.Errorf(&quot;output error during write&quot;)
  44. }
  45. }
  46. }()
  47. return e
  48. }
  49. func collectErrors(errs ...&lt;-chan error) {
  50. var wg sync.WaitGroup
  51. for i := 0; i &lt; len(errs); i++ {
  52. wg.Add(1)
  53. go func(errs &lt;-chan error) {
  54. defer wg.Done()
  55. for err := range errs {
  56. log.Printf(&quot;%v&quot;, err)
  57. }
  58. }(errs[i])
  59. }
  60. wg.Wait()
  61. }
  62. func main() {
  63. ctx, cancel := context.WithCancel(context.Background())
  64. defer cancel()
  65. ch1, err1 := read(ctx)
  66. ch2, err2 := manipulate(ch1)
  67. err3 := write(ch2)
  68. collectErrors(err1, err2, err3)
  69. fmt.Println(&quot;main wait complete&quot;)
  70. }

This way, each channel gets closed reliably, and the I/O errors from write will cause the child context to be cancelled, shutting down the other goroutines.

huangapple
  • 本文由 发表于 2021年8月14日 00:40:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/68775773.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定