强制优先级的go select语句

huangapple go评论69阅读模式
英文:

Force priority of go select statement

问题

我有以下一段代码:

func sendRegularHeartbeats(ctx context.Context) {
    for {
        select {
        case <-ctx.Done():
            return
        case <-time.After(1 * time.Second):
            sendHeartbeat()
        }
    }
}

这个函数在一个专用的go协程中执行,并且每秒发送一条心跳消息。当上下文被取消时,整个过程应立即停止。

现在考虑以下情况:

ctx, cancel := context.WithCancel(context.Background())
cancel()
go sendRegularHeartbeats(ctx)

这样使用一个已关闭的上下文启动了心跳协程。在这种情况下,我不希望发送任何心跳消息。因此,应立即进入select语句中的第一个case块。

然而,似乎不能保证case块的评估顺序,并且有时代码会发送心跳消息,即使上下文已经被取消。

实现这种行为的正确方法是什么?

我可以在第二个case中添加一个“isContextClosed”检查,但这看起来更像是对问题的丑陋解决方法。

英文:

I have the following piece of code:

func sendRegularHeartbeats(ctx context.Context) {
	for {
		select {
		case &lt;-ctx.Done():
			return
		case &lt;-time.After(1 * time.Second):
			sendHeartbeat()
		}
	}
}

This function is executed in a dedicated go-routine and sends a heartbeat-message every second. The whole process should stop immediately when the context is canceled.

Now consider the following scenario:

ctx, cancel := context.WithCancel(context.Background())
cancel()
go sendRegularHeartbeats(ctx)

This starts the heartbeat-routine with a closed context. In such a case, I don't want any heartbeats to be transmitted. So the first case block in the select should be entered immediately.

However, it seems that the order in which case blocks are evaluated is not guaranteed, and that the code sometimes sends a heartbeat message, even though the context is already canceled.

What is the correct way to implement such a behaviour?

I could add a "isContextclosed"-check in the second case, but that looks more like an ugly workaround for the problem.

答案1

得分: 14

被接受的答案有一个错误的建议:

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        // 第一个选择
        select {
        case <-ctx.Done():
            return
        default:
        }

        // 第二个选择
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            sendHeartbeat()
        }
    }
}

这个方法并没有帮助,因为存在以下情况:

  1. 两个通道都是空的
  2. 第一个选择运行
  3. 两个通道同时接收到消息
  4. 你面临的概率问题与在第一个选择中什么都不做时一样

另一种但仍然不完美的方法是在消费 ticker 事件后防止并发的 Done() 事件(即"错误的选择"):

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        // 像往常一样进行选择
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            // 以非阻塞的方式优先处理可能的并发 Done() 事件
            select {
              case <-ctx.Done():
              return
            default:
            }
            sendHeartbeat()
        }
    }
}

注意:这种方法的问题是它允许"足够接近"的事件被混淆 - 例如,即使 ticker 事件先到达,Done 事件也足够快地抢占了心跳。目前还没有完美的解决方案。

英文:

The accepted answer has a wrong suggestion:

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        //first select 
        select {
        case &lt;-ctx.Done():
            return
        default:
        }

        //second select
        select {
        case &lt;-ctx.Done():
            return
        case &lt;-ticker.C:
            sendHeartbeat()
        }
    }
}

This doesn't help, because of the following scenario:

  1. both channels are empty
  2. first select runs
  3. both channels get a message concurrently
  4. you are in the same probability game as if you haven't done anything in the first select

An alternative but still imperfect way is to guard against concurrent Done() events (the "wrong select") after consuming the ticker event i.e.

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {            
        //select as usual
        select {
        case &lt;-ctx.Done():
            return
        case &lt;-ticker.C:
            //give priority to a possible concurrent Done() event non-blocking way
            select {
              case &lt;-ctx.Done():
              return
            default:
            }
            sendHeartbeat()
        }
    }
}

Caveat: the problem with this one is that it allows for "close enough" events to be confused - e.g. even though a ticker event arrived earlier, the Done event came soon enough to preempt the heartbeat. There is no perfect solution as of now.

答案2

得分: 8

事先注意:

如果在调用sendRegularHeartbeats()时上下文已经取消,那么case <-ctx.Done()通信将是唯一准备就绪并被选择的通信。另一个case <-time.After(1 * time.Second)将在1秒后准备就绪,因此一开始不会被选择。但是,如果要明确处理多个准备就绪的情况下的优先级,请继续阅读。


switch语句case分支(其中评估顺序是它们列出的顺序)不同,select语句case分支没有优先级或任何顺序保证。

引用自规范:Select语句

> 如果一个或多个通信可以进行,将通过均匀的伪随机选择选择一个可以进行的通信。否则,如果有一个默认的情况,那个情况将被选择。如果没有默认情况,"select"语句将阻塞,直到至少有一个通信可以进行。

如果有多个通信可以进行,将随机选择一个。就是这样。

如果要保持优先级,您必须自己手动处理。您可以使用多个select语句(后续的,而不是嵌套的),在_较早_的select中列出具有较高优先级的通信,还要确保添加一个default分支以避免阻塞,如果它们没有准备就绪。您的示例需要2个select语句,第一个检查<-ctx.Done(),因为这是您希望具有较高优先级的通信。

我还建议使用单个time.Ticker而不是在每次迭代中调用time.After()time.After()在内部也使用time.Ticker,但它不会重用它,只是在下一次调用时“丢弃”它并创建一个新的)。

这是一个示例实现:

func sendRegularHeartbeats(ctx context.Context) {
    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ctx.Done():
            return
        default:
        }

        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            sendHeartbeat()
        }
    }
}

如果在调用sendRegularHeartbeats()时上下文已经取消,将不会发送心跳,您可以在Go Playground上检查/验证它。

如果将cancel()调用延迟2.5秒,那么将发送确切地2个心跳:

ctx, cancel := context.WithCancel(context.Background())
go sendRegularHeartbeats(ctx)
time.Sleep(time.Millisecond * 2500)
cancel()
time.Sleep(time.Second * 2)

Go Playground上尝试这个。

英文:

Note beforehand:

Your example will work as you intend it to, as if the context is already cancelled when sendRegularHeartbeats() is called, the case &lt;-ctx.Done() communication will be the only one ready to proceed and therefore chosen. The other case &lt;-time.After(1 * time.Second) will only be ready to proceed after 1 second, so it will not be chosen at first. But to explicitly handle priorities when multiple cases might be ready, read on.


Unlike the case branches of a switch statement (where the evaluation order is the order they are listed), there is no priority or any order guaranteed in the case branches of a select statement.

Quoting from Spec: Select statements:

> If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.

If more communications can proceed, one is chosen randomly. Period.

If you want to maintain priority, you have to do that yourself (manually). You may do it using multiple select statements (subsequent, not nested), listing ones with higher priority in an earlier select, also be sure to add a default branch to avoid blocking if those are not ready to proceed. Your example requires 2 select statements, first one checking &lt;-ctx.Done() as that is the one you want higher priority for.

I also recommend using a single time.Ticker instead of calling time.After() in each iteration (time.After() also uses a time.Ticker under the hood, but it doesn't reuse it just "throws it away" and creates a new one on the next call).

Here's an example implementation:

func sendRegularHeartbeats(ctx context.Context) {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()

	for {
		select {
		case &lt;-ctx.Done():
			return
		default:
		}

		select {
		case &lt;-ctx.Done():
			return
		case &lt;-ticker.C:
			sendHeartbeat()
		}
	}
}

This will send no heartbeat if the context is already cancelled when sendRegularHeartbeats() is called, as you can check / verify it on the Go Playground.

If you delay the cancel() call for 2.5 seconds, then exactly 2 heartbeats will be sent:

ctx, cancel := context.WithCancel(context.Background())
go sendRegularHeartbeats(ctx)
time.Sleep(time.Millisecond * 2500)
cancel()
time.Sleep(time.Second * 2)

Try this one on the Go Playground.

答案3

得分: 4

如果绝对关键要保持操作的优先级,你可以采取以下措施:

  • 在单独的goroutine中从每个通道消费数据
  • 每个goroutine向一个共享的第三个通道写入消息,指示其类型
  • 第三个goroutine从该通道中消费数据,读取接收到的消息以确定是否为tick并应该执行sendHeartbeat,或者是否为cancel并应该退出

这样,其他通道接收到的消息(可能)会按照它们被触发的顺序进入第三个通道,从而使你能够适当地处理它们。

然而,值得注意的是,这可能是不必要的。select语句不能保证如果多个case同时成功,哪个case会执行。这可能是一个罕见的事件;在select处理之前,cancel和ticker都必须触发。大多数情况下,每次循环迭代时只会触发其中一个,因此它的行为与预期完全一致。如果你可以容忍在取消后多触发一个心跳的罕见情况,最好保留你现有的代码,因为它更高效且更易读。

英文:

If it is absolutely critical to maintain that priority of operations, you can:

  • Consume from each channel in a separate goroutine
  • Have each of those goroutines write a message to a shared third channel indicating its type
  • Have a third goroutine consume from that channel, reading the messages it receives to determine if it is a tick and should sendHeartbeat or if it is a cancel and it should exit

This way, messages received on the other channels will (probably - you can't guarantee order of execution of concurrent routines) come in on the third channel in the order they're triggered, allowing you to handle them appropriately.

However, it's worth noting that this is probably not necessary. A select does not guarantee which case will execute if multiple cases succeed simultaneously. That is probably a rare event; the cancel and ticker would both have to fire before either was handled by the select. The vast majority of the time, only one or the other will fire at any given loop iteration, so it will behave exactly as expected. If you can tolerate rare occurrences of firing one additional heartbeat after a cancellation, you're better off keeping the code you have, as it is more efficient and more readable.

huangapple
  • 本文由 发表于 2017年9月13日 22:35:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/46200343.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定