sync.Cond测试广播-为什么要循环检查?

huangapple go评论92阅读模式
英文:

sync.Cond Test Broadcast - why check in a loop?

问题

我正在尝试使用sync.Cond的Wait和Broadcast方法,但是有些部分我无法理解:

Wait方法的注释中写道:

   41    // 因为在Wait第一次恢复时,c.L没有被锁定,所以调用者
   42    // 通常不能假设在Wait返回时条件为真。相反,调用者应该使用循环等待:
   43    //
   44    //    c.L.Lock()
   45    //    for !condition() {
   46    //        c.Wait()
   47    //    }
   48    //    ... 使用条件 ...
   49    //    c.L.Unlock()

这是为什么需要这样做的原因?

这意味着以下程序可能不正确(尽管它可以工作):

package main

import (
	"bufio"
	"fmt"
	"os"
	"sync"
)

type processor struct {
	n       int
	c       *sync.Cond
	started chan int
}

func newProcessor(n int) *processor {

	p := new(processor)
	p.n = n

	p.c = sync.NewCond(&sync.Mutex{})

	p.started = make(chan int, n)

	return p
}

func (p *processor) start() {

	for i := 0; i < p.n; i++ {
		go p.process(i)
	}

	for i := 0; i < p.n; i++ {
		<-p.started
	}
	p.c.L.Lock()
	p.c.Broadcast()
	p.c.L.Unlock()

}

func (p *processor) process(f int) {

	fmt.Printf("fork : %d\n", f)

	p.c.L.Lock()
	p.started <- f
	p.c.Wait()
	p.c.L.Unlock()
	fmt.Printf("process: %d - out of wait\n", f)
}

func main() {

	p := newProcessor(5)
	p.start()

	reader := bufio.NewReader(os.Stdin)
	_,_ =reader.ReadString('\n')

}
英文:

I was trying to use sync.Cond - Wait and Broadcast. I could not understand some parts of it:

The comment for Wait calls says:

   41    // Because c.L is not locked when Wait first resumes, the caller
   42    // typically cannot assume that the condition is true when
   43    // Wait returns.  Instead, the caller should Wait in a loop:
   44    //
   45    //    c.L.Lock()
   46    //    for !condition() {
   47    //        c.Wait()
   48    //    }
   49    //    ... make use of condition ...
   50    //    c.L.Unlock()

What is the reason this is required?

So this means the following program may not be correct (all though it works):

package main

import (
	"bufio"
	"fmt"
	"os"
	"sync"
)

type processor struct {
	n       int
	c       *sync.Cond
	started chan int
}

func newProcessor(n int) *processor {

	p := new(processor)
	p.n = n

	p.c = sync.NewCond(&sync.Mutex{})

	p.started = make(chan int, n)

	return p
}

func (p *processor) start() {

	for i := 0; i < p.n; i++ {
		go p.process(i)
	}

	for i := 0; i < p.n; i++ {
		<-p.started
	}
	p.c.L.Lock()
	p.c.Broadcast()
	p.c.L.Unlock()

}

func (p *processor) process(f int) {

	fmt.Printf("fork : %d\n", f)

	p.c.L.Lock()
	p.started <- f
	p.c.Wait()
	p.c.L.Unlock()
	fmt.Printf("process: %d - out of wait\n", f)
}

func main() {

	p := newProcessor(5)
	p.start()

	reader := bufio.NewReader(os.Stdin)
	_,_ =reader.ReadString('\n')

}

答案1

得分: 1

条件变量不会保持信号状态,它们只会唤醒在.Wait()中阻塞的其他Go协程。因此,除非你有一个谓词来检查是否需要等待,或者你要等待的事情已经发生,否则会出现竞态条件。

在你的特定情况下,你通过使用p.started通道在调用.Wait()的Go协程和调用.Broadcast()的协程之间添加了同步,这样就不会出现我在这篇文章中描述的竞态条件。尽管我不敢肯定,但我个人会按照文档描述的惯用方式来做。

考虑你的start()函数在以下代码行中执行广播:

p.c.L.Lock()
p.c.Broadcast()

在那个特定的时间点上,考虑你的另一个Go协程已经到达了process()函数中的这个点:

fmt.Printf("fork : %d\n", f)

这个Go协程接下来要做的事情是锁定互斥锁(至少在start()中的Go协程释放该互斥锁之前,它不会拥有该互斥锁)并在条件变量上等待。

p.c.L.Lock()
p.started <- f
p.c.Wait()

但是Wait永远不会返回,因为此时没有人会发出/广播它——信号已经发生了。

因此,你需要另一个条件,你可以自己测试,这样当你已经知道条件已经发生时,就不需要调用Wait(),例如:

type processor struct {
    n       int
    c       *sync.Cond
    started chan int
    done    bool //添加的
}

...

func (p *processor) start() {

    for i := 0; i < p.n; i++ {
        go p.process(i)
    }

    for i := 0; i < p.n; i++ {
        <-p.started
    }
    p.c.L.Lock()
    p.done = true //添加的
    p.c.Broadcast()
    p.c.L.Unlock()

}

func (p *processor) process(f int) {

    fmt.Printf("fork : %d\n", f)

    p.c.L.Lock()
    p.started <- f
    for !p.done { //添加的
        p.c.Wait()
    }
    p.c.L.Unlock()
    fmt.Printf("process: %d - out of wait\n", f)
}
英文:

Condition variables doesn't stay signaled, they only wake up other go routines that are blocking in .Wait(). So this presents a race condition unless you have a predicate where you check if you even need to wait, or if the thing you want to wait for has already happened.

In your particular case, you have added synchronization between the go routines calling .Wait() and the one calling .BroadCast() by using your p.started channel, in a manner that as far as I can tell should not present the race condition I'm describing further on in this post. Though I wouldn't bet on it, and personally I would just do it the idiomatic way like the documentation describes.

Consider your start() function is executing the broadcast in these lines:

p.c.L.Lock()
p.c.Broadcast()

At that specific point in time consider that one of your other go routines have come to this point in your process() function

fmt.Printf(&quot;fork : %d\n&quot;, f)

The next thing that go routine will do is lock the mutex (which it isn't going to own at least until the go routine in start() releases that mutex) and wait on the condition variable.

p.c.L.Lock()
p.started &lt;- f
p.c.Wait()

But Wait is never going to return, since at this point there's noone that will signal/broadcast it - the signal has already happened.

So you need another condition that you can test yourself so you don't need to call Wait() when you already know the condition has happened, e.g.

type processor struct {
    n       int
    c       *sync.Cond
    started chan int
    done    bool //added
}

...

func (p *processor) start() {

    for i := 0; i &lt; p.n; i++ {
        go p.process(i)
    }

    for i := 0; i &lt; p.n; i++ {
        &lt;-p.started
    }
    p.c.L.Lock()
    p.done = true //added
    p.c.Broadcast()
    p.c.L.Unlock()

}

func (p *processor) process(f int) {

    fmt.Printf(&quot;fork : %d\n&quot;, f)

    p.c.L.Lock()
    p.started &lt;- f
    for !p.done { //added
        p.c.Wait()
    }
    p.c.L.Unlock()
    fmt.Printf(&quot;process: %d - out of wait\n&quot;, f)
}

huangapple
  • 本文由 发表于 2015年11月21日 17:02:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/33841585.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定