Close multiple goroutine if an error occurs in one in go

huangapple go评论129阅读模式

Close multiple goroutine if an error occurs in one in go



  1. func doAllWork() error {
  2. var wg sync.WaitGroup
  3. for i := 0; i < 2; i++ {
  4. wg.Add(1)
  5. go func() {
  6. defer wg.Done()
  7. for j := 0; j < 10; j++ {
  8. result, err := work(j)
  9. if err != nil {
  10. // 在这里不能使用 `return err`
  11. // 我应该放什么来代替?
  12. os.Exit(0)
  13. }
  14. }
  15. }()
  16. }
  17. wg.Wait()
  18. return nil
  19. }

在每个 goroutine 中,函数 work() 被调用了 10 次。如果在任何一个正在运行的 goroutine 中,对 work() 的调用返回了一个错误,我希望所有的 goroutine 立即停止,并且程序退出。
在这里使用 os.Exit() 可以吗?我应该如何处理这个问题?

编辑:这个问题与 如何停止一个 goroutine 不同,因为在这里,如果一个 goroutine 中发生错误,我需要关闭所有的 goroutine。


consider this function :

  1. func doAllWork() error {
  2. var wg sync.WaitGroup
  3. for i := 0; i &lt; 2; i++ {
  4. wg.add(1)
  5. go func() {
  6. defer wg.Done()
  7. for j := 0; j &lt; 10; j++ {
  8. result, err := work(j)
  9. if err != nil {
  10. // can&#39;t use `return err` here
  11. // what sould I put instead ?
  12. os.Exit(0)
  13. }
  14. }
  15. }()
  16. }
  17. wg.Wait()
  18. return nil
  19. }

In each goroutine, the function work() is called 10 times. If one call to work() returns an error in any of the running goroutines, I want all the goroutines to stop immediately, and the program to exit.
Is it ok to use os.Exit() here ? How should I handle this ?

Edit: this question is different from how to stop a goroutine as here I need to close all goroutines if an error occurs in one


得分: 55




  1. func work(i int) (int, error) {
  2. if rand.Intn(100) < 10 { // 10%的失败几率
  3. return 0, errors.New("随机错误")
  4. }
  5. time.Sleep(time.Second)
  6. return 100 + i, nil
  7. }


  1. func doAllWork() error {
  2. var wg sync.WaitGroup
  3. ctx, cancel := context.WithCancel(context.Background())
  4. defer cancel() // 确保即使没有错误也调用cancel()释放资源
  5. for i := 0; i < 2; i++ {
  6. wg.Add(1)
  7. go func(i int) {
  8. defer wg.Done()
  9. for j := 0; j < 10; j++ {
  10. // 检查其他协程中是否发生了错误:
  11. select {
  12. case <-ctx.Done():
  13. return // 发生错误,终止
  14. default: // 默认分支必须避免阻塞
  15. }
  16. result, err := work(j)
  17. if err != nil {
  18. fmt.Printf("Worker #%d 在 %d 时发生错误:%v\n", i, j, err)
  19. cancel()
  20. return
  21. }
  22. fmt.Printf("Worker #%d 完成 %d,结果:%d。\n", i, j, result)
  23. }
  24. }(i)
  25. }
  26. wg.Wait()
  27. return ctx.Err()
  28. }


  1. func main() {
  2. rand.Seed(time.Now().UnixNano() + 1) // +1 是因为Playground的时间是固定的
  3. fmt.Printf("doAllWork: %v\n", doAllWork())
  4. }

输出结果(在Go Playground上尝试):

  1. Worker #0 完成 0,结果:100。
  2. Worker #1 完成 0,结果:100。
  3. Worker #1 完成 1,结果:101。
  4. Worker #0 完成 1,结果:101。
  5. Worker #0 完成 2,结果:102。
  6. Worker #1 完成 2,结果:102。
  7. Worker #1 完成 3,结果:103。
  8. Worker #1 在 4 时发生错误:随机错误
  9. Worker #0 完成 3,结果:103。
  10. doAllWork: 上下文已取消


  1. func work(i int) (int, error) {
  2. time.Sleep(time.Second)
  3. return 100 + i, nil
  4. }

输出结果将如下所示(在Go Playground上尝试):

  1. Worker #0 完成 0,结果:100。
  2. Worker #1 完成 0,结果:100。
  3. Worker #1 完成 1,结果:101。
  4. Worker #0 完成 1,结果:101。
  5. Worker #0 完成 2,结果:102。
  6. Worker #1 完成 2,结果:102。
  7. Worker #1 完成 3,结果:103。
  8. Worker #0 完成 3,结果:103。
  9. Worker #0 完成 4,结果:104。
  10. Worker #1 完成 4,结果:104。
  11. Worker #1 完成 5,结果:105。
  12. Worker #0 完成 5,结果:105。
  13. Worker #0 完成 6,结果:106。
  14. Worker #1 完成 6,结果:106。
  15. Worker #1 完成 7,结果:107。
  16. Worker #0 完成 7,结果:107。
  17. Worker #0 完成 8,结果:108。
  18. Worker #1 完成 8,结果:108。
  19. Worker #1 完成 9,结果:109。
  20. Worker #0 完成 9,结果:109。
  21. doAllWork: <nil>






  1. errs := make(chan error, 2) // 缓冲区大小为2的错误通道


  1. result, err := work(j)
  2. if err != nil {
  3. errs <- fmt.Errorf("Worker #%d 在 %d 时发生错误:%v\n", i, j, err)
  4. cancel()
  5. return
  6. }


  1. // 返回(第一个)错误(如果有):
  2. if ctx.Err() != nil {
  3. return <-errs
  4. }
  5. return nil

这次的输出结果(在Go Playground上尝试):

  1. Worker #0 完成 0,结果:100。
  2. Worker #1 完成 0,结果:100。
  3. Worker #1 完成 1,结果:101。
  4. Worker #0 完成 1,结果:101。
  5. Worker #0 完成 2,结果:102。
  6. Worker #1 完成 2,结果:102。
  7. Worker #1 完成 3,结果:103。
  8. Worker #0 完成 3,结果:103。
  9. doAllWork: Worker #1 在 4 时发生错误:随机错误


  1. errs := make(chan error, 1) // 仅为第一个错误缓冲区
  2. // ...在工作协程中:
  3. result, err := work(j)
  4. if err != nil {
  5. // 非阻塞发送:
  6. select {
  7. case errs <- fmt.Errorf("Worker #%d 在 %d 时发生错误:%v\n", i, j, err):
  8. default:
  9. }
  10. cancel()
  11. return
  12. }

You may use the context package which was created for things like this ("carries deadlines, cancelation signals...").

You create a context capable of publishing cancelation signals with context.WithCancel() (parent context may be the one returned by context.Background()). This will return you a cancel() function which may be used to cancel (or more precisely signal the cancel intent) to the worker goroutines.
And in the worker goroutines you have to check if such intent has been initiated, by checking if the channel returned by Context.Done() is closed, easiest done by attempting to receive from it (which proceeds immediately if it is closed). And to do a non-blocking check (so you can continue if it is not closed), use the select statement with a default branch.

I will use the following work() implementation, which simulates a 10% failure chance, and simulates 1 second of work:

  1. func work(i int) (int, error) {
  2. if rand.Intn(100) &lt; 10 { // 10% of failure
  3. return 0, errors.New(&quot;random error&quot;)
  4. }
  5. time.Sleep(time.Second)
  6. return 100 + i, nil
  7. }

And the doAllWork() may look like this:

  1. func doAllWork() error {
  2. var wg sync.WaitGroup
  3. ctx, cancel := context.WithCancel(context.Background())
  4. defer cancel() // Make sure it&#39;s called to release resources even if no errors
  5. for i := 0; i &lt; 2; i++ {
  6. wg.Add(1)
  7. go func(i int) {
  8. defer wg.Done()
  9. for j := 0; j &lt; 10; j++ {
  10. // Check if any error occurred in any other gorouties:
  11. select {
  12. case &lt;-ctx.Done():
  13. return // Error somewhere, terminate
  14. default: // Default is must to avoid blocking
  15. }
  16. result, err := work(j)
  17. if err != nil {
  18. fmt.Printf(&quot;Worker #%d during %d, error: %v\n&quot;, i, j, err)
  19. cancel()
  20. return
  21. }
  22. fmt.Printf(&quot;Worker #%d finished %d, result: %d.\n&quot;, i, j, result)
  23. }
  24. }(i)
  25. }
  26. wg.Wait()
  27. return ctx.Err()
  28. }

This is how it can be tested:

  1. func main() {
  2. rand.Seed(time.Now().UnixNano() + 1) // +1 &#39;cause Playground&#39;s time is fixed
  3. fmt.Printf(&quot;doAllWork: %v\n&quot;, doAllWork())
  4. }

Output (try it on the Go Playground):

  1. Worker #0 finished 0, result: 100.
  2. Worker #1 finished 0, result: 100.
  3. Worker #1 finished 1, result: 101.
  4. Worker #0 finished 1, result: 101.
  5. Worker #0 finished 2, result: 102.
  6. Worker #1 finished 2, result: 102.
  7. Worker #1 finished 3, result: 103.
  8. Worker #1 during 4, error: random error
  9. Worker #0 finished 3, result: 103.
  10. doAllWork: context canceled

If there would be no errors, e.g. when using the following work() function:

  1. func work(i int) (int, error) {
  2. time.Sleep(time.Second)
  3. return 100 + i, nil
  4. }

The output would be like (try it on the Go Playground):

  1. Worker #0 finished 0, result: 100.
  2. Worker #1 finished 0, result: 100.
  3. Worker #1 finished 1, result: 101.
  4. Worker #0 finished 1, result: 101.
  5. Worker #0 finished 2, result: 102.
  6. Worker #1 finished 2, result: 102.
  7. Worker #1 finished 3, result: 103.
  8. Worker #0 finished 3, result: 103.
  9. Worker #0 finished 4, result: 104.
  10. Worker #1 finished 4, result: 104.
  11. Worker #1 finished 5, result: 105.
  12. Worker #0 finished 5, result: 105.
  13. Worker #0 finished 6, result: 106.
  14. Worker #1 finished 6, result: 106.
  15. Worker #1 finished 7, result: 107.
  16. Worker #0 finished 7, result: 107.
  17. Worker #0 finished 8, result: 108.
  18. Worker #1 finished 8, result: 108.
  19. Worker #1 finished 9, result: 109.
  20. Worker #0 finished 9, result: 109.
  21. doAllWork: &lt;nil&gt;


Basically we just used the Done() channel of the context, so it seems we could just as easily (if not even easier) use a done channel instead of the Context, closing the channel to do what cancel() does in the above solution.

This is not true. This can only be used if only one goroutine may close the channel, but in our case any of the workers may do so. And attempting to close an already closed channel panics (see details here: So you would have to ensure some kind of synchronization / exclusion around the close(done), which will make it less readable and even more complex. Actually this is exactly what the cancel() function does under the hood, hidden / abstracted away from your eyes, so cancel() may be called multiple times to make your code / use of it simpler.

How to get and return the error(s) from the workers?

For this you may use an error channel:

  1. errs := make(chan error, 2) // Buffer for 2 errors

And inside the workers when an error is encountered, send it on the channel instead of printing it:

  1. result, err := work(j)
  2. if err != nil {
  3. errs &lt;- fmt.Errorf(&quot;Worker #%d during %d, error: %v\n&quot;, i, j, err)
  4. cancel()
  5. return
  6. }

And after the loop, if there was an error, return that (and nil otherwise):

  1. // Return (first) error, if any:
  2. if ctx.Err() != nil {
  3. return &lt;-errs
  4. }
  5. return nil

Output this time (try this on the Go Playground):

  1. Worker #0 finished 0, result: 100.
  2. Worker #1 finished 0, result: 100.
  3. Worker #1 finished 1, result: 101.
  4. Worker #0 finished 1, result: 101.
  5. Worker #0 finished 2, result: 102.
  6. Worker #1 finished 2, result: 102.
  7. Worker #1 finished 3, result: 103.
  8. Worker #0 finished 3, result: 103.
  9. doAllWork: Worker #1 during 4, error: random error

Note that I used a buffered channel with a buffer size equal to the number of workers, which ensures sending on it is always non-blocking. This also gives you the possibility to receive and process all errors, not just one (e.g. the first). Another option could be to use a buffered channel to hold only 1, and do a non-blocking send on it, which could look like this:

  1. errs := make(chan error, 1) // Buffered only for the first error
  2. // ...and inside the worker:
  3. result, err := work(j)
  4. if err != nil {
  5. // Non-blocking send:
  6. select {
  7. case errs &lt;- fmt.Errorf(&quot;Worker #%d during %d, error: %v\n&quot;, i, j, err):
  8. default:
  9. }
  10. cancel()
  11. return
  12. }


得分: 12




  1. var g errgroup.Group
  2. var urls = []string{
  3. "",
  4. "",
  5. "",
  6. }
  7. for _, url := range urls {
  8. // 启动一个goroutine来获取URL。
  9. url := url //
  10. g.Go(func() error {
  11. // 获取URL。
  12. resp, err := http.Get(url)
  13. if err == nil {
  14. resp.Body.Close()
  15. }
  16. return err
  17. })
  18. }
  19. // 等待所有HTTP获取完成。
  20. if err := g.Wait(); err == nil {
  21. fmt.Println("成功获取所有URL。")
  22. } else {
  23. // 所有goroutine都已运行,但至少有一个返回了错误!
  24. // 但是所有的goroutine都必须完成它们的工作!
  25. // 如果你想在一个失败时停止其他的goroutine,可以继续阅读!
  26. fmt.Println("获取URL时出错。")
  27. }

但是要注意:Go文档中的The first call to return a non-nil error cancels the group这句话有点误导人。




  1. "只有错误",就像上面的示例一样:

  2. 并行执行:

  3. 管道(在示例中了解更多)。


A more clear way to go here is to use errgroup (documentation).

Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

You can check it out in this example (playground):

  1. var g errgroup.Group
  2. var urls = []string{
  3. &quot;;,
  4. &quot;;,
  5. &quot;;,
  6. }
  7. for _, url := range urls {
  8. // Launch a goroutine to fetch the URL.
  9. url := url //
  10. g.Go(func() error {
  11. // Fetch the URL.
  12. resp, err := http.Get(url)
  13. if err == nil {
  14. resp.Body.Close()
  15. }
  16. return err
  17. })
  18. }
  19. // Wait for all HTTP fetches to complete.
  20. if err := g.Wait(); err == nil {
  21. fmt.Println(&quot;Successfully fetched all URLs.&quot;)
  22. } else {
  23. // After all have run, at least one of them has returned an error!
  24. // But all have to finish their work!
  25. // If you want to stop others goroutines when one fail, go ahead reading!
  26. fmt.Println(&quot;Unsuccessfully fetched URLs.&quot;)
  27. }

But attention: The first call to return a non-nil error cancels the group phrase in the Go documentation is a little bit misleading.

In fact, errgroup.Group if created with a context (WithContext function), will call the cancel function of the context returned by WithContext when a goroutine in the group will return an error, otherwise nothing will be done (read the source code here!).

So, if you want to close your different goroutines, you must use the context returned my WithContext and manage it by yourself inside them, errgroup will just close that context!
Here you can find an example.

To summarize, errgroup can be used in different ways, as shown by the examples.

  1. "just errors", as the above example:
    Wait wait that all goroutines end, and then returns the first non-nil error if any from them, or return nil.

  2. In parallel:
    You have to create the group with the WithContext function and use the context to manage the context closing.
    I created a playground example here with some sleeps!
    You have to manually close each goroutines, but using the context you can end them when one close the context.

  3. Pipelines (see more in the examples).


得分: 0




Another way to go here is to use errgroup.WithContext. You can check it out in this example.

In short, g.Wait() waits for the first error to happen or for all to finish without errors. When error happens in any of the goroutines (timeout in the provided example), it cancels the execution in other goroutines through ctx.Done() channel.

  • 本文由 发表于 2017年8月4日 15:37:37
  • 转载请务必保留本文链接:



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
