英文:
How to implement a timeout when using sync.WaitGroup.wait?
问题
我遇到了一个情况,我想要追踪一些goroutine,直到它们在特定点同步,例如当所有的URL都被获取时。然后,我们可以将它们全部放在一起,并按特定的顺序显示。
我认为这里可以使用屏障,使用go
和sync.WaitGroup
来实现。然而,在实际情况中,我们无法确保所有的获取操作在短时间内都成功。因此,我想在等待获取操作时引入一个超时机制。
我是一个对Golang
不太熟悉的新手,所以有人可以给我一些建议吗?
我期望的代码示例是这样的:
wg := &sync.WaitGroup{}
select {
case <-wg.Wait():
// 所有操作完成!
case <-time.After(500 * time.Millisecond):
// 超时处理。
}
我知道Wait
方法不支持通道(Channel)。
英文:
I have come across a situation that i want to trace some goroutine to sync on a specific point, for example when all the urls are fetched. Then, we can put them all and show them in specific order.
I think this is the barrier comes in. It is in go
with sync.WaitGroup
. However, in real situation that we can not make sure that all the fetch operation will succeed in a short time. So, i want to introduce a timeout when wait
for the fetch operations.
I am a newbie to Golang
, so can someone give me some advice?
What i am looking for is like this:
wg := &sync.WaigGroup{}
select {
case <-wg.Wait():
// All done!
case <-time.After(500 * time.Millisecond):
// Hit timeout.
}
I know Wait
do not support Channel
.
答案1
得分: 34
如果你只想要一个整洁的选择,你可以通过生成一个调用方法并在完成后关闭/发送通道的例程来将阻塞函数转换为通道。
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// 全部完成!
case <-time.After(500 * time.Millisecond):
// 超时。
}
英文:
If all you want is your neat select, you can easily convert blocking function to a channel by spawning a routine which calls a method and closes/sends on channel once done.
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
// All done!
case <-time.After(500 * time.Millisecond):
// Hit timeout.
}
答案2
得分: 2
将你的结果发送到一个缓冲通道,足够容纳所有结果,而不会阻塞,并在主线程的for-select循环中读取它们:
func work(msg string, d time.Duration, ret chan<- string) {
time.Sleep(d) // 模拟工作。
select {
case ret <- msg:
default:
}
}
// ...
const N = 2
ch := make(chan string, N)
go work("printed", 100*time.Millisecond, ch)
go work("not printed", 1000*time.Millisecond, ch)
timeout := time.After(500 * time.Millisecond)
loop:
for received := 0; received < N; received++ {
select {
case msg := <-ch:
fmt.Println(msg)
case <-timeout:
fmt.Println("timeout!")
break loop
}
}
Playground: http://play.golang.org/p/PxeEEJo2dz.
参考资料:Go并发模式:超时,继续进行.
英文:
Send your results to a buffered channel enough to take all results, without blocking, and read them in for-select loop in the main thread:
func work(msg string, d time.Duration, ret chan<- string) {
time.Sleep(d) // Work emulation.
select {
case ret <- msg:
default:
}
}
// ...
const N = 2
ch := make(chan string, N)
go work("printed", 100*time.Millisecond, ch)
go work("not printed", 1000*time.Millisecond, ch)
timeout := time.After(500 * time.Millisecond)
loop:
for received := 0; received < N; received++ {
select {
case msg := <-ch:
fmt.Println(msg)
case <-timeout:
fmt.Println("timeout!")
break loop
}
}
Playground: http://play.golang.org/p/PxeEEJo2dz.
答案3
得分: 1
另一种方法是在内部进行监控。你的问题有些限制,但我假设你是通过循环启动goroutine的,即使你没有,你也可以重构代码以适应你的需求。你可以使用以下两个示例中的任意一个:第一个示例将为每个请求设置超时时间,第二个示例将为整个请求批次设置超时时间,并在超时后继续执行。
var wg sync.WaitGroup
wg.Add(1)
go func() {
success := make(chan struct{}, 1)
go func() {
// 发送请求并等待响应
// 假设已收到响应
time.Sleep(5 * time.Second)
success <- struct{}{}
// goroutine 在返回后会正常关闭
fmt.Println("正常返回")
}()
select {
case <-success:
break
case <-time.After(1 * time.Second):
break
}
wg.Done()
// 所有资源应该被垃圾回收,不再占用空间
}()
wg.Wait()
// 处理获取到的结果
fmt.Println("完成")
time.Sleep(10 * time.Second)
fmt.Println("在 limbo goroutine 完成后检查是否有错误抛出")
如果你只是想要一个通用的简单方法来设置所有请求的超时时间,你可以这样做:
```go
var wg sync.WaitGroup
waiter := make(chan int)
wg.Add(1)
go func() {
success := make(chan struct{}, 1)
go func() {
// 发送请求并等待响应
// 假设已收到响应
time.Sleep(5 * time.Second)
success <- struct{}{}
// goroutine 在返回后会正常关闭
fmt.Println("正常返回")
}()
select {
case <-success:
break
case <-time.After(1 * time.Second):
// 控制每个请求的超时时间,以确保调用 wg.Done() 并让持有 .Wait 的 goroutine 关闭
break
}
wg.Done()
// 所有资源应该被垃圾回收,不再占用空间
}()
completed := false
go func(completed *bool) {
// 使用任意一个等待方式解除阻塞
wg.Wait()
if !*completed {
waiter <- 1
*completed = true
}
fmt.Println("返回 Two")
}(&completed)
go func(completed *bool) {
// 等待一段时间
time.Sleep(time.Second * 5)
if !*completed {
waiter <- 1
*completed = true
}
fmt.Println("返回 One")
}(&completed)
// 阻塞,直到超时或 .Wait 停止阻塞
<-waiter
// 处理获取到的结果
fmt.Println("完成")
time.Sleep(10 * time.Second)
fmt.Println("在 limbo goroutine 完成后检查是否有错误抛出")
这样你的 WaitGroup 将保持同步,不会有任何 goroutine 被遗留在 limbo 状态。
你可以在这里尝试运行代码:http://play.golang.org/p/g0J_qJ1BUT,你可以更改变量来观察不同的运行结果。
编辑:我在使用手机,如果有人能修复格式问题,那就太好了,谢谢。
<details>
<summary>英文:</summary>
Another way to do it would be to monitor it internally, your question is limited but I'm going to assume you're starting your goroutines through a loop even if you're not you can refactor this to work for you but you could do one of these 2 examples, the first one will timeout each request to timeout individually and the second one will timeout the entire batch of requests and move on if too much time has passed
var wg sync.WaitGroup
wg.Add(1)
go func() {
success := make(chan struct{}, 1)
go func() {
// send your request and wait for a response
// pretend response was received
time.Sleep(5 * time.Second)
success <- struct{}{}
// goroutine will close gracefully after return
fmt.Println("Returned Gracefully")
}()
select {
case <-success:
break
case <-time.After(1 * time.Second):
break
}
wg.Done()
// everything should be garbage collected and no longer take up space
}()
wg.Wait()
// do whatever with what you got
fmt.Println("Done")
time.Sleep(10 * time.Second)
fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done")
Or if you just want a general easy way to timeout ALL requests you could do something like
var wg sync.WaitGroup
waiter := make(chan int)
wg.Add(1)
go func() {
success := make(chan struct{}, 1)
go func() {
// send your request and wait for a response
// pretend response was received
time.Sleep(5 * time.Second)
success <- struct{}{}
// goroutine will close gracefully after return
fmt.Println("Returned Gracefully")
}()
select {
case <-success:
break
case <-time.After(1 * time.Second):
// control the timeouts for each request individually to make sure that wg.Done gets called and will let the goroutine holding the .Wait close
break
}
wg.Done()
// everything should be garbage collected and no longer take up space
}()
completed := false
go func(completed *bool) {
// Unblock with either wait
wg.Wait()
if !*completed {
waiter <- 1
*completed = true
}
fmt.Println("Returned Two")
}(&completed)
go func(completed *bool) {
// wait however long
time.Sleep(time.Second * 5)
if !*completed {
waiter <- 1
*completed = true
}
fmt.Println("Returned One")
}(&completed)
// block until it either times out or .Wait stops blocking
<-waiter
// do whatever with what you got
fmt.Println("Done")
time.Sleep(10 * time.Second)
fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done")
This way your WaitGroup will stay in sync and you won't have any goroutines left in limbo
http://play.golang.org/p/g0J_qJ1BUT try it here you can change the variables around to see it work differently
Edit: I'm on mobile If anybody could fix the formatting that would be great thanks.
</details>
# 答案4
**得分**: 1
如果您想避免将并发逻辑与业务逻辑混合在一起,我写了这个库https://github.com/shomali11/parallelizer来帮助您解决这个问题。它封装了并发逻辑,因此您不必担心它。
所以在您的示例中:
package main
import (
"github.com/shomali11/parallelizer"
"fmt"
)
func main() {
urls := []string{ ... }
results = make([]*HttpResponse, len(urls)
options := &Options{ Timeout: time.Second }
group := parallelizer.NewGroup(options)
for index, url := range urls {
group.Add(func(index int, url string, results *[]*HttpResponse) {
return func () {
...
results[index] = &HttpResponse{url, response, err}
}
}(index, url, &results))
}
err := group.Run()
fmt.Println("Done")
fmt.Println(fmt.Sprintf("Results: %v", results))
fmt.Printf("Error: %v", err) // 如果完成则为nil,如果超时则为err
}
<details>
<summary>英文:</summary>
If you would like to avoid mixing concurrency logic with business logic, I wrote this library https://github.com/shomali11/parallelizer to help you with that. It encapsulates the concurrency logic so you do not have to worry about it.
So in your example:
package main
import (
"github.com/shomali11/parallelizer"
"fmt"
)
func main() {
urls := []string{ ... }
results = make([]*HttpResponse, len(urls)
options := &Options{ Timeout: time.Second }
group := parallelizer.NewGroup(options)
for index, url := range urls {
group.Add(func(index int, url string, results *[]*HttpResponse) {
return func () {
...
results[index] = &HttpResponse{url, response, err}
}
}(index, url, &results))
}
err := group.Run()
fmt.Println("Done")
fmt.Println(fmt.Sprintf("Results: %v", results))
fmt.Printf("Error: %v", err) // nil if it completed, err if timed out
}
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论