当我返回多个错误时,Go协程会发生恐慌。

huangapple go评论73阅读模式
英文:

go routine panics when i return multiple errors

问题

我正在玩弄工作池,并且我想在返回错误之前将所有来自工作池的错误合并。我已经编写了一个示例代码,但是我遇到了死锁的问题。

我想要实现什么?
一个客户端发送100个请求,我想首先将这些请求添加到作业队列中,并将其分派给n个后台执行任务的Go协程,如果有错误发生,我希望在将所有错误发送给客户端之前累积所有这些错误。我已经编写了一段代码,有人可以解释一下问题出在哪里,以及如何解决死锁问题吗?

package main

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/apex/log"
	"github.com/hashicorp/go-multierror"
)

type Manager struct {
	taskChan    chan int
	wg          *sync.WaitGroup
	QuitChan    chan bool
	ErrorChan   chan error
	busyWorkers int64
}

func main() {
	fmt.Println("Hello, 世界")
	m := New()

	ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
	//defer cancel()
	for i := 0; i < 3; i++ {
		m.wg.Add(1)
		go m.run(ctx, test)
	}

	for i := 1; i < 5; i++ {
		m.taskChan <- i
	}
	close(m.taskChan)
	go func(*Manager) {
		if len(m.taskChan) == 0 {
			m.QuitChan <- true
		}
	}(m)

	var errors error
	for {
		select {
		case err := <-m.ErrorChan:
			errors = multierror.Append(errors, err)
			if m.busyWorkers == int64(0) {
				break
			}
		default:
			fmt.Println("hello")
		}
	}
	m.wg.Wait()
	fmt.Println(errors)

}

func New() *Manager {
	return &Manager{taskChan: make(chan int),
		wg:        new(sync.WaitGroup),
		QuitChan:  make(chan bool),
		ErrorChan: make(chan error),
	}
}

func (m *Manager) run(ctx context.Context, fn func(a, b int) error) {
	defer m.wg.Done()
	defer fmt.Println("finished working")
	for {
		select {
		case t, ok := <-m.taskChan:
			if ok {
				atomic.AddInt64(&m.busyWorkers, 1)
				err := fn(t, t)
				if err != nil {

					m.ErrorChan <- err
				}
				atomic.AddInt64(&m.busyWorkers, -1)
			}

		case <-ctx.Done():
			log.Infof("closing channel %v", ctx.Err())
			return
		case <-m.QuitChan:

			return
		}

	}
}

// this can return error or not, this is the main driver func, but i'm propagating 
//errors so that i can understand where i am going wrong
func test(a, b int) error {
	fmt.Println(a, b)
	return fmt.Errorf("dummy error %v", a)

}

请注意,我只会返回翻译好的代码部分,不会回答关于翻译的问题。

英文:

I am playing around with worker pools and i want to consolidate all errors from worker pools before i return the error. I've written a sample code but i am entering a deadlock.

What am i trying to achieve?
a client send 100 requests,i want to first add those requests to a job queue and dispatch it to n number of go routines that does tasks in background , if at all there are errors i want to accumulate all these errors before i send all errors to the client. I have written a snippet, can someone explain what's wrong and how to mitigate the deadlock.

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;sync&quot;
&quot;sync/atomic&quot;
&quot;time&quot;
&quot;github.com/apex/log&quot;
&quot;github.com/hashicorp/go-multierror&quot;
)
type Manager struct {
taskChan    chan int
wg          *sync.WaitGroup
QuitChan    chan bool
ErrorChan   chan error
busyWorkers int64
}
func main() {
fmt.Println(&quot;Hello, 世界&quot;)
m := New()
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
//defer cancel()
for i := 0; i &lt; 3; i++ {
m.wg.Add(1)
go m.run(ctx, test)
}
for i := 1; i &lt; 5; i++ {
m.taskChan &lt;- i
}
close(m.taskChan)
go func(*Manager) {
if len(m.taskChan) == 0 {
m.QuitChan &lt;- true
}
}(m)
var errors error
for {
select {
case err := &lt;-m.ErrorChan:
errors = multierror.Append(errors, err)
if m.busyWorkers == int64(0) {
break
}
default:
fmt.Println(&quot;hello&quot;)
}
}
m.wg.Wait()
fmt.Println(errors)
}
func New() *Manager {
return &amp;Manager{taskChan: make(chan int),
wg:        new(sync.WaitGroup),
QuitChan:  make(chan bool),
ErrorChan: make(chan error),
}
}
func (m *Manager) run(ctx context.Context, fn func(a, b int) error) {
defer m.wg.Done()
defer fmt.Println(&quot;finished working&quot;)
for {
select {
case t, ok := &lt;-m.taskChan:
if ok {
atomic.AddInt64(&amp;m.busyWorkers, 1)
err := fn(t, t)
if err != nil {
m.ErrorChan &lt;- err
}
atomic.AddInt64(&amp;m.busyWorkers, -1)
}
case &lt;-ctx.Done():
log.Infof(&quot;closing channel %v&quot;, ctx.Err())
return
case &lt;-m.QuitChan:
return
}
}
}
// this can return error or not, this is the main driver func, but i&#39;m propagating 
//errors so that i can understand where i am going wrong
func test(a, b int) error {
fmt.Println(a, b)
return fmt.Errorf(&quot;dummy error %v&quot;, a)
}

答案1

得分: 1

你有3个工人,他们都返回错误。

你的主线程试图将5个任务放入队列中。一旦前3个任务被工人接收,主线程就会被阻塞,等待一个新的工人在taskChan上接收,而你的3个工人都被阻塞在试图在ErrorChan上发送数据。

换句话说,发生了死锁。

也许你想把taskChan变成一个带缓冲区的通道?这样你就可以在缓冲区满之前无阻塞地发送数据。

taskChan: make(chan int, 10)
英文:

You have 3 workers who all return errors.

Your main thread tries to put 5 jobs in the queue. Once the first 3 has been taken by your workers, the main thread is stuck waiting for a new worker to receive on taskChan and all your 3 workers are stuck trying to send data on ErrorChan.

In other words, deadlock.

Maybe you wanted to make taskChan a buffered channel? That way you can send data on it until the buffer is full without blocking.

taskChan: make(chan int, 10)

huangapple
  • 本文由 发表于 2022年2月23日 03:32:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/71227265.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定