How do I handle errors in a worker pool using WaitGroup?

huangapple go评论91阅读模式
英文:

How do I handle errors in a worker pool using WaitGroup?

问题

我在使用sync.WaitGroupselect时遇到了问题。如果你看一下下面的HTTP请求池,你会注意到如果发生错误,它将永远不会被报告,因为wg.Done()会阻塞,而且没有从通道中读取任何内容。

package pool

import (
	"fmt"
	"log"
	"net/http"
	"sync"
)

var (
	MaxPoolQueue  = 100
	MaxPoolWorker = 10
)

type Pool struct {
	wg *sync.WaitGroup

	queue  chan *http.Request
	errors chan error
}

func NewPool() *Pool {
	return &Pool{
		wg: &sync.WaitGroup{},

		queue:  make(chan *http.Request, MaxPoolQueue),
		errors: make(chan error),
	}
}

func (p *Pool) Add(r *http.Request) {
	p.wg.Add(1)

	p.queue <- r
}

func (p *Pool) Run() error {
	for i := 0; i < MaxPoolWorker; i++ {
		go p.doWork()
	}

	select {
	case err := <-p.errors:
		return err
	default:
		p.wg.Wait()
	}

	return nil
}

func (p *Pool) doWork() {
	for r := range p.queue {
		fmt.Printf("Request to %s\n", r.Host)

		p.wg.Done()

		_, err := http.DefaultClient.Do(r)

		if err != nil {
			log.Fatal(err)

			p.errors <- err
		} else {
			fmt.Printf("no error\n")
		}
	}
}

你可以在这里找到源代码。

我如何在使用WaitGroup的同时从goroutine中获取错误?

英文:

I got a problem using sync.WaitGroup and select together. If you take a look at following http request pool you will notice that if an error occurs it will never be reported as wg.Done() will block and there is no read from the channel anymore.

package pool
import (
&quot;fmt&quot;
&quot;log&quot;
&quot;net/http&quot;
&quot;sync&quot;
)
var (
MaxPoolQueue  = 100
MaxPoolWorker = 10
)
type Pool struct {
wg *sync.WaitGroup
queue  chan *http.Request
errors chan error
}
func NewPool() *Pool {
return &amp;Pool{
wg: &amp;sync.WaitGroup{},
queue:  make(chan *http.Request, MaxPoolQueue),
errors: make(chan error),
}
}
func (p *Pool) Add(r *http.Request) {
p.wg.Add(1)
p.queue &lt;- r
}
func (p *Pool) Run() error {
for i := 0; i &lt; MaxPoolWorker; i++ {
go p.doWork()
}
select {
case err := &lt;-p.errors:
return err
default:
p.wg.Wait()
}
return nil
}
func (p *Pool) doWork() {
for r := range p.queue {
fmt.Printf(&quot;Request to %s\n&quot;, r.Host)
p.wg.Done()
_, err := http.DefaultClient.Do(r)
if err != nil {
log.Fatal(err)
p.errors &lt;- err
} else {
fmt.Printf(&quot;no error\n&quot;)
}
}
}

Source can be found here

How can I still use WaitGroup but also get errors from go routines?

答案1

得分: 7

刚刚在写问题的时候,我自己找到了答案,我觉得这是一个有趣的案例,所以我想与你分享。

使用sync.WaitGroupchan结合的技巧是将以下代码包装起来:

select {
case err := <-p.errors:
return err
default:
p.wg.Done()
}

然后放在一个for循环中:

for {
select {
case err := <-p.errors:
return err
default:
p.wg.Done()
}
}

在这种情况下,select语句将始终检查错误,并在没有其他操作时等待 How do I handle errors in a worker pool using WaitGroup?

英文:

Just got the answer my self as I wrote the question and as I think it is an interesting case I would like to share it with you.

The trick to use sync.WaitGroup and chan together is that we wrap:

select {
case err := &lt;-p.errors:
return err
default:
p.wg.Done()
}

Together in a for loop:

for {
select {
case err := &lt;-p.errors:
return err
default:
p.wg.Done()
}
}

In this case select will always check for errors and wait if nothing happens How do I handle errors in a worker pool using WaitGroup?

答案2

得分: 4

这看起来有点像Tomb库Tomb V2 GoDoc)启用的快速失败机制:

tomb包处理清理goroutine的跟踪和终止。

如果任何被跟踪的goroutine返回非nil错误,或者系统中的任何goroutine(被跟踪的或未被跟踪的)调用KillKillf方法,tomb的Err被设置为该错误,Alive被设置为false,并且Dying通道被关闭,以标志所有被跟踪的goroutine应尽快自愿终止。

一旦所有被跟踪的goroutine终止,Dead通道被关闭,Wait解除阻塞并返回通过结果或显式的KillKillf方法调用传递给tomb的第一个非nil错误,如果没有错误,则返回nil。

你可以在这个playground示例中看到一个例子:

(摘录)

// start并发运行所有给定的函数,
// 直到它们全部完成或其中一个返回错误,
// 在这种情况下,它返回该错误。
//
// 函数被传递一个通道,当函数应该停止时,该通道将被关闭。
func start(funcs []func(stop <-chan struct{}) error) error {
var tomb tomb.Tomb
var wg sync.WaitGroup
allDone := make(chan struct{})
// 启动所有函数。
for _, f := range funcs {
f := f
wg.Add(1)
go func() {
defer wg.Done()
if err := f(tomb.Dying()); err != nil {
tomb.Kill(err)
}
}()
}
// 启动一个goroutine等待它们全部完成。
go func() {
wg.Wait()
close(allDone)
}()
// 等待它们全部完成,或者一个失败
select {
case <-allDone:
case <-tomb.Dying():
}
tomb.Done()
return tomb.Err()
}
英文:

It looks a bit like the fail-fast mechanism enabled by the Tomb library (Tomb V2 GoDoc):

> The tomb package handles clean goroutine tracking and termination.
>
> If any of the tracked goroutines returns a non-nil error, or the Kill or Killf method is called by any goroutine in the system (tracked or not), the tomb Err is set, Alive is set to false, and the Dying channel is closed to flag that all tracked goroutines are supposed to willingly terminate as soon as possible.
>
> Once all tracked goroutines terminate, the Dead channel is closed, and Wait unblocks and returns the first non-nil error presented to the tomb via a result or an explicit Kill or Killf method call, or nil if there were no errors.

You can see an example in this playground:

(extract)

// start runs all the given functions concurrently
// until either they all complete or one returns an
// error, in which case it returns that error.
//
// The functions are passed a channel which will be closed
// when the function should stop.
func start(funcs []func(stop &lt;-chan struct{}) error) error {
var tomb tomb.Tomb
var wg sync.WaitGroup
allDone := make(chan struct{})
// Start all the functions.
for _, f := range funcs {
f := f
wg.Add(1)
go func() {
defer wg.Done()
if err := f(tomb.Dying()); err != nil {
tomb.Kill(err)
}
}()
}
// Start a goroutine to wait for them all to finish.
go func() {
wg.Wait()
close(allDone)
}()
// Wait for them all to finish, or one to fail
select {
case &lt;-allDone:
case &lt;-tomb.Dying():
}
tomb.Done()
return tomb.Err()
}

答案3

得分: 0

一个更简单的实现如下所示。 (在play.golang中检查:https://play.golang.org/p/TYxxsDRt5Wu)

package main

import "fmt"
import "sync"
import "time"

type Error struct {
    message string
}

func (e Error) Error() string {
    return e.message
}

func main() {
    var wg sync.WaitGroup
    waitGroupLength := 8
    errChannel := make(chan error, 1)

    // 设置waitgroup以匹配我们将启动的goroutine数量
    wg.Add(waitGroupLength)
    finished := make(chan bool, 1) // 这个和wg.Wait()一起使用是为了使错误处理工作并且不会死锁

    for i := 0; i < waitGroupLength; i++ {

        go func(i int) {
            fmt.Printf("Go routine %d executed\n", i+1)
            time.Sleep(time.Duration(waitGroupLength - i))
            time.Sleep(0) // 只有这里需要time导入

            if i%4 == 1 {
                errChannel <- Error{fmt.Sprintf("Errored on routine %d", i+1)}
            }

            // 标记wait group为Done,以免挂起
            wg.Done()
        }(i)
    }

    go func() {
        wg.Wait()
        close(finished)
    }()

    L:
        for {
            select {
            case <-finished:
                break L // 这将从循环中跳出
            case err := <-errChannel:
                if err != nil {
                    fmt.Println("error ", err)
                    // 处理你的错误
                }
            }
        }

    fmt.Println("Executed all go routines")
}
英文:

A simpler implementation would be like below. (Check in play.golang: https://play.golang.org/p/TYxxsDRt5Wu)

package main
import &quot;fmt&quot;
import &quot;sync&quot;
import &quot;time&quot;
type Error struct {
message string
}
func (e Error) Error() string {
return e.message
}
func main() {
var wg sync.WaitGroup
waitGroupLength := 8
errChannel := make(chan error, 1)
// Setup waitgroup to match the number of go routines we&#39;ll launch off
wg.Add(waitGroupLength)
finished := make(chan bool, 1) // this along with wg.Wait() are why the error handling works and doesn&#39;t deadlock
for i := 0; i &lt; waitGroupLength; i++ {
go func(i int) {
fmt.Printf(&quot;Go routine %d executed\n&quot;, i+1)
time.Sleep(time.Duration(waitGroupLength - i))
time.Sleep(0) // only here so the time import is needed
if i%4 == 1 {
errChannel &lt;- Error{fmt.Sprintf(&quot;Errored on routine %d&quot;, i+1)}
}
// Mark the wait group as Done so it does not hang
wg.Done()
}(i)
}
go func() {
wg.Wait()
close(finished)
}()
L:
for {
select {
case &lt;-finished:
break L // this will break from loop
case err := &lt;-errChannel:
if err != nil {
fmt.Println(&quot;error &quot;, err)
// handle your error
}
}
}
fmt.Println(&quot;Executed all go routines&quot;)
}

huangapple
  • 本文由 发表于 2014年7月13日 01:53:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/24715754.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定