Go workerpool 使用 errgroup 实现,goroutine 卡住的问题

huangapple go评论101阅读模式
英文:

Go workerpool implementation using errgroup, goroutines stuck

问题

我已经使用errgroup实现了一个使用workerpool模式的代码,这样可以捕获任何goroutine中的错误。以下是详细信息:

jobs := make(chan UsersInfo, totalUsers)
results := make(chan UsersInfo, totalUsers)

g, gCtx := errgroup.WithContext(ctx)

for i := 1; i <= 4; i++ {
    g.Go(func() error {
        err := workerUser(gCtx, jobs, results)
        if err != nil {
            return err
        }
        return nil
    })
}

for _, user := range usersResp {
    jobs <- user
}
close(jobs)

var usersArray []UsersInfo
for i := 1; i <= totalUsers; i++ {
    r := <-results
    usersArray = append(usersArray, r)
}

if err := g.Wait(); err != nil {
    return nil, err
}

然后,worker函数实现如下:

func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case user, _ := <-jobs:
            userInfo, err := CallUserAPI(ctx, user)
            if err != nil {
                return err
            }
            results <- userInfo
        }
    }
}

CallUserAPI返回403 Forbidden错误时,应该调用g.Wait()并立即停止所有goroutine。但在这里并非如此,g.Wait()从未被调用。

英文:

I have implemented a workerpool pattern using errgroup, so that error in any goroutines can be caught. Here is my detail:

    jobs := make(chan UsersInfo, totalUsers)
	results := make(chan UsersInfo, totalUsers)

	g, gCtx := errgroup.WithContext(ctx)

	for i := 1; i &lt;= 4; i++ {
		g.Go(func() error {
			err := workerUser(gCtx, jobs, results)
			if err != nil {
				return err
			}
			return nil
		})
	}

	for _, user := range usersResp {
		jobs &lt;- user
	}
	close(jobs)

    var usersArray []UsersInfo
	for  i := 1; i &lt;= totalUsers; i++ {
        r := &lt;-results
		usersArray = append(usersArray, r)
	}

	if err := g.Wait(); err != nil {
		return nil, err
	}

Then the worker function implemented like this:

func workerUser(ctx context.Context, jobs &lt;-chan UsersInfo, results chan&lt;- UsersInfo) error {
  for {
	select {
	case &lt;-ctx.Done():
		return ctx.Err()
	case user, _ := &lt;-jobs:
		userInfo, err := CallUserAPI(ctx, user)
		if err != nil {
			return err
		}
		results &lt;- userInfo
	}
 }
}

The CallUserAPI return 403 Forbidden error which should invoke g.wait() and should stop all goroutines instantly when there is non-nil error. But this is not the case here, g.wait() is never called.

答案1

得分: 3

有几个问题:

  • 循环部分

      for  i := 1; i &lt;= totalUsers; i++ {
          r := &lt;-results
          usersArray = append(usersArray, r)
      }
    

    等待工作线程为每个用户发送结果。当CallUserAPI返回错误时,这将不会发生。

  • 工作线程没有处理jobs关闭的情况。

以下是修复这两个问题的代码:

声明一个指定要处理的用户和结果放置位置的类型:

type job struct {
	user UsersInfo
	result *UsersInfo
}

修改工作线程以使用这个新类型。同时,修改工作线程在jobs关闭时退出。

func workerUser(ctx context.Context, jobs &lt;-chan job) error {
	for {
		select {
		case &lt;-ctx.Done():
			return ctx.Err()
		case job, ok := &lt;-jobs:
			if !ok {
				// 没有更多的任务,退出。
				return nil
			}
			var err error
			*job.result, err = CallUserAPI(ctx, job.user)
			if err != nil {
				return err
			}
		}
	}
}

在主goroutine中将它们组合在一起:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// 启动工作线程。
for i := 1; i &lt;= 4; i++ {
	g.Go(func() error {
		return workerUser(gCtx, jobs)
	})
}

// 提供任务给工作线程。
for i, user := range usersResp {
	jobs &lt;- job{user: user, result: &amp;usersArray[i]}
}

// 关闭通道以指示不再发送任务。
// 工作线程通过`if !ok { return nil }`语句退出。
close(jobs)

// 等待工作线程完成。
if err := g.Wait(); err != nil {
	return nil, err
}

// 现在可以安全地访问usersArray中的结果。
英文:

There are a couple of problems:

  • The loop

      for  i := 1; i &lt;= totalUsers; i++ {
          r := &lt;-results
          usersArray = append(usersArray, r)
      }
    

    waits for the workers to send a result for each user. That will not happen when CallUserAPI returns an error.

  • The worker does not handle the case where jobs is closed.

Here's code that fixes both of these problems:

Declare a type that specifies the user to work on and where to put the result:

type job struct {
	user UsersInfo
	result *UsersInfo
}

Modify the worker to use this new type. Also, modify the worker to exit when jobs is closed.

func workerUser(ctx context.Context, jobs &lt;-chan job) error {
	for {
		select {
		case &lt;-ctx.Done():
			return ctx.Err()
		case job, ok := &lt;-jobs:
			if !ok {
				// No more jobs, exit.
				return nil
			}
			var err error
			*job.result, err = CallUserAPI(ctx, job.user)
			if err != nil {
				return err
			}
		}
	}
}

Glue it all together in the main goroutine:

jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)

// Start the workers.
for i := 1; i &lt;= 4; i++ {
	g.Go(func() error {
		return workerUser(gCtx, jobs)
	})
}

// Feed the workers.  
for i, user := range usersResp {
	jobs &lt;- job{user: user, result: &amp;usersArray[i]}
}

// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)

// Wait for the workers to complete.
if err := g.Wait(); err != nil {
	return nil, err
}

// It is now safe to access the results in usersArray.

huangapple
  • 本文由 发表于 2023年5月3日 19:58:39
  • 转载请务必保留本文链接:https://go.coder-hub.com/76163543.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定