等待具有超时的sync.Cond

huangapple go评论89阅读模式
英文:

Waiting on a sync.Cond with a timeout

问题

有没有一种简单的方法来实现类似于Java的wait(long timeMillis)的功能?它在一个监视器(互斥锁+条件变量)上等待指定的时间,并在未收到信号时返回。

我在文档和谷歌搜索中找不到相关信息,虽然可以通过创建WaitGroup并使用定时器goroutine来实现,但这似乎很繁琐/烦人/低效,只是为了获得这个简单的功能(顺便说一句,任何我遇到过的底层系统线程库都直接支持这个功能)。

编辑:是的,我们都读过http://www.golang-book.com/10/index.htm和https://blog.golang.org/pipelines - 再次强调,创建更多的线程是一个“不好的”(性能低下)解决方案,而且通道也不适合这种情况。想象一下一个典型的并发服务器Join()方法的用例...(请不要告诉我反转控制并使用监听器模式。你并不总是有幸改变你正在使用的API...)

英文:

Is it possible in some easy way to do the equivalent of Java's

wait(long timeMillis)

which waits on a monitor (mutex+cond, roughly) for a specified amount of time, and returns if it is not signalled?

I can't find anything in the docs or googling around on this, and although it's of course possible to play some games with making a WaitGroup and having a timer goroutine pop, that seems tedious/annoying/inefficient to just get this simple functionality (which is by the way directly supported by any underlying systems thread library I've ever encountered)

Edit: Yes we have all read http://www.golang-book.com/10/index.htm as well as https://blog.golang.org/pipelines - again, creating more threads is a "bad" (non-performant) solution to this, and channels are also not well suited to this. Imagine for a use case a typical concurrent server Join() method... (Please do not tell me to invert the control and use a Listener pattern instead. You don't always have the luxury to change the API you are working with...)

答案1

得分: 10

你可以实现一个只支持Broadcast(不支持Signal)的条件变量,使用一个通道。这里是一个快速的Gist示例:https://gist.github.com/zviadm/c234426882bfc8acba88f3503edaaa36#file-cond2-go

你也可以在你的代码中利用这种替换通道并关闭旧通道的技术。Gist中的代码使用了unsafe.Pointer和原子操作,以允许在不获取主要的sync.Locker的情况下调用“Broadcast”。然而,在你自己的代码中,通常情况下,你应该在获取锁的情况下进行Broadcast,所以你不需要进行任何不安全的/原子的操作。

虽然这种方法可以工作,但你可能还想查看一下:https://godoc.org/golang.org/x/sync/semaphore。如果你创建一个带有限制为1的加权信号量,那么它也会给你提供所需的所有功能,并且还会保证公平性。

英文:

You can implement a condition variable that supports Broadcast only (no Signal), with a channel. Here is a quick Gist of it:
https://gist.github.com/zviadm/c234426882bfc8acba88f3503edaaa36#file-cond2-go

You can also just utilize this technique of replacing a channel and closing the old one within your code. The code in Gist is using unsafe.Pointer and atomic operations to allow calling 'Broadcast' without acquiring the main sync.Locker. However in your own code, more often than not, you should be Broadcasting from within the acquire lock anyways so you don't need to do any of the unsafe/atomic stuff.

While this method works, you might want to also checkout: https://godoc.org/golang.org/x/sync/semaphore. If you make a weighted semaphore with a limit of 1, that will also give you all the abilities you need and it will also be fair.

答案2

得分: 2

不。没有简单的方法来做到这一点,并且根据那个线程,他们不打算添加一个。(尽管也许与他们讨论一下可能会有所进展)

但总是有一种困难的方法。有两个选择:

  1. 自己编写具有此功能的Cond。(参见https://golang.org/src/sync/cond.go)
  2. 通过系统调用使用操作系统级别的功能。(也许是futex?)

这里的挑战 - 以及为什么它不是微不足道的原因 - 是goroutine不是线程。Go有自己的自定义调度器。创建自己的Cond将涉及对运行时的一些部分进行调整,这些部分实际上并不打算进行调整。(但是,正如我所说,这是可能的)

如果这限制了你,我很抱歉。大多数Go都很简单 - 你通常可以轻松地跳到较低的层次。但调度器不是这样的。它是魔法。

这个魔法对大多数事情都有效,并且他们添加了sync中的一些内容来覆盖一些已知的不适用情况。如果你觉得你找到了另一个情况,也许你可以说服他们添加它。(但这不仅仅是复制另一种编程语言的API,或者暴露底层API)

英文:

No. There is no easy way to do this and based on that thread they aren't going to add one. (though perhaps discussing it with them may get you somewhere)

But there's always a hard way. 2 options:

  1. Roll your own Cond that has this capability. (see https://golang.org/src/sync/cond.go)
  2. Use OS-level capabilities via a syscall. (maybe a futex?)

The challenge here - and the reason why it's not trivial - is that goroutines aren't threads. Go has it's own custom scheduler. Creating your own Cond will involve tinkering with parts of runtime that aren't really meant to be tinkered with. (but, like I said, it's possible)

Sorry if that's limiting. Most of go is pretty straightforward - you can often jump down into the lower layer without too much trouble. But the scheduler is not like that. It's magic.

The magic works for most things, and they added the stuff in sync to cover some known cases where it doesn't. If you feel like you found another, maybe you can convince them to add it. (but it's not just a matter of reproducing an API from another programming language, or exposing an underlying API)

答案3

得分: 1

我在今年的GopherCon演讲中提出了几种可能的替代方案(请参见https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view)。"条件变量"部分从第37页开始,但这个特定的模式在备用幻灯片(101-105页)中有更详细的介绍。

正如zviadm所指出的,一种选择(https://play.golang.org/p/tWVvXOs87HX)是关闭一个通道。

另一种选择(https://play.golang.org/p/uRwV_i0v13T)是让每个等待者分配一个1缓冲通道,并让广播者向缓冲区发送一个令牌进行广播。

如果事件是一个持久的条件,比如"队列为空",第三种选择(https://play.golang.org/p/uvx8vFSQ2f0)是使用一个1缓冲通道,并让每个接收者在条件持续存在时重新填充缓冲区。

英文:

I sketched out a couple of possible alternatives in my GopherCon talk this year (see https://drive.google.com/file/d/1nPdvhB0PutEJzdCq5ms6UI58dp50fcAN/view). The “Condition Variables” section starts at slide 37, but this particular pattern is covered in more detail in the backup slides (101-105).

As zviadm notes, one option (https://play.golang.org/p/tWVvXOs87HX) is to close a channel.

Another option (https://play.golang.org/p/uRwV_i0v13T) is to have each waiter allocate a 1-buffered channel, and have the broadcaster send a token into the buffer to broadcast.

If the event is a persistent condition, such as “the queue is empty”, a third option (https://play.golang.org/p/uvx8vFSQ2f0) is to use a 1-buffered channel and have each receiver refill the buffer as long as the condition persists.

答案4

得分: 1

https://gitlab.com/jonas.jasas/condchan 可以实现在等待时设置超时。请参考以下示例:

package main

import (
	"fmt"
	"sync"
	"time"
	"gitlab.com/jonas.jasas/condchan"
)

func main() {
	cc := condchan.New(&sync.Mutex{})
	timeoutChan := time.After(time.Second)

	cc.L.Lock()
	// 传递一个函数,该函数获取通道c,该通道在CondChan上调用Signal或Broadcast时发出信号
	cc.Select(func(c <-chan struct{}) { // 使用select进行等待
		select {
		case <-c: // 永远等待
		case <-timeoutChan:
			fmt.Println("Hooray! Just escaped from eternal wait.")
		}
	})
	cc.L.Unlock()
}
英文:

https://gitlab.com/jonas.jasas/condchan makes it possible to timeout on wait. Please see an example:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
	&quot;gitlab.com/jonas.jasas/condchan&quot;
)

func main() {
	cc := condchan.New(&amp;sync.Mutex{})
	timeoutChan := time.After(time.Second)

	cc.L.Lock()
	// Passing func that gets channel c that signals when
	// Signal or Broadcast is called on CondChan
	cc.Select(func(c &lt;-chan struct{}) { // Waiting with select
		select {
		case &lt;-c: // Never ending wait
		case &lt;-timeoutChan:
			fmt.Println(&quot;Hooray! Just escaped from eternal wait.&quot;)
		}
	})
	cc.L.Unlock()
}

答案5

得分: -2

我遇到了同样的问题,使用通道解决起来非常简单。

  • 信号是在通道上发送的。
  • 等待只是在通道上等待消息。
  • 带有超时的等待只是在定时器和消息上进行选择。
  • 广播是循环发送消息,直到没有人再监听为止。

与任何条件变量一样,在等待时需要持有互斥锁,并且在发出信号时强烈建议持有互斥锁。

我编写了一个遵循Cond协议并添加了WaitOrTimeout的实现。如果成功,它返回true,如果超时,则返回false。

以下是我的代码以及一些测试案例!免责声明:这似乎工作正常,但尚未经过彻底测试。此外,公平性不能保证。等待的线程按照调度程序认为合适的顺序释放,并不一定是先到先服务。

package main

import (
	"sync"
	"time"
	"fmt"
)

type TMOCond struct {
	L    sync.Locker
	ch   chan bool
}

func NewTMOCond(l sync.Locker) *TMOCond {
	return &TMOCond{ch: make(chan bool), L: l}
}

func (t *TMOCond) Wait() {
	t.L.Unlock()
	<-t.ch
	t.L.Lock()
}

func (t *TMOCond) WaitOrTimeout(d time.Duration) bool {
	tmo := time.NewTimer(d)
	t.L.Unlock()
	var r bool
	select {
	case <-tmo.C:
		r = false
	case <-t.ch:
		r = true
	}
	if !tmo.Stop() {
		select {
		case <-tmo.C:
		default:
		}
	}
	t.L.Lock()
	return r
}

func (t *TMOCond) Signal() {
	t.signal()
}

func (t *TMOCond) Broadcast() {
	for {
		// Stop when we run out of waiters
		//
		if !t.signal() {
			return
		}
	}
}

func (t *TMOCond) signal() bool {
	select {
	case t.ch <- true:
		return true
	default:
		return false
	}
}

// **** TEST CASES ****
func lockAndSignal(t *TMOCond) {
	t.L.Lock()
	t.Signal()
	t.L.Unlock()
}

func waitAndPrint(t *TMOCond, i int) {
	t.L.Lock()
	fmt.Println("Goroutine", i, "waiting...")
	ok := t.WaitOrTimeout(10 * time.Second)
	t.L.Unlock()
	fmt.Println("This is goroutine", i, "ok:", ok)
}

func main() {
	var m sync.Mutex
	t := NewTMOCond(&m)

	// Simple wait
	//
	t.L.Lock()
	go lockAndSignal(t)
	t.Wait()
	t.L.Unlock()
	fmt.Println("Simple wait finished.")

	// Wait that times out
	//
	t.L.Lock()
	ok := t.WaitOrTimeout(100 * time.Millisecond)
	t.L.Unlock()
	fmt.Println("Timeout wait finished. Timeout:", !ok)

	// Broadcast. All threads should finish.
	//
	for i := 0; i < 10; i++ {
		go waitAndPrint(t, i)
	}
	time.Sleep(1 * time.Second)
	t.L.Lock()
	fmt.Println("About to signal")
	t.Broadcast()
	t.L.Unlock()
	time.Sleep(10 * time.Second)
}

你可以在这里找到代码和一些测试案例:https://play.golang.org/p/K1veAOGbWZ

英文:

I ran into the same issue and it turned out to be pretty easy to solve using a channel.

  • A signal is a send on a channel
  • A wait is simply waiting for a message on a channel.
  • A wait with timeout is just a select on a timer and the message.
  • A broadcast is a loop sending messages until there's no one left who listens.

As with any condition variable, it's required to hold the mutex when you wait and highly recommended to hold it when you're signaling.

I wrote a in implementation that follows the Cond protocol and adds a WaitOrTimeout. It returns true if successful, false if timed out.

Here's my code along with some test cases! DISCLAIMER: This seems to work fine but hasn't been thoroughly tested. Also, fairness is not guaranteed. Threads waiting are released in the order the scheduler sees fit and not necessarily first come/first serve.

https://play.golang.org/p/K1veAOGbWZ

package main
import (
&quot;sync&quot;
&quot;time&quot;
&quot;fmt&quot;
)
type TMOCond struct {
L    sync.Locker
ch      chan bool
}
func NewTMOCond(l sync.Locker) *TMOCond {
return &amp;TMOCond{ ch: make(chan bool), L: l }
}
func (t *TMOCond) Wait() {
t.L.Unlock()
&lt;-t.ch
t.L.Lock()
}
func (t *TMOCond) WaitOrTimeout(d time.Duration) bool {
tmo := time.NewTimer(d)
t.L.Unlock()
var r bool
select {
case &lt;-tmo.C:
r = false
case &lt;-t.ch:
r = true
}
if !tmo.Stop() {
select {
case &lt;- tmo.C:
default:
}
}
t.L.Lock()
return r
}
func (t *TMOCond) Signal() {
t.signal()
}
func (t *TMOCond) Broadcast() {
for {
// Stop when we run out of waiters
//
if !t.signal() {
return
}
}
}
func (t *TMOCond) signal() bool {
select {
case t.ch &lt;- true:
return true
default:
return false
}
}
// **** TEST CASES ****
func lockAndSignal(t *TMOCond) {
t.L.Lock()
t.Signal()
t.L.Unlock()
}
func waitAndPrint(t *TMOCond, i int) {
t.L.Lock()
fmt.Println(&quot;Goroutine&quot;, i, &quot;waiting...&quot;)
ok := t.WaitOrTimeout(10 * time.Second)
t.L.Unlock()
fmt.Println(&quot;This is goroutine&quot;, i, &quot;ok:&quot;, ok)
}
func main() {
var m sync.Mutex
t := NewTMOCond(&amp;m)
// Simple wait
//
t.L.Lock()
go lockAndSignal(t)
t.Wait()
t.L.Unlock()
fmt.Println(&quot;Simple wait finished.&quot;)
// Wait that times out
//
t.L.Lock()
ok := t.WaitOrTimeout(100 * time.Millisecond)
t.L.Unlock()
fmt.Println(&quot;Timeout wait finished. Timeout:&quot;, !ok)
// Broadcast. All threads should finish.
//
for i := 0; i &lt; 10; i++ {
go waitAndPrint(t, i)
}
time.Sleep(1 * time.Second) 
t.L.Lock()
fmt.Println(&quot;About to signal&quot;)
t.Broadcast()
t.L.Unlock()
time.Sleep(10 * time.Second)
}

huangapple
  • 本文由 发表于 2015年4月28日 23:32:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/29923666.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定