英文:
Golang Concurrency Issue to introduce timeout
问题
我帮你翻译一下:
我希望在Go语言中使用Go协程实现并行API调用。一旦请求被发送,
1)我需要等待所有响应(响应时间不同)。
2)如果任何一个请求失败并返回错误,我希望结束(或假装结束)协程。
3)我还希望每个Go协程(或API调用)关联一个超时值。
我已经实现了上述1和2的功能,但需要帮助来实现第3个功能。同时,对于1和2的反馈也会有所帮助。
谢谢。
英文:
I wish to implement parallel api calling in golang using go routines. Once the requests are fired,
- I need to wait for all responses (which take different time).
- If any of the request fails and returns an error, I wish to end (or pretend) the routines.
- I also want to have a timeout value associated with each go routine (or api call).
I have implemented the below for 1 and 2, but need help as to how can I implement 3. Also, feedback on 1 and 2 will also help.
package main
import (
"errors"
"fmt"
"sync"
"time"
)
func main() {
var wg sync.WaitGroup
c := make(chan interface{}, 1)
c2 := make(chan interface{}, 1)
err := make(chan interface{})
wg.Add(1)
go func() {
defer wg.Done()
result, e := doSomeWork()
if e != nil {
err <- e
return
}
c <- result
}()
wg.Add(1)
go func() {
defer wg.Done()
result2, e := doSomeWork2()
if e != nil {
err <- e
return
}
c2 <- result2
}()
go func() {
wg.Wait()
close(c)
close(c2)
close(err)
}()
for e := range err {
// here error happend u could exit your caller function
fmt.Println("Error==>", e)
return
}
fmt.Println(<-c, <-c2)
}
// mimic api call 1
func doSomeWork() (function1, error) {
time.Sleep(10 * time.Second)
obj := function1{"ABC", "29"}
return obj, nil
}
type function1 struct {
Name string
Age string
}
// mimic api call 2
func doSomeWork2() (function2, error) {
time.Sleep(4 * time.Second)
r := errors.New("Error Occured")
if 1 == 2 {
fmt.Println(r)
}
obj := function2{"Delhi", "Delhi"}
// return error as nil for now
return obj, nil
}
type function2 struct {
City string
State string
}
Thanks in advance.
答案1
得分: 1
为了支持goroutine工作的超时和取消,标准机制是使用context.Context。
ctx := context.Background() // 根上下文
// 使用超时和/或取消机制包装上下文
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // 设置超时或取消
//ctx, cancel := context.WithCancel(ctx) // 只取消,没有超时
defer cancel() // 避免内存泄漏,如果我们从未取消/超时
接下来,您的工作goroutine需要支持接收和监视ctx
的状态。为了与time.Sleep
并行(模拟长时间计算),将sleep转换为基于通道的解决方案:
// 模拟API调用1
func doSomeWork(ctx context.Context) (function1, error) {
//time.Sleep(10 * time.Second)
select {
case <-time.After(10 * time.Second):
// 等待完成
case <-ctx.Done():
return function1{}, ctx.Err()
}
// ...
}
如果一个工作goroutine失败,为了向其他工作goroutine发出信号,请求应该被中止,只需调用cancel()
函数。
result, e := doSomeWork(ctx)
if e != nil {
cancel() // 添加这一行
err <- e
return
}
将所有这些组合在一起:
https://play.golang.org/p/1Kpe_tre7XI
编辑:上面的sleep示例显然是一个中止“假”任务的人为示例。在现实世界中,可能涉及到http
或SQL DB
调用 - 自从go 1.7
和1.8
以来 - 标准库为这些潜在的阻塞调用添加了上下文支持:
func doSomeWork(ctx context.Context) (error) {
// 数据库
db, err := sql.Open("mysql", "...") // 检查错误
//rows, err := db.Query("SELECT age from users", age)
rows, err := db.QueryContext(ctx, "SELECT age from users", age)
if err != nil {
return err // 如果上下文被取消,将返回错误
}
// http
// req, err := http.NewRequest("GET", "http://example.com", nil)
req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // 检查错误
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // 如果上下文被取消,将返回错误
}
}
编辑(2):为了在不阻塞的情况下轮询上下文的状态,利用select
的default
分支:
select {
case <-ctx.Done():
return ctx.Err()
default:
// 如果上下文未完成 - 使用此分支
}
default
分支可以选择包含代码,但即使它的代码为空,它的存在也将防止阻塞 - 因此只是在那个瞬间轮询上下文的状态。
英文:
To support timeouts and cancelation of goroutine work, the standard mechanism is to use context.Context.
ctx := context.Background() // root context
// wrap the context with a timeout and/or cancelation mechanism
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // with timeout or cancel
//ctx, cancel := context.WithCancel(ctx) // no timeout just cancel
defer cancel() // avoid memory leak if we never cancel/timeout
Next your worker goroutines need to support taking and monitoring the state of the ctx
. To do this in parallel with the time.Sleep
(to mimic a long computation), convert the sleep to a channel based solution:
// mimic api call 1
func doSomeWork(ctx context.Context) (function1, error) {
//time.Sleep(10 * time.Second)
select {
case <-time.After(10 * time.Second):
// wait completed
case <-ctx.Done():
return function1{}, ctx.Err()
}
// ...
}
And if one worker goroutine fails, to signal to the other worker that the request should be aborted, simply call the cancel()
function.
result, e := doSomeWork(ctx)
if e != nil {
cancel() // <- add this
err <- e
return
}
Pulling this all together:
https://play.golang.org/p/1Kpe_tre7XI
EDIT: the sleep example above is obviously a contrived example of how to abort a "fake" task. In the real world, http
or SQL DB
calls would be involve - and since go 1.7
& 1.8
- the standard library added context support to any of these potentially blocking calls:
func doSomeWork(ctx context.Context) (error)
// DB
db, err := sql.Open("mysql", "...") // check err
//rows, err := db.Query("SELECT age from users", age)
rows, err := db.QueryContext(ctx, "SELECT age from users", age)
if err != nil {
return err // will return with error if context is canceled
}
// http
// req, err := http.NewRequest("GET", "http://example.com", nil)
req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // check err
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err // will return with error if context is canceled
}
}
EDIT (2): to poll a context's state without blocking, leverage select
's default
branch:
select {
case <-ctx.Done():
return ctx.Err()
default:
// if ctx is not done - this branch is used
}
the default
branch can optional have code in it, but even if it is empty of code it's presence will prevent blocking - and thus just poll the status of the context in that instant of time.
答案2
得分: 1
这种 fork-and-join 模式正是 golang.org/x/sync/errgroup
设计的初衷。(从一组 goroutine 中确定适当的“第一个错误”可能会非常微妙。)
你可以使用 errgroup.WithContext
来获取一个在组中的任何 goroutine 返回时取消的 context.Context
。(*Group).Wait
方法 等待 goroutine 完成并返回第一个错误。
对于你的示例,可能会像这样:https://play.golang.org/p/jqYeb4chHCZ。
然后,你可以通过使用 context.WithTimeout
来在任何给定的调用中注入超时。
(然而,根据我的经验,如果你正确地处理了取消操作,显式的超时几乎从来都不会有帮助——最终用户可以在等待时间过长时显式地取消操作,而且如果某些操作花费的时间比你预期的稍微长一点,你可能不希望将服务降级为完全停止。)
英文:
This kind of fork-and-join pattern is exactly what golang.org/x/sync/errgroup
was designed for. (Identifying the appropriate “first error” from a group of goroutines can be surprisingly subtle.)
You can use errgroup.WithContext
to obtain a context.Context
that is cancelled if any of the goroutines in the group returns. The (*Group).Wait
method waits for the goroutines to complete and returns the first error.
For your example, that might look something like: https://play.golang.org/p/jqYeb4chHCZ.
You can then inject a timeout within any given call by wrapping the Context
using context.WithTimeout
.
(However, in my experience if you've plumbed in cancellation correctly, explicit timeouts are almost never helpful — the end user can cancel explicitly if they get tired of waiting, and you probably don't want to promote degraded service to a complete outage if something starts to take just a bit longer than you expected.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论