How to correctly use sync.Cond?

huangapple go评论91阅读模式
英文:

How to correctly use sync.Cond?

问题

我在使用sync.Cond时遇到了困难。据我所知,在锁定Locker和调用条件的Wait方法之间存在竞态条件。以下示例在主goroutine的两行代码之间添加了人为延迟,以模拟竞态条件:

package main

import (
	"sync"
	"time"
)

func main() {
	m := sync.Mutex{}
	c := sync.NewCond(&m)
	go func() {
		time.Sleep(1 * time.Second)
		c.Broadcast()
	}()
	m.Lock()
	time.Sleep(2 * time.Second)
	c.Wait()
}

这会导致立即发生panic:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
	/usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
	/usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
	/tmp/sandbox301865429/main.go:17 +0x1a0

我做错了什么?我如何避免这个明显的竞态条件?有没有更好的同步构造我应该使用?


**编辑:**我意识到我应该更好地解释一下我在这里尝试解决的问题。我有一个长时间运行的goroutine,它下载一个大文件,还有一些其他的goroutine需要在HTTP头可用时访问它们。这个问题比听起来要困难。

我不能使用通道,因为只有一个goroutine会接收到值。而其他一些goroutine可能会在头部已经可用之后仍然尝试检索它们。

下载器goroutine可以简单地将HTTP头存储在一个变量中,并使用互斥锁来保护对它们的访问。然而,这并不能提供一种让其他goroutine“等待”它们变为可用的方法。

我曾认为sync.Mutexsync.Cond结合起来可以实现这个目标,但似乎这是不可能的。

英文:

I'm having trouble figuring out how to correctly use sync.Cond. From what I can tell, a race condition exists between locking the Locker and invoking the condition's Wait method. This example adds an artificial delay between the two lines in the main goroutine to simulate the race condition:

package main

import (
	"sync"
	"time"
)

func main() {
	m := sync.Mutex{}
	c := sync.NewCond(&m)
	go func() {
		time.Sleep(1 * time.Second)
		c.Broadcast()
	}()
	m.Lock()
	time.Sleep(2 * time.Second)
	c.Wait()
}

<sup>[Run on the Go Playground]</sup>

This causes an immediate panic:

<pre>fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Syncsemacquire(0x10330208, 0x1)
/usr/local/go/src/runtime/sema.go:241 +0x2e0
sync.(*Cond).Wait(0x10330200, 0x0)
/usr/local/go/src/sync/cond.go:63 +0xe0
main.main()
/tmp/sandbox301865429/main.go:17 +0x1a0</pre>

What am I doing wrong? How do I avoid this apparent race condition? Is there a better synchronization construct I should be using?


Edit: I realize I should have better explained the problem I'm trying to solve here. I have a long-running goroutine that downloads a large file and a number of other goroutines that need access to the HTTP headers when they are available. This problem is harder than it sounds.

I can't use channels since only one goroutine would then receive the value. And some of the other goroutines would be trying to retrieve the headers long after they are already available.

The downloader goroutine could simply store the HTTP headers in a variable and use a mutex to safeguard access to them. However, this doesn't provide a way for the other goroutines to "wait" for them to become available.

I had thought that both a sync.Mutex and sync.Cond together could accomplish this goal but it appears that this is not possible.

答案1

得分: 47

OP回答了自己的问题,但没有直接回答原始问题,我将发布如何正确使用sync.Cond的内容。

如果每个写入和读取操作都有一个goroutine,实际上不需要使用sync.Cond - 一个单独的sync.Mutex就足够在它们之间进行通信了。sync.Cond在多个读取器等待共享资源可用的情况下非常有用。

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // 这个goroutine等待sharedRsc的变化
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // 这个goroutine等待sharedRsc的变化
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // 这个goroutine将更改写入sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

Playground

话虽如此,如果情况允许,仍然推荐使用通道来传递数据。

注意:这里的sync.WaitGroup仅用于等待goroutine完成其执行。

英文:

OP answered his own, but did not directly answer the original question, I am going to post how to correctly use sync.Cond.

You do not really need sync.Cond if you have one goroutine for each write and read - a single sync.Mutex would suffice to communicate between them. sync.Cond could useful in situations where multiple readers wait for the shared resources to be available.

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&amp;m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc[&quot;rsc1&quot;])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc[&quot;rsc2&quot;])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc[&quot;rsc1&quot;] = &quot;foo&quot;
    sharedRsc[&quot;rsc2&quot;] = &quot;bar&quot;
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

Playground

Having said that, using channels is still the recommended way to pass data around if the situation permitting.

Note: sync.WaitGroup here is only used to wait for the goroutines to complete their executions.

答案2

得分: 20

你需要确保在调用c.Wait之后调用c.Broadcast。你的程序的正确版本应该是:

package main

import (
	"fmt"
	"sync"
)

func main() {
	m := &sync.Mutex{}
	c := sync.NewCond(m)
	m.Lock()
	go func() {
		m.Lock() // 等待c.Wait()
		c.Broadcast()
		m.Unlock()
	}()
	c.Wait() // 解锁m,等待,然后再次锁定m
    m.Unlock()
}

你可以在这里查看代码:https://play.golang.org/p/O1r8v8yW6h

英文:

You need to make sure that c.Broadcast is called after your call to c.Wait. The correct version of your program would be:

package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
)

func main() {
	m := &amp;sync.Mutex{}
	c := sync.NewCond(m)
	m.Lock()
	go func() {
		m.Lock() // Wait for c.Wait()
		c.Broadcast()
		m.Unlock()
	}()
	c.Wait() // Unlocks m, waits, then locks m again
    m.Unlock()
}

https://play.golang.org/p/O1r8v8yW6h

答案3

得分: 3

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	m := sync.Mutex{}
	m.Lock() // 主 goroutine 拥有锁
	c := sync.NewCond(&m)
	go func() {
		m.Lock() // 获取锁
		defer m.Unlock()
		fmt.Println("3. goroutine 拥有锁")
		time.Sleep(2 * time.Second) // 长时间计算 - 因为你是拥有者,可以改变状态变量
		c.Broadcast()               // 状态已改变,通知等待的 goroutine
		fmt.Println("4. goroutine 即将释放锁(延迟解锁)")
	}()
	fmt.Println("1. 主 goroutine 拥有锁")
	time.Sleep(1 * time.Second) // 初始化
	fmt.Println("2. 主 goroutine 仍然拥有锁")
	c.Wait() // Wait 会在等待期间临时释放互斥锁,并给其他 goroutine 改变状态的机会
	// 因为你不知道是否等待的是你期望的状态,通常会在循环中调用
	m.Unlock()
	fmt.Println("完成")
}

点击此处查看代码。

英文:
package main

import (
	&quot;fmt&quot;
	&quot;sync&quot;
	&quot;time&quot;
)

func main() {
	m := sync.Mutex{}
	m.Lock() // main gouroutine is owner of lock
	c := sync.NewCond(&amp;m)
	go func() {
		m.Lock() // obtain a lock
		defer m.Unlock()
		fmt.Println(&quot;3. goroutine is owner of lock&quot;)
		time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
		c.Broadcast()               // State has been changed, publish it to waiting goroutines
		fmt.Println(&quot;4. goroutine will release lock soon (deffered Unlock&quot;)
	}()
	fmt.Println(&quot;1. main goroutine is owner of lock&quot;)
	time.Sleep(1 * time.Second) // initialization
	fmt.Println(&quot;2. main goroutine is still lockek&quot;)
	c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
	// Because you don&#39;t know, whether this is state, that you are waiting for, is usually called in loop.
	m.Unlock()
	fmt.Println(&quot;Done&quot;)
}

http://play.golang.org/p/fBBwoL7_pm

答案4

得分: 1

看起来你在等待一个广播,但是根据你的时间间隔,这个广播永远不会发生。

使用以下代码片段:

time.Sleep(3 * time.Second) // 在等待之后进行广播
c.Broadcast()

你的代码片段似乎可以工作,你是不是有什么特定的目标?

英文:

Looks like you c.Wait for Broadcast which would never happens with your time intervals.
With

time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()

your snippet seems to work http://play.golang.org/p/OE8aP4i6gY .Or am I missing something that you try to achive?

答案5

得分: 1

我终于找到了一种方法来实现这个,而且完全不涉及sync.Cond,只需要使用互斥锁。

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...做一些操作...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

这个方法是如何工作的呢?

在任务开始时,互斥锁被锁定,确保调用WaitFor()的任何操作都会被阻塞。一旦 headers 可用并且 goroutine 解锁了互斥锁,每次调用WaitFor()都会依次执行。所有未来的调用(即使在 goroutine 结束后)都不会有问题地锁定互斥锁,因为它始终会保持解锁状态。

英文:

I finally discovered a way to do this and it doesn't involve sync.Cond at all - just the mutex.

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &amp;Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

How does this work?

The mutex is locked at the beginning of the task, ensuring that anything calling WaitFor() will block. Once the headers are available and the mutex unlocked by the goroutine, each call to WaitFor() will execute one at a time. All future calls (even after the goroutine ends) will have no problem locking the mutex, since it will always be left unlocked.

答案6

得分: 1

是的,你可以使用一个通道将 Header 传递给多个 Go 协程。

headerChan := make(chan http.Header)

go func() { // 这个协程可以被多次启动
    header := <-headerChan // 等待接收 header
    // 对 header 进行处理
}()

// 将 header 提供给所有等待的协程
for more := true; more; {
    select {
    case headerChan <- r.Header:
    default:
        more = false
    }
}

以上是代码的翻译部分。

英文:

Yes you can use one channel to pass Header to multiple Go routines.

headerChan := make(chan http.Header)

go func() { // This routine can be started many times
	header := &lt;-headerChan	// Wait for header
	// Do things with the header
}()

// Feed the header to all waiting go routines
for more := true; more; {
	select {
	case headerChan &lt;- r.Header:
	default: more = false
	}
}

答案7

得分: 1

这可以通过使用通道来轻松实现,并且代码将会很清晰。以下是示例代码。希望对你有帮助!

package main

import (
	"fmt"
	"net/http"
	"sync"
)

func main() {
	done := make(chan struct{})
	var wg sync.WaitGroup
	// fork required number of goroutines
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			<-done
			fmt.Println("从这里读取HTTP头部")
		}()
	}
	time.Sleep(1) // 在这里下载你的大文件
	fmt.Println("解除阻塞的goroutines...")
	close(done) // 这将解除所有goroutines的阻塞
	wg.Wait()
}
英文:

This can be done with channels pretty easily and the code will be clean. Below is the example. Hope this helps!

package main

import (
	&quot;fmt&quot;
	&quot;net/http&quot;
	&quot;sync&quot;
)

func main() {
	done := make(chan struct{})
	var wg sync.WaitGroup
	// fork required number of goroutines
	for i := 0; i &lt; 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			&lt;-done
			fmt.Println(&quot;read the http headers from here&quot;)
		}()
	}
	time.Sleep(1) //download your large file here
	fmt.Println(&quot;Unblocking goroutines...&quot;)
	close(done) // this will unblock all the goroutines
	wg.Wait()
}

答案8

得分: 0

你的代码中的问题是,你的信号只被发送了一次,而接收的go例程还没有准备好,所以信号被错过了。你应该在一个循环中进行广播。

package main

import (
	"sync"
	"time"
)

func main() {
	m := sync.Mutex{}
	c := sync.NewCond(&m)
	go func() {
		time.Sleep(1 * time.Second)
		for range time.Tick(time.Millisecond) {
			c.Broadcast()
		}
	}()
	m.Lock()
	time.Sleep(2 * time.Second)
	c.Wait()
	m.Unlock()

	//do stuff
}
英文:

The problem in your code was that your signal was emitted once and a receiving go routine was not ready for it so the signal was missed. You should do broadcasting in a loop.

   package main
    
    import (
    	&quot;sync&quot;
    	&quot;time&quot;
    )
    
    func main() {
    	m := sync.Mutex{}
    	c := sync.NewCond(&amp;m)
    	go func() {
    		time.Sleep(1 * time.Second)
    		for range time.Tick(time.Millisecond) {
    			c.Broadcast()
    		}
    	}()
    	m.Lock()
    	time.Sleep(2 * time.Second)
    	c.Wait()
    	m.Unlock()
    
    	//do stuff
    }

答案9

得分: -1

在优秀的书籍《Go并发编程》中,他们提供了以下简单的解决方案,利用了关闭的通道会释放所有等待的客户端的特性。

package main
import (
    "fmt"
    "time"
)
func main() {
    httpHeaders := []string{}
    headerChan := make(chan interface{})
    var consumerFunc = func(id int, stream <-chan interface{}, funcHeaders *[]string) {
        <-stream
        fmt.Println("Consumer ", id, " got headers:", funcHeaders)
    }
    for i := 0; i < 3; i++ {
        go consumerFunc(i, headerChan, &httpHeaders)
    }
    fmt.Println("Getting headers...")
    time.Sleep(2 * time.Second)
    httpHeaders = append(httpHeaders, "test1")
    fmt.Println("Publishing headers...")
    close(headerChan)
    time.Sleep(5 * time.Second)
}

https://play.golang.org/p/cE3SiKWNRIt

英文:

In the excellent book "Concurrency in Go" they provide the following easy solution while leveraging the fact that a channel that is closed will release all waiting clients.

package main
import (
    &quot;fmt&quot;
	&quot;time&quot;
)
func main() {
	httpHeaders := []string{}
	headerChan := make(chan interface{})
	var consumerFunc= func(id int, stream &lt;-chan interface{}, funcHeaders *[]string)         
    {
		&lt;-stream
		fmt.Println(&quot;Consumer &quot;,id,&quot; got headers:&quot;, funcHeaders )	
	}
	for i:=0;i&lt;3;i++ {
		go consumerFunc(i, headerChan, &amp;httpHeaders)
	}
	fmt.Println(&quot;Getting headers...&quot;)
	time.Sleep(2*time.Second)
	httpHeaders=append(httpHeaders, &quot;test1&quot;);
	fmt.Println(&quot;Publishing headers...&quot;)
	close(headerChan )
	time.Sleep(5*time.Second)
}

https://play.golang.org/p/cE3SiKWNRIt

huangapple
  • 本文由 发表于 2016年4月26日 14:43:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/36857167.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定