英文:
Golang Concurrency Issue to introduce timeout
问题
我帮你翻译一下:
我希望在Go语言中使用Go协程实现并行API调用。一旦请求被发送,
1)我需要等待所有响应(响应时间不同)。
2)如果任何一个请求失败并返回错误,我希望结束(或假装结束)协程。
3)我还希望每个Go协程(或API调用)关联一个超时值。
我已经实现了上述1和2的功能,但需要帮助来实现第3个功能。同时,对于1和2的反馈也会有所帮助。
谢谢。
英文:
I wish to implement parallel api calling in golang using go routines. Once the requests are fired,
- I need to wait for all responses (which take different time).
 - If any of the request fails and returns an error, I wish to end (or pretend) the routines.
 - I also want to have a timeout value associated with each go routine (or api call).
 
I have implemented the below for 1 and 2, but need help as to how can I implement 3. Also, feedback on 1 and 2 will also help.
package main
import (
	"errors"
	"fmt"
	"sync"
	"time"
)
func main() {
	var wg sync.WaitGroup
	c := make(chan interface{}, 1)
	c2 := make(chan interface{}, 1)
	err := make(chan interface{})
	wg.Add(1)
	go func() {
		defer wg.Done()
		result, e := doSomeWork()
		if e != nil {
			err <- e
			return
		}
		c <- result
	}()
	wg.Add(1)
	go func() {
		defer wg.Done()
		result2, e := doSomeWork2()
		if e != nil {
			err <- e
			return
		}
		c2 <- result2
	}()
	go func() {
		wg.Wait()
		close(c)
		close(c2)
		close(err)
	}()
	for e := range err {
		// here error happend u could exit your caller function
		fmt.Println("Error==>", e)
		return
	}
	fmt.Println(<-c, <-c2)
}
// mimic api call 1
func doSomeWork() (function1, error) {
	time.Sleep(10 * time.Second)
	obj := function1{"ABC", "29"}
	return obj, nil
}
type function1 struct {
	Name string
	Age  string
}
// mimic api call 2
func doSomeWork2() (function2, error) {
	time.Sleep(4 * time.Second)
	r := errors.New("Error Occured")
	if 1 == 2 {
		fmt.Println(r)
	}
	obj := function2{"Delhi", "Delhi"}
	// return error as nil for now
	return obj, nil
}
type function2 struct {
	City  string
	State string
}
Thanks in advance.
答案1
得分: 1
为了支持goroutine工作的超时和取消,标准机制是使用context.Context。
ctx := context.Background() // 根上下文
// 使用超时和/或取消机制包装上下文
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // 设置超时或取消
//ctx, cancel := context.WithCancel(ctx)               // 只取消,没有超时
defer cancel() // 避免内存泄漏,如果我们从未取消/超时
接下来,您的工作goroutine需要支持接收和监视ctx的状态。为了与time.Sleep并行(模拟长时间计算),将sleep转换为基于通道的解决方案:
// 模拟API调用1
func doSomeWork(ctx context.Context) (function1, error) {
    //time.Sleep(10 * time.Second)
    select {
    case <-time.After(10 * time.Second):
        // 等待完成
    case <-ctx.Done():
        return function1{}, ctx.Err()
    }
    // ...
}
如果一个工作goroutine失败,为了向其他工作goroutine发出信号,请求应该被中止,只需调用cancel()函数。
result, e := doSomeWork(ctx)
if e != nil {
    cancel()    // 添加这一行
    err <- e
    return
}
将所有这些组合在一起:
https://play.golang.org/p/1Kpe_tre7XI
编辑:上面的sleep示例显然是一个中止“假”任务的人为示例。在现实世界中,可能涉及到http或SQL DB调用 - 自从go 1.7和1.8以来 - 标准库为这些潜在的阻塞调用添加了上下文支持:
func doSomeWork(ctx context.Context) (error) {
    // 数据库
    db, err := sql.Open("mysql", "...") // 检查错误
    //rows, err := db.Query("SELECT age from users", age)
    rows, err := db.QueryContext(ctx, "SELECT age from users", age)
    if err != nil {
        return err // 如果上下文被取消,将返回错误
    }
    // http
    // req, err := http.NewRequest("GET", "http://example.com", nil)
    req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // 检查错误
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err // 如果上下文被取消,将返回错误
    }
}
编辑(2):为了在不阻塞的情况下轮询上下文的状态,利用select的default分支:
select {
case <-ctx.Done():
    return ctx.Err()
default:
    // 如果上下文未完成 - 使用此分支
}
default分支可以选择包含代码,但即使它的代码为空,它的存在也将防止阻塞 - 因此只是在那个瞬间轮询上下文的状态。
英文:
To support timeouts and cancelation of goroutine work, the standard mechanism is to use context.Context.
ctx := context.Background() // root context
// wrap the context with a timeout and/or cancelation mechanism
ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // with timeout or cancel
//ctx, cancel := context.WithCancel(ctx)               // no   timeout just cancel
defer cancel() // avoid memory leak if we never cancel/timeout
Next your worker goroutines need to support taking and monitoring the state of the ctx. To do this in parallel with the time.Sleep (to mimic a long computation), convert the sleep to a channel based solution:
// mimic api call 1
func doSomeWork(ctx context.Context) (function1, error) {
	//time.Sleep(10 * time.Second)
	select {
	case <-time.After(10 * time.Second):
		// wait completed
	case <-ctx.Done():
		return function1{}, ctx.Err()
	}
    // ...
}
And if one worker goroutine fails, to signal to the other worker that the request should be aborted, simply call the cancel() function.
result, e := doSomeWork(ctx)
if e != nil {
    cancel()    // <- add this
	err <- e
	return
}
Pulling this all together:
https://play.golang.org/p/1Kpe_tre7XI
EDIT: the sleep example above is obviously a contrived example of how to abort a "fake" task. In the real world, http or SQL DB calls would be involve - and since go 1.7 & 1.8 - the standard library added context support to any of these potentially blocking calls:
func doSomeWork(ctx context.Context) (error) 
    // DB
    db, err := sql.Open("mysql", "...") // check err
    //rows, err := db.Query("SELECT age from users", age)
    rows, err := db.QueryContext(ctx, "SELECT age from users", age)
    if err != nil {
        return err // will return with error if context is canceled
    }
    // http
    // req, err := http.NewRequest("GET", "http://example.com", nil)
    req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // check err
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err // will return with error if context is canceled
    }
}
EDIT (2): to poll a context's state without blocking, leverage select's default branch:
 select {
 case <-ctx.Done():
     return ctx.Err()
 default:
     // if ctx is not done - this branch is used
 }
the default branch can optional have code in it, but even if it is empty of code it's presence will prevent blocking - and thus just poll the status of the context in that instant of time.
答案2
得分: 1
这种 fork-and-join 模式正是 golang.org/x/sync/errgroup 设计的初衷。(从一组 goroutine 中确定适当的“第一个错误”可能会非常微妙。)
你可以使用 errgroup.WithContext 来获取一个在组中的任何 goroutine 返回时取消的 context.Context。(*Group).Wait 方法 等待 goroutine 完成并返回第一个错误。
对于你的示例,可能会像这样:https://play.golang.org/p/jqYeb4chHCZ。
然后,你可以通过使用 context.WithTimeout 来在任何给定的调用中注入超时。
(然而,根据我的经验,如果你正确地处理了取消操作,显式的超时几乎从来都不会有帮助——最终用户可以在等待时间过长时显式地取消操作,而且如果某些操作花费的时间比你预期的稍微长一点,你可能不希望将服务降级为完全停止。)
英文:
This kind of fork-and-join pattern is exactly what golang.org/x/sync/errgroup was designed for. (Identifying the appropriate “first error” from a group of goroutines can be surprisingly subtle.)
You can use errgroup.WithContext to obtain a context.Context that is cancelled if any of the goroutines in the group returns. The (*Group).Wait method waits for the goroutines to complete and returns the first error.
For your example, that might look something like: https://play.golang.org/p/jqYeb4chHCZ.
You can then inject a timeout within any given call by wrapping the Context using context.WithTimeout.
(However, in my experience if you've plumbed in cancellation correctly, explicit timeouts are almost never helpful — the end user can cancel explicitly if they get tired of waiting, and you probably don't want to promote degraded service to a complete outage if something starts to take just a bit longer than you expected.)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论