Go workerpool 使用 errgroup 实现,goroutine 卡住的问题

huangapple go评论143阅读模式
英文:

Go workerpool implementation using errgroup, goroutines stuck

问题

我已经使用errgroup实现了一个使用workerpool模式的代码,这样可以捕获任何goroutine中的错误。以下是详细信息:

  1. jobs := make(chan UsersInfo, totalUsers)
  2. results := make(chan UsersInfo, totalUsers)
  3. g, gCtx := errgroup.WithContext(ctx)
  4. for i := 1; i <= 4; i++ {
  5. g.Go(func() error {
  6. err := workerUser(gCtx, jobs, results)
  7. if err != nil {
  8. return err
  9. }
  10. return nil
  11. })
  12. }
  13. for _, user := range usersResp {
  14. jobs <- user
  15. }
  16. close(jobs)
  17. var usersArray []UsersInfo
  18. for i := 1; i <= totalUsers; i++ {
  19. r := <-results
  20. usersArray = append(usersArray, r)
  21. }
  22. if err := g.Wait(); err != nil {
  23. return nil, err
  24. }

然后,worker函数实现如下:

  1. func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  2. for {
  3. select {
  4. case <-ctx.Done():
  5. return ctx.Err()
  6. case user, _ := <-jobs:
  7. userInfo, err := CallUserAPI(ctx, user)
  8. if err != nil {
  9. return err
  10. }
  11. results <- userInfo
  12. }
  13. }
  14. }

CallUserAPI返回403 Forbidden错误时,应该调用g.Wait()并立即停止所有goroutine。但在这里并非如此,g.Wait()从未被调用。

英文:

I have implemented a workerpool pattern using errgroup, so that error in any goroutines can be caught. Here is my detail:

  1. jobs := make(chan UsersInfo, totalUsers)
  2. results := make(chan UsersInfo, totalUsers)
  3. g, gCtx := errgroup.WithContext(ctx)
  4. for i := 1; i &lt;= 4; i++ {
  5. g.Go(func() error {
  6. err := workerUser(gCtx, jobs, results)
  7. if err != nil {
  8. return err
  9. }
  10. return nil
  11. })
  12. }
  13. for _, user := range usersResp {
  14. jobs &lt;- user
  15. }
  16. close(jobs)
  17. var usersArray []UsersInfo
  18. for i := 1; i &lt;= totalUsers; i++ {
  19. r := &lt;-results
  20. usersArray = append(usersArray, r)
  21. }
  22. if err := g.Wait(); err != nil {
  23. return nil, err
  24. }

Then the worker function implemented like this:

  1. func workerUser(ctx context.Context, jobs &lt;-chan UsersInfo, results chan&lt;- UsersInfo) error {
  2. for {
  3. select {
  4. case &lt;-ctx.Done():
  5. return ctx.Err()
  6. case user, _ := &lt;-jobs:
  7. userInfo, err := CallUserAPI(ctx, user)
  8. if err != nil {
  9. return err
  10. }
  11. results &lt;- userInfo
  12. }
  13. }
  14. }

The CallUserAPI return 403 Forbidden error which should invoke g.wait() and should stop all goroutines instantly when there is non-nil error. But this is not the case here, g.wait() is never called.

答案1

得分: 3

有几个问题:

  • 循环部分

    1. for i := 1; i &lt;= totalUsers; i++ {
    2. r := &lt;-results
    3. usersArray = append(usersArray, r)
    4. }

    等待工作线程为每个用户发送结果。当CallUserAPI返回错误时,这将不会发生。

  • 工作线程没有处理jobs关闭的情况。

以下是修复这两个问题的代码:

声明一个指定要处理的用户和结果放置位置的类型:

  1. type job struct {
  2. user UsersInfo
  3. result *UsersInfo
  4. }

修改工作线程以使用这个新类型。同时,修改工作线程在jobs关闭时退出。

  1. func workerUser(ctx context.Context, jobs &lt;-chan job) error {
  2. for {
  3. select {
  4. case &lt;-ctx.Done():
  5. return ctx.Err()
  6. case job, ok := &lt;-jobs:
  7. if !ok {
  8. // 没有更多的任务,退出。
  9. return nil
  10. }
  11. var err error
  12. *job.result, err = CallUserAPI(ctx, job.user)
  13. if err != nil {
  14. return err
  15. }
  16. }
  17. }
  18. }

在主goroutine中将它们组合在一起:

  1. jobs := make(chan UsersInfo, totalUsers)
  2. usersArray := make([]UsersInfo, totalUsers)
  3. g, gCtx := errgroup.WithContext(ctx)
  4. // 启动工作线程。
  5. for i := 1; i &lt;= 4; i++ {
  6. g.Go(func() error {
  7. return workerUser(gCtx, jobs)
  8. })
  9. }
  10. // 提供任务给工作线程。
  11. for i, user := range usersResp {
  12. jobs &lt;- job{user: user, result: &amp;usersArray[i]}
  13. }
  14. // 关闭通道以指示不再发送任务。
  15. // 工作线程通过`if !ok { return nil }`语句退出。
  16. close(jobs)
  17. // 等待工作线程完成。
  18. if err := g.Wait(); err != nil {
  19. return nil, err
  20. }
  21. // 现在可以安全地访问usersArray中的结果。
英文:

There are a couple of problems:

  • The loop

    1. for i := 1; i &lt;= totalUsers; i++ {
    2. r := &lt;-results
    3. usersArray = append(usersArray, r)
    4. }

    waits for the workers to send a result for each user. That will not happen when CallUserAPI returns an error.

  • The worker does not handle the case where jobs is closed.

Here's code that fixes both of these problems:

Declare a type that specifies the user to work on and where to put the result:

  1. type job struct {
  2. user UsersInfo
  3. result *UsersInfo
  4. }

Modify the worker to use this new type. Also, modify the worker to exit when jobs is closed.

  1. func workerUser(ctx context.Context, jobs &lt;-chan job) error {
  2. for {
  3. select {
  4. case &lt;-ctx.Done():
  5. return ctx.Err()
  6. case job, ok := &lt;-jobs:
  7. if !ok {
  8. // No more jobs, exit.
  9. return nil
  10. }
  11. var err error
  12. *job.result, err = CallUserAPI(ctx, job.user)
  13. if err != nil {
  14. return err
  15. }
  16. }
  17. }
  18. }

Glue it all together in the main goroutine:

  1. jobs := make(chan UsersInfo, totalUsers)
  2. usersArray := make([]UsersInfo, totalUsers)
  3. g, gCtx := errgroup.WithContext(ctx)
  4. // Start the workers.
  5. for i := 1; i &lt;= 4; i++ {
  6. g.Go(func() error {
  7. return workerUser(gCtx, jobs)
  8. })
  9. }
  10. // Feed the workers.
  11. for i, user := range usersResp {
  12. jobs &lt;- job{user: user, result: &amp;usersArray[i]}
  13. }
  14. // Close the channel to indicate that no more jobs will be sent.
  15. // The workers exit via the `if !ok { return nil }` statement.
  16. close(jobs)
  17. // Wait for the workers to complete.
  18. if err := g.Wait(); err != nil {
  19. return nil, err
  20. }
  21. // It is now safe to access the results in usersArray.

huangapple
  • 本文由 发表于 2023年5月3日 19:58:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76163543.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定