Golang goroutine不会在通道内运行

huangapple go评论118阅读模式
英文:

Golang goroutine doesn't run with channel inside

问题

我正在尝试实现一个单词计数程序,但在第一步遇到了一些问题。

这是我的代码:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "bufio"
  6. "sync"
  7. )
  8. // 将数据加载到通道中
  9. func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
  10. for _,path := range arr {
  11. file,err := os.Open(path)
  12. fmt.Println("开始加载数据 ", path)
  13. if err != nil {
  14. fmt.Println(err)
  15. os.Exit(-1)
  16. }
  17. defer file.Close()
  18. reader := bufio.NewReaderSize(file, 32*10*1024)
  19. i := 0
  20. for {
  21. line,err := reader.ReadString('\n')
  22. channel <- line
  23. if err != nil {
  24. break
  25. }
  26. i++
  27. if i%200 == 0 {
  28. fmt.Println(i, "行已解析")
  29. }
  30. }
  31. fmt.Println("完成数据加载 ", path)
  32. }
  33. wg.Done()
  34. }
  35. // 将数据行分发给不同的映射器
  36. func dispatcher(channel chan string,wg sync.WaitGroup){
  37. fmt.Println("拉取数据 11")
  38. line,ok := <- channel
  39. fmt.Println(ok)
  40. for ok {
  41. fmt.Println(line)
  42. line,ok = <- channel
  43. }
  44. fmt.Println("拉取数据 22")
  45. wg.Done()
  46. }
  47. func main() {
  48. path := os.Args
  49. if len(path) < 2 {
  50. fmt.Println("需要输入文件")
  51. os.Exit(0)
  52. }
  53. var wg sync.WaitGroup
  54. wg.Add(2)
  55. channel := make(chan string)
  56. defer close(channel)
  57. fmt.Println("在分发器之前")
  58. go laodData(path[1:],channel,wg)
  59. go dispatcher(channel,wg)
  60. wg.Wait()
  61. fmt.Println("在分发器之后")
  62. }

这是我的输出:

  1. ...
  2. 完成数据加载 result.txt
  3. 抛出异常:所有的goroutine都处于休眠状态 - 死锁!
  4. goroutine 1 [semacquire]:
  5. sync.runtime_Semacquire(0x42154100, 0x42154100)
  6. /usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
  7. sync.(*WaitGroup).Wait(0x4213b440, 0x0)
  8. /usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
  9. main.main()
  10. /Users/kuankuan/go/src/mreasy/main.go:66 +0x238
  11. goroutine 2 [syscall]:
  12. created by runtime.main
  13. /usr/local/go/src/pkg/runtime/proc.c:221
  14. goroutine 4 [chan receive]:
  15. main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
  16. /Users/kuankuan/go/src/mreasy/main.go:45 +0x223
  17. created by main.main
  18. /Users/kuankuan/go/src/mreasy/main.go:65 +0x228
  19. 退出状态 2

谢谢!

英文:

I'm trying to implement a word count program, but with the first step i got some problem.

Here's my code:

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;os&quot;
  5. &quot;bufio&quot;
  6. &quot;sync&quot;
  7. )
  8. // Load data into channel
  9. func laodData(arr []string,channel chan string,wg sync.WaitGroup) {
  10. for _,path := range arr {
  11. file,err := os.Open(path)
  12. fmt.Println(&quot;begin to laodData &quot;, path)
  13. if err != nil {
  14. fmt.Println(err)
  15. os.Exit(-1)
  16. }
  17. defer file.Close()
  18. reader := bufio.NewReaderSize(file, 32*10*1024)
  19. i := 0
  20. for {
  21. line,err := reader.ReadString(&#39;\n&#39;)
  22. channel &lt;- line
  23. if err != nil {
  24. break
  25. }
  26. i++
  27. if i%200 == 0 {
  28. fmt.Println(i,&quot; lines parsed&quot;)
  29. }
  30. }
  31. fmt.Println(&quot;finish laodData &quot;, path)
  32. }
  33. wg.Done()
  34. }
  35. // dispatch data lines into different mappers
  36. func dispatcher(channel chan string,wg sync.WaitGroup){
  37. fmt.Println(&quot;pull data 11&quot;)
  38. line,ok := &lt;- channel
  39. fmt.Println(ok)
  40. for ok {
  41. fmt.Println(line)
  42. line,ok = &lt;- channel
  43. }
  44. fmt.Println(&quot;pull data 22&quot;)
  45. wg.Done()
  46. }
  47. func main() {
  48. path := os.Args
  49. if len(path) &lt; 2 {
  50. fmt.Println(&quot;Need Input Files&quot;)
  51. os.Exit(0)
  52. }
  53. var wg sync.WaitGroup
  54. wg.Add(2)
  55. channel := make(chan string)
  56. defer close(channel)
  57. fmt.Println(&quot;before dispatcher&quot;)
  58. go laodData(path[1:],channel,wg)
  59. go dispatcher(channel,wg)
  60. wg.Wait()
  61. fmt.Println(&quot;after dispatcher&quot;)
  62. }

And here's my output:

  1. ...
  2. finish laodData result.txt
  3. throw: all goroutines are asleep - deadlock!
  4. goroutine 1 [semacquire]:
  5. sync.runtime_Semacquire(0x42154100, 0x42154100)
  6. /usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25
  7. sync.(*WaitGroup).Wait(0x4213b440, 0x0)
  8. /usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2
  9. main.main()
  10. /Users/kuankuan/go/src/mreasy/main.go:66 +0x238
  11. goroutine 2 [syscall]:
  12. created by runtime.main
  13. /usr/local/go/src/pkg/runtime/proc.c:221
  14. goroutine 4 [chan receive]:
  15. main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)
  16. /Users/kuankuan/go/src/mreasy/main.go:45 +0x223
  17. created by main.main
  18. /Users/kuankuan/go/src/mreasy/main.go:65 +0x228
  19. exit status 2

Thanks !

答案1

得分: 9

程序在主goroutine退出时终止,所以dispatcher()没有时间执行任何操作。你需要在main()中阻塞,直到dispatcher()完成。可以使用通道来实现这一点:

  1. package main
  2. import (
  3. "fmt"
  4. "os"
  5. "bufio"
  6. )
  7. var done = make(chan bool) // 创建通道
  8. // 加载文件并将它们发送到通道中供mappers读取。
  9. func dispatcher(arr []string,channel chan string) {
  10. for _,path := range arr {
  11. file,err := os.Open(path)
  12. fmt.Println("开始分发 ", path)
  13. if err != nil {
  14. fmt.Println(err)
  15. os.Exit(-1)
  16. }
  17. defer file.Close()
  18. reader := bufio.NewReaderSize(file, 32*10*1024)
  19. i := 0
  20. for {
  21. line,_ := reader.ReadString('\n')
  22. channel <- line
  23. i++
  24. if i%200 == 0 {
  25. fmt.Println(i, " 行已解析")
  26. }
  27. }
  28. fmt.Println("完成分发 ", path)
  29. }
  30. done <- true // 通知main()完成
  31. }
  32. func main() {
  33. path := os.Args
  34. if len(path) < 2 {
  35. fmt.Println("需要输入文件")
  36. os.Exit(0)
  37. }
  38. channel := make(chan string)
  39. fmt.Println("在dispatcher之前")
  40. go dispatcher(path[1:],channel)
  41. <-done // 等待dispatcher()
  42. fmt.Println("在dispatcher之后")
  43. }
英文:

Program terminates when main goroutine exits, so that dispatcher() has no time to do anything. You need to block in main() until dispatcher() completes. Channel can be used for this:

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;os&quot;
  5. &quot;bufio&quot;
  6. )
  7. var done = make(chan bool) // create channel
  8. // Load files and send them into a channel for mappers reading.
  9. func dispatcher(arr []string,channel chan string) {
  10. for _,path := range arr {
  11. file,err := os.Open(path)
  12. fmt.Println(&quot;begin to dispatch &quot;, path)
  13. if err != nil {
  14. fmt.Println(err)
  15. os.Exit(-1)
  16. }
  17. defer file.Close()
  18. reader := bufio.NewReaderSize(file, 32*10*1024)
  19. i := 0
  20. for {
  21. line,_ := reader.ReadString(&#39;\n&#39;)
  22. channel &lt;- line
  23. i++
  24. if i%200 == 0 {
  25. fmt.Println(i,&quot; lines parsed&quot;)
  26. }
  27. }
  28. fmt.Println(&quot;finish dispatch &quot;, path)
  29. }
  30. done &lt;- true // notify main() of completion
  31. }
  32. func main() {
  33. path := os.Args
  34. if len(path) &lt; 2 {
  35. fmt.Println(&quot;Need Input Files&quot;)
  36. os.Exit(0)
  37. }
  38. channel := make(chan string)
  39. fmt.Println(&quot;before dispatcher&quot;)
  40. go dispatcher(path[1:],channel)
  41. &lt;-done // wait for dispatcher()
  42. fmt.Println(&quot;after dispatcher&quot;)
  43. }

答案2

得分: 2

我修改了你的示例,使其在Go playground上运行,因为那里没有文件I/O;它会在通道上发送随机数。

Victor Deryagin的解释和他建议使用“done”通道是正确的。你遇到死锁的原因是你的goroutine在通道上发送数据,但没有人从中读取,所以程序在这一点上被卡住了。在上面的链接中,我添加了一个消费者goroutine。然后程序按预期并发运行。

请注意,要等待多个goroutine,使用sync.WaitGroup更清晰和更容易。

英文:

I modified your example to run on the Go playground where there's no file I/O; it sends random numbers on the channel instead.

@Victor Deryagin's explanation and his suggestion of using a "done" channel is correct. The reason you get a deadlock is that your goroutine sends on channel, but no one reads from it, so the program is stuck at this point. In the above link I added a consumer goroutine. The program then runs concurrently as intended.

Note that to wait for several goroutines, it's clearer and easier to use sync.WaitGroup.

答案3

得分: 1

在原始问题中有两个问题需要解决。

  1. 在发送完所有数据后,必须关闭通道。在laodData函数中,请在发送完所有数据后使用close(channel)
  2. sync.Waitgroup作为引用传递。你在以下函数的参数中将wg作为值发送... laodData和dispatcher函数。

修复这两个问题将解决死锁问题。你的代码死锁的原因如下:

  • 未关闭发送通道将导致下游通道等待时间过长。
  • sync.Waitgroup的参数作为值发送。应该将其作为引用发送,否则将创建一个新的对象副本。
英文:

Two issues needs to be fixed in the original question.

  1. You have to close the channel once you're done sending all the data. In func laodData, please use close(channel) post sending all data.
  2. Pass the sync.Waitgroup as a reference.you are sending wg as a value in the argument to the following functions... laodData and dispatcher functions.

Fixing these two issues will fix your problem of deadlock. The reasons for the deadlock in your code follow:

  • Leaving the sending channel unclosed will cause the downstream channel to wait for prolonged time.
  • sending the argument of sync.Waitgroup as a value . It should be sent as a reference otherwise it will create a new copy of the object which you are sending.

huangapple
  • 本文由 发表于 2012年11月22日 23:28:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/13515846.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定