多个生产者/多个消费者的并发性

huangapple go评论68阅读模式
英文:

Concurrency with multiple producers/multiple consumers

问题

我可能遗漏了一些东西,或者对Go语言处理并发的方式(或者对并发本身的了解)有所误解,我设计了一小段代码来理解多个生产者/消费者的情况。

这是代码:

package main

import (
    "fmt"
    "time"
    "sync"
)

var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64

func makeTimestamp() int64 {
    return time.Now().UnixNano() / int64(time.Millisecond)
}

func generateStuff(genId int) {
    var crap uint64
    for {
        crap = <-requestChan
        seq = seq+1
        fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())
        generatorChan <- uint64(seq)
    }
}

func concurrentPrint(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i < 5; i++ {
        requestChan<-uint64(id)
        fmt.Println("Conc", id, ":", <-generatorChan)
    }
}

func main() {
    generatorChan = make(chan uint64)
    requestChan = make(chan uint64)
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        go generateStuff(i)
    }
    maximumWorker := 200
    wg.Add(maximumWorker)
    for i := 0; i < maximumWorker; i++ {
        go concurrentPrint(i, &wg)
    }
    wg.Wait()
}

运行时,它会打印(大部分按顺序)从1到1000的所有数字(每个消费者获取一个数字5次)。
我本来期望某些消费者会打印相同的数字,但似乎requestChan像一个屏障一样起作用,即使有20个goroutine在为递增一个全局变量的generateStuff服务,也能防止这种情况发生。

我对Go语言或并发有什么误解吗?

我本来期望的情况是,两个类型为generateStuff的goroutine会同时被唤醒并同时增加seq,从而导致两个消费者打印相同的数字两次。

编辑 在playgolang上的代码:http://play.golang.org/p/eRzNXjdxtZ

英文:

I'm probably missing something, or not understanding something in how Go handles concurrency (or in my knowledge of concurrency itself), i've devised a little bit of code to understand a multiple producer/consumer.

This is the code:

package main

import (
    &quot;fmt&quot;
    &quot;time&quot;
    // &quot;math/rand&quot;
    &quot;sync&quot;
)

var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64

func makeTimestamp() int64 {
    return time.Now().UnixNano() / int64(time.Millisecond)
}

func generateStuff(genId int) {
    var crap uint64
    for {
        crap = &lt;-requestChan
        // &lt;- requestChan
        seq = seq+1
        fmt.Println(&quot;Gen &quot;, genId, &quot; - From : &quot;, crap, &quot; @&quot;, makeTimestamp())
        generatorChan &lt;- uint64(seq)
    }
}

func concurrentPrint(id int, work *sync.WaitGroup) {
    defer work.Done()

    for i := 0; i &lt; 5; i++ {
        requestChan&lt;-uint64(id)
        fmt.Println(&quot;Conc&quot;, id, &quot;: &quot;, &lt;-generatorChan)
    }
}

func main() {
    generatorChan = make(chan uint64)
    requestChan = make(chan uint64)
    var wg sync.WaitGroup
    for i := 0; i &lt; 20; i++ {
        go generateStuff(i)
    }
    maximumWorker := 200
    wg.Add(maximumWorker)
    for i := 0; i &lt; maximumWorker; i++ {
        go concurrentPrint(i, &amp;wg)
    }
    wg.Wait()
}

When run it prints (mostly in order) all the numbers from 1 to 1000 (200 consumers getting a number 5 times each).
I would've expected that some consumer would print the exact same number but it seems that the requestChan is working like a barrier preventing this even if there are 20 goroutines serving the generateStuff that generate the number by increasing a global variable.

What am i getting wrong about Go or Concurrency in general?

I would've expected a situation in like two go routines of type generateStuff would've been woke up together and increase seq at the same time thus having something like two consumers printing the same number two times.

EDIT Code on playgolang: http://play.golang.org/p/eRzNXjdxtZ

答案1

得分: 3

你有多个工作线程,它们可以同时运行并尝试同时发出请求。由于requestChan是无缓冲的,它们都会阻塞等待读取器进行同步并接收它们的请求。

你有多个生成器,它们通过requestChan与请求者进行同步,生成结果,然后在无缓冲的generatorChan上阻塞,直到工作线程读取结果。请注意,可能是不同的工作线程读取结果。

除此之外,没有其他的同步机制,所以其他所有的操作都是不确定的。

  • 一个生成器可能处理所有的请求。
  • 一个生成器可能在其他生成器有机会运行之前抢占一个请求并递增seq
  • 所有的生成器可能同时抢占请求,并且都想在完全相同的时间递增seq,从而引发各种问题。
  • 工作线程可能从它们发送请求的生成器或完全不同的生成器获取响应。

一般来说,如果不添加同步机制来强制其中一种行为,你无法确保任何一种行为实际发生。

请注意,在数据竞争的情况下,数据竞争本身也是另一个不确定的事件。可能会得到任意的值、程序崩溃等。在竞争条件下,不能安全地假设值可能只是偏差了一个或其他相对无害的结果。

对于实验,你可以通过增加GOMAXPROCS来改变情况。可以通过环境变量(例如env GOMAXPROCS=16 go run foo.goenv GOMAXPROCS=16 ./foo)或在程序中调用runtime.GOMAXPROCS(16)来实现。默认值为1,这意味着数据竞争或其他“奇怪”的行为可能被隐藏起来。

你还可以通过在各个位置添加runtime.Goschedtime.Sleep的调用来稍微影响情况。

如果使用竞争检测器(例如使用go run -race foo.gogo build -race),你还可以看到数据竞争。程序退出时应该显示“Found 1 data race(s)”的信息,并在第一次检测到数据竞争时输出大量详细信息和堆栈跟踪。

以下是经过“清理”的用于实验的代码版本:

package main

import (
	"log"
	"sync"
	"sync/atomic"
)

var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)

func generator(genID int) {
	for reqID := range requestChan {
		// If you want to see a data race:
		//seq = seq + 1
		// Else:
		s := atomic.AddUint64(&seq, 1)
		log.Printf("Gen: %2d, from %3d", genID, reqID)
		generatorChan <- s
	}
}

func worker(id int, work *sync.WaitGroup) {
	defer work.Done()

	for i := 0; i < 5; i++ {
		requestChan <- uint64(id)
		log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)
	}
}

func main() {
	log.SetFlags(log.Lmicroseconds)
	const (
		numGen    = 20
		numWorker = 200
	)
	var wg sync.WaitGroup
	for i := 0; i < numGen; i++ {
		go generator(i)
	}
	wg.Add(numWorker)
	for i := 0; i < numWorker; i++ {
		go worker(i, &wg)
	}
	wg.Wait()
	close(requestChan)
}

Playground(但请注意,Playground上的时间戳可能没有用,调用runtime.MAXPROCS可能不起作用)。此外,请注意,Playground会缓存结果,因此再次运行完全相同的程序将始终显示相同的输出,你需要进行一些小的更改或在自己的计算机上运行它。

这个版本的代码进行了一些小的更改,如关闭生成器、使用log而不是fmt(因为前者提供了并发保证)、移除了数据竞争、使输出看起来更好等等。

英文:

You have multiple workers that can all run at the same time and all try and make requests at the same time. Since requestChan is unbuffered they all block waiting for a reader to synchronize with and take their request.

You have multiple generators that will synchronize with a requester via requestChan, produce a result, and then block on the unbuffered generatorChan until a worker reads the result. Note it may be a different worker.

There is no additional synchronization so everything else is non-deterministic.

  • One generator could field all the requests.
  • A generator could grab a request and get through incrementing seq
    before any other generator happens to get a chance to run. With only one processor this may even be likely.
  • All the generators could grab requests and all end up wanting to increment seq at exactly the same time causing all kinds of problems.
  • The workers can get responses from the same generator they happened to send to or from a completely different one.

In general, without adding synchronization to force one of these behaviors there is no way you can ensure any of these actually happen.

Note that with the data race, that itself is another non-deterministic event. It's possible to get arbitrary values, program crashes, etc. It's not safe to assume that under race conditions the value may just be off by one or some such relatively innocuous result.

For experimenting, the best you may be able to do is crank up GOMAXPROCS. Either through an environment variable (e.g. something like env GOMAXPROCS=16 go run foo.go or env GOMAXPROCS=16 ./foo after go build) or by calling runtime.GOMAXPROCS(16) from your program. The default is 1 and this means that data races or other "strange" behavior may be hidden.

You can also influence things a little by adding calls to runtime.Gosched or time.Sleep at various points.

You can also see the data race if you use the race detector (e.g. with go run -race foo.goo or go build -race). Not only should the program show "Found 1 data race(s)" on exit but it should also dump out a lot of details with stack traces when the race is first detected.

Here is a "cleaned up" version of your code for experimentation:

package main

import (
	&quot;log&quot;
	&quot;sync&quot;
	&quot;sync/atomic&quot;
)

var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)

func generator(genID int) {
	for reqID := range requestChan {
		// If you want to see a data race:
		//seq = seq + 1
		// Else:
		s := atomic.AddUint64(&amp;seq, 1)
		log.Printf(&quot;Gen: %2d, from %3d&quot;, genID, reqID)
		generatorChan &lt;- s
	}
}

func worker(id int, work *sync.WaitGroup) {
	defer work.Done()

	for i := 0; i &lt; 5; i++ {
		requestChan &lt;- uint64(id)
		log.Printf(&quot;\t\t\tWorker: %3d got %4d&quot;, id, &lt;-generatorChan)
	}
}

func main() {
	log.SetFlags(log.Lmicroseconds)
	const (
		numGen    = 20
		numWorker = 200
	)
	var wg sync.WaitGroup
	for i := 0; i &lt; numGen; i++ {
		go generator(i)
	}
	wg.Add(numWorker)
	for i := 0; i &lt; numWorker; i++ {
		go worker(i, &amp;wg)
	}
	wg.Wait()
	close(requestChan)
}

<kbd>Playground</kbd> (but note that the timestamps on the playground won't be useful and calling runtime.MAXPROCS may not do anything). Further note that the playground caches results so re-running the exact same program will always show the same output, you need to make some small change or just run it on your own machine.

Largely small changes like shunting down the generator, using log versus fmt since the former makes concurrency guarantees, removing the data race, making the output look nicer, etc.

答案2

得分: 0

通道类型

通道提供了一种机制,用于并发执行函数之间通过发送和接收指定元素类型的值进行通信。未初始化的通道的值为nil。

可以使用内置函数make创建一个新的初始化通道值,该函数接受通道类型和可选容量作为参数:

make(chan int, 100)

容量(以元素数量表示)设置通道中缓冲区的大小。如果容量为零或不存在,则通道是无缓冲的,只有在发送方和接收方都准备好时通信才会成功。否则,通道是有缓冲的,如果缓冲区不满(发送)或不为空(接收),通信将成功而不会阻塞。nil通道永远不会准备好进行通信。

通过使用无缓冲通道,您可以限制通道通信的速度。

例如,

generatorChan = make(chan uint64)
requestChan = make(chan uint64)
英文:

> Channel types
>
> A channel provides a mechanism for concurrently executing functions to
> communicate by sending and receiving values of a specified element
> type. The value of an uninitialized channel is nil.
>
> A new, initialized channel value can be made using the built-in
> function make, which takes the channel type and an optional capacity
> as arguments:
>
> make(chan int, 100)
>
> The capacity, in number of elements, sets the size of the buffer in
> the channel. If the capacity is zero or absent, the channel is
> unbuffered and communication succeeds only when both a sender and
> receiver are ready. Otherwise, the channel is buffered and
> communication succeeds without blocking if the buffer is not full
> (sends) or not empty (receives). A nil channel is never ready for
> communication.

You are throttling channel communications by using unbuffered channels.

For example,

generatorChan = make(chan uint64)
requestChan = make(chan uint64)

huangapple
  • 本文由 发表于 2015年4月5日 03:55:53
  • 转载请务必保留本文链接:https://go.coder-hub.com/29450820.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定