Golang并发问题引入超时

huangapple go评论87阅读模式
英文:

Golang Concurrency Issue to introduce timeout

问题

我帮你翻译一下:

我希望在Go语言中使用Go协程实现并行API调用。一旦请求被发送,
1)我需要等待所有响应(响应时间不同)。
2)如果任何一个请求失败并返回错误,我希望结束(或假装结束)协程。
3)我还希望每个Go协程(或API调用)关联一个超时值。

我已经实现了上述1和2的功能,但需要帮助来实现第3个功能。同时,对于1和2的反馈也会有所帮助。

谢谢。

英文:

I wish to implement parallel api calling in golang using go routines. Once the requests are fired,

  1. I need to wait for all responses (which take different time).
  2. If any of the request fails and returns an error, I wish to end (or pretend) the routines.
  3. I also want to have a timeout value associated with each go routine (or api call).

I have implemented the below for 1 and 2, but need help as to how can I implement 3. Also, feedback on 1 and 2 will also help.

package main

import (
	"errors"
	"fmt"
	"sync"
	"time"
)

func main() {
	var wg sync.WaitGroup

	c := make(chan interface{}, 1)
	c2 := make(chan interface{}, 1)
	err := make(chan interface{})

	wg.Add(1)
	go func() {
		defer wg.Done()
		result, e := doSomeWork()
		if e != nil {
			err <- e
			return
		}
		c <- result
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		result2, e := doSomeWork2()
		if e != nil {
			err <- e
			return
		}
		c2 <- result2
	}()

	go func() {
		wg.Wait()
		close(c)
		close(c2)
		close(err)
	}()

	for e := range err {
		// here error happend u could exit your caller function
		fmt.Println("Error==>", e)
		return

	}
	fmt.Println(<-c, <-c2)

}

// mimic api call 1
func doSomeWork() (function1, error) {
	time.Sleep(10 * time.Second)
	obj := function1{"ABC", "29"}
	return obj, nil
}

type function1 struct {
	Name string
	Age  string
}

// mimic api call 2
func doSomeWork2() (function2, error) {
	time.Sleep(4 * time.Second)
	r := errors.New("Error Occured")
	if 1 == 2 {
		fmt.Println(r)
	}
	obj := function2{"Delhi", "Delhi"}
	// return error as nil for now
	return obj, nil
}

type function2 struct {
	City  string
	State string
}

Thanks in advance.

答案1

得分: 1

为了支持goroutine工作的超时和取消,标准机制是使用context.Context

ctx := context.Background() // 根上下文

// 使用超时和/或取消机制包装上下文

ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // 设置超时或取消
//ctx, cancel := context.WithCancel(ctx)               // 只取消,没有超时

defer cancel() // 避免内存泄漏,如果我们从未取消/超时

接下来,您的工作goroutine需要支持接收和监视ctx的状态。为了与time.Sleep并行(模拟长时间计算),将sleep转换为基于通道的解决方案:

// 模拟API调用1
func doSomeWork(ctx context.Context) (function1, error) {
    //time.Sleep(10 * time.Second)
    select {
    case <-time.After(10 * time.Second):
        // 等待完成
    case <-ctx.Done():
        return function1{}, ctx.Err()
    }
    // ...
}

如果一个工作goroutine失败,为了向其他工作goroutine发出信号,请求应该被中止,只需调用cancel()函数。

result, e := doSomeWork(ctx)
if e != nil {
    cancel()    // 添加这一行
    err <- e
    return
}

将所有这些组合在一起:

https://play.golang.org/p/1Kpe_tre7XI


编辑:上面的sleep示例显然是一个中止“假”任务的人为示例。在现实世界中,可能涉及到httpSQL DB调用 - 自从go 1.71.8以来 - 标准库为这些潜在的阻塞调用添加了上下文支持:

func doSomeWork(ctx context.Context) (error) {

    // 数据库

    db, err := sql.Open("mysql", "...") // 检查错误

    //rows, err := db.Query("SELECT age from users", age)
    rows, err := db.QueryContext(ctx, "SELECT age from users", age)
    if err != nil {
        return err // 如果上下文被取消,将返回错误
    }

    // http

    // req, err := http.NewRequest("GET", "http://example.com", nil)
    req, err := http.NewRequestWithContext(ctx, "GET", "http://example.com", nil) // 检查错误

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err // 如果上下文被取消,将返回错误
    }

}

编辑(2):为了在不阻塞的情况下轮询上下文的状态,利用selectdefault分支:

select {
case <-ctx.Done():
    return ctx.Err()
default:
    // 如果上下文未完成 - 使用此分支
}

default分支可以选择包含代码,但即使它的代码为空,它的存在也将防止阻塞 - 因此只是在那个瞬间轮询上下文的状态。

英文:

To support timeouts and cancelation of goroutine work, the standard mechanism is to use context.Context.

ctx := context.Background() // root context

// wrap the context with a timeout and/or cancelation mechanism

ctx, cancel := context.WithTimeout(ctx, 5*time.Second) // with timeout or cancel
//ctx, cancel := context.WithCancel(ctx)               // no   timeout just cancel

defer cancel() // avoid memory leak if we never cancel/timeout

Next your worker goroutines need to support taking and monitoring the state of the ctx. To do this in parallel with the time.Sleep (to mimic a long computation), convert the sleep to a channel based solution:

// mimic api call 1
func doSomeWork(ctx context.Context) (function1, error) {
	//time.Sleep(10 * time.Second)
	select {
	case &lt;-time.After(10 * time.Second):
		// wait completed
	case &lt;-ctx.Done():
		return function1{}, ctx.Err()
	}
    // ...
}

And if one worker goroutine fails, to signal to the other worker that the request should be aborted, simply call the cancel() function.

result, e := doSomeWork(ctx)
if e != nil {
    cancel()    // &lt;- add this
	err &lt;- e
	return
}

Pulling this all together:

https://play.golang.org/p/1Kpe_tre7XI


EDIT: the sleep example above is obviously a contrived example of how to abort a "fake" task. In the real world, http or SQL DB calls would be involve - and since go 1.7 & 1.8 - the standard library added context support to any of these potentially blocking calls:

func doSomeWork(ctx context.Context) (error) 

    // DB

    db, err := sql.Open(&quot;mysql&quot;, &quot;...&quot;) // check err

    //rows, err := db.Query(&quot;SELECT age from users&quot;, age)
    rows, err := db.QueryContext(ctx, &quot;SELECT age from users&quot;, age)
    if err != nil {
        return err // will return with error if context is canceled
    }

    // http

    // req, err := http.NewRequest(&quot;GET&quot;, &quot;http://example.com&quot;, nil)
    req, err := http.NewRequestWithContext(ctx, &quot;GET&quot;, &quot;http://example.com&quot;, nil) // check err

    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return err // will return with error if context is canceled
    }

}

EDIT (2): to poll a context's state without blocking, leverage select's default branch:

 select {
 case &lt;-ctx.Done():
     return ctx.Err()
 default:
     // if ctx is not done - this branch is used
 }

the default branch can optional have code in it, but even if it is empty of code it's presence will prevent blocking - and thus just poll the status of the context in that instant of time.

答案2

得分: 1

这种 fork-and-join 模式正是 golang.org/x/sync/errgroup 设计的初衷。(从一组 goroutine 中确定适当的“第一个错误”可能会非常微妙。)

你可以使用 errgroup.WithContext 来获取一个在组中的任何 goroutine 返回时取消的 context.Context(*Group).Wait 方法 等待 goroutine 完成并返回第一个错误。

对于你的示例,可能会像这样:https://play.golang.org/p/jqYeb4chHCZ。


然后,你可以通过使用 context.WithTimeout 来在任何给定的调用中注入超时。

(然而,根据我的经验,如果你正确地处理了取消操作,显式的超时几乎从来都不会有帮助——最终用户可以在等待时间过长时显式地取消操作,而且如果某些操作花费的时间比你预期的稍微长一点,你可能不希望将服务降级为完全停止。)

英文:

This kind of fork-and-join pattern is exactly what golang.org/x/sync/errgroup was designed for. (Identifying the appropriate “first error” from a group of goroutines can be surprisingly subtle.)

You can use errgroup.WithContext to obtain a context.Context that is cancelled if any of the goroutines in the group returns. The (*Group).Wait method waits for the goroutines to complete and returns the first error.

For your example, that might look something like: https://play.golang.org/p/jqYeb4chHCZ.


You can then inject a timeout within any given call by wrapping the Context using context.WithTimeout.

(However, in my experience if you've plumbed in cancellation correctly, explicit timeouts are almost never helpful — the end user can cancel explicitly if they get tired of waiting, and you probably don't want to promote degraded service to a complete outage if something starts to take just a bit longer than you expected.)

huangapple
  • 本文由 发表于 2021年8月23日 22:53:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/68894802.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定