英文:
Managing Producer Consumer deadlock in case of failure
问题
说我有一个在不同例程中的读取器、操作器和消费器的情况:
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/pkg/errors"
)
func Reader(ctx context.Context, chanFromReader chan int) error {
defer close(chanFromReader)
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
return nil
case chanFromReader <- i:
}
}
return nil
}
func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
defer close(chanToWriter)
for {
select {
case <-ctx.Done():
return nil
case x, ok := <-chanFromReader:
if !ok {
return nil
}
chanToWriter <- 2 * x
}
}
}
func Writer(ctx context.Context, chanToWriter chan int) error {
for {
select {
case <-ctx.Done():
return nil
case x, ok := <-chanToWriter:
if !ok {
return nil
}
fmt.Println("Writer: ", x)
if x == 10 {
return errors.New("Generate some error in writer")
}
}
}
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
chanFromReader := make(chan int)
chanToWriter := make(chan int)
func(ctx context.Context, chanToWriter chan int) {
g.Go(func() error {
return Writer(ctx, chanToWriter)
})
}(ctx, chanToWriter)
func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
g.Go(func() error {
return Manipulate(ctx, chanFromReader, chanToWriter)
})
}(ctx, chanFromReader, chanToWriter)
func(ctx context.Context, chanFromReader chan int) {
g.Go(func() error {
return Reader(ctx, chanFromReader)
})
}(ctx, chanFromReader)
g.Wait()
fmt.Println("Main wait done")
}
如果写入器由于某种原因失败,我无法中止其他例程。在上面的示例中,尽管它们监听 ctx 进行取消,但仍会在写入器失败的情况下发生死锁,有没有解决这个问题的方法?
我考虑添加以下内容:
func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
defer close(chanToWriter)
for {
select {
case <-ctx.Done():
return nil
case x, ok := <-chanFromReader:
if !ok {
return nil
}
select {
case <-ctx.Done():
return nil
case chanToWriter <- 2 * x:
}
}
}
}
这样可以解决问题,但看起来不太干净...
英文:
say I have a case of reader, manipulator, consumer in different routines:
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/pkg/errors"
)
func Reader(ctx context.Context, chanFromReader chan int) error {
defer close(chanFromReader)
for i := 0; i < 100; i++ {
select {
case <-ctx.Done():
return nil
case chanFromReader <- i:
}
}
return nil
}
func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
defer close(chanToWriter)
for {
select {
case <-ctx.Done():
return nil
case x, ok := <-chanFromReader:
if !ok {
return nil
}
chanToWriter <- 2 * x
}
}
}
func Writer(ctx context.Context, chanToWriter chan int) error {
for {
select {
case <-ctx.Done():
return nil
case x, ok := <-chanToWriter:
if !ok {
return nil
}
fmt.Println("Writer: ", x)
if x == 10 {
return errors.New("Generate some error in writer")
}
}
}
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
chanFromReader := make(chan int)
chanToWriter := make(chan int)
func(ctx context.Context, chanToWriter chan int) {
g.Go(func() error {
return Writer(ctx, chanToWriter)
})
}(ctx, chanToWriter)
func(ctx context.Context, chanFromReader chan int, chanToWriter chan int) {
g.Go(func() error {
return Manipulate(ctx, chanFromReader, chanToWriter)
})
}(ctx, chanFromReader, chanToWriter)
func(ctx context.Context, chanFromReader chan int) {
g.Go(func() error {
return Reader(ctx, chanFromReader)
})
}(ctx, chanFromReader)
g.Wait()
fmt.Println("Main wait done")
}
https://play.golang.org/p/whslVE3rzel
In case the writer fails for some reason, I'm having trouble aborting the rest of the routines.
In the example above for instance, though they listen on ctx for cancellation they still deadlock on case of fail in writer, is there a workaround this?
I thought of adding this:
func Manipulate(ctx context.Context, chanFromReader chan int, chanToWriter chan int) error {
defer close(chanToWriter)
for {
select {
case <-ctx.Done():
return nil
case x, ok := <-chanFromReader:
if !ok {
return nil
}
select {
case <-ctx.Done():
return nil
case chanToWriter <- 2 * x:
}
}
}
}
which solves it, but it looks so unclean...
答案1
得分: 1
我建议的解决方案是,每个通道只能由创建它的代码关闭。可以通过从创建通道的函数返回一个只接收通道来实现这一点,并负责关闭它:
(感谢mh-cbon进一步完善此代码:)
package main
import (
"context"
"fmt"
"log"
"sync"
)
func read(ctx context.Context) (<-chan int, <-chan error) {
ch := make(chan int)
e := make(chan error)
go func() {
defer close(e)
defer close(ch)
for i := 0; i < 12; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch, e
}
func manipulate(in <-chan int) (<-chan int, <-chan error) {
ch := make(chan int)
e := make(chan error)
go func() {
defer close(e)
defer close(ch)
for n := range in {
ch <- 2 * n
}
}()
return ch, e
}
func write(in <-chan int) <-chan error {
e := make(chan error)
go func() {
defer close(e)
for n := range in {
fmt.Println("written: ", n)
if n == 10 {
e <- fmt.Errorf("output error during write")
}
}
}()
return e
}
func collectErrors(errs ...<-chan error) {
var wg sync.WaitGroup
for i := 0; i < len(errs); i++ {
wg.Add(1)
go func(errs <-chan error) {
defer wg.Done()
for err := range errs {
log.Printf("%v", err)
}
}(errs[i])
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch1, err1 := read(ctx)
ch2, err2 := manipulate(ch1)
err3 := write(ch2)
collectErrors(err1, err2, err3)
fmt.Println("main wait complete")
}
这样,每个通道都能可靠地关闭,并且来自写入的I/O错误将导致子上下文被取消,关闭其他goroutine。
英文:
I would propose a solution where each channel gets closed only by the code that creates it. This can be enforced by returning a receive-only channel from the function that creates the channel and is responsible for closing it:
(kudos to mh-cbon for further refining this:)
https://play.golang.org/p/Tq4OVW5sSP4
package main
import (
"context"
"fmt"
"log"
"sync"
)
func read(ctx context.Context) (<-chan int, <-chan error) {
ch := make(chan int)
e := make(chan error)
go func() {
defer close(e)
defer close(ch)
for i := 0; i < 12; i++ {
select {
case <-ctx.Done():
return
case ch <- i:
}
}
}()
return ch, e
}
func manipulate(in <-chan int) (<-chan int, <-chan error) {
ch := make(chan int)
e := make(chan error)
go func() {
defer close(e)
defer close(ch)
for n := range in {
ch <- 2 * n
}
}()
return ch, e
}
func write(in <-chan int) <-chan error {
e := make(chan error)
go func() {
defer close(e)
for n := range in {
fmt.Println("written: ", n)
if n == 10 {
e <- fmt.Errorf("output error during write")
}
}
}()
return e
}
func collectErrors(errs ...<-chan error) {
var wg sync.WaitGroup
for i := 0; i < len(errs); i++ {
wg.Add(1)
go func(errs <-chan error) {
defer wg.Done()
for err := range errs {
log.Printf("%v", err)
}
}(errs[i])
}
wg.Wait()
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch1, err1 := read(ctx)
ch2, err2 := manipulate(ch1)
err3 := write(ch2)
collectErrors(err1, err2, err3)
fmt.Println("main wait complete")
}
This way, each channel gets closed reliably, and the I/O errors from write will cause the child context to be cancelled, shutting down the other goroutines.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论