英文:
Concurrency with multiple producers/multiple consumers
问题
我可能遗漏了一些东西,或者对Go语言处理并发的方式(或者对并发本身的了解)有所误解,我设计了一小段代码来理解多个生产者/消费者的情况。
这是代码:
package main
import (
"fmt"
"time"
"sync"
)
var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64
func makeTimestamp() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
func generateStuff(genId int) {
var crap uint64
for {
crap = <-requestChan
seq = seq+1
fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())
generatorChan <- uint64(seq)
}
}
func concurrentPrint(id int, work *sync.WaitGroup) {
defer work.Done()
for i := 0; i < 5; i++ {
requestChan<-uint64(id)
fmt.Println("Conc", id, ":", <-generatorChan)
}
}
func main() {
generatorChan = make(chan uint64)
requestChan = make(chan uint64)
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
go generateStuff(i)
}
maximumWorker := 200
wg.Add(maximumWorker)
for i := 0; i < maximumWorker; i++ {
go concurrentPrint(i, &wg)
}
wg.Wait()
}
运行时,它会打印(大部分按顺序)从1到1000的所有数字(每个消费者获取一个数字5次)。
我本来期望某些消费者会打印相同的数字,但似乎requestChan像一个屏障一样起作用,即使有20个goroutine在为递增一个全局变量的generateStuff服务,也能防止这种情况发生。
我对Go语言或并发有什么误解吗?
我本来期望的情况是,两个类型为generateStuff的goroutine会同时被唤醒并同时增加seq,从而导致两个消费者打印相同的数字两次。
编辑 在playgolang上的代码:http://play.golang.org/p/eRzNXjdxtZ
英文:
I'm probably missing something, or not understanding something in how Go handles concurrency (or in my knowledge of concurrency itself), i've devised a little bit of code to understand a multiple producer/consumer.
This is the code:
package main
import (
"fmt"
"time"
// "math/rand"
"sync"
)
var seq uint64 = 0
var generatorChan chan uint64
var requestChan chan uint64
func makeTimestamp() int64 {
return time.Now().UnixNano() / int64(time.Millisecond)
}
func generateStuff(genId int) {
var crap uint64
for {
crap = <-requestChan
// <- requestChan
seq = seq+1
fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp())
generatorChan <- uint64(seq)
}
}
func concurrentPrint(id int, work *sync.WaitGroup) {
defer work.Done()
for i := 0; i < 5; i++ {
requestChan<-uint64(id)
fmt.Println("Conc", id, ": ", <-generatorChan)
}
}
func main() {
generatorChan = make(chan uint64)
requestChan = make(chan uint64)
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
go generateStuff(i)
}
maximumWorker := 200
wg.Add(maximumWorker)
for i := 0; i < maximumWorker; i++ {
go concurrentPrint(i, &wg)
}
wg.Wait()
}
When run it prints (mostly in order) all the numbers from 1 to 1000 (200 consumers getting a number 5 times each).
I would've expected that some consumer would print the exact same number but it seems that the requestChan is working like a barrier preventing this even if there are 20 goroutines serving the generateStuff that generate the number by increasing a global variable.
What am i getting wrong about Go or Concurrency in general?
I would've expected a situation in like two go routines of type generateStuff would've been woke up together and increase seq at the same time thus having something like two consumers printing the same number two times.
EDIT Code on playgolang: http://play.golang.org/p/eRzNXjdxtZ
答案1
得分: 3
你有多个工作线程,它们可以同时运行并尝试同时发出请求。由于requestChan
是无缓冲的,它们都会阻塞等待读取器进行同步并接收它们的请求。
你有多个生成器,它们通过requestChan
与请求者进行同步,生成结果,然后在无缓冲的generatorChan
上阻塞,直到工作线程读取结果。请注意,可能是不同的工作线程读取结果。
除此之外,没有其他的同步机制,所以其他所有的操作都是不确定的。
- 一个生成器可能处理所有的请求。
- 一个生成器可能在其他生成器有机会运行之前抢占一个请求并递增
seq
。 - 所有的生成器可能同时抢占请求,并且都想在完全相同的时间递增
seq
,从而引发各种问题。 - 工作线程可能从它们发送请求的生成器或完全不同的生成器获取响应。
一般来说,如果不添加同步机制来强制其中一种行为,你无法确保任何一种行为实际发生。
请注意,在数据竞争的情况下,数据竞争本身也是另一个不确定的事件。可能会得到任意的值、程序崩溃等。在竞争条件下,不能安全地假设值可能只是偏差了一个或其他相对无害的结果。
对于实验,你可以通过增加GOMAXPROCS
来改变情况。可以通过环境变量(例如env GOMAXPROCS=16 go run foo.go
或env GOMAXPROCS=16 ./foo
)或在程序中调用runtime.GOMAXPROCS(16)
来实现。默认值为1,这意味着数据竞争或其他“奇怪”的行为可能被隐藏起来。
你还可以通过在各个位置添加runtime.Gosched
或time.Sleep
的调用来稍微影响情况。
如果使用竞争检测器(例如使用go run -race foo.go
或go build -race
),你还可以看到数据竞争。程序退出时应该显示“Found 1 data race(s)”的信息,并在第一次检测到数据竞争时输出大量详细信息和堆栈跟踪。
以下是经过“清理”的用于实验的代码版本:
package main
import (
"log"
"sync"
"sync/atomic"
)
var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)
func generator(genID int) {
for reqID := range requestChan {
// If you want to see a data race:
//seq = seq + 1
// Else:
s := atomic.AddUint64(&seq, 1)
log.Printf("Gen: %2d, from %3d", genID, reqID)
generatorChan <- s
}
}
func worker(id int, work *sync.WaitGroup) {
defer work.Done()
for i := 0; i < 5; i++ {
requestChan <- uint64(id)
log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)
}
}
func main() {
log.SetFlags(log.Lmicroseconds)
const (
numGen = 20
numWorker = 200
)
var wg sync.WaitGroup
for i := 0; i < numGen; i++ {
go generator(i)
}
wg.Add(numWorker)
for i := 0; i < numWorker; i++ {
go worker(i, &wg)
}
wg.Wait()
close(requestChan)
}
Playground(但请注意,Playground上的时间戳可能没有用,调用runtime.MAXPROCS
可能不起作用)。此外,请注意,Playground会缓存结果,因此再次运行完全相同的程序将始终显示相同的输出,你需要进行一些小的更改或在自己的计算机上运行它。
这个版本的代码进行了一些小的更改,如关闭生成器、使用log
而不是fmt
(因为前者提供了并发保证)、移除了数据竞争、使输出看起来更好等等。
英文:
You have multiple workers that can all run at the same time and all try and make requests at the same time. Since requestChan
is unbuffered they all block waiting for a reader to synchronize with and take their request.
You have multiple generators that will synchronize with a requester via requestChan
, produce a result, and then block on the unbuffered generatorChan
until a worker reads the result. Note it may be a different worker.
There is no additional synchronization so everything else is non-deterministic.
- One generator could field all the requests.
- A generator could grab a request and get through incrementing
seq
before any other generator happens to get a chance to run. With only one processor this may even be likely. - All the generators could grab requests and all end up wanting to increment
seq
at exactly the same time causing all kinds of problems. - The workers can get responses from the same generator they happened to send to or from a completely different one.
In general, without adding synchronization to force one of these behaviors there is no way you can ensure any of these actually happen.
Note that with the data race, that itself is another non-deterministic event. It's possible to get arbitrary values, program crashes, etc. It's not safe to assume that under race conditions the value may just be off by one or some such relatively innocuous result.
For experimenting, the best you may be able to do is crank up GOMAXPROCS
. Either through an environment variable (e.g. something like env GOMAXPROCS=16 go run foo.go
or env GOMAXPROCS=16 ./foo
after go build
) or by calling runtime.GOMAXPROCS(16)
from your program. The default is 1 and this means that data races or other "strange" behavior may be hidden.
You can also influence things a little by adding calls to runtime.Gosched
or time.Sleep
at various points.
You can also see the data race if you use the race detector (e.g. with go run -race foo.goo
or go build -race
). Not only should the program show "Found 1 data race(s)" on exit but it should also dump out a lot of details with stack traces when the race is first detected.
Here is a "cleaned up" version of your code for experimentation:
package main
import (
"log"
"sync"
"sync/atomic"
)
var seq uint64 = 0
var generatorChan = make(chan uint64)
var requestChan = make(chan uint64)
func generator(genID int) {
for reqID := range requestChan {
// If you want to see a data race:
//seq = seq + 1
// Else:
s := atomic.AddUint64(&seq, 1)
log.Printf("Gen: %2d, from %3d", genID, reqID)
generatorChan <- s
}
}
func worker(id int, work *sync.WaitGroup) {
defer work.Done()
for i := 0; i < 5; i++ {
requestChan <- uint64(id)
log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan)
}
}
func main() {
log.SetFlags(log.Lmicroseconds)
const (
numGen = 20
numWorker = 200
)
var wg sync.WaitGroup
for i := 0; i < numGen; i++ {
go generator(i)
}
wg.Add(numWorker)
for i := 0; i < numWorker; i++ {
go worker(i, &wg)
}
wg.Wait()
close(requestChan)
}
<kbd>Playground</kbd> (but note that the timestamps on the playground won't be useful and calling runtime.MAXPROCS
may not do anything). Further note that the playground caches results so re-running the exact same program will always show the same output, you need to make some small change or just run it on your own machine.
Largely small changes like shunting down the generator, using log
versus fmt
since the former makes concurrency guarantees, removing the data race, making the output look nicer, etc.
答案2
得分: 0
通道类型
通道提供了一种机制,用于并发执行函数之间通过发送和接收指定元素类型的值进行通信。未初始化的通道的值为nil。
可以使用内置函数make创建一个新的初始化通道值,该函数接受通道类型和可选容量作为参数:
make(chan int, 100)
容量(以元素数量表示)设置通道中缓冲区的大小。如果容量为零或不存在,则通道是无缓冲的,只有在发送方和接收方都准备好时通信才会成功。否则,通道是有缓冲的,如果缓冲区不满(发送)或不为空(接收),通信将成功而不会阻塞。nil通道永远不会准备好进行通信。
通过使用无缓冲通道,您可以限制通道通信的速度。
例如,
generatorChan = make(chan uint64)
requestChan = make(chan uint64)
英文:
> Channel types
>
> A channel provides a mechanism for concurrently executing functions to
> communicate by sending and receiving values of a specified element
> type. The value of an uninitialized channel is nil.
>
> A new, initialized channel value can be made using the built-in
> function make, which takes the channel type and an optional capacity
> as arguments:
>
> make(chan int, 100)
>
> The capacity, in number of elements, sets the size of the buffer in
> the channel. If the capacity is zero or absent, the channel is
> unbuffered and communication succeeds only when both a sender and
> receiver are ready. Otherwise, the channel is buffered and
> communication succeeds without blocking if the buffer is not full
> (sends) or not empty (receives). A nil channel is never ready for
> communication.
You are throttling channel communications by using unbuffered channels.
For example,
generatorChan = make(chan uint64)
requestChan = make(chan uint64)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论