
huangapple go评论68阅读模式

Concurrency with multiple producers/multiple consumers




package main

import (

var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64

func makeTimestamp() int64 {
    return time.Now().UnixNano() / int64(time.Millisecond)

func generateStuff(genId int) {
    var crap uint64
    for {
        crap = <-requestChan
        seq = seq+1
        fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())
        generatorChan <- uint64(seq)

func concurrentPrint(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i < 5; i++ {
        fmt.Println("Conc", id, ":", <-generatorChan)

func main() {
    generatorChan = make(chan uint64)
    requestChan = make(chan uint64)
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        go generateStuff(i)
    maximumWorker := 200
    for i := 0; i < maximumWorker; i++ {
        go concurrentPrint(i, &wg)




编辑 在playgolang上的代码:http://play.golang.org/p/eRzNXjdxtZ


I'm probably missing something, or not understanding something in how Go handles concurrency (or in my knowledge of concurrency itself), i've devised a little bit of code to understand a multiple producer/consumer.

This is the code:

package main

import (
    // &quot;math/rand&quot;

var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64

func makeTimestamp() int64 {
    return time.Now().UnixNano() / int64(time.Millisecond)

func generateStuff(genId int) {
    var crap uint64
    for {
        crap = &lt;-requestChan
        // &lt;- requestChan
        seq = seq+1
        fmt.Println(&quot;Gen &quot;, genId, &quot; - From : &quot;, crap, &quot; @&quot;, makeTimestamp())
        generatorChan &lt;- uint64(seq)

func concurrentPrint(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i &lt; 5; i++ {
        fmt.Println(&quot;Conc&quot;, id, &quot;: &quot;, &lt;-generatorChan)

func main() {
    generatorChan = make(chan uint64)
    requestChan = make(chan uint64)
    var wg sync.WaitGroup
    for i := 0; i &lt; 20; i++ {
        go generateStuff(i)
    maximumWorker := 200
    for i := 0; i &lt; maximumWorker; i++ {
        go concurrentPrint(i, &amp;wg)

When run it prints (mostly in order) all the numbers from 1 to 1000 (200 consumers getting a number 5 times each).
I would've expected that some consumer would print the exact same number but it seems that the requestChan is working like a barrier preventing this even if there are 20 goroutines serving the generateStuff that generate the number by increasing a global variable.

What am i getting wrong about Go or Concurrency in general?

I would've expected a situation in like two go routines of type generateStuff would've been woke up together and increase seq at the same time thus having something like two consumers printing the same number two times.

EDIT Code on playgolang: http://play.golang.org/p/eRzNXjdxtZ


得分: 3




  • 一个生成器可能处理所有的请求。
  • 一个生成器可能在其他生成器有机会运行之前抢占一个请求并递增seq
  • 所有的生成器可能同时抢占请求,并且都想在完全相同的时间递增seq,从而引发各种问题。
  • 工作线程可能从它们发送请求的生成器或完全不同的生成器获取响应。



对于实验,你可以通过增加GOMAXPROCS来改变情况。可以通过环境变量(例如env GOMAXPROCS=16 go run foo.goenv GOMAXPROCS=16 ./foo)或在程序中调用runtime.GOMAXPROCS(16)来实现。默认值为1,这意味着数据竞争或其他“奇怪”的行为可能被隐藏起来。


如果使用竞争检测器(例如使用go run -race foo.gogo build -race),你还可以看到数据竞争。程序退出时应该显示“Found 1 data race(s)”的信息,并在第一次检测到数据竞争时输出大量详细信息和堆栈跟踪。


package main

import (

var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)

func generator(genID int) {
	for reqID := range requestChan {
		// If you want to see a data race:
		//seq = seq + 1
		// Else:
		s := atomic.AddUint64(&seq, 1)
		log.Printf("Gen: %2d, from %3d", genID, reqID)
		generatorChan <- s

func worker(id int, work *sync.WaitGroup) {
	defer work.Done()

	for i := 0; i < 5; i++ {
		requestChan <- uint64(id)
		log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)

func main() {
	const (
		numGen    = 20
		numWorker = 200
	var wg sync.WaitGroup
	for i := 0; i < numGen; i++ {
		go generator(i)
	for i := 0; i < numWorker; i++ {
		go worker(i, &wg)




You have multiple workers that can all run at the same time and all try and make requests at the same time. Since requestChan is unbuffered they all block waiting for a reader to synchronize with and take their request.

You have multiple generators that will synchronize with a requester via requestChan, produce a result, and then block on the unbuffered generatorChan until a worker reads the result. Note it may be a different worker.

There is no additional synchronization so everything else is non-deterministic.

  • One generator could field all the requests.
  • A generator could grab a request and get through incrementing seq
    before any other generator happens to get a chance to run. With only one processor this may even be likely.
  • All the generators could grab requests and all end up wanting to increment seq at exactly the same time causing all kinds of problems.
  • The workers can get responses from the same generator they happened to send to or from a completely different one.

In general, without adding synchronization to force one of these behaviors there is no way you can ensure any of these actually happen.

Note that with the data race, that itself is another non-deterministic event. It's possible to get arbitrary values, program crashes, etc. It's not safe to assume that under race conditions the value may just be off by one or some such relatively innocuous result.

For experimenting, the best you may be able to do is crank up GOMAXPROCS. Either through an environment variable (e.g. something like env GOMAXPROCS=16 go run foo.go or env GOMAXPROCS=16 ./foo after go build) or by calling runtime.GOMAXPROCS(16) from your program. The default is 1 and this means that data races or other "strange" behavior may be hidden.

You can also influence things a little by adding calls to runtime.Gosched or time.Sleep at various points.

You can also see the data race if you use the race detector (e.g. with go run -race foo.goo or go build -race). Not only should the program show "Found 1 data race(s)" on exit but it should also dump out a lot of details with stack traces when the race is first detected.

Here is a "cleaned up" version of your code for experimentation:

package main

import (

var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)

func generator(genID int) {
	for reqID := range requestChan {
		// If you want to see a data race:
		//seq = seq + 1
		// Else:
		s := atomic.AddUint64(&amp;seq, 1)
		log.Printf(&quot;Gen: %2d, from %3d&quot;, genID, reqID)
		generatorChan &lt;- s

func worker(id int, work *sync.WaitGroup) {
	defer work.Done()

	for i := 0; i &lt; 5; i++ {
		requestChan &lt;- uint64(id)
		log.Printf(&quot;\t\t\tWorker: %3d got %4d&quot;, id, &lt;-generatorChan)

func main() {
	const (
		numGen    = 20
		numWorker = 200
	var wg sync.WaitGroup
	for i := 0; i &lt; numGen; i++ {
		go generator(i)
	for i := 0; i &lt; numWorker; i++ {
		go worker(i, &amp;wg)

<kbd>Playground</kbd> (but note that the timestamps on the playground won't be useful and calling runtime.MAXPROCS may not do anything). Further note that the playground caches results so re-running the exact same program will always show the same output, you need to make some small change or just run it on your own machine.

Largely small changes like shunting down the generator, using log versus fmt since the former makes concurrency guarantees, removing the data race, making the output look nicer, etc.


得分: 0




make(chan int, 100)




generatorChan = make(chan uint64)
requestChan = make(chan uint64)

> Channel types
> A channel provides a mechanism for concurrently executing functions to
> communicate by sending and receiving values of a specified element
> type. The value of an uninitialized channel is nil.
> A new, initialized channel value can be made using the built-in
> function make, which takes the channel type and an optional capacity
> as arguments:
> make(chan int, 100)
> The capacity, in number of elements, sets the size of the buffer in
> the channel. If the capacity is zero or absent, the channel is
> unbuffered and communication succeeds only when both a sender and
> receiver are ready. Otherwise, the channel is buffered and
> communication succeeds without blocking if the buffer is not full
> (sends) or not empty (receives). A nil channel is never ready for
> communication.

You are throttling channel communications by using unbuffered channels.

For example,

generatorChan = make(chan uint64)
requestChan = make(chan uint64)

  • 本文由 发表于 2015年4月5日 03:55:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/29450820.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
