在Go中实现一个作业工作池

huangapple go评论120阅读模式
英文:

Implementing a job worker pool in Go

问题

由于Go语言没有泛型,所有预制的解决方案都使用类型转换,而我并不太喜欢这种方法。我也想自己实现,尝试了以下代码。然而,有时它不会等待所有的goroutine,我是不是过早地关闭了jobs通道?我没有从中获取任何内容。我可能也使用了一个伪输出通道,并等待从中获取确切的数量,但我相信以下代码也应该可以工作。我漏掉了什么?

  1. func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
  2. wg.Add(1)
  3. defer wg.Done()
  4. for job := range jobs {
  5. item := ParseItem(job)
  6. item.SaveItem()
  7. MarkJobCompleted(item.ID)
  8. log.Println("Saved", item.Title)
  9. }
  10. }
  11. // ProcessJobs processes the jobs from the list and deletes them
  12. func ProcessJobs() {
  13. jobs := make(chan string)
  14. list := GetJobs()
  15. // Start workers
  16. var wg sync.WaitGroup
  17. for w := 0; w < 10; w++ {
  18. go jobWorker(w, jobs, wg)
  19. }
  20. for _, url := range list {
  21. jobs <- url
  22. }
  23. close(jobs)
  24. wg.Wait()
  25. }
英文:

Since Go does not have generics, all the premade solutions use type casting which I do not like very much. I also want to implement it on my own and tried the following code. However, sometimes it does not wait for all goroutines, am I closing the jobs channel prematurely? I do not have anything to fetch from them. I might have used a pseudo output channel too and waited to fetch the exact amount from them however I believe the following code should work too. What am I missing?

  1. func jobWorker(id int, jobs &lt;-chan string, wg sync.WaitGroup) {
  2. wg.Add(1)
  3. defer wg.Done()
  4. for job := range jobs {
  5. item := ParseItem(job)
  6. item.SaveItem()
  7. MarkJobCompleted(item.ID)
  8. log.Println(&quot;Saved&quot;, item.Title)
  9. }
  10. }
  11. // ProcessJobs processes the jobs from the list and deletes them
  12. func ProcessJobs() {
  13. jobs := make(chan string)
  14. list := GetJobs()
  15. // Start workers
  16. var wg sync.WaitGroup
  17. for w := 0; w &lt; 10; w++ {
  18. go jobWorker(w, jobs, wg)
  19. }
  20. for _, url := range list {
  21. jobs &lt;- url
  22. }
  23. close(jobs)
  24. wg.Wait()
  25. }

答案1

得分: 1

在goroutine之外调用wg.Add,并传递一个指向等待组的指针。

如果在goroutine内部调用Add,主goroutine有可能在goroutine有机会运行之前调用Wait。如果没有调用Add,那么Wait将立即返回。

传递一个指向goroutine的指针。否则,goroutine将使用它们自己的等待组副本。

  1. func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
  2. defer wg.Done()
  3. for job := range jobs {
  4. item := ParseItem(job)
  5. item.SaveItem()
  6. MarkJobCompleted(item.ID)
  7. log.Println("Saved", item.Title)
  8. }
  9. }
  10. // ProcessJobs处理列表中的作业并删除它们
  11. func ProcessJobs() {
  12. jobs := make(chan string)
  13. list := GetJobs()
  14. // 启动工作线程
  15. var wg sync.WaitGroup
  16. for w := 0; w < 10; w++ {
  17. wg.Add(1)
  18. go jobWorker(w, jobs, &wg)
  19. }
  20. for _, url := range list {
  21. jobs <- url
  22. }
  23. close(jobs)
  24. wg.Wait()
  25. }
英文:

Call wg.Add outside of the goroutine and pass a pointer to the wait group.

If Add is called from inside the goroutine, it's possible for the main goroutine to call Wait before the goroutines get a chance to run. If Add has not been called, then Wait will return immediately.

Pass a pointer to the goroutine. Otherwise, the goroutines use their own copy of the wait group.

  1. func jobWorker(id int, jobs &lt;-chan string, wg *sync.WaitGroup) {
  2. defer wg.Done()
  3. for job := range jobs {
  4. item := ParseItem(job)
  5. item.SaveItem()
  6. MarkJobCompleted(item.ID)
  7. log.Println(&quot;Saved&quot;, item.Title)
  8. }
  9. }
  10. // ProcessJobs processes the jobs from the list and deletes them
  11. func ProcessJobs() {
  12. jobs := make(chan string)
  13. list := GetJobs()
  14. // Start workers
  15. var wg sync.WaitGroup
  16. for w := 0; w &lt; 10; w++ {
  17. wg.Add(1)
  18. go jobWorker(w, jobs, &amp;wg)
  19. }
  20. for _, url := range list {
  21. jobs &lt;- url
  22. }
  23. close(jobs)
  24. wg.Wait()
  25. }

答案2

得分: 1

你需要传递一个指向waitgroup的指针,否则每个作业都会收到自己的副本。

  1. func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
  2. wg.Add(1)
  3. defer wg.Done()
  4. for job := range jobs {
  5. item := ParseItem(job)
  6. item.SaveItem()
  7. MarkJobCompleted(item.ID)
  8. log.Println("Saved", item.Title)
  9. }
  10. }
  11. // ProcessJobs处理列表中的作业并删除它们
  12. func ProcessJobs() {
  13. jobs := make(chan string)
  14. list := GetJobs()
  15. // 启动工作线程
  16. var wg sync.WaitGroup
  17. for w := 0; w < 10; w++ {
  18. go jobWorker(w, jobs, &wg)
  19. }
  20. for _, url := range list {
  21. jobs <- url
  22. }
  23. close(jobs)
  24. wg.Wait()
  25. }

在这里可以看到区别:没有指针有指针

英文:

You need to pass a pointer to the waitgroup, or else every job receives it's own copy.

  1. func jobWorker(id int, jobs &lt;-chan string, wg *sync.WaitGroup) {
  2. wg.Add(1)
  3. defer wg.Done()
  4. for job := range jobs {
  5. item := ParseItem(job)
  6. item.SaveItem()
  7. MarkJobCompleted(item.ID)
  8. log.Println(&quot;Saved&quot;, item.Title)
  9. }
  10. }
  11. // ProcessJobs processes the jobs from the list and deletes them
  12. func ProcessJobs() {
  13. jobs := make(chan string)
  14. list := GetJobs()
  15. // Start workers
  16. var wg sync.WaitGroup
  17. for w := 0; w &lt; 10; w++ {
  18. go jobWorker(w, jobs, &amp;wg)
  19. }
  20. for _, url := range list {
  21. jobs &lt;- url
  22. }
  23. close(jobs)
  24. wg.Wait()
  25. }

See the difference here: without pointer, with pointer.

huangapple
  • 本文由 发表于 2016年2月22日 07:35:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/35543545.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定