如何在Go中使用专用通道来信号化爬取作业的结束

huangapple go评论118阅读模式
英文:

How to use a dedicated channel to signal the end of a crawl job in go

问题

这是对我的上一个问题的跟进。

我正在尝试构建一个网络爬虫的原型,并且我想使用chan来阻塞执行,直到所有的任务都完成,就像这样:

  1. func main() {
  2. go func() {
  3. do_stuff()
  4. stop <- true
  5. }
  6. fmt.Println(<-stop)
  7. }

有一个queue函数将任务分发给工作线程。当所有任务完成时,该函数还会关闭通道并发送一个信号。

  1. type Job int
  2. // 模拟处理 HTML 页面并返回更多链接的工作线程
  3. func worker(in chan Job, out chan Job, num int) {
  4. for element := range in {
  5. if element%2 == 0 {
  6. out <- 100*element + 5
  7. out <- 100*element + 3
  8. out <- 100*element + 1
  9. }
  10. }
  11. }
  12. func queue(toWorkers chan<- Job, fromWorkers <-chan Job, init Job, stop chan bool) {
  13. var list []Job
  14. var currentJobs int
  15. currentJobs = 0
  16. list = append(list, init)
  17. done := make(map[Job]bool)
  18. for {
  19. var send chan<- Job
  20. var item Job
  21. if len(list) > 0 {
  22. send = toWorkers
  23. item = list[0]
  24. } else if currentJobs == 0 {
  25. close(toWorkers)
  26. // 这里出了问题!
  27. stop <- true
  28. return
  29. }
  30. select {
  31. case send <- item:
  32. currentJobs += 1
  33. // 我们发送了一个任务,将其移除
  34. list = list[1:]
  35. case thing := <-fromWorkers:
  36. currentJobs -= 1
  37. // 收到一个新的任务
  38. if !done[thing] {
  39. list = append(list, thing)
  40. done[thing] = true
  41. }
  42. }
  43. }
  44. }
  45. func main() {
  46. in := make(chan Job, 1)
  47. out := make(chan Job, 1)
  48. stop := make(chan bool)
  49. // 将任务分发给工作线程
  50. go queue(in, out, 0, stop)
  51. for i := 0; i < max_workers; i++ {
  52. go worker(in, out, i)
  53. }
  54. duration := time.Second
  55. time.Sleep(duration)
  56. // 这会导致死锁
  57. fmt.Println(<-stop)
  58. }

Playground链接

如果我理解正确,问题出在stop通道上:当工作线程仍然有任务时,Go 会认为没有人会向该通道发送消息,并宣布死锁。queue函数将关闭toWorkers通道并发送一个信号到stop,但前提是没有未完成的任务。
我漏掉了什么?

英文:

This is a follow up from my previous question.

I am trying to build a prototype for a webcrawler and I want to use a chan to block the execution until all the jobs are done, just as in

  1. func main() {
  2. go func() {
  3. do_stuff()
  4. stop &lt;- true
  5. }
  6. fmt.Println(&lt;-stop)
  7. }

There is a queue function that dispatch the jobs to the workers. When all jobs are finished, the function will also the channel and send a signal.

  1. type Job int
  2. //simulating a worker that processes a html page and returns some more links
  3. func worker(in chan Job, out chan Job, num int) {
  4. for element := range in {
  5. if element%2 == 0 {
  6. out &lt;- 100*element + 5
  7. out &lt;- 100*element + 3
  8. out &lt;- 100*element + 1
  9. }
  10. }
  11. }
  12. func queue(toWorkers chan&lt;- Job, fromWorkers &lt;-chan Job, init Job, stop chan bool) {
  13. var list []Job
  14. var currentJobs int
  15. currentJobs = 0
  16. list = append(list, init)
  17. done := make(map[Job]bool)
  18. for {
  19. var send chan&lt;- Job
  20. var item Job
  21. if len(list) &gt; 0 {
  22. send = toWorkers
  23. item = list[0]
  24. } else if currentJobs == 0 {
  25. close(toWorkers)
  26. // this messes up everything!
  27. stop &lt;- true
  28. return
  29. }
  30. select {
  31. case send &lt;- item:
  32. currentJobs += 1
  33. // We sent an item, remove it
  34. list = list[1:]
  35. case thing := &lt;-fromWorkers:
  36. currentJobs -= 1
  37. // Got a new thing
  38. if !done[thing] {
  39. list = append(list, thing)
  40. done[thing] = true
  41. }
  42. }
  43. }
  44. }
  45. func main() {
  46. in := make(chan Job, 1)
  47. out := make(chan Job, 1)
  48. stop := make(chan bool)
  49. // dispatches jobs to workers
  50. go queue(in, out, 0, stop)
  51. for i := 0; i &lt; max_workers; i++ {
  52. go worker(in, out, i)
  53. }
  54. duration := time.Second
  55. time.Sleep(duration)
  56. // this cause deadlock
  57. fmt.Println(&lt;-stop)
  58. }

Link to playground

If I understand correctly, the problem is with the stop channel: when the workers still have jobs, go thinks that no one will send to that channel and declares deadlock. The function queue will both close the toWorkers channel and send a signal to stop, but not while there are outstanding jobs.
What am I missing?

答案1

得分: 4

使用sync.WaitGroup来等待所有的goroutine结束。

http://golang.org/pkg/sync/#WaitGroup

http://blog.golang.org/pipelines

我在这里提供了一个小例子:http://play.golang.org/p/P30LdV0Gfe

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. routinesNo := 10
  9. wg.Add(routinesNo)
  10. for i := 0; i < routinesNo; i++ {
  11. go func(n int) {
  12. fmt.Printf("%d ", n)
  13. wg.Done()
  14. }(i)
  15. }
  16. wg.Wait()
  17. fmt.Println("\nThe end!")
  18. }
英文:

Use sync.WaitGroup to wait for all the go routines to end.

http://golang.org/pkg/sync/#WaitGroup

http://blog.golang.org/pipelines

I made a small example here: http://play.golang.org/p/P30LdV0Gfe

  1. package main
  2. import (
  3. &quot;fmt&quot;
  4. &quot;sync&quot;
  5. )
  6. func main() {
  7. var wg sync.WaitGroup
  8. routinesNo := 10
  9. wg.Add(routinesNo)
  10. for i := 0; i &lt; routinesNo; i++ {
  11. go func(n int) {
  12. fmt.Printf(&quot;%d &quot;, n)
  13. wg.Done()
  14. }(i)
  15. }
  16. wg.Wait()
  17. fmt.Println(&quot;\nThe end!&quot;)
  18. }

huangapple
  • 本文由 发表于 2015年4月9日 20:26:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/29538470.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定