英文:
Golang goroutine doesn't run with channel inside
问题
我正在尝试实现一个单词计数程序,但在第一步遇到了一些问题。
这是我的代码:
package main
import (
"fmt"
"os"
"bufio"
"sync"
)
// 将数据加载到通道中
func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
for _,path := range arr {
file,err := os.Open(path)
fmt.Println("开始加载数据 ", path)
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
defer file.Close()
reader := bufio.NewReaderSize(file, 32*10*1024)
i := 0
for {
line,err := reader.ReadString('\n')
channel <- line
if err != nil {
break
}
i++
if i%200 == 0 {
fmt.Println(i, "行已解析")
}
}
fmt.Println("完成数据加载 ", path)
}
wg.Done()
}
// 将数据行分发给不同的映射器
func dispatcher(channel chan string,wg sync.WaitGroup){
fmt.Println("拉取数据 11")
line,ok := <- channel
fmt.Println(ok)
for ok {
fmt.Println(line)
line,ok = <- channel
}
fmt.Println("拉取数据 22")
wg.Done()
}
func main() {
path := os.Args
if len(path) < 2 {
fmt.Println("需要输入文件")
os.Exit(0)
}
var wg sync.WaitGroup
wg.Add(2)
channel := make(chan string)
defer close(channel)
fmt.Println("在分发器之前")
go laodData(path[1:],channel,wg)
go dispatcher(channel,wg)
wg.Wait()
fmt.Println("在分发器之后")
}
这是我的输出:
...
完成数据加载 result.txt
抛出异常:所有的goroutine都处于休眠状态 - 死锁!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x42154100, 0x42154100)
/usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
sync.(*WaitGroup).Wait(0x4213b440, 0x0)
/usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
main.main()
/Users/kuankuan/go/src/mreasy/main.go:66 +0x238
goroutine 2 [syscall]:
created by runtime.main
/usr/local/go/src/pkg/runtime/proc.c:221
goroutine 4 [chan receive]:
main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
/Users/kuankuan/go/src/mreasy/main.go:45 +0x223
created by main.main
/Users/kuankuan/go/src/mreasy/main.go:65 +0x228
退出状态 2
谢谢!
英文:
I'm trying to implement a word count program, but with the first step i got some problem.
Here's my code:
package main
import (
"fmt"
"os"
"bufio"
"sync"
)
// Load data into channel
func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
for _,path := range arr {
file,err := os.Open(path)
fmt.Println("begin to laodData ", path)
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
defer file.Close()
reader := bufio.NewReaderSize(file, 32*10*1024)
i := 0
for {
line,err := reader.ReadString('\n')
channel <- line
if err != nil {
break
}
i++
if i%200 == 0 {
fmt.Println(i," lines parsed")
}
}
fmt.Println("finish laodData ", path)
}
wg.Done()
}
// dispatch data lines into different mappers
func dispatcher(channel chan string,wg sync.WaitGroup){
fmt.Println("pull data 11")
line,ok := <- channel
fmt.Println(ok)
for ok {
fmt.Println(line)
line,ok = <- channel
}
fmt.Println("pull data 22")
wg.Done()
}
func main() {
path := os.Args
if len(path) < 2 {
fmt.Println("Need Input Files")
os.Exit(0)
}
var wg sync.WaitGroup
wg.Add(2)
channel := make(chan string)
defer close(channel)
fmt.Println("before dispatcher")
go laodData(path[1:],channel,wg)
go dispatcher(channel,wg)
wg.Wait()
fmt.Println("after dispatcher")
}
And here's my output:
...
finish laodData result.txt
throw: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x42154100, 0x42154100)
/usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
sync.(*WaitGroup).Wait(0x4213b440, 0x0)
/usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
main.main()
/Users/kuankuan/go/src/mreasy/main.go:66 +0x238
goroutine 2 [syscall]:
created by runtime.main
/usr/local/go/src/pkg/runtime/proc.c:221
goroutine 4 [chan receive]:
main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
/Users/kuankuan/go/src/mreasy/main.go:45 +0x223
created by main.main
/Users/kuankuan/go/src/mreasy/main.go:65 +0x228
exit status 2
Thanks !
答案1
得分: 9
程序在主goroutine退出时终止,所以dispatcher()
没有时间执行任何操作。你需要在main()
中阻塞,直到dispatcher()
完成。可以使用通道来实现这一点:
package main
import (
"fmt"
"os"
"bufio"
)
var done = make(chan bool) // 创建通道
// 加载文件并将它们发送到通道中供mappers读取。
func dispatcher(arr []string,channel chan string) {
for _,path := range arr {
file,err := os.Open(path)
fmt.Println("开始分发 ", path)
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
defer file.Close()
reader := bufio.NewReaderSize(file, 32*10*1024)
i := 0
for {
line,_ := reader.ReadString('\n')
channel <- line
i++
if i%200 == 0 {
fmt.Println(i, " 行已解析")
}
}
fmt.Println("完成分发 ", path)
}
done <- true // 通知main()完成
}
func main() {
path := os.Args
if len(path) < 2 {
fmt.Println("需要输入文件")
os.Exit(0)
}
channel := make(chan string)
fmt.Println("在dispatcher之前")
go dispatcher(path[1:],channel)
<-done // 等待dispatcher()
fmt.Println("在dispatcher之后")
}
英文:
Program terminates when main goroutine exits, so that dispatcher()
has no time to do anything. You need to block in main()
until dispatcher()
completes. Channel can be used for this:
package main
import (
"fmt"
"os"
"bufio"
)
var done = make(chan bool) // create channel
// Load files and send them into a channel for mappers reading.
func dispatcher(arr []string,channel chan string) {
for _,path := range arr {
file,err := os.Open(path)
fmt.Println("begin to dispatch ", path)
if err != nil {
fmt.Println(err)
os.Exit(-1)
}
defer file.Close()
reader := bufio.NewReaderSize(file, 32*10*1024)
i := 0
for {
line,_ := reader.ReadString('\n')
channel <- line
i++
if i%200 == 0 {
fmt.Println(i," lines parsed")
}
}
fmt.Println("finish dispatch ", path)
}
done <- true // notify main() of completion
}
func main() {
path := os.Args
if len(path) < 2 {
fmt.Println("Need Input Files")
os.Exit(0)
}
channel := make(chan string)
fmt.Println("before dispatcher")
go dispatcher(path[1:],channel)
<-done // wait for dispatcher()
fmt.Println("after dispatcher")
}
答案2
得分: 2
我修改了你的示例,使其在Go playground上运行,因为那里没有文件I/O;它会在通道上发送随机数。
Victor Deryagin的解释和他建议使用“done”通道是正确的。你遇到死锁的原因是你的goroutine在通道上发送数据,但没有人从中读取,所以程序在这一点上被卡住了。在上面的链接中,我添加了一个消费者goroutine。然后程序按预期并发运行。
请注意,要等待多个goroutine,使用sync.WaitGroup更清晰和更容易。
英文:
I modified your example to run on the Go playground where there's no file I/O; it sends random numbers on the channel instead.
@Victor Deryagin's explanation and his suggestion of using a "done" channel is correct. The reason you get a deadlock is that your goroutine sends on channel, but no one reads from it, so the program is stuck at this point. In the above link I added a consumer goroutine. The program then runs concurrently as intended.
Note that to wait for several goroutines, it's clearer and easier to use sync.WaitGroup.
答案3
得分: 1
在原始问题中有两个问题需要解决。
- 在发送完所有数据后,必须关闭通道。在
laodData
函数中,请在发送完所有数据后使用close(channel)
。 - 将
sync.Waitgroup
作为引用传递。你在以下函数的参数中将wg作为值发送...laodData
和dispatcher函数。
修复这两个问题将解决死锁问题。你的代码死锁的原因如下:
- 未关闭发送通道将导致下游通道等待时间过长。
- 将
sync.Waitgroup
的参数作为值发送。应该将其作为引用发送,否则将创建一个新的对象副本。
英文:
Two issues needs to be fixed in the original question.
- You have to close the channel once you're done sending all the data. In func
laodData
, please use close(channel) post sending all data. - Pass the
sync.Waitgroup
as a reference.you are sending wg as a value in the argument to the following functions...laodData
and dispatcher functions.
Fixing these two issues will fix your problem of deadlock. The reasons for the deadlock in your code follow:
- Leaving the sending channel unclosed will cause the downstream channel to wait for prolonged time.
- sending the argument of
sync.Waitgroup
as a value . It should be sent as a reference otherwise it will create a new copy of the object which you are sending.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论