多个goroutine监听一个通道

huangapple go评论149阅读模式
英文:

Multiple goroutines listening on one channel

问题

我有多个goroutine同时尝试在同一个通道上接收。似乎最后一个开始在通道上接收的goroutine会得到值。这是语言规范中的某个地方,还是未定义的行为?

c := make(chan string)
for i := 0; i < 5; i++ {
	go func(i int) {
		<-c
		c <- fmt.Sprintf("goroutine %d", i)
	}(i)
}
c <- "hi"
fmt.Println(<-c)

输出:

goroutine 4

在Playground上的示例

编辑:

我刚意识到这比我想象的要复杂。消息在所有的goroutine之间传递。

c := make(chan string)
for i := 0; i < 5; i++ {
	go func(i int) {
		msg := <-c
		c <- fmt.Sprintf("%s, hi from %d", msg, i)
	}(i)
}
c <- "original"
fmt.Println(<-c)

输出:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

注意:上述输出在较新版本的Go中已过时(请参见评论)

在Playground上的示例

英文:

I have multiple goroutines trying to receive on the same channel simultaneously. It seems like the last goroutine that starts receiving on the channel gets the value. Is this somewhere in the language spec or is it undefined behaviour?

c := make(chan string)
for i := 0; i &lt; 5; i++ {
	go func(i int) {
		&lt;-c
		c &lt;- fmt.Sprintf(&quot;goroutine %d&quot;, i)
	}(i)
}
c &lt;- &quot;hi&quot;
fmt.Println(&lt;-c)

Output:

goroutine 4

Example On Playground

EDIT:

I just realized that it's more complicated than I thought. The message gets passed around all the goroutines.

c := make(chan string)
for i := 0; i &lt; 5; i++ {
	go func(i int) {
		msg := &lt;-c
		c &lt;- fmt.Sprintf(&quot;%s, hi from %d&quot;, msg, i)
	}(i)
}
c &lt;- &quot;original&quot;
fmt.Println(&lt;-c)

Output:

original, hi from 0, hi from 1, hi from 2, hi from 3, hi from 4

NOTE: the above output is outdated in more recent versions of Go (see comments)

Example On Playground

答案1

得分: 94

是的,这很复杂,但有一些经验法则可以使事情变得更简单明了。

  • 优先使用形式参数来传递给go协程的通道,而不是在全局范围内访问通道。这样可以获得更多的编译器检查,并且也更具模块化。
  • 避免在特定的go协程(包括'main'协程)中同时读写同一个通道。否则,死锁的风险会更大。

这是你的程序的另一种版本,应用了这两个指导原则。这个例子演示了在一个通道上有多个写入者和一个读取者:

c := make(chan string)

for i := 1; i <= 5; i++ {
	go func(i int, co chan<- string) {
		for j := 1; j <= 5; j++ {
			co <- fmt.Sprintf("hi from %d.%d", i, j)
		}
	}(i, c)
}

for i := 1; i <= 25; i++ {
	fmt.Println(<-c)
}

http://play.golang.org/p/quQn7xePLw

它创建了五个写入到单个通道的go协程,每个协程写入五次。主协程读取所有25个消息 - 你可能会注意到它们出现的顺序通常不是连续的(即并发性是显而易见的)。

这个例子演示了Go通道的一个特性:可以有多个写入者共享一个通道;Go会自动交错处理消息。

同样,对于一个写入者和多个读取者共享一个通道的情况,如下面的第二个例子所示:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i <= 5; i++ {
	go func(i int, ci <-chan int) {
		j := 1
		for v := range ci {
			time.Sleep(time.Millisecond)
			fmt.Printf("%d.%d got %d\n", i, j, v)
			j += 1
		}
		w.Done()
	}(i, c)
}

for i := 1; i <= 25; i++ {
	c <- i
}
close(c)
w.Wait()

这个第二个例子在主go协程上加了一个等待,否则它会立即退出并导致其他五个go协程提前终止(感谢olov提供的更正)。

在这两个例子中,不需要缓冲区。通常,将缓冲视为性能增强器是一个很好的原则。如果你的程序在没有缓冲的情况下不会死锁,那么在有缓冲的情况下也不会死锁(但反之不一定成立)。因此,作为另一个经验法则,先不使用缓冲,然后根据需要添加缓冲

英文:

Yes, it's complicated, But there are a couple of rules of thumb that should make things feel much more straightforward.

  • prefer using formal arguments for the channels you pass to go-routines instead of accessing channels in global scope. You can get more compiler checking this way, and better modularity too.
  • avoid both reading and writing on the same channel in a particular go-routine (including the 'main' one). Otherwise, deadlock is a much greater risk.

Here's an alternative version of your program, applying these two guidelines. This case demonstrates many writers & one reader on a channel:

c := make(chan string)

for i := 1; i &lt;= 5; i++ {
	go func(i int, co chan&lt;- string) {
		for j := 1; j &lt;= 5; j++ {
			co &lt;- fmt.Sprintf(&quot;hi from %d.%d&quot;, i, j)
		}
	}(i, c)
}

for i := 1; i &lt;= 25; i++ {
	fmt.Println(&lt;-c)
}

http://play.golang.org/p/quQn7xePLw

It creates the five go-routines writing to a single channel, each one writing five times. The main go-routine reads all twenty five messages - you may notice that the order they appear in is often not sequential (i.e. the concurrency is evident).

This example demonstrates a feature of Go channels: it is possible to have multiple writers sharing one channel; Go will interleave the messages automatically.

The same applies for one writer and multiple readers on one channel, as seen in the second example here:

c := make(chan int)
var w sync.WaitGroup
w.Add(5)

for i := 1; i &lt;= 5; i++ {
	go func(i int, ci &lt;-chan int) {
		j := 1
		for v := range ci {
			time.Sleep(time.Millisecond)
			fmt.Printf(&quot;%d.%d got %d\n&quot;, i, j, v)
			j += 1
		}
		w.Done()
	}(i, c)
}

for i := 1; i &lt;= 25; i++ {
	c &lt;- i
}
close(c)
w.Wait()

This second example includes a wait imposed on the main goroutine, which would otherwise exit promptly and cause the other five goroutines to be terminated early (thanks to olov for this correction).

In both examples, no buffering was needed. It is generally a good principle to view buffering as a performance enhancer only. If your program does not deadlock without buffers, it won't deadlock with buffers either (but the converse is not always true). So, as another rule of thumb, start without buffering then add it later as needed.

答案2

得分: 36

晚回复,但我希望这对其他人有所帮助,就像https://stackoverflow.com/q/19802037/143225那样。

《Effective Go》解释了这个问题:

接收器总是阻塞,直到有数据可接收。

这意味着你不能让多个goroutine监听一个通道并期望所有的goroutine都接收到相同的值。

运行这个代码示例

package main

import "fmt"

func main() {
    c := make(chan int)

    for i := 1; i <= 5; i++ {
        go func(i int) {
            for v := range c {
                fmt.Printf("count %d from goroutine #%d\n", v, i)
            }
        }(i)
    }

    for i := 1; i <= 25; i++ {
        c<-i
    }
    
    close(c)
}

即使有5个goroutine监听通道,你也不会看到“count 1”超过一次。这是因为当第一个goroutine阻塞通道时,所有其他goroutine必须排队等待。当通道解除阻塞时,计数已经被接收并从通道中删除,所以下一个排队的goroutine会得到下一个计数值。

英文:

Late reply, but I hope this helps others in the future like https://stackoverflow.com/q/19802037/143225

Effective Go explains the issue:

> Receivers always block until there is data to receive.

That means that you cannot have more than 1 goroutine listening to 1 channel and expect ALL goroutines to receive the same value.

Run this Code Example.

package main

import &quot;fmt&quot;

func main() {
	c := make(chan int)

	for i := 1; i &lt;= 5; i++ {
		go func(i int) {
		for v := range c {
				fmt.Printf(&quot;count %d from goroutine #%d\n&quot;, v, i)
			}
		}(i)
	}

	for i := 1; i &lt;= 25; i++ {
		c&lt;-i
	}
	
	close(c)
}

You will not see "count 1" more than once even though there are 5 goroutines listening to the channel. This is because when the first goroutine blocks the channel all other goroutines must wait in line. When the channel is unblocked, the count has already been received and removed from the channel so the next goroutine in line gets the next count value.

答案3

得分: 8

这是复杂的。

另外,看看GOMAXPROCS = NumCPU+1会发生什么。例如,

package main

import (
	"fmt"
	"runtime"
)

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() + 1)
	fmt.Print(runtime.GOMAXPROCS(0))
	c := make(chan string)
	for i := 0; i < 5; i++ {
		go func(i int) {
			msg := <-c
			c <- fmt.Sprintf("%s, hi from %d", msg, i)
		}(i)
	}
	c <- ", original"
	fmt.Println(<-c)
}

输出:

5, original, hi from 4

并且,看看带缓冲通道会发生什么。例如,

package main

import "fmt"

func main() {
	c := make(chan string, 5+1)
	for i := 0; i < 5; i++ {
		go func(i int) {
			msg := <-c
			c <- fmt.Sprintf("%s, hi from %d", msg, i)
		}(i)
	}
	c <- "original"
	fmt.Println(<-c)
}

输出:

original

你应该能够解释这些情况。

英文:

It is complicated.

Also, see what happens with GOMAXPROCS = NumCPU+1. For example,

package main

import (
	&quot;fmt&quot;
	&quot;runtime&quot;
)

func main() {
	runtime.GOMAXPROCS(runtime.NumCPU() + 1)
	fmt.Print(runtime.GOMAXPROCS(0))
	c := make(chan string)
	for i := 0; i &lt; 5; i++ {
		go func(i int) {
			msg := &lt;-c
			c &lt;- fmt.Sprintf(&quot;%s, hi from %d&quot;, msg, i)
		}(i)
	}
	c &lt;- &quot;, original&quot;
	fmt.Println(&lt;-c)
}

Output:

5, original, hi from 4

And, see what happens with buffered channels. For example,

package main

import &quot;fmt&quot;

func main() {
	c := make(chan string, 5+1)
	for i := 0; i &lt; 5; i++ {
		go func(i int) {
			msg := &lt;-c
			c &lt;- fmt.Sprintf(&quot;%s, hi from %d&quot;, msg, i)
		}(i)
	}
	c &lt;- &quot;original&quot;
	fmt.Println(&lt;-c)
}

Output:

original

You should be able to explain these cases too.

答案4

得分: 8

我研究了现有的解决方案,并创建了一个简单的广播库https://github.com/grafov/bcast。

<!-- language-all: go -->

group := bcast.NewGroup() // 你创建了广播组
go bcast.Broadcasting(0) // 组接受消息并将其广播给所有成员

member := group.Join() // 然后你从其他goroutine中加入成员
member.Send("测试消息") // 或者发送任何类型的消息到组

member1 := group.Join() // 然后你从其他goroutine中加入成员
val := member1.Recv() // 例如监听消息

英文:

I've studied existing solutions and created simple broadcast library https://github.com/grafov/bcast.

<!-- language-all: go -->

    group := bcast.NewGroup() // you created the broadcast group
    go bcast.Broadcasting(0) // the group accepts messages and broadcast it to all members

    member := group.Join() // then you join member(s) from other goroutine(s)
    member.Send(&quot;test message&quot;) // or send messages of any type to the group 

    member1 := group.Join() // then you join member(s) from other goroutine(s)
    val := member1.Recv() // and for example listen for messages

答案5

得分: 3

对于多个goroutine监听一个channel,是可以的。关键点在于消息本身,你可以定义一些类似这样的消息:

package main

import (
    "fmt"
    "sync"
)

type obj struct {
    msg string
    receiver int
}

func main() {
    ch := make(chan *obj) // 阻塞或非阻塞都可以
    var wg sync.WaitGroup
    receiver := 25 // 指定接收者数量

    sender := func() {
        o := &obj {
            msg: "hello everyone!",
            receiver: receiver,
        }
        ch <- o
    }
    recv := func(idx int) {
        defer wg.Done()
        o := <-ch
        fmt.Printf("%d received at %d\n", idx, o.receiver)
        o.receiver--
        if o.receiver > 0 {
            ch <- o // 转发给其他接收者
        } else {
            fmt.Printf("last receiver: %d\n", idx)
        }
    }

    go sender()
    for i:=0; i<receiver; i++ {
        wg.Add(1)
        go recv(i)
    }

    wg.Wait()
}

输出是随机的:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver: 4
英文:

For multiple goroutine listen on one channel, yes, it's possible. the key point is the message itself, you can define some message like that:

package main

import (
    &quot;fmt&quot;
    &quot;sync&quot;
)

type obj struct {
    msg string
    receiver int
}

func main() {
    ch := make(chan *obj) // both block or non-block are ok
    var wg sync.WaitGroup
    receiver := 25 // specify receiver count

    sender := func() {
        o := &amp;obj {
            msg: &quot;hello everyone!&quot;,
            receiver: receiver,
        }
        ch &lt;- o
    }
    recv := func(idx int) {
        defer wg.Done()
        o := &lt;-ch
        fmt.Printf(&quot;%d received at %d\n&quot;, idx, o.receiver)
        o.receiver--
        if o.receiver &gt; 0 {
            ch &lt;- o // forward to others
        } else {
            fmt.Printf(&quot;last receiver: %d\n&quot;, idx)
        }
    }

    go sender()
    for i:=0; i&lt;reciever; i++ {
        wg.Add(1)
        go recv(i)
    }

    wg.Wait()
}

The output is random:

5 received at 25
24 received at 24
6 received at 23
7 received at 22
8 received at 21
9 received at 20
10 received at 19
11 received at 18
12 received at 17
13 received at 16
14 received at 15
15 received at 14
16 received at 13
17 received at 12
18 received at 11
19 received at 10
20 received at 9
21 received at 8
22 received at 7
23 received at 6
2 received at 5
0 received at 4
1 received at 3
3 received at 2
4 received at 1
last receiver 4

答案6

得分: 2

相当古老的问题,但是我认为没有人提到这一点。

首先,如果你多次运行代码,两个示例的输出可能会不同。这与Go的版本无关。

第一个示例的输出可以是goroutine 4goroutine 0goroutine 1,...实际上,所有的goroutine都可以是将字符串发送到主goroutine的那个。

主goroutine是其中一个goroutine,所以它也在等待来自通道的数据。
哪个goroutine应该接收数据?没有人知道。这不在语言规范中。

此外,第二个示例的输出也可以是任何内容:

(我只是为了清晰起见添加了方括号)

// [original, hi from 4]
// [[[[[original, hi from 4], hi from 0], hi from 2], hi from 1], hi from 3]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 2], hi from 3]
// [[[[[original, hi from 0], hi from 2], hi from 1], hi from 3], hi from 4]
// [[original, hi from 4], hi from 1]
// [[original, hi from 0], hi from 4]
// [[[original, hi from 4], hi from 1], hi from 0]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 3], hi from 2]
// [[[[original, hi from 0], hi from 2], hi from 1], hi from 3]
//
// ......任何内容都可能是输出。

这不是魔术,也不是神秘的现象。

如果有多个线程正在执行,没有人确切地知道哪个线程将获取资源。这不是语言决定的。相反,操作系统会处理这个问题。这就是为什么多线程编程非常复杂的原因。

Goroutine不是操作系统线程,但它的行为有些类似。

英文:

Quite an old question, but nobody mentioned this, I think.

First, the outputs of both examples can be different if you run the codes many times. This is not related to the Go version.

The output of the 1st example can be goroutine 4, goroutine 0, goroutine 1,... actually all the goroutine can be a one who sends the string to the main goroutine.

Main goroutine is one of the goroutines, so it's also waiting for data from the channel.
Which goroutine should receive the data? Nobody knows. It's not in the language spec.

Also, the output of the 2nd example also can be anything:

(I added the square brackets just for clarity)

// [original, hi from 4]
// [[[[[original, hi from 4], hi from 0], hi from 2], hi from 1], hi from 3]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 2], hi from 3]
// [[[[[original, hi from 0], hi from 2], hi from 1], hi from 3], hi from 4]
// [[original, hi from 4], hi from 1]
// [[original, hi from 0], hi from 4]
// [[[original, hi from 4], hi from 1], hi from 0]
// [[[[[original, hi from 4], hi from 1], hi from 0], hi from 3], hi from 2]
// [[[[original, hi from 0], hi from 2], hi from 1], hi from 3]
//
// ......anything can be the output.

This is not magic, nor a mysterious phenomenon.

If there are multiple threads being executed, no one knows exactly which thread will acquire the resource. The language doesn't determine it. Rather, OS takes care of it. This is why multithread programming is quite complicated.

Goroutine is not OS thread, but it behaves somewhat similarly.

答案7

得分: 0

使用sync.Cond是一个不错的选择。

参考:https://pkg.go.dev/sync#Cond

英文:

Use sync.Cond is a good choice.

ref: https://pkg.go.dev/sync#Cond

huangapple
  • 本文由 发表于 2013年3月30日 14:14:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/15715605.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定