英文:
Timeout for WaitGroup.Wait()
问题
给WaitGroup.Wait()分配超时的惯用方法是什么?
我想这样做的原因是为了保护我的“调度器”不会永远等待一个错误的“工作者”。这引发了一些哲学问题(即系统如何可靠地在出现错误的工作者后继续运行?),但我认为这超出了这个问题的范围。
我有一个答案,我会提供给你。现在我把它写下来,它似乎并不那么糟糕,但仍然感觉比应该的更复杂。我想知道是否有更简单、更惯用的方法,或者甚至是不使用WaitGroups的替代方法。
英文:
What is an idiomatic way to assign a timeout to WaitGroup.Wait() ?
The reason I want to do this, is to safeguard my 'scheduler' from potentially awaiting an errant 'worker' for ever. This leads to some philosophical questions (i.e. how can the system reliably continue once it has errant workers?), but I think that's out of scope for this question.
I have an answer which I'll provide. Now that I've written it down, it doesn't seem so bad but it still feels more convoluted than it ought to. I'd like to know if there's something available which is simpler, more idiomatic, or even an alternative approach which doesn't use WaitGroups.
答案1
得分: 63
你发布的解决方案大部分都很好。以下是一些建议来改进它:
- 作为完成信号,你可以选择关闭通道而不是在通道上发送一个值,对于已关闭的通道,接收操作总是可以立即进行。
- 最好使用
defer
语句来发出完成信号,即使函数异常终止,它也会执行。 - 如果只有一个“作业”需要等待,你可以完全省略
WaitGroup
,当作业完成时,只需发送一个值或关闭通道(与你在select
语句中使用的通道相同)。 - 指定1秒的持续时间很简单:
timeout := time.Second
。例如,指定2秒的持续时间是:timeout := 2 * time.Second
。你不需要进行转换,time.Second
已经是time.Duration
类型,将其与像2
这样的无类型常量相乘也会产生一个time.Duration
类型的值。
我还会创建一个包装这个功能的辅助/实用函数。请注意,WaitGroup
必须作为指针传递,否则副本将无法“接收”WaitGroup.Done()
的调用。类似这样:
// waitTimeout 等待 waitgroup 直到超时。
// 如果等待超时,则返回 true。
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // 正常完成
case <-time.After(timeout):
return true // 超时
}
}
使用它:
if waitTimeout(&wg, time.Second) {
fmt.Println("等待组超时")
} else {
fmt.Println("等待组完成")
}
在Go Playground上尝试一下。
英文:
Mostly your solution you posted below is as good as it can get. Couple of tips to improve it:
- Alternatively you may close the channel to signal completion instead of sending a value on it, a receive operation on a closed channel can always proceed immediately.
- And it's better to use
defer
statement to signal completion, it is executed even if a function terminates abruptly. - Also if there is only one "job" to wait for, you can completely omit the
WaitGroup
and just send a value or close the channel when job is complete (the same channel you use in yourselect
statement). - Specifying 1 second duration is as simple as:
timeout := time.Second
. Specifying 2 seconds for example is:timeout := 2 * time.Second
. You don't need the conversion,time.Second
is already of typetime.Duration
, multiplying it with an untyped constant like2
will also yield a value of typetime.Duration
.
I would also create a helper / utility function wrapping this functionality. Note that WaitGroup
must be passed as a pointer else the copy will not get "notified" of the WaitGroup.Done()
calls. Something like:
// waitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}
Using it:
if waitTimeout(&wg, time.Second) {
fmt.Println("Timed out waiting for wait group")
} else {
fmt.Println("Wait group finished")
}
Try it on the Go Playground.
答案2
得分: 7
我是你的中文翻译助手,以下是翻译好的内容:
我是这样做的:http://play.golang.org/p/eWv0fRlLEC
go func() {
wg.Wait()
c <- struct{}{}
}()
timeout := time.Duration(1) * time.Second
fmt.Printf("等待 waitgroup(最多 %s)\n", timeout)
select {
case <-c:
fmt.Printf("Wait group 完成\n")
case <-time.After(timeout):
fmt.Printf("等待 wait group 超时\n")
}
fmt.Printf("终于自由了\n")
它运行良好,但这是最好的方法吗?
英文:
I did it like this: http://play.golang.org/p/eWv0fRlLEC
go func() {
wg.Wait()
c <- struct{}{}
}()
timeout := time.Duration(1) * time.Second
fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)
select {
case <-c:
fmt.Printf("Wait group finished\n")
case <-time.After(timeout):
fmt.Printf("Timed out waiting for wait group\n")
}
fmt.Printf("Free at last\n")
It works fine, but is it the best way to do it?
答案3
得分: 6
大多数现有的答案都建议泄漏goroutine。将超时分配给WaitGroup.Wait的惯用方法是使用底层的sync/atomic包原语。我使用atomic
包从@icza的答案中获取了代码,并添加了上下文取消,因为这是一种通知超时的惯用方法。
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
)
func main() {
var submitCount int32
// 使用这个代替 wg.Add(1)
atomic.AddInt32(&submitCount, 1)
// 使用这个代替 wg.Done()
// atomic.AddInt32(&submitCount, -1)
timeout := time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
fmt.Printf("等待 waitgroup(最多 %s)\n", timeout)
waitWithCtx(ctx, &submitCount)
fmt.Println("终于自由了")
}
// waitWithCtx 当传递的计数器降为零或上下文被取消时返回
func waitWithCtx(ctx context.Context, counter *int32) {
ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if atomic.LoadInt32(counter) == 0 {
return
}
}
}
}
英文:
Most existing answers suggest leaking goroutines. The idiomatic way to assign a timeout to WaitGroup.Wait is to use underlying sync/atomic package primitives. I took code from @icza answer and rewrote it using the atomic
package, and added context cancelation as that's an idiomatic way to notify of a timeout.
package main
import (
"context"
"fmt"
"sync/atomic"
"time"
)
func main() {
var submitCount int32
// run this instead of wg.Add(1)
atomic.AddInt32(&submitCount, 1)
// run this instead of wg.Done()
// atomic.AddInt32(&submitCount, -1)
timeout := time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
fmt.Printf("Wait for waitgroup (up to %s)\n", timeout)
waitWithCtx(ctx, &submitCount)
fmt.Println("Free at last")
}
// waitWithCtx returns when passed counter drops to zero
// or when context is cancelled
func waitWithCtx(ctx context.Context, counter *int32) {
ticker := time.NewTicker(10 * time.Millisecond)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if atomic.LoadInt32(counter) == 0 {
return
}
}
}
}
答案4
得分: 4
投入一个不会泄漏goroutine或依赖轮询(休眠)的解决方案:
import "atomic"
type WaitGroup struct {
count int32
done chan struct{}
}
func NewWaitGroup() *WaitGroup {
return &WaitGroup{
done: make(chan struct{}),
}
}
func (wg *WaitGroup) Add(i int32) {
select {
case <-wg.done:
panic("使用已关闭的WaitGroup")
default:
}
atomic.AddInt32(&wg.count, i)
}
func (wg *WaitGroup) Done() {
i := atomic.AddInt32(&wg.count, -1)
if i == 0 {
close(wg.done)
}
if i < 0 {
panic("Done() 调用过多")
}
}
func (wg *WaitGroup) C() <-chan struct{} {
return wg.done
}
用法:
wg := NewWaitGroup()
wg.Add(1)
go func() {
// 做一些事情
wg.Done()
}
select {
case <-wg.C():
fmt.Printf("完成!\n")
case <-time.After(time.Second):
fmt.Printf("超时!\n")
}
英文:
Throwing in a solution which does not leak a goroutine, or rely on polling (sleeps):
import "atomic"
type WaitGroup struct {
count int32
done chan struct{}
}
func NewWaitGroup() *WaitGroup {
return &WaitGroup{
done: make(chan struct{}),
}
}
func (wg *WaitGroup) Add(i int32) {
select {
case <-wg.done:
panic("use of an already closed WaitGroup")
default:
}
atomic.AddInt32(&wg.count, i)
}
func (wg *WaitGroup) Done() {
i := atomic.AddInt32(&wg.count, -1)
if i == 0 {
close(wg.done)
}
if i < 0 {
panic("too many Done() calls")
}
}
func (wg *WaitGroup) C() <-chan struct{} {
return wg.done
}
Usage:
wg := NewWaitGroup()
wg.Add(1)
go func() {
// do stuff
wg.Done()
}
select {
case <-wg.C():
fmt.Printf("Completed!\n")
case <-time.After(time.Second):
fmt.Printf("Timed out!\n")
}
答案5
得分: 3
这是一个不好的主意。不要放弃 goroutines,这样做可能会引入竞争条件、资源泄漏和意外情况,最终影响应用程序的稳定性。
相反,应该在代码中始终使用超时来确保没有 goroutine 被永久阻塞或运行时间过长。
实现这一目标的惯用方式是使用 context.WithTimeout()
:
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
// 现在使用给定的 ctx 执行任何 I/O 操作:
go func() {
err = example.Connect(ctx)
if err != nil { /* 处理错误并退出 goroutine */ }
. . .
}()
现在你可以安全地使用 WaitGroup.Wait()
,知道它总是会及时完成。
英文:
This is a bad idea. Do not abandon goroutines, doing so may introduce races, resource leaks and unexpected conditions, ultimately impacting the stability of your application.
Instead use timeouts throughout your code consistently in order to make sure no goroutine is blocked forever or takes too long to run.
The idiomatic way for achieving that is via context.WithTimeout()
:
ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
defer cancel()
// Now perform any I/O using the given ctx:
go func() {
err = example.Connect(ctx)
if err != nil { /* handle err and exit goroutine */ }
. . .
}()
Now you can safely use WaitGroup.Wait()
, knowing it will always finish in a timely manner.
答案6
得分: 2
以下代码将不会引入任何泄漏的 goroutine:
func callingFunc() {
...
wg := new(sync.WaitGroup)
for _, msg := range msgs {
wg.Add(1)
go wrapperParallelCall(ctx, params, wg)
}
wg.Wait()
}
func wrapperParallelCall(ctx, params, wg) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer wg.Done()
defer cancel()
originalSequenceCall(ctx, params)
}
func originalSequenceCall(ctx, params) {...}
这段代码使用了sync.WaitGroup
来等待所有并发调用完成,并且在每个并发调用中使用了context.WithTimeout
来设置超时时间。在每个并发调用结束后,使用defer
语句调用wg.Done()
和cancel()
来释放资源。
英文:
The following will not introduce any leaking goroutines
func callingFunc() {
...
wg := new(sync.WaitGroup)
for _, msg := range msgs {
wg.Add(1)
go wrapperParallelCall(ctx, params, wg)
}
wg.Wait()
}
func wrapperParallelCall(ctx, params, wg) {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer wg.Done()
defer cancel()
originalSequenceCall(ctx, params)
}
func originalSequenceCall(ctx, params) {...}
答案7
得分: 1
另一种不泄漏wg.Wait()
例程的解决方案是使用(得到良好支持且广泛使用的)golang.org/x/sync/semaphore
:
- 使用
sem.NewWeighted(N)
代替sync.WaitGroup{}
- 使用
err := sem.Acquire(ctx, 1)
代替wg.Add(1)
- 使用
defer sem.Release(1)
代替defer wg.Done()
- 使用带有超时上下文的
sem.Acquire(ctx, N)
代替wg.Wait()
- 注意,这仅在特定用例中等效于
sync.WaitGroup
(当您只调用Add(1)
和Release(1)
N
次时)。仔细阅读文档。
示例代码如下:
package main
import (
"context"
"log"
"time"
"golang.org/x/sync/semaphore"
)
func worker(n int) {
time.Sleep(time.Duration(n) * time.Second)
log.Printf("Worker %v finished", n)
}
func main() {
const N = 5
sem := semaphore.NewWeighted(N)
for i := 0; i < N; i++ {
err := sem.Acquire(context.Background(), 1)
if err != nil {
log.Fatal("sem.Acquire err", err)
}
go func(n int) {
defer sem.Release(1)
worker(n)
}(i)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
err := sem.Acquire(ctx, N)
if err != nil {
log.Println("sem.Acquire err:", err)
return
}
log.Println("sem.Acquire ok")
}
运行结果如下:
2009/11/10 23:00:00 Worker 0 finished
2009/11/10 23:00:01 Worker 1 finished
2009/11/10 23:00:02 Worker 2 finished
2009/11/10 23:00:02 sem.Acquire err: context deadline exceeded
英文:
Another solution without leaking wg.Wait()
routine: just use (well-suported and widely-used) golang.org/x/sync/semaphore
:
- Instead of
sync.WaitGroup{}
usesem.NewWeighted(N)
(you have to knowN
in advance) - Instead of
wg.Add(1)
useerr := sem.Acquire(ctx, 1)
- Instead of
defer wg.Done()
usedefer sem.Release(1)
- Instead of
wg.Wait()
you can usesem.Acquire(ctx, N)
with context with timeout. - Watch out, this is only equivalent to
sync.WaitGroup
in this specific use-case (when you only callAdd(1)
andRelease(1)
N
times). Read the documentation carefully.
package main
import (
"context"
"log"
"time"
"golang.org/x/sync/semaphore"
)
func worker(n int) {
time.Sleep(time.Duration(n) * time.Second)
log.Printf("Worker %v finished", n)
}
func main() {
const N = 5
sem := semaphore.NewWeighted(N)
for i := 0; i < N; i++ {
err := sem.Acquire(context.Background(), 1)
if err != nil {
log.Fatal("sem.Acquire err", err)
}
go func(n int) {
defer sem.Release(1)
worker(n)
}(i)
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
err := sem.Acquire(ctx, N)
if err != nil {
log.Println("sem.Acquire err:", err)
return
}
log.Println("sem.Acquire ok")
}
Which results in:
2009/11/10 23:00:00 Worker 0 finished
2009/11/10 23:00:01 Worker 1 finished
2009/11/10 23:00:02 Worker 2 finished
2009/11/10 23:00:02 sem.Acquire err: context deadline exceeded
答案8
得分: 0
我写了一个封装并发逻辑的库https://github.com/shomali11/parallelizer,你也可以传递一个超时时间。
以下是一个没有超时的示例:
func main() {
group := parallelizer.DefaultGroup()
group.Add(func() {
for char := 'a'; char < 'a'+3; char++ {
fmt.Printf("%c ", char)
}
})
group.Add(func() {
for number := 1; number < 4; number++ {
fmt.Printf("%d ", number)
}
})
err := group.Run()
fmt.Println()
fmt.Println("Done")
fmt.Printf("Error: %v", err)
}
输出:
a 1 b 2 c 3
Done
Error: <nil>
以下是一个带有超时的示例:
func main() {
options := ¶llelizer.Options{Timeout: time.Second}
group := parallelizer.NewGroup(options)
group.Add(func() {
time.Sleep(time.Minute)
for char := 'a'; char < 'a'+3; char++ {
fmt.Printf("%c ", char)
}
})
group.Add(func() {
time.Sleep(time.Minute)
for number := 1; number < 4; number++ {
fmt.Printf("%d ", number)
}
})
err := group.Run()
fmt.Println()
fmt.Println("Done")
fmt.Printf("Error: %v", err)
}
输出:
Done
Error: timeout
英文:
I wrote a library that encapsulates the concurrency logic https://github.com/shomali11/parallelizer which you can also pass a timeout.
Here is an example without a timeout:
func main() {
group := parallelizer.DefaultGroup()
group.Add(func() {
for char := 'a'; char < 'a'+3; char++ {
fmt.Printf("%c ", char)
}
})
group.Add(func() {
for number := 1; number < 4; number++ {
fmt.Printf("%d ", number)
}
})
err := group.Run()
fmt.Println()
fmt.Println("Done")
fmt.Printf("Error: %v", err)
}
Output:
a 1 b 2 c 3
Done
Error: <nil>
Here is an example with a timeout:
func main() {
options := &parallelizer.Options{Timeout: time.Second}
group := parallelizer.NewGroup(options)
group.Add(func() {
time.Sleep(time.Minute)
for char := 'a'; char < 'a'+3; char++ {
fmt.Printf("%c ", char)
}
})
group.Add(func() {
time.Sleep(time.Minute)
for number := 1; number < 4; number++ {
fmt.Printf("%d ", number)
}
})
err := group.Run()
fmt.Println()
fmt.Println("Done")
fmt.Printf("Error: %v", err)
}
Output:
Done
Error: timeout
答案9
得分: 0
这不是对这个问题的实际回答,而是我在遇到这个问题时的(简单得多)解决方案。
我的“工作者”正在进行http.Get()请求,所以我只需在http客户端上设置超时时间。
urls := []string{"http://1.jpg", "http://2.jpg"}
wg := &sync.WaitGroup{}
for _, url := range urls {
wg.Add(1)
go func(url string) {
client := http.Client{
Timeout: time.Duration(3 * time.Second), // 只希望得到非常快速的响应
}
resp, err := client.Get(url)
//... 检查错误
//... 当没有错误时对图像进行处理
//...
wg.Done()
}(url)
}
wg.Wait()
希望对你有帮助!
英文:
This is not an actual answer to this question but was the (much simpler) solution to my little problem when I had this question.
My 'workers' were doing http.Get() requests so I just set the timeout on the http client.
urls := []string{"http://1.jpg", "http://2.jpg"}
wg := &sync.WaitGroup{}
for _, url := range urls {
wg.Add(1)
go func(url string) {
client := http.Client{
Timeout: time.Duration(3 * time.Second), // only want very fast responses
}
resp, err := client.Get(url)
//... check for errors
//... do something with the image when there are no errors
//...
wg.Done()
}(url)
}
wg.Wait()
答案10
得分: 0
我们的一个系统也有同样的需求。通过将上下文传递给goroutine,并在超时时关闭该上下文,我们可以防止goroutine泄漏。
func main() {
ctx := context.Background()
ctxWithCancel, cancelFunc := context.WithCancel(ctx)
var wg sync.WaitGroup
Provide(ctxWithCancel, 5, &wg)
Provide(ctxWithCancel, 5, &wg)
c := make(chan struct{})
go func() {
wg.Wait()
c <- struct{}{}
fmt.Println("closed")
}()
select {
case <-c:
case <-time.After(20 * time.Millisecond):
cancelFunc()
fmt.Println("timeout")
}
}
func Work(ctx context.Context, to int) {
for i := 0; i < to; i++ {
select {
case <-ctx.Done():
return
default:
fmt.Println(i)
time.Sleep(10 * time.Millisecond)
}
}
}
func Provide(ctx context.Context, to int, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
Work(ctx, to)
wg.Done()
}()
}
英文:
We had the same need for one of our systems. by passing a context to goroutines and closing that context when we are facing timeout, we would prevent goroutine leaks.
func main() {
ctx := context.Background()
ctxWithCancel, cancelFunc := context.WithCancel(ctx)
var wg sync.WaitGroup
Provide(ctxWithCancel, 5, &wg)
Provide(ctxWithCancel, 5, &wg)
c := make(chan struct{})
go func() {
wg.Wait()
c <- struct{}{}
fmt.Println("closed")
}()
select {
case <-c:
case <-time.After(20 * time.Millisecond):
cancelFunc()
fmt.Println("timeout")
}
}
func Work(ctx context.Context, to int) {
for i := 0; i < to; i++ {
select {
case <-ctx.Done():
return
default:
fmt.Println(i)
time.Sleep(10 * time.Millisecond)
}
}
}
func Provide(ctx context.Context, to int, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
Work(ctx, to)
wg.Done()
}()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论