WaitGroup.Wait()的超时时间

huangapple go评论87阅读模式
英文:

Timeout for WaitGroup.Wait()

问题

WaitGroup.Wait()分配超时的惯用方法是什么?

我想这样做的原因是为了保护我的“调度器”不会永远等待一个错误的“工作者”。这引发了一些哲学问题(即系统如何可靠地在出现错误的工作者后继续运行?),但我认为这超出了这个问题的范围。

我有一个答案,我会提供给你。现在我把它写下来,它似乎并不那么糟糕,但仍然感觉比应该的更复杂。我想知道是否有更简单、更惯用的方法,或者甚至是不使用WaitGroups的替代方法。

英文:

What is an idiomatic way to assign a timeout to WaitGroup.Wait() ?

The reason I want to do this, is to safeguard my 'scheduler' from potentially awaiting an errant 'worker' for ever. This leads to some philosophical questions (i.e. how can the system reliably continue once it has errant workers?), but I think that's out of scope for this question.

I have an answer which I'll provide. Now that I've written it down, it doesn't seem so bad but it still feels more convoluted than it ought to. I'd like to know if there's something available which is simpler, more idiomatic, or even an alternative approach which doesn't use WaitGroups.

答案1

得分: 63

你发布的解决方案大部分都很好。以下是一些建议来改进它:

  • 作为完成信号,你可以选择关闭通道而不是在通道上发送一个值,对于已关闭的通道,接收操作总是可以立即进行
  • 最好使用defer语句来发出完成信号,即使函数异常终止,它也会执行。
  • 如果只有一个“作业”需要等待,你可以完全省略WaitGroup,当作业完成时,只需发送一个值或关闭通道(与你在select语句中使用的通道相同)。
  • 指定1秒的持续时间很简单:timeout := time.Second。例如,指定2秒的持续时间是:timeout := 2 * time.Second。你不需要进行转换,time.Second已经是time.Duration类型,将其与像2这样的无类型常量相乘也会产生一个time.Duration类型的值。

我还会创建一个包装这个功能的辅助/实用函数。请注意,WaitGroup必须作为指针传递,否则副本将无法“接收”WaitGroup.Done()的调用。类似这样:

// waitTimeout 等待 waitgroup 直到超时。
// 如果等待超时,则返回 true。
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
    c := make(chan struct{})
    go func() {
        defer close(c)
        wg.Wait()
    }()
    select {
    case <-c:
        return false // 正常完成
    case <-time.After(timeout):
        return true // 超时
    }
}

使用它:

if waitTimeout(&wg, time.Second) {
    fmt.Println("等待组超时")
} else {
    fmt.Println("等待组完成")
}

Go Playground上尝试一下。

英文:

Mostly your solution you posted below is as good as it can get. Couple of tips to improve it:

  • Alternatively you may close the channel to signal completion instead of sending a value on it, a receive operation on a closed channel can always proceed immediately.
  • And it's better to use defer statement to signal completion, it is executed even if a function terminates abruptly.
  • Also if there is only one "job" to wait for, you can completely omit the WaitGroup and just send a value or close the channel when job is complete (the same channel you use in your select statement).
  • Specifying 1 second duration is as simple as: timeout := time.Second. Specifying 2 seconds for example is: timeout := 2 * time.Second. You don't need the conversion, time.Second is already of type time.Duration, multiplying it with an untyped constant like 2 will also yield a value of type time.Duration.

I would also create a helper / utility function wrapping this functionality. Note that WaitGroup must be passed as a pointer else the copy will not get "notified" of the WaitGroup.Done() calls. Something like:

// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
	c := make(chan struct{})
	go func() {
		defer close(c)
		wg.Wait()
	}()
	select {
   	case &lt;-c:
		return false // completed normally
	case &lt;-time.After(timeout):
		return true // timed out
	}
}

Using it:

if waitTimeout(&amp;wg, time.Second) {
	fmt.Println(&quot;Timed out waiting for wait group&quot;)
} else {
	fmt.Println(&quot;Wait group finished&quot;)
}

Try it on the Go Playground.

答案2

得分: 7

我是你的中文翻译助手,以下是翻译好的内容:

我是这样做的:http://play.golang.org/p/eWv0fRlLEC

go func() {
    wg.Wait()
    c <- struct{}{}
}()
timeout := time.Duration(1) * time.Second
fmt.Printf("等待 waitgroup(最多 %s)\n", timeout)
select {
case <-c:
    fmt.Printf("Wait group 完成\n")
case <-time.After(timeout):
    fmt.Printf("等待 wait group 超时\n")
}
fmt.Printf("终于自由了\n")

它运行良好,但这是最好的方法吗?

英文:

I did it like this: http://play.golang.org/p/eWv0fRlLEC

go func() {
	wg.Wait()
	c &lt;- struct{}{}
}()
timeout := time.Duration(1) * time.Second
fmt.Printf(&quot;Wait for waitgroup (up to %s)\n&quot;, timeout)
select {
case &lt;-c:
	fmt.Printf(&quot;Wait group finished\n&quot;)
case &lt;-time.After(timeout):
	fmt.Printf(&quot;Timed out waiting for wait group\n&quot;)
}
fmt.Printf(&quot;Free at last\n&quot;)

It works fine, but is it the best way to do it?

答案3

得分: 6

大多数现有的答案都建议泄漏goroutine。将超时分配给WaitGroup.Wait的惯用方法是使用底层的sync/atomic包原语。我使用atomic包从@icza的答案中获取了代码,并添加了上下文取消,因为这是一种通知超时的惯用方法。

package main

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"
)

func main() {
	var submitCount int32
	// 使用这个代替 wg.Add(1)
	atomic.AddInt32(&submitCount, 1)

	// 使用这个代替 wg.Done()
	// atomic.AddInt32(&submitCount, -1)

	timeout := time.Second
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	fmt.Printf("等待 waitgroup(最多 %s)\n", timeout)

	waitWithCtx(ctx, &submitCount)

	fmt.Println("终于自由了")
}

// waitWithCtx 当传递的计数器降为零或上下文被取消时返回
func waitWithCtx(ctx context.Context, counter *int32) {
	ticker := time.NewTicker(10 * time.Millisecond)
	for {
		select {
		case <-ctx.Done():
			return
		case <-ticker.C:
			if atomic.LoadInt32(counter) == 0 {
				return
			}
		}
	}
}

在Go Playground中查看相同的代码

英文:

Most existing answers suggest leaking goroutines. The idiomatic way to assign a timeout to WaitGroup.Wait is to use underlying sync/atomic package primitives. I took code from @icza answer and rewrote it using the atomic package, and added context cancelation as that's an idiomatic way to notify of a timeout.

package main

import (
	&quot;context&quot;
	&quot;fmt&quot;
	&quot;sync/atomic&quot;
	&quot;time&quot;
)

func main() {
	var submitCount int32
	// run this instead of wg.Add(1)
	atomic.AddInt32(&amp;submitCount, 1)

	// run this instead of wg.Done()
	// atomic.AddInt32(&amp;submitCount, -1)

	timeout := time.Second
	ctx, cancel := context.WithTimeout(context.Background(), timeout)
	defer cancel()
	fmt.Printf(&quot;Wait for waitgroup (up to %s)\n&quot;, timeout)

	waitWithCtx(ctx, &amp;submitCount)

	fmt.Println(&quot;Free at last&quot;)
}

// waitWithCtx returns when passed counter drops to zero
// or when context is cancelled
func waitWithCtx(ctx context.Context, counter *int32) {
	ticker := time.NewTicker(10 * time.Millisecond)
	for {
		select {
		case &lt;-ctx.Done():
			return
		case &lt;-ticker.C:
			if atomic.LoadInt32(counter) == 0 {
				return
			}
		}
	}
}

Same code in Go Playground

答案4

得分: 4

投入一个不会泄漏goroutine或依赖轮询(休眠)的解决方案:

import "atomic"

type WaitGroup struct {
	count int32
	done chan struct{}
}

func NewWaitGroup() *WaitGroup {
	return &WaitGroup{
		done: make(chan struct{}),
	}
}

func (wg *WaitGroup) Add(i int32) {
	select {
	case <-wg.done:
		panic("使用已关闭的WaitGroup")
	default:
	}
	atomic.AddInt32(&wg.count, i)
}

func (wg *WaitGroup) Done() {
	i := atomic.AddInt32(&wg.count, -1)
	if i == 0 {
		close(wg.done)
	}
	if i < 0 {
		panic("Done() 调用过多")
	}
}

func (wg *WaitGroup) C() <-chan struct{} {
	return wg.done
}

用法:

wg := NewWaitGroup()
wg.Add(1)
go func() {
  // 做一些事情
  wg.Done()
}

select {
case <-wg.C():
  fmt.Printf("完成!\n")
case <-time.After(time.Second):
  fmt.Printf("超时!\n")
}
英文:

Throwing in a solution which does not leak a goroutine, or rely on polling (sleeps):

import &quot;atomic&quot;

type WaitGroup struct {
	count int32
	done chan struct{}
}

func NewWaitGroup() *WaitGroup {
	return &amp;WaitGroup{
		done: make(chan struct{}),
	}
}

func (wg *WaitGroup) Add(i int32) {
	select {
	case &lt;-wg.done:
		panic(&quot;use of an already closed WaitGroup&quot;)
	default:
	}
	atomic.AddInt32(&amp;wg.count, i)
}

func (wg *WaitGroup) Done() {
	i := atomic.AddInt32(&amp;wg.count, -1)
	if i == 0 {
		close(wg.done)
	}
	if i &lt; 0 {
		panic(&quot;too many Done() calls&quot;)
	}
}

func (wg *WaitGroup) C() &lt;-chan struct{} {
	return wg.done
}

Usage:

wg := NewWaitGroup()
wg.Add(1)
go func() {
  // do stuff
  wg.Done()
}

select {
case &lt;-wg.C():
  fmt.Printf(&quot;Completed!\n&quot;)
case &lt;-time.After(time.Second):
  fmt.Printf(&quot;Timed out!\n&quot;)
}

答案5

得分: 3

这是一个不好的主意。不要放弃 goroutines,这样做可能会引入竞争条件、资源泄漏和意外情况,最终影响应用程序的稳定性。

相反,应该在代码中始终使用超时来确保没有 goroutine 被永久阻塞或运行时间过长。

实现这一目标的惯用方式是使用 context.WithTimeout()

ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()

// 现在使用给定的 ctx 执行任何 I/O 操作:
go func() {
  err = example.Connect(ctx)
  if err != nil { /* 处理错误并退出 goroutine */ }
  . . .
}()

现在你可以安全地使用 WaitGroup.Wait(),知道它总是会及时完成。

英文:

This is a bad idea. Do not abandon goroutines, doing so may introduce races, resource leaks and unexpected conditions, ultimately impacting the stability of your application.

Instead use timeouts throughout your code consistently in order to make sure no goroutine is blocked forever or takes too long to run.

The idiomatic way for achieving that is via context.WithTimeout():

ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()

// Now perform any I/O using the given ctx:
go func() {
  err = example.Connect(ctx)
  if err != nil { /* handle err and exit goroutine */ }
  . . .
}()

Now you can safely use WaitGroup.Wait(), knowing it will always finish in a timely manner.

答案6

得分: 2

以下代码将不会引入任何泄漏的 goroutine

func callingFunc() {
    ...
    wg := new(sync.WaitGroup)
    for _, msg := range msgs {
        wg.Add(1)
        go wrapperParallelCall(ctx, params, wg)
    }

    wg.Wait()
}

func wrapperParallelCall(ctx, params, wg) {
    ctx, cancel := context.WithTimeout(ctx, time.Second)
    defer wg.Done()
    defer cancel()

    originalSequenceCall(ctx, params)
}

func originalSequenceCall(ctx, params) {...}

这段代码使用了sync.WaitGroup来等待所有并发调用完成,并且在每个并发调用中使用了context.WithTimeout来设置超时时间。在每个并发调用结束后,使用defer语句调用wg.Done()cancel()来释放资源。

英文:

The following will not introduce any leaking goroutines

func callingFunc() {
	...
	wg := new(sync.WaitGroup)
	for _, msg := range msgs {
		wg.Add(1)
		go wrapperParallelCall(ctx, params, wg)
	}

	wg.Wait()
}

func wrapperParallelCall(ctx, params, wg) {
	ctx, cancel := context.WithTimeout(ctx, time.Second)
	defer wg.Done()
	defer cancel()

	originalSequenceCall(ctx, params)
}

func originalSequenceCall(ctx, params) {...}

答案7

得分: 1

另一种不泄漏wg.Wait()例程的解决方案是使用(得到良好支持且广泛使用的)golang.org/x/sync/semaphore

  • 使用sem.NewWeighted(N)代替sync.WaitGroup{}
  • 使用err := sem.Acquire(ctx, 1)代替wg.Add(1)
  • 使用defer sem.Release(1)代替defer wg.Done()
  • 使用带有超时上下文的sem.Acquire(ctx, N)代替wg.Wait()
  • 注意,这仅在特定用例中等效于sync.WaitGroup(当您只调用Add(1)Release(1) N次时)。仔细阅读文档。

示例代码如下:

package main

import (
	"context"
	"log"
	"time"

	"golang.org/x/sync/semaphore"
)

func worker(n int) {
	time.Sleep(time.Duration(n) * time.Second)
	log.Printf("Worker %v finished", n)
}

func main() {

	const N = 5
	sem := semaphore.NewWeighted(N)

	for i := 0; i < N; i++ {

		err := sem.Acquire(context.Background(), 1)
		if err != nil {
			log.Fatal("sem.Acquire err", err)
		}
		go func(n int) {
			defer sem.Release(1)
			worker(n)
		}(i)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	err := sem.Acquire(ctx, N)
	if err != nil {
		log.Println("sem.Acquire err:", err)
		return
	}

	log.Println("sem.Acquire ok")
}

运行结果如下:

2009/11/10 23:00:00 Worker 0 finished
2009/11/10 23:00:01 Worker 1 finished
2009/11/10 23:00:02 Worker 2 finished
2009/11/10 23:00:02 sem.Acquire err: context deadline exceeded
英文:

Another solution without leaking wg.Wait() routine: just use (well-suported and widely-used) golang.org/x/sync/semaphore:

  • Instead of sync.WaitGroup{} use sem.NewWeighted(N) (you have to know N in advance)
  • Instead of wg.Add(1) use err := sem.Acquire(ctx, 1)
  • Instead of defer wg.Done() use defer sem.Release(1)
  • Instead of wg.Wait() you can use sem.Acquire(ctx, N) with context with timeout.
  • Watch out, this is only equivalent to sync.WaitGroup in this specific use-case (when you only call Add(1) and Release(1) N times). Read the documentation carefully.

Example:

package main

import (
	&quot;context&quot;
	&quot;log&quot;
	&quot;time&quot;

	&quot;golang.org/x/sync/semaphore&quot;
)

func worker(n int) {
	time.Sleep(time.Duration(n) * time.Second)
	log.Printf(&quot;Worker %v finished&quot;, n)
}

func main() {

	const N = 5
	sem := semaphore.NewWeighted(N)

	for i := 0; i &lt; N; i++ {

		err := sem.Acquire(context.Background(), 1)
		if err != nil {
			log.Fatal(&quot;sem.Acquire err&quot;, err)
		}
		go func(n int) {
			defer sem.Release(1)
			worker(n)
		}(i)
	}

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
	defer cancel()

	err := sem.Acquire(ctx, N)
	if err != nil {
		log.Println(&quot;sem.Acquire err:&quot;, err)
		return
	}

	log.Println(&quot;sem.Acquire ok&quot;)
}

Which results in:

2009/11/10 23:00:00 Worker 0 finished
2009/11/10 23:00:01 Worker 1 finished
2009/11/10 23:00:02 Worker 2 finished
2009/11/10 23:00:02 sem.Acquire err: context deadline exceeded

答案8

得分: 0

我写了一个封装并发逻辑的库https://github.com/shomali11/parallelizer,你也可以传递一个超时时间。

以下是一个没有超时的示例:

func main() {
    group := parallelizer.DefaultGroup()

    group.Add(func() {
        for char := 'a'; char < 'a'+3; char++ {
            fmt.Printf("%c ", char)
        }
    })

    group.Add(func() {
        for number := 1; number < 4; number++ {
            fmt.Printf("%d ", number)
        }
    })

    err := group.Run()

    fmt.Println()
    fmt.Println("Done")
    fmt.Printf("Error: %v", err)
}

输出:

a 1 b 2 c 3 
Done
Error: <nil>

以下是一个带有超时的示例:

func main() {
    options := &parallelizer.Options{Timeout: time.Second}
    group := parallelizer.NewGroup(options)

    group.Add(func() {
        time.Sleep(time.Minute)

        for char := 'a'; char < 'a'+3; char++ {
            fmt.Printf("%c ", char)
        }
    })

    group.Add(func() {
        time.Sleep(time.Minute)

        for number := 1; number < 4; number++ {
            fmt.Printf("%d ", number)
        }
    })

    err := group.Run()

    fmt.Println()
    fmt.Println("Done")
    fmt.Printf("Error: %v", err)
}

输出:

Done
Error: timeout
英文:

I wrote a library that encapsulates the concurrency logic https://github.com/shomali11/parallelizer which you can also pass a timeout.

Here is an example without a timeout:

func main() {
	group := parallelizer.DefaultGroup()

    group.Add(func() {
	    for char := &#39;a&#39;; char &lt; &#39;a&#39;+3; char++ {
		    fmt.Printf(&quot;%c &quot;, char)
	    }
    })

	group.Add(func() {
    	for number := 1; number &lt; 4; number++ {
	    	fmt.Printf(&quot;%d &quot;, number)
	    }
    })

    err := group.Run()

	fmt.Println()
    fmt.Println(&quot;Done&quot;)
    fmt.Printf(&quot;Error: %v&quot;, err)
}

Output:

a 1 b 2 c 3 
Done
Error: &lt;nil&gt;

Here is an example with a timeout:

func main() {
    options := &amp;parallelizer.Options{Timeout: time.Second}
	group := parallelizer.NewGroup(options)

    group.Add(func() {
	    time.Sleep(time.Minute)

		for char := &#39;a&#39;; char &lt; &#39;a&#39;+3; char++ {
    		fmt.Printf(&quot;%c &quot;, char)
    	}
    })

	group.Add(func() {
    	time.Sleep(time.Minute)

	    for number := 1; number &lt; 4; number++ {
		    fmt.Printf(&quot;%d &quot;, number)
	    }
    })

    err := group.Run()

	fmt.Println()
    fmt.Println(&quot;Done&quot;)
    fmt.Printf(&quot;Error: %v&quot;, err)
}

Output:

Done
Error: timeout

答案9

得分: 0

这不是对这个问题的实际回答,而是我在遇到这个问题时的(简单得多)解决方案。

我的“工作者”正在进行http.Get()请求,所以我只需在http客户端上设置超时时间。

urls := []string{"http://1.jpg", "http://2.jpg"}
wg := &sync.WaitGroup{}
for _, url := range urls {
    wg.Add(1)
    go func(url string) {
        client := http.Client{
            Timeout: time.Duration(3 * time.Second), // 只希望得到非常快速的响应
        }
        resp, err := client.Get(url)
        //... 检查错误
        //... 当没有错误时对图像进行处理
        //...

        wg.Done()
    }(url)
}

wg.Wait()

希望对你有帮助!

英文:

This is not an actual answer to this question but was the (much simpler) solution to my little problem when I had this question.

My 'workers' were doing http.Get() requests so I just set the timeout on the http client.

urls := []string{&quot;http://1.jpg&quot;, &quot;http://2.jpg&quot;}
wg := &amp;sync.WaitGroup{}
for _, url := range urls {
	wg.Add(1)
	go func(url string) {
		client := http.Client{
			Timeout: time.Duration(3 * time.Second), // only want very fast responses
		}
		resp, err := client.Get(url)
		//... check for errors
		//... do something with the image when there are no errors
		//...

		wg.Done()
	}(url)
	
}
wg.Wait()

答案10

得分: 0

我们的一个系统也有同样的需求。通过将上下文传递给goroutine,并在超时时关闭该上下文,我们可以防止goroutine泄漏。

func main() {
    ctx := context.Background()
    ctxWithCancel, cancelFunc := context.WithCancel(ctx)
    var wg sync.WaitGroup
    Provide(ctxWithCancel, 5, &wg)
    Provide(ctxWithCancel, 5, &wg)
    c := make(chan struct{})
    go func() {
        wg.Wait()
        c <- struct{}{}
        fmt.Println("closed")
    }()

    select {
    case <-c:
    case <-time.After(20 * time.Millisecond):
        cancelFunc()
        fmt.Println("timeout")
    }
}

func Work(ctx context.Context, to int) {
    for i := 0; i < to; i++ {
        select {
        case <-ctx.Done():
            return
        default:
            fmt.Println(i)
            time.Sleep(10 * time.Millisecond)
        }
    }
}

func Provide(ctx context.Context, to int, wg *sync.WaitGroup) {
    wg.Add(1)
    go func() {
        Work(ctx, to)
        wg.Done()
    }()
}
英文:

We had the same need for one of our systems. by passing a context to goroutines and closing that context when we are facing timeout, we would prevent goroutine leaks.

func main() {
	ctx := context.Background()
	ctxWithCancel, cancelFunc := context.WithCancel(ctx)
	var wg sync.WaitGroup
	Provide(ctxWithCancel, 5, &amp;wg)
	Provide(ctxWithCancel, 5, &amp;wg)
	c := make(chan struct{})
	go func() {
		wg.Wait()
		c &lt;- struct{}{}
		fmt.Println(&quot;closed&quot;)
	}()

	select {
	case &lt;-c:
	case &lt;-time.After(20 * time.Millisecond):
		cancelFunc()
		fmt.Println(&quot;timeout&quot;)
	}
}

func Work(ctx context.Context, to int) {
	for i := 0; i &lt; to; i++ {
		select {
		case &lt;-ctx.Done():
			return
		default:
			fmt.Println(i)
			time.Sleep(10 * time.Millisecond)
		}
	}
}

func Provide(ctx context.Context, to int, wg *sync.WaitGroup) {
	wg.Add(1)
	go func() {
		Work(ctx, to)
		wg.Done()
	}()
}

huangapple
  • 本文由 发表于 2015年9月29日 17:39:45
  • 转载请务必保留本文链接:https://go.coder-hub.com/32840687.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定