英文:
Idiomatic goroutine termination and error handling
问题
我在Go语言中有一个简单的并发使用案例,但是我无法找到一个优雅的解决方案来解决我的问题。
我想编写一个名为fetchAll
的方法,以并行方式从远程服务器查询未指定数量的资源。如果任何一个查询失败,我希望立即返回第一个错误。
我的初始实现中存在goroutine泄漏问题:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func fetchAll() error {
wg := sync.WaitGroup{}
errs := make(chan error)
leaks := make(map[int]struct{})
defer fmt.Println("这些goroutine泄漏了:", leaks)
// 并行运行所有的http请求
for i := 0; i < 4; i++ {
leaks[i] = struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
defer delete(leaks, i)
// 假设这里执行一个http请求并返回一个错误
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
errs <- fmt.Errorf("goroutine %d的错误返回", i)
}(i)
}
// 等待所有的查询完成并关闭错误通道,以便下面的循环终止
go func() {
wg.Wait()
close(errs)
}()
// 返回第一个错误
for err := range errs {
if err != nil {
return err
}
}
return nil
}
func main() {
fmt.Println(fetchAll())
}
Playground: https://play.golang.org/p/Be93J514R5
我从阅读https://blog.golang.org/pipelines得知,我可以创建一个信号通道来清理其他线程。或者,我可能可以使用context
来实现。但是,似乎这样一个简单的用例应该有一个我所忽略的更简单的解决方案。
英文:
I have a simple concurrency use case in go, and I cannot figure out an elegant solution to my problem.
I want to write a method fetchAll
that queries an unspecified number of resources from remote servers in parallel. If any of the fetches fails, I want to return that first error immediately.
My initial implementation leaks goroutines:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func fetchAll() error {
wg := sync.WaitGroup{}
errs := make(chan error)
leaks := make(map[int]struct{})
defer fmt.Println("these goroutines leaked:", leaks)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
leaks[i] = struct{}{}
wg.Add(1)
go func(i int) {
defer wg.Done()
defer delete(leaks, i)
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
errs <- fmt.Errorf("goroutine %d's error returned", i)
}(i)
}
// wait until all the fetches are done and close the error
// channel so the loop below terminates
go func() {
wg.Wait()
close(errs)
}()
// return the first error
for err := range errs {
if err != nil {
return err
}
}
return nil
}
func main() {
fmt.Println(fetchAll())
}
Playground: https://play.golang.org/p/Be93J514R5
I know from reading https://blog.golang.org/pipelines that I can create a signal channel to cleanup the other threads. Alternatively, I could probably use context
to accomplish it. But it seems like such a simple use case should have a simpler solution that I'm missing.
答案1
得分: 98
使用Error Group可以使这个过程更简单。它会自动等待所有提供的Go协程成功完成,或者在任何一个协程返回错误的情况下取消剩余的协程(在这种情况下,该错误将向上冒泡到调用者)。
package main
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func fetchAll(ctx context.Context) error {
errs, ctx := errgroup.WithContext(ctx)
// 并行运行所有的http请求
for i := 0; i < 4; i++ {
errs.Go(func() error {
// 假设这里执行一个http请求并返回一个错误
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Errorf("协程中发生错误,退出")
})
}
// 等待完成并返回第一个错误(如果有的话)
return errs.Wait()
}
func main() {
fmt.Println(fetchAll(context.Background()))
}
英文:
Using Error Group makes this even simpler. This automatically waits for all the supplied Go Routines to complete successfully, or cancels all those remaining in the case of any one routine returning an error (in which case that error is the one bubble back up to the caller).
package main
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
)
func fetchAll(ctx context.Context) error {
errs, ctx := errgroup.WithContext(ctx)
// run all the http requests in parallel
for i := 0; i < 4; i++ {
errs.Go(func() error {
// pretend this does an http request and returns an error
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
return fmt.Errorf("error in go routine, bailing")
})
}
// Wait for completion and return the first error (if any)
return errs.Wait()
}
func main() {
fmt.Println(fetchAll(context.Background()))
}
答案2
得分: 29
你的大部分goroutine都泄漏了,因为它们仍在等待发送到errs通道 - 你从未完成清空它的for-range循环。你还泄漏了负责关闭errs通道的goroutine,因为waitgroup从未完成。
(另外,正如Andy指出的,从map中删除不是线程安全的,所以需要使用互斥锁进行保护。)
然而,我认为在这里甚至不需要使用map、互斥锁、waitgroup、上下文等。我会重写整个代码,只使用基本的通道操作,类似以下的方式:
package main
import (
"fmt"
"math/rand"
"time"
)
func fetchAll() error {
var N = 4
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
for i := 0; i < N; i++ {
go func(i int) {
// dummy fetch
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := error(nil)
if rand.Intn(2) == 0 {
err = fmt.Errorf("goroutine %d's error returned", i)
}
ch := done // we'll send to done if nil error and to errc otherwise
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}(i)
}
count := 0
for {
select {
case err := <-errc:
close(quit)
return err
case <-done:
count++
if count == N {
return nil // got all N signals, so there was no error
}
}
}
}
func main() {
rand.Seed(time.Now().UnixNano())
fmt.Println(fetchAll())
}
Playground链接:https://play.golang.org/p/mxGhSYYkOb
编辑:确实有一个愚蠢的错误,谢谢指出。我已经修复了上面的代码(我想...)。我还添加了一些随机性以增加真实感™。
另外,我想强调的是,解决这个问题确实有多种方法,我的解决方案只是其中一种。最终取决于个人喜好,但通常来说,你希望努力编写“惯用”的代码 - 以及一种对你来说自然而易于理解的风格。
英文:
All but one of your goroutines are leaked, because they're still waiting to send to the errs channel - you never finish the for-range that empties it. You're also leaking the goroutine who's job is to close the errs channel, because the waitgroup is never finished.
(Also, as Andy pointed out, deleting from map is not thread-safe, so that'd need protection from a mutex.)
However, I don't think maps, mutexes, waitgroups, contexts etc. are even necessary here. I'd rewrite the whole thing to just use basic channel operations, something like the following:
package main
import (
"fmt"
"math/rand"
"time"
)
func fetchAll() error {
var N = 4
quit := make(chan bool)
errc := make(chan error)
done := make(chan error)
for i := 0; i < N; i++ {
go func(i int) {
// dummy fetch
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
err := error(nil)
if rand.Intn(2) == 0 {
err = fmt.Errorf("goroutine %d's error returned", i)
}
ch := done // we'll send to done if nil error and to errc otherwise
if err != nil {
ch = errc
}
select {
case ch <- err:
return
case <-quit:
return
}
}(i)
}
count := 0
for {
select {
case err := <-errc:
close(quit)
return err
case <-done:
count++
if count == N {
return nil // got all N signals, so there was no error
}
}
}
}
func main() {
rand.Seed(time.Now().UnixNano())
fmt.Println(fetchAll())
}
Playground link: https://play.golang.org/p/mxGhSYYkOb
EDIT: There indeed was a silly mistake, thanks for pointing it out. I fixed the code above (I think...). I also added some randomness for added Realism™.
Also, I'd like to stress that there really are multiple ways to approach this problem, and my solution is but one way. Ultimately it comes down to personal taste, but in general, you want to strive towards "idiomatic" code - and towards a style that feels natural and easy to understand for you.
答案3
得分: 9
这是一个更完整的示例,使用了errgroup库,由joth提供建议。它展示了处理成功数据,并在第一个错误发生时退出。
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"time"
)
func fetchAll() error {
g, ctx := errgroup.WithContext(context.Background())
results := make(chan int)
for i := 0; i < 4; i++ {
current := i
g.Go(func() error {
// 模拟延迟和随机错误
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(2) == 0 {
return fmt.Errorf("goroutine %d 的错误", current)
}
// 将处理后的数据传递给通道,或接收上下文完成信号
select {
case results <- current:
return nil
// 如果发生其他错误,则关闭通道
case <-ctx.Done():
return ctx.Err()
}
})
}
// 在第一个错误发生或处理成功时优雅地关闭通道
go func() {
g.Wait()
close(results)
}()
for result := range results {
fmt.Println("处理结果", result)
}
// 等待所有获取操作完成
return g.Wait()
}
func main() {
fmt.Println(fetchAll())
}
英文:
Here's a more complete example using errgroup suggested by joth. It shows processing successful data, and will exit on the first error.
https://play.golang.org/p/rU1v-Mp2ijo
package main
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"math/rand"
"time"
)
func fetchAll() error {
g, ctx := errgroup.WithContext(context.Background())
results := make(chan int)
for i := 0; i < 4; i++ {
current := i
g.Go(func() error {
// Simulate delay with random errors.
time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
if rand.Intn(2) == 0 {
return fmt.Errorf("goroutine %d's error returned", current)
}
// Pass processed data to channel, or receive a context completion.
select {
case results <- current:
return nil
// Close out if another error occurs.
case <-ctx.Done():
return ctx.Err()
}
})
}
// Elegant way to close out the channel when the first error occurs or
// when processing is successful.
go func() {
g.Wait()
close(results)
}()
for result := range results {
fmt.Println("processed", result)
}
// Wait for all fetches to complete.
return g.Wait()
}
func main() {
fmt.Println(fetchAll())
}
答案4
得分: 2
只要每个 goroutine 完成,就不会泄漏任何资源。你应该创建一个带有缓冲区的错误通道,缓冲区大小应该等于 goroutine 的数量,这样通道上的发送操作就不会阻塞。每个 goroutine 在完成时都应该向通道发送一些东西,无论成功与否。然后底部的循环只需迭代 goroutine 的数量,并在收到非空错误时返回。你不需要 WaitGroup 或者关闭通道的其他 goroutine。
我认为出现 goroutine 泄漏的原因是当你得到第一个错误时就返回了,所以其中一些 goroutine 仍在运行。
顺便说一下,映射(maps)不是 goroutine 安全的。如果你在 goroutine 之间共享一个映射,并且其中一些 goroutine 对映射进行了更改,你需要使用互斥锁来保护它。
英文:
As long as each goroutine completes, you won't leak anything. You should create the error channel as buffered with the buffer size equal to the number of goroutines so that the send operations on the channel won't block. Each goroutine should always send something on the channel when it finishes, whether it succeeds or fails. The loop at the bottom can then just iterate for the number of goroutines and return if it gets a non-nil error. You don't need the WaitGroup or the other goroutine that closes the channel.
I think the reason it appears that goroutines are leaking is that you return when you get the first error, so some of them are still running.
By the way, maps are not goroutine safe. If you share a map among goroutines and some of them are making changes to the map, you need to protect it with a mutex.
答案5
得分: 0
这个答案包括将响应放回doneData
的能力 -
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
)
var doneData []string // 响应
func fetchAll(n int, doneCh chan bool, errCh chan error) {
partialDoneCh := make(chan string)
for i := 0; i < n; i++ {
go func(i int) {
if r := rand.Intn(100); r != 0 && r%10 == 0 {
// 模拟错误
errCh <- fmt.Errorf("e33or for reqno=" + strconv.Itoa(r))
} else {
partialDoneCh <- strconv.Itoa(i)
}
}(i)
}
// 修改doneData
for d := range partialDoneCh {
doneData = append(doneData, d)
if len(doneData) == n {
close(partialDoneCh)
doneCh <- true
}
}
}
func main() {
// rand.Seed(1)
var n int
var e error
if len(os.Args) > 1 {
if n, e = strconv.Atoi(os.Args[1]); e != nil {
panic(e)
}
} else {
n = 5
}
doneCh := make(chan bool)
errCh := make(chan error)
go fetchAll(n, doneCh, errCh)
fmt.Println("main: end")
select {
case <-doneCh:
fmt.Println("success:", doneData)
case e := <-errCh:
fmt.Println("failure:", e, doneData)
}
}
使用
go run filename.go 50
执行,其中N=50,即并行度的数量。
英文:
This answer includes the ability to get the responses back into doneData
-
package main
import (
"fmt"
"math/rand"
"os"
"strconv"
)
var doneData []string // responses
func fetchAll(n int, doneCh chan bool, errCh chan error) {
partialDoneCh := make(chan string)
for i := 0; i < n; i++ {
go func(i int) {
if r := rand.Intn(100); r != 0 && r%10 == 0 {
// simulate an error
errCh <- fmt.Errorf("e33or for reqno=" + strconv.Itoa(r))
} else {
partialDoneCh <- strconv.Itoa(i)
}
}(i)
}
// mutation of doneData
for d := range partialDoneCh {
doneData = append(doneData, d)
if len(doneData) == n {
close(partialDoneCh)
doneCh <- true
}
}
}
func main() {
// rand.Seed(1)
var n int
var e error
if len(os.Args) > 1 {
if n, e = strconv.Atoi(os.Args[1]); e != nil {
panic(e)
}
} else {
n = 5
}
doneCh := make(chan bool)
errCh := make(chan error)
go fetchAll(n, doneCh, errCh)
fmt.Println("main: end")
select {
case <-doneCh:
fmt.Println("success:", doneData)
case e := <-errCh:
fmt.Println("failure:", e, doneData)
}
}
> Execute using go run filename.go 50
where N=50 i.e amount of parallelism
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论