英文:
Goroutines, Channels and Deadlock
问题
我正在尝试更多地了解Go语言的通道(channels)和goroutines,所以我决定编写一个小程序,用于统计一个由bufio.NewScanner
对象读取的文件中的单词数量:
nCPUs := flag.Int("cpu", 2, "number of CPUs to use")
flag.Parse()
runtime.GOMAXPROCS(*nCPUs)
scanner := bufio.NewScanner(file)
lines := make(chan string)
results := make(chan int)
for i := 0; i < *nCPUs; i++ {
go func() {
for line := range lines {
fmt.Printf("%s\n", line)
results <- len(strings.Split(line, " "))
}
}()
}
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
acc := 0
for i := range results {
acc += i
}
fmt.Printf("%d\n", acc)
在我找到的大多数示例中,lines
和results
通道都会被缓冲,例如make(chan int, NUMBER_OF_LINES_IN_FILE)
。然而,运行这段代码后,我的程序会出现fatal error: all goroutines are asleep - deadlock!
的错误消息。
基本上,我的想法是我需要两个通道:一个用于将文件中的行传递给goroutine(由于行的数量可能是任意的,我不想在make(chan)
函数调用中指定大小),另一个通道用于收集goroutine的结果,在主函数中我将使用它来计算累积结果。
在使用goroutines和通道进行这种编程时,应该选择什么样的最佳选项?非常感谢任何帮助。
英文:
I'm trying to understand more about go's channels and goroutines, so I decided to make a little program that count words from a file, read by a bufio.NewScanner
object:
nCPUs := flag.Int("cpu", 2, "number of CPUs to use")
flag.Parse()
runtime.GOMAXPROCS(*nCPUs)
scanner := bufio.NewScanner(file)
lines := make(chan string)
results := make(chan int)
for i := 0; i < *nCPUs; i++ {
go func() {
for line := range lines {
fmt.Printf("%s\n", line)
results <- len(strings.Split(line, " "))
}
}()
}
for scanner.Scan(){
lines <- scanner.Text()
}
close(lines)
acc := 0
for i := range results {
acc += i
}
fmt.Printf("%d\n", acc)
Now, in most examples I've found so far both the lines
and results
channels would be buffered, such as make(chan int, NUMBER_OF_LINES_IN_FILE)
. Still, after running this code, my program exists with a fatal error: all goroutines are asleep - deadlock!
error message.
Basically my thought it's that I need two channels: one to communicate to the goroutine the lines from the file (as it can be of any size, I don't like to think that I need to inform the size in the make(chan)
function call. The other channel would collect the results from the goroutine and in the main function I would use it to e.g. calculate an accumulated result.
What should be the best option to program in this manner with goroutines and channels? Any help is much appreciated.
答案1
得分: 7
如@AndrewN所指出的,问题在于每个goroutine都到达了尝试发送到results
通道的点,但是这些发送操作会被阻塞,因为results
通道是无缓冲的,并且在for i := range results
循环之前没有任何地方读取这些发送的值。你永远无法进入该循环,因为你首先需要完成for scanner.Scan()
循环,该循环试图将所有的line
发送到lines
通道,但是由于goroutine永远不会回到range lines
,因为它们一直在尝试发送到results
通道,所以lines
通道被阻塞了。
你可以尝试的第一件事是将scanner.Scan()
的部分放在一个goroutine中,这样就可以立即开始从results
通道读取。然而,你将遇到的下一个问题是如何知道何时结束for i := range results
循环。你希望在原始的goroutine完成从lines
通道读取后关闭results
通道。你可以在关闭lines
通道后立即关闭results
通道,但是我认为这可能会引入潜在的竞争条件,所以最安全的做法是在关闭results
通道之前也等待原始的两个goroutine完成:(playground链接):
package main
import "fmt"
import "runtime"
import "bufio"
import "strings"
import "sync"
func main() {
runtime.GOMAXPROCS(2)
scanner := bufio.NewScanner(strings.NewReader(`
hi mom
hi dad
hi sister
goodbye`))
lines := make(chan string)
results := make(chan int)
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
for line := range lines {
fmt.Printf("%s\n", line)
results <- len(strings.Split(line, " "))
}
wg.Done()
}()
}
go func() {
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
wg.Wait()
close(results)
}()
acc := 0
for i := range results {
acc += i
}
fmt.Printf("%d\n", acc)
}
<details>
<summary>英文:</summary>
As @AndrewN has pointed out, the problem is each goroutine gets to the point where it's trying to send to the `results` channel, but those sends will block because the `results` channel is unbuffered and nothing reads from them until the `for i := range results` loop. You never get to that loop, because you first need to finish the `for scanner.Scan()` loop, which is trying to send all the `line`s down the `lines` channel, which is blocked because the goroutines are never looping back to the `range lines` because they're stuck sending to `results`.
The first thing you might try to do to fix this is to put the `scanner.Scan()` stuff in a goroutine, so that something can start reading off the `results` channel right away. However, the next problem you'll have is knowing when to end the `for i := range results` loop. You want to have something close the `results` channel, but only after the original goroutines are done reading off the `lines` channel. You could close the `results` channel right after closing the `lines` channel, however I think that might introduce a potential race, so the safest thing to do is also wait for the original two goroutines to be done before closing the `results` channel: ([playground link][1]):
package main
import "fmt"
import "runtime"
import "bufio"
import "strings"
import "sync"
func main() {
runtime.GOMAXPROCS(2)
scanner := bufio.NewScanner(strings.NewReader(`
hi mom
hi dad
hi sister
goodbye`))
lines := make(chan string)
results := make(chan int)
wg := sync.WaitGroup{}
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
for line := range lines {
fmt.Printf("%s\n", line)
results <- len(strings.Split(line, " "))
}
wg.Done()
}()
}
go func() {
for scanner.Scan() {
lines <- scanner.Text()
}
close(lines)
wg.Wait()
close(results)
}()
acc := 0
for i := range results {
acc += i
}
fmt.Printf("%d\n", acc)
}
[1]: https://play.golang.org/p/OnQRT9ie5U
</details>
# 答案2
**得分**: 5
在Go语言中,默认情况下,通道是无缓冲的,这意味着在你开始尝试从通道接收数据之前,你所创建的匿名goroutine都不能向**results**通道发送数据。直到**scanner.Scan()**完成填充**line**通道之后,主程序才开始执行...而**scanner.Scan()**被阻塞在这一步,直到匿名函数能够向**results**通道发送数据并重新开始它们的循环。这就是死锁的原因。
你的代码中还有另一个问题,即使通过给通道添加缓冲区来解决上述问题,**for i := range results**也会在没有更多的结果被发送到通道时发生死锁,因为通道没有被关闭。
编辑:如果你想避免使用缓冲通道,这里有一个潜在的解决方案。基本上,第一个问题可以通过通过新的goroutine向**results**通道发送数据来避免,从而允许lines循环完成。第二个问题(不知道何时停止读取通道)可以通过在创建每个goroutine时计数,并在每个goroutine都被计算在内时显式关闭通道来避免。使用waitgroups可能更好,但这只是一个非常快速的展示如何在无缓冲情况下实现这一点的方法。
[1]: https://golang.org/ref/spec#Channel_types
[2]: http://play.golang.org/p/UAH5aW_4hQ
<details>
<summary>英文:</summary>
Channels in go are unbuffered by [default][1], which means that none of the anonymous goroutines you spawn can send to the **results** channel until you start trying to receive from that channel. That doesn't start executing in the main program until **scanner.Scan()** is done filling up the **line** channel...which it's blocked from doing until your anonymous functions can send to the **results** channel and restart their loops. Deadlock.
The other problem in your code, even when trivially fixing the above by buffering the channels, is that **for i := range results** will also deadlock once there are no more results being fed into it, since the channel hasn't been closed.
Edit: Here's one potential [solution][2], if you want to *avoid* buffered channels. Basically, the first issue is avoided by performing the send to the **results** channel via a new goroutine, allowing the lines loop to complete. The second issue (not knowing when to stop reading a channel) is avoided by counting the goroutines as they are created and explicitly closing down the channel when every goroutine is accounted for. It's probably better to do something similar with waitgroups, but this is just a very fast way to show how to do this unbuffered.
[1]: https://golang.org/ref/spec#Channel_types
[2]: http://play.golang.org/p/UAH5aW_4hQ
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论