英文:
Go workerpool implementation using errgroup, goroutines stuck
问题
我已经使用errgroup实现了一个使用workerpool模式的代码,这样可以捕获任何goroutine中的错误。以下是详细信息:
jobs := make(chan UsersInfo, totalUsers)
results := make(chan UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
for i := 1; i <= 4; i++ {
g.Go(func() error {
err := workerUser(gCtx, jobs, results)
if err != nil {
return err
}
return nil
})
}
for _, user := range usersResp {
jobs <- user
}
close(jobs)
var usersArray []UsersInfo
for i := 1; i <= totalUsers; i++ {
r := <-results
usersArray = append(usersArray, r)
}
if err := g.Wait(); err != nil {
return nil, err
}
然后,worker函数实现如下:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case user, _ := <-jobs:
userInfo, err := CallUserAPI(ctx, user)
if err != nil {
return err
}
results <- userInfo
}
}
}
CallUserAPI返回403 Forbidden错误时,应该调用g.Wait()并立即停止所有goroutine。但在这里并非如此,g.Wait()从未被调用。
英文:
I have implemented a workerpool pattern using errgroup, so that error in any goroutines can be caught. Here is my detail:
jobs := make(chan UsersInfo, totalUsers)
results := make(chan UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
for i := 1; i <= 4; i++ {
g.Go(func() error {
err := workerUser(gCtx, jobs, results)
if err != nil {
return err
}
return nil
})
}
for _, user := range usersResp {
jobs <- user
}
close(jobs)
var usersArray []UsersInfo
for i := 1; i <= totalUsers; i++ {
r := <-results
usersArray = append(usersArray, r)
}
if err := g.Wait(); err != nil {
return nil, err
}
Then the worker function implemented like this:
func workerUser(ctx context.Context, jobs <-chan UsersInfo, results chan<- UsersInfo) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case user, _ := <-jobs:
userInfo, err := CallUserAPI(ctx, user)
if err != nil {
return err
}
results <- userInfo
}
}
}
The CallUserAPI return 403 Forbidden error which should invoke g.wait() and should stop all goroutines instantly when there is non-nil error. But this is not the case here, g.wait() is never called.
答案1
得分: 3
有几个问题:
-
循环部分
for i := 1; i <= totalUsers; i++ { r := <-results usersArray = append(usersArray, r) }
等待工作线程为每个用户发送结果。当
CallUserAPI
返回错误时,这将不会发生。 -
工作线程没有处理
jobs
关闭的情况。
以下是修复这两个问题的代码:
声明一个指定要处理的用户和结果放置位置的类型:
type job struct {
user UsersInfo
result *UsersInfo
}
修改工作线程以使用这个新类型。同时,修改工作线程在jobs
关闭时退出。
func workerUser(ctx context.Context, jobs <-chan job) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case job, ok := <-jobs:
if !ok {
// 没有更多的任务,退出。
return nil
}
var err error
*job.result, err = CallUserAPI(ctx, job.user)
if err != nil {
return err
}
}
}
}
在主goroutine中将它们组合在一起:
jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
// 启动工作线程。
for i := 1; i <= 4; i++ {
g.Go(func() error {
return workerUser(gCtx, jobs)
})
}
// 提供任务给工作线程。
for i, user := range usersResp {
jobs <- job{user: user, result: &usersArray[i]}
}
// 关闭通道以指示不再发送任务。
// 工作线程通过`if !ok { return nil }`语句退出。
close(jobs)
// 等待工作线程完成。
if err := g.Wait(); err != nil {
return nil, err
}
// 现在可以安全地访问usersArray中的结果。
英文:
There are a couple of problems:
-
The loop
for i := 1; i <= totalUsers; i++ { r := <-results usersArray = append(usersArray, r) }
waits for the workers to send a result for each user. That will not happen when
CallUserAPI
returns an error. -
The worker does not handle the case where
jobs
is closed.
Here's code that fixes both of these problems:
Declare a type that specifies the user to work on and where to put the result:
type job struct {
user UsersInfo
result *UsersInfo
}
Modify the worker to use this new type. Also, modify the worker to exit when jobs
is closed.
func workerUser(ctx context.Context, jobs <-chan job) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case job, ok := <-jobs:
if !ok {
// No more jobs, exit.
return nil
}
var err error
*job.result, err = CallUserAPI(ctx, job.user)
if err != nil {
return err
}
}
}
}
Glue it all together in the main goroutine:
jobs := make(chan UsersInfo, totalUsers)
usersArray := make([]UsersInfo, totalUsers)
g, gCtx := errgroup.WithContext(ctx)
// Start the workers.
for i := 1; i <= 4; i++ {
g.Go(func() error {
return workerUser(gCtx, jobs)
})
}
// Feed the workers.
for i, user := range usersResp {
jobs <- job{user: user, result: &usersArray[i]}
}
// Close the channel to indicate that no more jobs will be sent.
// The workers exit via the `if !ok { return nil }` statement.
close(jobs)
// Wait for the workers to complete.
if err := g.Wait(); err != nil {
return nil, err
}
// It is now safe to access the results in usersArray.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论