英文:
Do go channels preserve order when blocked?
问题
我有一组通道,它们都接收相同的消息:
func broadcast(c <-chan string, chans []chan<- string) {
for msg := range c {
for _, ch := range chans {
ch <- msg
}
}
}
然而,由于chans
中的每个通道可能以不同的速率被读取,当我遇到一个慢的消费者时,我不想阻塞其他通道。我通过使用goroutine解决了这个问题:
func broadcast(c <-chan string, chans []chan<- string) {
for msg := range c {
for _, ch := range chans {
go func() { ch <- msg }()
}
}
}
然而,将消息传递给每个通道的顺序很重要。我查看了规范,看看通道在被阻塞时是否保留顺序,但我只找到了这个:
如果容量大于零,则通道是异步的:如果缓冲区不满(发送)或不为空(接收),通信操作将成功而不阻塞,并且元素按发送的顺序接收。
对我来说,如果写入被阻塞,那么它并没有被“发送”,而是等待被发送。基于这个假设,上述内容并没有说明在多个goroutine在写入时被阻塞时发送的顺序。
在通道解除阻塞后,是否有任何关于发送顺序的保证?
英文:
I have a slice of channels that all receive the same message:
func broadcast(c <-chan string, chans []chan<- string) {
for msg := range c {
for _, ch := range chans {
ch <- msg
}
}
}
However, since each of the channels in chans
are potentially being read at a different rate, I don't want to block the other channels when I get a slow consumer. I've solved this with goroutines:
func broadcast(c <-chan string, chans []chan<- string) {
for msg := range c {
for _, ch := range chans {
go func() { ch <- msg }()
}
}
}
However, the order of the messages that get passed to each channel is important. I looked to the spec to see if channels preserve order when blocked, and all I found was this:
> If the capacity is greater than zero, the channel is asynchronous: communication operations succeed without blocking if the buffer is not full (sends) or not empty (receives), and elements are received in the order they are sent.
To me, if a write is blocked, then it is not "sent", but waiting to be sent. With that assumption, the above says nothing about order of sending when multiple goroutines are blocked on writing.
Are there any guarantees about the order of sends after a channel becomes unblocked?
答案1
得分: 9
不,没有任何保证。
即使通道没有满,如果两个 goroutine 在大约相同的时间开始向其发送消息,我认为没有任何保证先启动的 goroutine 实际上会先执行。因此,您不能指望消息按顺序到达。
英文:
No, there are no guarantees.
Even when the channel is not full, if two goroutines are started at about the same time to send to it, I don't think there is any guarantee that the goroutine that was started first would actually execute first. So you can't count on the messages arriving in order.
答案2
得分: 4
如果通道已满,您可以丢弃消息(然后设置一个标志来暂停客户端并向他们发送一条消息,告诉他们正在丢弃消息或其他内容)。
类似以下代码(未经测试):
type Client struct {
Name string
ch chan<- string
}
func broadcast(c <-chan string, chans []*Client) {
for msg := range c {
for _, ch := range chans {
select {
case ch.ch <- msg:
// 一切正常
default:
log.Printf("通道已满,将消息 '%s' 发送给客户端 %s", msg, ch.Name)
}
}
}
}
英文:
You can drop the message if the channel is full (and then set a flag to pause the client and send them a message that they're dropping messages or whatever).
Something along the lines of (untested):
type Client struct {
Name string
ch chan<-string
}
func broadcast(c <-chan string, chans []*Client) {
for msg := range c {
for _, ch := range chans {
select {
case ch.ch <- msg:
// all okay
default:
log.Printf("Channel was full sending '%s' to client %s", msg, ch.Name)
}
}
}
}
答案3
得分: 1
在这段代码中,没有任何保证。
给定示例代码的主要问题不在于通道行为,而在于创建的大量goroutine。所有的goroutine都在同一个嵌套循环中“启动”,没有进一步的同步,所以即使在它们开始发送消息之前,我们也不知道哪些会先执行。
然而,这在一般情况下引发了一个合理的问题:如果我们以某种方式保证了多个阻塞发送指令的顺序,我们能保证它们以相同的顺序接收吗?
发送的“happens-before”属性很难创建。我担心这是不可能的,因为:
- 在发送指令之前,任何事情都可能发生:例如,其他goroutine执行它们自己的发送或不发送
- 在发送中被阻塞的goroutine无法同时管理其他类型的同步
例如,如果我有10个编号为1到10的goroutine,我无法让它们以正确的顺序并发地将自己的编号发送到通道中。我能做的只是使用各种顺序技巧,比如在一个单独的goroutine中进行排序。
英文:
In this code, no guarantees.
The main problem with the given sample code lies not in the channel behavior, but rather in the numerous created goroutines. All the goroutines are "fired" inside the same imbricated loop without further synchronization, so even before they start to send messages, we simply don't know which ones will execute first.
However this rises a legitimate question in general : if we somehow garantee the order of several blocking send instructions, are we guaranteed to receive them in the same order?
The "happens-before" property of the sendings is difficult to create. I fear it is impossible because :
- Anything can happen before the sending instruction : for example, other goroutines performing their own sendings or not
- A goroutine being blocked in a sending cannot simultaneously manage other sorts of synchronization
For example, if I have 10 goroutines numbered 1 to 10, I have no way of letting them send their own number to the channel, concurrently, in the right order. All I can do is use various kinds of sequential tricks like doing the sorting in 1 single goroutine.
答案4
得分: 0
这是对已发布答案的补充。
正如几乎每个人都指出的那样,问题在于goroutine的执行顺序,你可以通过使用通道来轻松协调goroutine的执行,通过传递要运行的goroutine的编号:
func coordinated(coord chan int, num, max int, work func()) {
for {
n := <-coord
if n == num {
work()
coord <- (n+1) % max
} else {
coord <- n
}
}
}
coord := make(chan int)
go coordinated(coord, 0, 3, func() { println("0"); time.Sleep(1 * time.Second) })
go coordinated(coord, 1, 3, func() { println("1"); time.Sleep(1 * time.Second) })
go coordinated(coord, 2, 3, func() { println("2"); time.Sleep(1 * time.Second) })
coord <- 0
或者通过使用一个中央goroutine以有序的方式执行工作:
func executor(funs chan func()) {
for {
worker := <-funs
worker()
funs <- worker
}
}
funs := make(chan func(), 3)
funs <- func() { println("0"); time.Sleep(1 * time.Second) }
funs <- func() { println("1"); time.Sleep(1 * time.Second) }
funs <- func() { println("2"); time.Sleep(1 * time.Second) }
go executor(funs)
这些方法当然会消除所有的并行性,因为需要同步。然而,你的程序仍然具有并发性。
英文:
This is an addition to the already posted answers.
As practically everyone stated, that the problem is the order of execution of the goroutines,
you can easily coordinate goroutine execution using channels by passing around the number of the
goroutine you want to run:
func coordinated(coord chan int, num, max int, work func()) {
for {
n := <-coord
if n == num {
work()
coord <- (n+1) % max
} else {
coord <- n
}
}
}
coord := make(chan int)
go coordinated(coord, 0, 3, func() { println("0"); time.Sleep(1 * time.Second) })
go coordinated(coord, 1, 3, func() { println("1"); time.Sleep(1 * time.Second) })
go coordinated(coord, 2, 3, func() { println("2"); time.Sleep(1 * time.Second) })
coord <- 0
or by using a central goroutine which executes the workers in a ordered manner:
func executor(funs chan func()) {
for {
worker := <-funs
worker()
funs <- worker
}
}
funs := make(chan func(), 3)
funs <- func() { println("0"); time.Sleep(1 * time.Second) }
funs <- func() { println("1"); time.Sleep(1 * time.Second) }
funs <- func() { println("2"); time.Sleep(1 * time.Second) }
go executor(funs)
These methods will, of course, remove all parallelism due to synchronization. However,
the concurrent aspect of your program remains.
答案5
得分: 0
我认为问题标题非常有趣,当我在我的OS X机器上运行它(也在Ubuntu Jammy上运行)时,以下测试一次又一次地通过。我对此很感兴趣,因为我正在使用如这里所描述的锁。对我来说,问题标题的答案似乎是“是”,尽管我明白问题正文描述了一个情景,其中通道排序无法阻止慢消费者。
import (
"strconv"
"time"
"testing"
)
type Empty struct{}
func TestOrder(t *testing.T) {
lock := make(chan Empty, 1)
// 用于由goroutine构建的整数数组
list := &[]int{}
// 长时间的初始暂停以强制goroutine排队
initialPause, _ := time.ParseDuration("100ms")
doneCh := make(chan Empty)
// 工作函数
add := func(i int) {
lock <- Empty{}
if i == 0 { time.Sleep(initialPause) }
*list = append(*list, i)
<-lock
doneCh <- Empty{}
if i == 9 { close(doneCh) }
}
// 间隔10个goroutine
launchInterval, _ := time.ParseDuration("10ms")
for i := 0; i < 10; i++ {
t.Logf("Launching %d\n", i)
go add(i)
if i < 9 {
time.Sleep(launchInterval)
}
}
// 等待所有goroutine完成
for _ = range doneCh { }
result := ""
for _, n := range *list {
result = result + strconv.Itoa(n)
}
if result != "0123456789" { t.Fatalf("Didn't work") }
}
英文:
I think the question title is really interesting, and the following test passes again, and again, and again when I run it on my OS X machine (and also on Ubuntu Jammy). I am interested in this because I'm using locks as described here. To me it looks like the answer to the question title is "yes", although I see why the question body describes a scenario where channel ordering won't save the OP from slow consumers.
import (
"strconv"
"time"
"testing"
)
type Empty struct{}
func TestOrder(t *testing.T) {
lock := make(chan Empty, 1)
// array of ints to build by goroutines
list := &[]int{}
// long initial pause to force goroutines to queue up
initialPause, _ := time.ParseDuration("100ms")
doneCh := make(chan Empty)
// work func
add := func(i int) {
lock <- Empty{}
if i == 0 { time.Sleep(initialPause) }
*list = append(*list, i)
<-lock
doneCh <- Empty{}
if i == 9 { close(doneCh) }
}
// stagger 10 goroutines
launchInterval, _ := time.ParseDuration("10ms")
for i := 0; i < 10; i++ {
t.Logf("Launching %d\n", i)
go add(i)
if i < 9 {
time.Sleep(launchInterval)
}
}
// wait for all goroutines
for _ = range doneCh { }
result := ""
for _, n := range *list {
result = result + strconv.Itoa(n)
}
if result != "0123456789" { t.Fatalf("Didn't work") }
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论