英文:
Close multiple goroutine if an error occurs in one in go
问题
考虑以下函数:
func doAllWork() error {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
result, err := work(j)
if err != nil {
// 在这里不能使用 `return err`
// 我应该放什么来代替?
os.Exit(0)
}
}
}()
}
wg.Wait()
return nil
}
在每个 goroutine 中,函数 work()
被调用了 10 次。如果在任何一个正在运行的 goroutine 中,对 work()
的调用返回了一个错误,我希望所有的 goroutine 立即停止,并且程序退出。
在这里使用 os.Exit()
可以吗?我应该如何处理这个问题?
编辑:这个问题与 如何停止一个 goroutine 不同,因为在这里,如果一个 goroutine 中发生错误,我需要关闭所有的 goroutine。
英文:
consider this function :
func doAllWork() error {
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.add(1)
go func() {
defer wg.Done()
for j := 0; j < 10; j++ {
result, err := work(j)
if err != nil {
// can't use `return err` here
// what sould I put instead ?
os.Exit(0)
}
}
}()
}
wg.Wait()
return nil
}
In each goroutine, the function work()
is called 10 times. If one call to work()
returns an error in any of the running goroutines, I want all the goroutines to stop immediately, and the program to exit.
Is it ok to use os.Exit()
here ? How should I handle this ?
Edit: this question is different from how to stop a goroutine as here I need to close all goroutines if an error occurs in one
答案1
得分: 55
你可以使用context
包来实现这样的功能("携带截止时间、取消信号等")。
你可以使用context.WithCancel()
创建一个能够发布取消信号的上下文(父上下文可以是context.Background()
返回的上下文)。这将返回一个cancel()
函数,用于向工作协程发送取消意图(更准确地说是发送取消信号)。
在工作协程中,你需要检查是否已经发起了取消意图,通过检查Context.Done()
返回的通道是否关闭,最简单的方法是尝试从中接收(如果通道已关闭,则立即进行)。为了进行非阻塞检查(以便在通道未关闭时继续执行),可以使用select
语句和default
分支。
我将使用以下work()
实现,它模拟了10%的失败几率,并模拟了1秒钟的工作时间:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10%的失败几率
return 0, errors.New("随机错误")
}
time.Sleep(time.Second)
return 100 + i, nil
}
doAllWork()
函数可能如下所示:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // 确保即使没有错误也调用cancel()释放资源
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// 检查其他协程中是否发生了错误:
select {
case <-ctx.Done():
return // 发生错误,终止
default: // 默认分支必须避免阻塞
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d 在 %d 时发生错误:%v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d 完成 %d,结果:%d。\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
可以使用以下方式进行测试:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 是因为Playground的时间是固定的
fmt.Printf("doAllWork: %v\n", doAllWork())
}
输出结果(在Go Playground上尝试):
Worker #0 完成 0,结果:100。
Worker #1 完成 0,结果:100。
Worker #1 完成 1,结果:101。
Worker #0 完成 1,结果:101。
Worker #0 完成 2,结果:102。
Worker #1 完成 2,结果:102。
Worker #1 完成 3,结果:103。
Worker #1 在 4 时发生错误:随机错误
Worker #0 完成 3,结果:103。
doAllWork: 上下文已取消
如果没有错误,例如使用以下work()
函数:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
输出结果将如下所示(在Go Playground上尝试):
Worker #0 完成 0,结果:100。
Worker #1 完成 0,结果:100。
Worker #1 完成 1,结果:101。
Worker #0 完成 1,结果:101。
Worker #0 完成 2,结果:102。
Worker #1 完成 2,结果:102。
Worker #1 完成 3,结果:103。
Worker #0 完成 3,结果:103。
Worker #0 完成 4,结果:104。
Worker #1 完成 4,结果:104。
Worker #1 完成 5,结果:105。
Worker #0 完成 5,结果:105。
Worker #0 完成 6,结果:106。
Worker #1 完成 6,结果:106。
Worker #1 完成 7,结果:107。
Worker #0 完成 7,结果:107。
Worker #0 完成 8,结果:108。
Worker #1 完成 8,结果:108。
Worker #1 完成 9,结果:109。
Worker #0 完成 9,结果:109。
doAllWork: <nil>
注意:
基本上我们只是使用了上下文的Done()
通道,所以似乎我们可以使用一个done
通道而不是上面解决方案中的Context
,通过关闭通道来执行cancel()
的操作。
但这是不正确的。**只有在只有一个协程可以关闭通道的情况下才能这样做,但在我们的情况下,任何一个工作协程都可以这样做。**而且,尝试关闭一个已经关闭的通道会导致panic(详细信息请参见这里:https://stackoverflow.com/questions/39015602/how-does-a-non-initialized-channel-behave/39016004#39016004)。因此,你需要确保在close(done)
周围进行某种形式的同步/排他操作,这将使代码变得更加难以理解和更加复杂。实际上,这正是cancel()
函数在幕后所做的事情,它将这些细节隐藏/抽象化,因此可以多次调用cancel()
以使你的代码/使用更简单。
如何获取和返回工作协程的错误?
你可以使用一个错误通道:
errs := make(chan error, 2) // 缓冲区大小为2的错误通道
在工作协程中,当遇到错误时,将其发送到通道而不是打印出来:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d 在 %d 时发生错误:%v\n", i, j, err)
cancel()
return
}
在循环结束后,如果有错误,则返回第一个错误(否则返回nil
):
// 返回(第一个)错误(如果有):
if ctx.Err() != nil {
return <-errs
}
return nil
这次的输出结果(在Go Playground上尝试):
Worker #0 完成 0,结果:100。
Worker #1 完成 0,结果:100。
Worker #1 完成 1,结果:101。
Worker #0 完成 1,结果:101。
Worker #0 完成 2,结果:102。
Worker #1 完成 2,结果:102。
Worker #1 完成 3,结果:103。
Worker #0 完成 3,结果:103。
doAllWork: Worker #1 在 4 时发生错误:随机错误
请注意,我使用了一个带有与工作协程数相等的缓冲区大小的缓冲通道,这确保在发送时通道始终是非阻塞的。这还使你有可能接收和处理所有错误,而不仅仅是一个(例如第一个)。另一种选择是使用缓冲区大小为1的缓冲通道,进行非阻塞发送,代码如下所示:
errs := make(chan error, 1) // 仅为第一个错误缓冲区
// ...在工作协程中:
result, err := work(j)
if err != nil {
// 非阻塞发送:
select {
case errs <- fmt.Errorf("Worker #%d 在 %d 时发生错误:%v\n", i, j, err):
default:
}
cancel()
return
}
英文:
You may use the context
package which was created for things like this ("carries deadlines, cancelation signals...").
You create a context capable of publishing cancelation signals with context.WithCancel()
(parent context may be the one returned by context.Background()
). This will return you a cancel()
function which may be used to cancel (or more precisely signal the cancel intent) to the worker goroutines.
And in the worker goroutines you have to check if such intent has been initiated, by checking if the channel returned by Context.Done()
is closed, easiest done by attempting to receive from it (which proceeds immediately if it is closed). And to do a non-blocking check (so you can continue if it is not closed), use the select
statement with a default
branch.
I will use the following work()
implementation, which simulates a 10% failure chance, and simulates 1 second of work:
func work(i int) (int, error) {
if rand.Intn(100) < 10 { // 10% of failure
return 0, errors.New("random error")
}
time.Sleep(time.Second)
return 100 + i, nil
}
And the doAllWork()
may look like this:
func doAllWork() error {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // Make sure it's called to release resources even if no errors
for i := 0; i < 2; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
for j := 0; j < 10; j++ {
// Check if any error occurred in any other gorouties:
select {
case <-ctx.Done():
return // Error somewhere, terminate
default: // Default is must to avoid blocking
}
result, err := work(j)
if err != nil {
fmt.Printf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
fmt.Printf("Worker #%d finished %d, result: %d.\n", i, j, result)
}
}(i)
}
wg.Wait()
return ctx.Err()
}
This is how it can be tested:
func main() {
rand.Seed(time.Now().UnixNano() + 1) // +1 'cause Playground's time is fixed
fmt.Printf("doAllWork: %v\n", doAllWork())
}
Output (try it on the Go Playground):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #1 during 4, error: random error
Worker #0 finished 3, result: 103.
doAllWork: context canceled
If there would be no errors, e.g. when using the following work()
function:
func work(i int) (int, error) {
time.Sleep(time.Second)
return 100 + i, nil
}
The output would be like (try it on the Go Playground):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
Worker #0 finished 4, result: 104.
Worker #1 finished 4, result: 104.
Worker #1 finished 5, result: 105.
Worker #0 finished 5, result: 105.
Worker #0 finished 6, result: 106.
Worker #1 finished 6, result: 106.
Worker #1 finished 7, result: 107.
Worker #0 finished 7, result: 107.
Worker #0 finished 8, result: 108.
Worker #1 finished 8, result: 108.
Worker #1 finished 9, result: 109.
Worker #0 finished 9, result: 109.
doAllWork: <nil>
Notes:
Basically we just used the Done()
channel of the context, so it seems we could just as easily (if not even easier) use a done
channel instead of the Context
, closing the channel to do what cancel()
does in the above solution.
This is not true. This can only be used if only one goroutine may close the channel, but in our case any of the workers may do so. And attempting to close an already closed channel panics (see details here: https://stackoverflow.com/questions/39015602/how-does-a-non-initialized-channel-behave/39016004#39016004). So you would have to ensure some kind of synchronization / exclusion around the close(done)
, which will make it less readable and even more complex. Actually this is exactly what the cancel()
function does under the hood, hidden / abstracted away from your eyes, so cancel()
may be called multiple times to make your code / use of it simpler.
How to get and return the error(s) from the workers?
For this you may use an error channel:
errs := make(chan error, 2) // Buffer for 2 errors
And inside the workers when an error is encountered, send it on the channel instead of printing it:
result, err := work(j)
if err != nil {
errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err)
cancel()
return
}
And after the loop, if there was an error, return that (and nil
otherwise):
// Return (first) error, if any:
if ctx.Err() != nil {
return <-errs
}
return nil
Output this time (try this on the Go Playground):
Worker #0 finished 0, result: 100.
Worker #1 finished 0, result: 100.
Worker #1 finished 1, result: 101.
Worker #0 finished 1, result: 101.
Worker #0 finished 2, result: 102.
Worker #1 finished 2, result: 102.
Worker #1 finished 3, result: 103.
Worker #0 finished 3, result: 103.
doAllWork: Worker #1 during 4, error: random error
Note that I used a buffered channel with a buffer size equal to the number of workers, which ensures sending on it is always non-blocking. This also gives you the possibility to receive and process all errors, not just one (e.g. the first). Another option could be to use a buffered channel to hold only 1, and do a non-blocking send on it, which could look like this:
errs := make(chan error, 1) // Buffered only for the first error
// ...and inside the worker:
result, err := work(j)
if err != nil {
// Non-blocking send:
select {
case errs <- fmt.Errorf("Worker #%d during %d, error: %v\n", i, j, err):
default:
}
cancel()
return
}
答案2
得分: 12
这里有一个更清晰的方法可以使用errgroup
(文档)。
errgroup
包提供了用于处理共同任务的子任务的一组goroutine的同步、错误传播和上下文取消功能。
你可以在这个示例中查看它(playground):
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// 启动一个goroutine来获取URL。
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// 获取URL。
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// 等待所有HTTP获取完成。
if err := g.Wait(); err == nil {
fmt.Println("成功获取所有URL。")
} else {
// 所有goroutine都已运行,但至少有一个返回了错误!
// 但是所有的goroutine都必须完成它们的工作!
// 如果你想在一个失败时停止其他的goroutine,可以继续阅读!
fmt.Println("获取URL时出错。")
}
但是要注意:Go
文档中的The first call to return a non-nil error cancels the group
这句话有点误导人。
实际上,如果使用上下文(通过WithContext
函数)创建了errgroup.Group
,当组中的一个goroutine返回错误时,它会调用WithContext
返回的上下文的取消函数,否则不会执行任何操作(在这里阅读源代码!)。
因此,如果你想关闭不同的goroutine,你必须使用WithContext
返回的上下文,并在其中自行管理它,errgroup
只会关闭该上下文!这里有一个示例。
总结一下,errgroup
可以以不同的方式使用,如示例所示:
-
"只有错误",就像上面的示例一样:
Wait
等待所有的goroutine结束,然后返回第一个非nil的错误(如果有的话),否则返回nil
。 -
并行执行:
你必须使用WithContext
函数创建组,并使用上下文来管理上下文的关闭。
我在这里创建了一个带有一些睡眠的playground示例!
你必须手动关闭每个goroutine,但是使用上下文,你可以在一个关闭上下文时结束它们。 -
管道(在示例中了解更多)。
英文:
A more clear way to go here is to use errgroup
(documentation).
Package errgroup
provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
You can check it out in this example (playground):
var g errgroup.Group
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Launch a goroutine to fetch the URL.
url := url // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
// Fetch the URL.
resp, err := http.Get(url)
if err == nil {
resp.Body.Close()
}
return err
})
}
// Wait for all HTTP fetches to complete.
if err := g.Wait(); err == nil {
fmt.Println("Successfully fetched all URLs.")
} else {
// After all have run, at least one of them has returned an error!
// But all have to finish their work!
// If you want to stop others goroutines when one fail, go ahead reading!
fmt.Println("Unsuccessfully fetched URLs.")
}
But attention: The first call to return a non-nil error cancels the group
phrase in the Go
documentation is a little bit misleading.
In fact, errgroup.Group
if created with a context (WithContext
function), will call the cancel function of the context returned by WithContext
when a goroutine in the group will return an error, otherwise nothing will be done (read the source code here!).
So, if you want to close your different goroutines, you must use the context returned my WithContext
and manage it by yourself inside them, errgroup
will just close that context!
Here you can find an example.
To summarize, errgroup
can be used in different ways, as shown by the examples.
-
"just errors", as the above example:
Wait
wait that all goroutines end, and then returns the first non-nil error if any from them, or returnnil
. -
In parallel:
You have to create the group with theWithContext
function and use the context to manage the context closing.
I created a playground example here with some sleeps!
You have to manually close each goroutines, but using the context you can end them when one close the context. -
Pipelines (see more in the examples).
答案3
得分: 0
另一种方法是使用errgroup.WithContext
。你可以在这个示例中查看它。
简而言之,g.Wait()
等待第一个错误发生或所有任务都完成且没有错误。当任何一个goroutine发生错误(例如提供的示例中的超时),它会通过ctx.Done()
通道取消其他goroutine的执行。
英文:
Another way to go here is to use errgroup.WithContext
. You can check it out in this example.
In short, g.Wait()
waits for the first error to happen or for all to finish without errors. When error happens in any of the goroutines (timeout in the provided example), it cancels the execution in other goroutines through ctx.Done()
channel.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论