英文:
Go workerpool implementation using errgroup, goroutines stuck
问题
我已经使用errgroup实现了一个使用workerpool模式的代码,这样可以捕获任何goroutine中的错误。以下是详细信息:
jobs := make(chan UsersInfo, totalUsers)
results := make(chan UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
for i := 1; i <= 4; i++ {
    g.Go(func() error {
        err := workerUser(gCtx, jobs, results)
        if err != nil {
            return err
        }
        return nil
    })
}
for _, user := range usersResp {
    jobs <- user
}
close(jobs)
var usersArray []UsersInfo
for i := 1; i <= totalUsers; i++ {
    r := <-results
    usersArray = append(usersArray, r)
}
if err := g.Wait(); err != nil {
    return nil, err
}
然后,worker函数实现如下:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case user, _ := <-jobs:
            userInfo, err := CallUserAPI(ctx, user)
            if err != nil {
                return err
            }
            results <- userInfo
        }
    }
}
CallUserAPI返回403 Forbidden错误时,应该调用g.Wait()并立即停止所有goroutine。但在这里并非如此,g.Wait()从未被调用。
英文:
I have implemented a workerpool pattern using errgroup, so that error in any goroutines can be caught. Here is my detail:
    jobs := make(chan UsersInfo, totalUsers)
	results := make(chan UsersInfo, totalUsers)
	g, gCtx := errgroup.WithContext(ctx)
	for i := 1; i <= 4; i++ {
		g.Go(func() error {
			err := workerUser(gCtx, jobs, results)
			if err != nil {
				return err
			}
			return nil
		})
	}
	for _, user := range usersResp {
		jobs <- user
	}
	close(jobs)
    var usersArray []UsersInfo
	for  i := 1; i <= totalUsers; i++ {
        r := <-results
		usersArray = append(usersArray, r)
	}
	if err := g.Wait(); err != nil {
		return nil, err
	}
Then the worker function implemented like this:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
  for {
	select {
	case <-ctx.Done():
		return ctx.Err()
	case user, _ := <-jobs:
		userInfo, err := CallUserAPI(ctx, user)
		if err != nil {
			return err
		}
		results <- userInfo
	}
 }
}
The CallUserAPI return 403 Forbidden error which should invoke g.wait() and should stop all goroutines instantly when there is non-nil error. But this is not the case here, g.wait() is never called.
答案1
得分: 3
有几个问题:
- 
循环部分
for i := 1; i <= totalUsers; i++ { r := <-results usersArray = append(usersArray, r) }等待工作线程为每个用户发送结果。当
CallUserAPI返回错误时,这将不会发生。 - 
工作线程没有处理
jobs关闭的情况。 
以下是修复这两个问题的代码:
声明一个指定要处理的用户和结果放置位置的类型:
type job struct {
	user UsersInfo
	result *UsersInfo
}
修改工作线程以使用这个新类型。同时,修改工作线程在jobs关闭时退出。
func workerUser(ctx context.Context, jobs <-chan job) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case job, ok := <-jobs:
			if !ok {
				// 没有更多的任务,退出。
				return nil
			}
			var err error
			*job.result, err = CallUserAPI(ctx, job.user)
			if err != nil {
				return err
			}
		}
	}
}
在主goroutine中将它们组合在一起:
jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
// 启动工作线程。
for i := 1; i <= 4; i++ {
	g.Go(func() error {
		return workerUser(gCtx, jobs)
	})
}
// 提供任务给工作线程。
for i, user := range usersResp {
	jobs <- job{user: user, result: &usersArray[i]}
}
// 关闭通道以指示不再发送任务。
// 工作线程通过`if !ok { return nil }`语句退出。
close(jobs)
// 等待工作线程完成。
if err := g.Wait(); err != nil {
	return nil, err
}
// 现在可以安全地访问usersArray中的结果。
英文:
There are a couple of problems:
- 
The loop
for i := 1; i <= totalUsers; i++ { r := <-results usersArray = append(usersArray, r) }waits for the workers to send a result for each user. That will not happen when
CallUserAPIreturns an error. - 
The worker does not handle the case where
jobsis closed. 
Here's code that fixes both of these problems:
Declare a type that specifies the user to work on and where to put the result:
type job struct {
	user UsersInfo
	result *UsersInfo
}
Modify the worker to use this new type. Also, modify the worker to exit when jobs is closed.
func workerUser(ctx context.Context, jobs <-chan job) error {
	for {
		select {
		case <-ctx.Done():
			return ctx.Err()
		case job, ok := <-jobs:
			if !ok {
				// No more jobs, exit.
				return nil
			}
			var err error
			*job.result, err = CallUserAPI(ctx, job.user)
			if err != nil {
				return err
			}
		}
	}
}
Glue it all together in the main goroutine:
jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
// Start the workers.
for i := 1; i <= 4; i++ {
	g.Go(func() error {
		return workerUser(gCtx, jobs)
	})
}
// Feed the workers.  
for i, user := range usersResp {
	jobs <- job{user: user, result: &usersArray[i]}
}
// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)
// Wait for the workers to complete.
if err := g.Wait(); err != nil {
	return nil, err
}
// It is now safe to access the results in usersArray.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论