英文:
Goroutines channels and "stopping short"
问题
我正在阅读/研究《Go并发模式:管道和取消》中的“停止短路”部分,但我对其理解有困难。我们有以下函数:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // 足够容纳未读取的输入的空间
// 为cs中的每个输入通道启动一个输出goroutine。output将值从c复制到out,直到c关闭,然后调用wg.Done。
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 在所有输出goroutine完成后,启动一个goroutine来关闭out。这必须在wg.Add调用之后启动。
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
// 将sq的工作分布到两个同时从in读取的goroutine中。
c1 := sq(in)
c2 := sq(in)
// 从输出中消费第一个值。
out := merge(c1, c2)
fmt.Println(<-out) // 4或9
return
// 显然,如果我们没有将merge的out缓冲区大小设置为1,那么我们将有一个挂起的goroutine。
}
现在,如果你注意到merge
函数中的第2行,它说我们使用缓冲区大小为1来创建out
通道,因为这足够容纳未读取的输入。然而,我几乎可以肯定我们应该使用缓冲区大小为2的通道。根据这个代码示例:
c := make(chan int, 2) // 缓冲区大小为2
c <- 1 // 立即成功
c <- 2 // 立即成功
c <- 3 // 阻塞,直到另一个goroutine执行<-c并接收到1
因为这一部分暗示了缓冲区大小为3的通道不会阻塞。有人可以澄清/帮助我理解吗?
英文:
I'm reading/working through [Go Concurrency Patterns: Pipelines and cancellation][1], but i'm having trouble understanding the Stopping short section. We have the following functions:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Apparently if we had not set the merge out buffer size to 1
// then we would have a hanging go routine.
}
Now, if you notice line 2
in merge
, it says we make the out chan
with buffer
size 1, because this is enough space for the unread inputs. However, I'm almost positive that we should allocate a chan
with buffer
size 2. In accordance with this code sample:
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
Because this section implies that a chan
of buffer
size 3 would not block. Can anyone please clarify/assist my understanding?
[1]: https://blog.golang.org/pipelines
答案1
得分: 1
该程序向通道 out
发送两个值,并从通道 out
读取一个值。其中一个值没有被接收。
如果通道是无缓冲的(容量为0),那么其中一个发送的 goroutine 将会阻塞,直到程序退出。这是一个泄漏。
如果通道的容量为1,则两个 goroutine 都可以向通道发送并退出。第一个发送到通道的值将被 main
接收。第二个值将保留在通道中。
如果主函数没有从通道 out
接收到值,则需要一个容量为2的通道来防止 goroutine 无限期地阻塞。
英文:
The program sends two values to the channel out
and reads one value from the channel out
. One of the values is not received.
If the channel is unbuffered (capacity 0), then one of the sending goroutines will block until the program exits. This is a leak.
If the channel is created with a capacity of 1, then both goroutines can send to the channel and exit. The first value sent to the channel is received by main
. The second value remains in the channel.
If the main function does not receive a value from the channel out
, then a channel of capacity 2 is required to prevent the goroutines from blocking indefinitely.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论