英文:
Handle goroutine termination and error handling via error group?
问题
我正在尝试以并行方式读取多个文件,使每个读取文件的Go协程将其数据写入该通道,然后有一个单独的Go协程监听该通道并将数据添加到映射中。这是我在playground上的示例代码:
package main
import (
"fmt"
"sync"
)
func main() {
var myFiles = []string{"file1", "file2", "file3"}
var myMap = make(map[string][]byte)
dataChan := make(chan fileData, len(myFiles))
wg := sync.WaitGroup{}
defer close(dataChan)
// we create a wait group of N
wg.Add(len(myFiles))
for _, file := range myFiles {
// we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
// the file via the dataChan channel
go getBytesFromFile(file, dataChan, &wg)
}
// we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
wg.Wait()
for i := 0; i < len(myFiles); i++ {
// we can now read from the data channel N times.
file := <-dataChan
myMap[file.name] = file.bytes
}
fmt.Printf("%+v\n", myMap)
}
type fileData struct {
name string
bytes []byte
}
// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
bytes := openFileAndGetBytes(file)
dataChan <- fileData{name: file, bytes: bytes}
waitGroup.Done()
}
func openFileAndGetBytes(file string) []byte {
return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}
问题描述
我如何使用golang.org/x/sync/errgroup来等待并处理goroutine中的错误,或者是否有更好的方法,比如使用信号量?例如,如果我的任何一个goroutine无法从文件中读取数据,我希望在任何一个goroutine返回错误的情况下取消所有剩余的goroutine(在这种情况下,该错误将向上传递给调用者)。对于成功的情况,它应自动等待所有提供的goroutine成功完成。
如果可能的话,我也不想为总文件数为100的情况生成100个goroutine。如果有任何方法,我希望能够控制并行性。
英文:
I am trying to read multiple files in parallel in such a way so that each go routine that is reading a file write its data to that channel, then have a single go-routine that listens to that channel and adds the data to the map. Here is my play.
Below is the example from the play:
package main
import (
"fmt"
"sync"
)
func main() {
var myFiles = []string{"file1", "file2", "file3"}
var myMap = make(map[string][]byte)
dataChan := make(chan fileData, len(myFiles))
wg := sync.WaitGroup{}
defer close(dataChan)
// we create a wait group of N
wg.Add(len(myFiles))
for _, file := range myFiles {
// we create N go-routines, one per file, each one will return a struct containing their filename and bytes from
// the file via the dataChan channel
go getBytesFromFile(file, dataChan, &wg)
}
// we wait until the wait group is decremented to zero by each instance of getBytesFromFile() calling waitGroup.Done()
wg.Wait()
for i := 0; i < len(myFiles); i++ {
// we can now read from the data channel N times.
file := <-dataChan
myMap[file.name] = file.bytes
}
fmt.Printf("%+v\n", myMap)
}
type fileData struct {
name string
bytes []byte
}
// how to handle error from this method if reading file got messed up?
func getBytesFromFile(file string, dataChan chan fileData, waitGroup *sync.WaitGroup) {
bytes := openFileAndGetBytes(file)
dataChan <- fileData{name: file, bytes: bytes}
waitGroup.Done()
}
func openFileAndGetBytes(file string) []byte {
return []byte(fmt.Sprintf("these are some bytes for file %s", file))
}
Problem Statement
How can I use golang.org/x/sync/errgroup to wait on and handle errors from goroutines or if there is any better way like using semaphore? For example if any one of my go routine fails to read data from file then I want to cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller). And it should automatically waits for all the supplied go routines to complete successfully for success case.
I also don't want to spawn 100 go-routines if total number of files is 100. I want to control the parallelism if possible if there is any way.
答案1
得分: 7
如何使用golang.org/x/sync/errgroup
来等待和处理goroutine中的错误,或者是否有更好的方法,比如使用信号量?例如[...],如果有任何一个例程返回错误(在这种情况下,该错误会向上传递给调用者),我希望取消所有剩余的例程。并且它应该自动等待所有提供的goroutine成功完成。
有很多方法可以在goroutine之间传递错误状态。errgroup
可以处理很多繁重的工作,并且适用于这种情况。否则,你将不得不实现相同的功能。
要使用errgroup
,我们需要处理错误(并为你的演示生成一些错误)。此外,为了取消现有的goroutine,我们将使用errgroup.NewWithContext
创建一个上下文。
根据errgroup
的参考文档:
包errgroup为共同任务的子任务提供了同步、错误传播和上下文取消。
你的代码示例中没有支持任何错误处理。如果我们不进行任何错误处理,就无法收集和取消错误。因此,我添加了一些代码来注入错误处理:
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
然后,该错误也必须从getBytesFromFile
传递回来:
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
现在,我们可以关注如何启动多个goroutine。如果文件总数为100,我也不想生成100个goroutine。如果可能的话,我希望能够控制并行性。
通常情况下,任务数量、通道大小和工作线程数量是独立的值。关键是使用通道关闭和上下文取消来在goroutine之间传递状态。我们需要一个额外的通道来分发文件名,并且需要一个额外的goroutine来收集结果。
为了说明这一点,我的代码使用了3个工作线程,并添加了一些额外的文件。我的通道是无缓冲的。这样我们就可以看到一些文件被处理,而其他文件被中止。如果你使用缓冲通道,示例仍然可以工作,但更有可能在取消处理之前处理其他工作。可以尝试不同的缓冲区大小、工作线程数和要处理的文件数。
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
要启动工作线程,我们不是为每个文件启动一个线程,而是启动我们想要的数量(这里是3个)。
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
工作线程调用你的getBytesFromFile
函数。如果它返回一个错误,我们就返回一个错误。在这种情况下,errgroup
会自动取消我们的上下文。然而,操作的确切顺序是不确定的,因此在取消处理之前可能会处理更多的文件。下面我将展示几种可能性。
通过对fileChan
进行range
迭代,工作线程会自动从通道关闭中获取到工作结束的信号。如果我们遇到错误,我们可以立即将其返回给errgroup
。否则,如果上下文已被取消,我们可以立即返回取消错误。
你可能会认为g.Go
会自动取消我们的函数。但实际上它不能。在Go
中,除了进程终止之外,没有办法取消正在运行的函数。errgroup.Group.Go
的函数参数必须根据其上下文的状态在适当的时候自行取消。
现在,我们可以关注将文件放入fileChan
的机制。我们有两个选项:我们可以使用一个大小为myFiles
的缓冲通道,就像你做的那样。我们可以填充整个通道以等待作业完成。只有在创建通道时知道作业数量时,才能选择此选项。另一个选项是使用额外的“分发”goroutine,它可以在写入fileChan
时阻塞,以便我们的“主”goroutine可以继续执行。
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
我不确定在这种情况下将其放入相同的errgroup
中是否是严格必要的,因为我们无法在分发器goroutine中获取错误。但是,这种通用模式取自errgroup的Pipeline示例,无论工作调度器是否可能生成错误,都可以工作。
这个函数非常简单,但是select
和ctx.Done()
通道的组合是其中的关键。我们要么向工作通道写入数据,要么在上下文完成时失败。这样,我们就可以在一个工作线程失败一个文件时停止分发工作。
我们使用defer close(fileChan)
,这样无论我们何时完成(无论是分发所有工作还是取消上下文),工作线程都知道在传入的工作队列(即fileChan
)上不会再有更多的工作。
我们还需要另一种同步机制:一旦所有工作都分发完毕,并且所有结果都已经收集或工作已经完成取消(例如,当我们的errgroup的Wait()
返回后),我们需要关闭结果通道dataChan
。这将向结果收集器发出信号,表示没有更多的结果需要收集。
var err error // 我们稍后会用到这个变量!
go func() {
err = g.Wait()
close(dataChan)
}()
我们不能将其放入errgroup.Group
中,也不需要这样做,因为该函数无法返回错误,并且无法等待自己close(dataChan)
。因此,它进入一个普通的goroutine,不带errgroup。
最后,我们可以收集结果了。有了专用的工作线程goroutine、一个分发器goroutine和一个等待工作并通知dataChan
不会再有更多写入的goroutine,我们可以在“主”goroutine中直接收集所有结果。
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil { // 这是在我们的最后一个goroutine中设置的,记住
fmt.Println("errgroup Error:", err.Error())
}
我做了一些小的更改,以便更容易看到输出。你可能已经注意到我将文件内容从[]byte
更改为string
。这纯粹是为了方便阅读结果。为了达到这个目的,我使用encoding/json
来格式化结果,以便可以轻松阅读并将其粘贴到SO中。这是我经常使用的一种常见模式,用于缩进结构化数据:
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
最后,我们准备运行。现在,我们可以看到许多不同的结果,这取决于goroutine的执行顺序。但所有这些都是有效的执行路径。
worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3"
}
Program exited.
在此结果中,剩余的工作(file4
和file5
)没有添加到通道中。请记住,无缓冲通道不存储数据。对于这些任务被写入通道,必须有一个工作线程来读取它们。相反,在file2
失败后,上下文被取消,并且分发函数在其选择中遵循了<-done
路径。file1
和file3
已经被处理。
这是另一个结果(我只是运行了几次playground分享以获得不同的结果)。
worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3",
"file4": "these are some bytes for file file4",
"file5": "these are some bytes for file file5",
"file6": "these are some bytes for file file6"
}
在这种情况下,看起来好像我们的取消失败了。但实际上,只是goroutine“碰巧”在errorgroup
注意到worker的失败并取消上下文之前,按顺序排队并完成了其余的工作。
errorgroup的作用
当你使用errorgroup时,你实际上得到了两个东西:
- 轻松访问工作线程返回的第一个错误;
- 获取一个上下文,当errorgroup取消时,它会自动取消。
请记住,errorgroup不会取消goroutine。这一点最初让我有点困惑。errorgroup取消上下文。你的责任是根据上下文的状态将其应用于你的goroutine(记住,goroutine必须自行结束,errorgroup无法结束它)。
关于文件操作的上下文和失败未完成工作的最后说明
大多数文件操作,例如io.Copy
或os.ReadFile
,实际上是一系列连续的Read
操作。但是,io
和os
不直接支持上下文。因此,如果你有一个正在读取文件的工作线程,并且你没有自己实现Read
循环,你将没有机会根据上下文取消。在你的情况下,这可能是可以接受的 - 当然,你可能读取了一些不需要的文件,但这仅仅是因为在错误发生时你已经在读取它们。我个人会接受这种情况,并且不会实现自己的读取循环。
代码
https://go.dev/play/p/9qfESp_eB-C
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"golang.org/x/sync/errgroup"
)
func main() {
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
if err := ctx.Err(); err != nil {
return err
}
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
var err error
go func() {
err = g.Wait()
close(dataChan)
}()
var myMap = make(map[string]string)
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil {
fmt.Println("errgroup Error:", err.Error())
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
}
type fileData struct {
name,
bytes string
}
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
英文:
> How can I use golang.org/x/sync/errgroup to wait on and handle errors from goroutines or if there is any better way like using semaphore? For example [...] I want to cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller). And it should automatically waits for all the supplied go routines to complete successfully for success case.
There are many ways to communicate error states across goroutines. errgroup
does a bunch of heavy lifting though, and is appropriate for this case. Otherwise you're going to end up implementing the same thing.
To use errgroup
we'll need to handle errors (and for your demo, generate some). In addition, to cancel existing goroutines, we'll use a context from errgroup.NewWithContext
.
From the errgroup reference,
> Package errgroup provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.
Your play doesn't support any error handling. We can't collect and cancel on errors if we don't do any error handling. So I added some code to inject error handling:
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
Then that error had to be passed back from getBytesFromFile
as well:
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
Now that we've done that, we can turn our attention to how we're going to start up a number of goroutines.
> I also don't want to spawn 100 go-routines if total number of files is 100. I want to control the parallelism if possible if there is any way.
Written well, the number of tasks, channel size, and number of workers are typically independent values. The trick is to use channel closure - and in your case, context cancellation - to communicate state between the goroutines. We'll need an additional channel for the distribution of filenames, and an additional goroutine for the collection of the results.
To illustate this point, my code uses 3 workers, and adds a few more files. My channels are unbuffered. This allows us to see some of the files get processed, while others are aborted. If you buffer the channels, the example will still work, but it's more likely for additional work to be processed before the cancellation is handled. Experiment with buffer size along with worker count and number of files to process.
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
To start up the workers, instead of starting one for each file, we start the number we desire - here, 3.
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
The workers call your getBytesFromFile
function. If it returns an err, we return an err. errgroup
will cancel our context automatically in this case. However, the exact order of operations is not deterministic, so more files may or may not get processed before the context is cancelled. I'll show several possibilties below.
by range
ing over fileChan
, the worker automatically picks up end of work from the channel closure. If we get an error, we can return it to errgroup
immediately. Otherwise, if the context has been cancelled, we can return the cancellation error immediately.
You might think that g.Go
would automatically cancel our function. But it cannot. There is no way to cancel a running function in Go
other than process termination. errgroup.Group.Go
's function argument must cancel itself when appropriate based on the state of its context.
Now we can turn our attention to the thing that puts the files on fileChan
. We have 2 options here: we can use a buffered channel of the size of myFiles
, like you did. We can fill the entire channel with pending jobs. This is only an option if you know the number of jobs when you create the channel. The other option is to use an additional "distribution" goroutine that can block on writes to fileChan
so that our "main" goroutine can continue.
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
I'm not sure it's strictly necessary to put this in the same errgroup in this case, because we can't get an error in the distributor goroutine. But this general pattern, drawn from the Pipeline example from errgroup, works regardless of whether the work dispatcher might generate errors.
This functions pretty simple, but the magic is in select
along with ctx.Done()
channel. Either we write to the work channel, or we fail if our context is done. This allows us to stop distributing work when one worker has failed one file.
We defer close(fileChan)
so that, regardless of why we have finished (either we distributed all work, or the context was cancelled), the workers know there will be no more work on the incoming work queue (ie fileChan
).
We need one more synchronization mechanism: once all the work is distributed, and all the results are in or work was finished being cancelled, (eg, after our errgroup's Wait()
returns), we need to close our results channel, dataChan
. This signals the results collector that there are no more results to be collected.
var err error // we'll need this later!
go func() {
err = g.Wait()
close(dataChan)
}()
We can't - and don't need to - put this in the errgroup.Group
. The function can't return an error, and it can't wait for itself to close(dataChan)
. So it goes into a regular old goroutine, sans errgroup.
Finally we can collect the results. With dedicated worker goroutines, a distributor goroutine, and a goroutine waiting on the work and notifying that there will be no more writes to the dataChan
, we can collect all the results right in the "primary" goroutine in main
.
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil { // this was set in our final goroutine, remember
fmt.Println("errgroup Error:", err.Error())
}
I made a few small changes so that it was easier to see the output. You may already have noticed I changed the file contents from []byte
to string
. This was purely so that the results were easy to read. Pursuant also to that end, I am using encoding/json
to format the results so that it is easy to read them and paste them into SO. This is a common pattern that I often use to indent structured data:
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
Finally we're ready to run. Now we can see a number of different results depending on just what order the goroutines execute. But all of them are valid execution paths.
worker 2 failed to process file2 : file2 cannot be read
worker 0 context error in worker: context canceled
worker 1 context error in worker: context canceled
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3"
}
Program exited.
In this result, the remaining work (file4
and file5
) were not added to the channel. Remember, an unbuffered channel stores no data. For those tasks to be written to the channel, a worker would have to be there to read them. Instead, the context was cancelled after file2
failed, and the distribution function followed the <-done
path within its select. file1
and file3
were already processed.
Here's a different result (I just ran the playground share a few times to get different results).
worker 1 failed to process file2 : file2 cannot be read
worker 2 processed all work on channel
worker 0 processed all work on channel
errgroup Error: file2 cannot be read
{
"file1": "these are some bytes for file file1",
"file3": "these are some bytes for file file3",
"file4": "these are some bytes for file file4",
"file5": "these are some bytes for file file5",
"file6": "these are some bytes for file file6"
}
In this case, it looks a little like our cancellation failed. but what really happened is that the goroutines just "happened" to queue and finish the rest of the work before errorgroup
picked upon worker `'s failure and cancelled the context.
what errorgroup does
When you use errorgroup, you're really getting 2 things out of it:
- easily accessing the first error your workers returned;
- getting a context that errorgroup will cancel for you when
Keep in mind that errorgroup does not cancel goroutines. This tripped me up a bit at first. Errorgroup cancels the context. It's your responsibility to apply the status of that context to your goroutines (remember, the goroutine must end itself, errorgroup
can't end it).
A final aside about contexts with file operations, and failing outstanding work
Most of your file operations, eg io.Copy
or os.ReadFile
, are actually a loop of subsequent Read
operations. But io
and os
don't support contexts directly. so if you have a worker reading a file, and you don't implement the Read
loop yourself, you won't have an opportunity to cancel based on context. That's probably okay in your case - sure, you may have read some more files than you really needed to, but only because you were already reading them when the error occurred. I would personally accept this state of affairs and not implement my own read loop.
The code
https://go.dev/play/p/9qfESp_eB-C
package main
import (
"context"
"encoding/json"
"fmt"
"os"
"golang.org/x/sync/errgroup"
)
func main() {
var myFiles = []string{"file1", "file2", "file3", "file4", "file5", "file6"}
fileChan := make(chan string)
dataChan := make(chan fileData)
g, ctx := errgroup.WithContext(context.Background())
for i := 0; i < 3; i++ {
worker_num := i
g.Go(func() error {
for file := range fileChan {
if err := getBytesFromFile(file, dataChan); err != nil {
fmt.Println("worker", worker_num, "failed to process", file, ":", err.Error())
return err
} else if err := ctx.Err(); err != nil {
fmt.Println("worker", worker_num, "context error in worker:", err.Error())
return err
}
}
fmt.Println("worker", worker_num, "processed all work on channel")
return nil
})
}
// dispatch files
g.Go(func() error {
defer close(fileChan)
done := ctx.Done()
for _, file := range myFiles {
if err := ctx.Err(); err != nil {
return err
}
select {
case fileChan <- file:
continue
case <-done:
break
}
}
return ctx.Err()
})
var err error
go func() {
err = g.Wait()
close(dataChan)
}()
var myMap = make(map[string]string)
for data := range dataChan {
myMap[data.name] = data.bytes
}
if err != nil {
fmt.Println("errgroup Error:", err.Error())
}
enc := json.NewEncoder(os.Stdout)
enc.SetIndent("", " ")
if err := enc.Encode(myMap); err != nil {
panic(err)
}
}
type fileData struct {
name,
bytes string
}
func getBytesFromFile(file string, dataChan chan fileData) error {
bytes, err := openFileAndGetBytes(file)
if err == nil {
dataChan <- fileData{name: file, bytes: bytes}
}
return err
}
func openFileAndGetBytes(file string) (string, error) {
if file == "file2" {
return "", fmt.Errorf("%s cannot be read", file)
}
return fmt.Sprintf("these are some bytes for file %s", file), nil
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论