英文:
Wait for multiple callbacks with timeout in go without busy waiting or polling
问题
在Go语言中,我有两个回调函数最终没有触发。
registerCb(func() {...})
registerCb(func() {...})
/* 等待两个函数执行,如果超时则返回 */
我想等待它们都执行完,但如果其中一个没有执行,就设置一个超时。
sync.WaitGroup不适用,因为它是阻塞的而不是基于通道的。此外,在回调函数之外调用WaitGroup.Done()会导致恐慌。
我目前的解决方案是使用两个布尔值和一个忙等待循环。但这并不令人满意。
是否有任何不使用轮询或忙等待的惯用方法?
**更新:**
以下是一些演示忙等待解决方案的代码,但应在两个回调函数都触发或超时后立即返回,而不使用轮询。
```go
package main
import (
"fmt"
"log"
"sync"
"time"
)
var cbOne func()
var cbTwo func()
func registerCbOne(cb func()) {
cbOne = cb
}
func registerCbTwo(cb func()) {
cbTwo = cb
}
func executeCallbacks() {
<-time.After(1 * time.Second)
cbOne()
// 可能永远不会发生
//<-time.After(1 * time.Second)
//cbTwo()
}
func main() {
// 后台的某个进程将执行我们的回调函数
go func() {
executeCallbacks()
}()
err := WaitAllOrTimeout(3 * time.Second)
if err != nil {
fmt.Println("Error: ", err.Error())
}
fmt.Println("Hello, playground")
}
func WaitAllOrTimeout(to time.Duration) error {
cbOneDoneCh := make(chan bool, 1)
cbTwoDoneCh := make(chan bool, 1)
cbOneDone := false
cbTwoDone := false
registerCbOne(func() {
fmt.Println("cb One")
cbOneDoneCh <- true
})
registerCbTwo(func() {
fmt.Println("cb Two")
cbTwoDoneCh <- true
})
// 等待cbOne和cbTwo被执行或超时
// 忙等待解决方案
for {
select {
case <-time.After(to):
if cbOneDone && cbTwoDone {
fmt.Println("Both CB executed (we could poll more often)")
return nil
}
fmt.Println("Timeout!")
return fmt.Errorf("Timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
}
英文:
In go I have two callbacks that eventually do not fire.
registerCb(func() {...})
registerCb(func() {...})
/* Wait for both func to execute with timeout */
I want to wait for both of them but having a timeout if one is not executed.
sync.WaitGroup does not work, since it is blocking and not channel based. Also you call WaitGroup.Done() without the risk of panic outside the callbacks.
My current solution is using just two booleans and a busy wait loop. But that's not satisfying.
Is there any idiomatic way that do not use polling or busy waiting?
Update:
Here is some code that demonstrates a busy wait solution but should return as soon as both callbacks are fired or after the timeout, without using polling
package main
import (
"fmt"
"log"
"sync"
"time"
)
var cbOne func()
var cbTwo func()
func registerCbOne(cb func()) {
cbOne = cb
}
func registerCbTwo(cb func()) {
cbTwo = cb
}
func executeCallbacks() {
<-time.After(1 * time.Second)
cbOne()
// Might never happen
//<-time.After(1 * time.Second)
//cbTwo()
}
func main() {
// Some process in background will execute our callbacks
go func() {
executeCallbacks()
}()
err := WaitAllOrTimeout(3 * time.Second)
if err != nil {
fmt.Println("Error: ", err.Error())
}
fmt.Println("Hello, playground")
}
func WaitAllOrTimeout(to time.Duration) error {
cbOneDoneCh := make(chan bool, 1)
cbTwoDoneCh := make(chan bool, 1)
cbOneDone := false
cbTwoDone := false
registerCbOne(func() {
fmt.Println("cb One");
cbOneDoneCh <- true
})
registerCbTwo(func() {
fmt.Println("cb Two");
cbTwoDoneCh <- true
})
// Wait for cbOne and cbTwo to be executed or a timeout
// Busywait solution
for {
select {
case <-time.After(to):
if cbOneDone && cbTwoDone {
fmt.Println("Both CB executed (we could poll more often)")
return nil
}
fmt.Println("Timeout!")
return fmt.Errorf("Timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
}
答案1
得分: 2
func wait(ctx context.Context, wg *sync.WaitGroup) error {
done := make(chan struct{}, 1)
go func() {
wg.Wait()
done <- struct{}{}
}()
select {
case <-done:
// 计数器为0,表示所有回调函数已完成。
return nil
case <-ctx.Done():
// 上下文被取消。
return ctx.Err()
}
}
或者,你可以传递一个 `time.Duration` 值,并在 `<-time.After(d)` 上阻塞,而不是在 `<-ctx.Done()` 上阻塞,但我认为使用上下文更符合惯用法。
英文:
func wait(ctx context.Context, wg *sync.WaitGroup) error {
done := make(chan struct{}, 1)
go func() {
wg.Wait()
done <- struct{}{}
}()
select {
case <-done:
// Counter is 0, so all callbacks completed.
return nil
case <-ctx.Done():
// Context cancelled.
return ctx.Err()
}
}
Alternatively, you can pass a time.Duration
and block on <-time.After(d)
rather than on <-ctx.Done()
, but I would argue that using context is more idiomatic.
答案2
得分: 1
以下代码提供了两种变体:
- 第一种是常规模式,没有花哨的东西,它完成了工作并且做得很好。你将回调函数启动到一个例程中,使它们推送到一个接收器中,监听该接收器以获取结果或超时。要注意接收器通道的初始容量,以防止泄漏例程,它必须与回调函数的数量匹配。
- 第二种将同步机制封装到小函数中进行组装,提供了两种等待方法,waitAll和waitOne。写起来很好,但效率肯定较低,分配更多,与更多通道来回交互,推理更复杂,更微妙。
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
ExampleOne()
ExampleTwo()
ExampleThree()
fmt.Println("Hello, playground")
}
func ExampleOne() {
log.Println("start reg")
errs := make(chan error, 2)
go func() {
fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)
errs <- fn()
}()
go func() {
fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))
errs <- fn()
}()
select {
case err := <-errs: // 只捕获一个结果,最快完成的。
if err != nil {
log.Println(err)
}
case <-time.After(time.Second): // 或者等待那么长时间,以防它们都很慢。
}
log.Println("done reg")
}
func ExampleTwo() {
log.Println("start wait")
errs := waitAll(
withTimeout(time.Second,
callbackWithOpts("waitAll: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitAll: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done wait")
}
func ExampleThree() {
log.Println("start waitOne")
errs := waitOne(
withTimeout(time.Second,
callbackWithOpts("waitOne: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitOne: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done waitOne")
}
// 一个可配置的回调函数
func callbackWithOpts(msg string, tout time.Duration, err error) func() error {
return func() error {
<-time.After(tout)
fmt.Println(msg)
return err
}
}
// withTimeout 返回一个函数,该函数返回第一个错误或超时并返回nil
func withTimeout(tout time.Duration, h func() error) func() error {
return func() error {
d := make(chan error, 1)
go func() {
d <- h()
}()
select {
case err := <-d:
return err
case <-time.After(tout):
}
return nil
}
}
// wait 启动所有func()并将它们的错误返回到返回的错误通道中;(合并)
// 调用者负责处理输出错误通道。
func waitAll(h ...func() error) chan error {
d := make(chan error, len(h))
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
wg.Wait()
close(d)
}()
return d
}
// wait 启动所有func()并将第一个错误返回到返回的错误通道中
// 调用者负责处理输出错误通道。
func waitOne(h ...func() error) chan error {
d := make(chan error, len(h))
one := make(chan error, 1)
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
for err := range d {
one <- err
close(one)
break
}
}()
go func() {
wg.Wait()
close(d)
}()
return one
}
func trim(err chan error) chan error {
out := make(chan error)
go func() {
for e := range err {
out <- e
}
close(out)
}()
return out
}
英文:
below code present two variations,
- the first is the regular pattern, nothing fancy, it does the job and does it well. You launch your callbacks into a routine, you make them push to a sink, listen that sink for a result or timeout. Take care to the sink channel initial capacity, to prevent leaking a routine it must match the number of callbacks.
- the second factories out the synchronization mechanisms into small functions to assemble, two wait methods are provided, waitAll and waitOne. Nice to write, but definitely less efficient, more allocations, more back and forth with more channels, more complex to reason about, more subtle.
package main
import (
"fmt"
"log"
"sync"
"time"
)
func main() {
ExampleOne()
ExampleTwo()
ExampleThree()
fmt.Println("Hello, playground")
}
func ExampleOne() {
log.Println("start reg")
errs := make(chan error, 2)
go func() {
fn := callbackWithOpts("reg: so slow", 2*time.Second, nil)
errs <- fn()
}()
go func() {
fn := callbackWithOpts("reg: too fast", time.Millisecond, fmt.Errorf("broke!"))
errs <- fn()
}()
select {
case err := <-errs: // capture only one result,
// the fastest to finish.
if err != nil {
log.Println(err)
}
case <-time.After(time.Second): // or wait that many amount of time,
// in case they are all so slow.
}
log.Println("done reg")
}
func ExampleTwo() {
log.Println("start wait")
errs := waitAll(
withTimeout(time.Second,
callbackWithOpts("waitAll: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitAll: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done wait")
}
func ExampleThree() {
log.Println("start waitOne")
errs := waitOne(
withTimeout(time.Second,
callbackWithOpts("waitOne: so slow", 2*time.Second, nil),
),
withTimeout(time.Second,
callbackWithOpts("waitOne: too fast", time.Millisecond, nil),
),
)
for err := range trim(errs) {
if err != nil {
log.Println(err)
}
}
log.Println("done waitOne")
}
// a configurable callback for playing
func callbackWithOpts(msg string, tout time.Duration, err error) func() error {
return func() error {
<-time.After(tout)
fmt.Println(msg)
return err
}
}
// withTimeout return a function that returns first error or times out and return nil
func withTimeout(tout time.Duration, h func() error) func() error {
return func() error {
d := make(chan error, 1)
go func() {
d <- h()
}()
select {
case err := <-d:
return err
case <-time.After(tout):
}
return nil
}
}
// wait launches all func() and return their errors into the returned error channel; (merge)
// It is the caller responsability to drain the output error channel.
func waitAll(h ...func() error) chan error {
d := make(chan error, len(h))
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
wg.Wait()
close(d)
}()
return d
}
// wait launches all func() and return the first error into the returned error channel
// It is the caller responsability to drain the output error channel.
func waitOne(h ...func() error) chan error {
d := make(chan error, len(h))
one := make(chan error, 1)
var wg sync.WaitGroup
for i := 0; i < len(h); i++ {
wg.Add(1)
go func(h func() error) {
defer wg.Done()
d <- h()
}(h[i])
}
go func() {
for err := range d {
one <- err
close(one)
break
}
}()
go func() {
wg.Wait()
close(d)
}()
return one
}
func trim(err chan error) chan error {
out := make(chan error)
go func() {
for e := range err {
out <- e
}
close(out)
}()
return out
}
答案3
得分: 1
这是对我的评论的后续,是在你添加示例解决方案之后添加的。为了比评论中更清楚,你的示例代码实际上并不那么糟糕。这是你的原始示例:
// Busywait solution
for {
select {
case <-time.After(to):
if cbOneDone && cbTwoDone {
fmt.Println("Both CB executed (we could poll more often)")
return nil
}
fmt.Println("Timeout!")
return fmt.Errorf("Timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
这不是一个“忙等待”,但它确实有几个错误(包括需要对完成通道进行一次性发送语义,或者更简单且至少同样好的方法是,在完成后将它们关闭,可以使用sync.Once
)。我们想要做的是:
- 使用
to
作为超时时间启动一个计时器。 - 进入一个选择循环,使用计时器的通道和两个“完成”通道。
我们希望在以下事件中的第一个发生时退出选择循环:
- 计时器触发,或者
- 两个“完成”通道都已发出信号。
如果我们要close
这两个完成通道,我们还希望将Ch
变量清除(设置为nil
),以便选择不会旋转-这将使其成为真正的忙等待-但是暂时让我们假设我们在回调时仅发送一次,并且在其他情况下只泄漏通道,以便我们可以按原样使用你的代码,因为这些选择只会返回一次。以下是更新后的代码:
t := timer.NewTimer(to)
for !cbOneDone || !cbTwoDone {
select {
case <-t.C:
fmt.Println("Timeout!")
return fmt.Errorf("timeout")
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
// 在此处插入t.Stop()和接收以排空t.C(如果需要)
fmt.Println("Both CB executed")
return nil
请注意,我们最多会执行两次循环:
-
如果我们从两个完成通道分别接收到,循环将在没有超时的情况下停止。没有旋转/忙等待:我们从
t.C
中没有接收到任何内容。我们返回nil(无错误)。 -
如果我们从一个完成通道接收到,循环将继续,但会阻塞等待计时器或另一个完成通道。
-
如果我们从
t.C
接收到任何内容,这意味着我们尚未收到两个回调。我们可能已经收到了一个,但已经超时,我们选择放弃,这是我们的目标。我们返回一个错误,而不再通过循环。
一个真实的版本需要更多的工作来进行适当的清理并避免泄漏“完成”通道(以及计时器通道及其goroutine;请参见注释),但这是一般的思路。你已经将回调转换为通道操作,并且已经有了一个带有其通道的计时器。
英文:
This is a followup to my comment, added after you added your example solution. To be clearer than I can in comments, your example code is actually not that bad. Here is your original example:
> // Busywait solution
> for {
> select {
> case <-time.After(to):
> if cbOneDone && cbTwoDone {
> fmt.Println("Both CB executed (we could poll more often)")
> return nil
> }
> fmt.Println("Timeout!")
> return fmt.Errorf("Timeout")
> case <-cbOneDoneCh:
> cbOneDone = true
> case <-cbTwoDoneCh:
> cbTwoDone = true
> }
> }
This isn't a "busy wait" but it does have several bugs (including the fact that you need an only-once send semantic for the done channels, or maybe easier and at least as good, to just close them once when done, perhaps using sync.Once
). What we want to do is:
- Start a timer with
to
as the timeout. - Enter a select loop, using the timer's channel and the two "done" channels.
We want to exit the select loop when the first of the following events occurs:
- the timer fires, or
- both "done" channels have been signaled.
If we're going to close
the two done channels we'll want to have the Ch
variables cleared (set to nil
) as well so that the selects don't spin—that would turn this into a true busy-wait—but for the moment let's just assume instead that we send exactly once on them on callback, and otherwise just leak the channels, so that we can use your code as written as those selects will only ever return once. Here's the updated code:
t := timer.NewTimer(to)
for !cbOneDone || !cbTwoDone {
select {
case <-t.C:
fmt.Println("Timeout!")
return fmt.Errorf("timeout")
}
case <-cbOneDoneCh:
cbOneDone = true
case <-cbTwoDoneCh:
cbTwoDone = true
}
}
// insert t.Stop() and receive here to drain t.C if desired
fmt.Println("Both CB executed")
return nil
Note that we will go through the loop at most two times:
-
If we receive from both Done channels, once each, the loop stops without a timeout. There's no spinning/busy-waiting: we never received anything from
t.C
. We return nil (no error). -
If we receive from one Done channel, the loop resumes but blocks waiting for the timer or the other Done channel.
-
If we ever receive from
t.C
, it means we didn't get both callbacks yet. We may have had one, but there's been a timeout and we choose to give up, which was our goal. We return an error, without going back through the loop.
A real version needs a bit more work to clean up properly and avoid leaking "done" channels (and the timer channel and its goroutine; see comment), but this is the general idea. You're already turning the callbacks into channel operations, and you already have a timer with its channel.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论