为什么我需要在单独的Go协程中使用wg.Wait()和close()函数?

huangapple go评论113阅读模式
英文:

Why do I need a wg.Wait() and close() in a separate go routine?

问题

我有很多包含数百到数千个文件的目录。我想循环遍历目录列表。然后为每个目录调用一个go例程,该例程将扫描目录中的文件,并将每个文件的路径添加到作业队列中以供一组工作处理。

这是我目前的代码:

  1. type AppConfig struct {
  2. UploadPath string `mapstructure:"upload_path"`
  3. LocalPath string `mapstructure:"local_path"`
  4. Bucket string `mapstructure:"bucket"`
  5. }
  6. func consumer(i int, jobs <-chan *ops.Job) {
  7. defer wg.Done()
  8. for job := range jobs {
  9. fmt.Printf("Worker: %v is processing file: %v\n", i, job.Work)
  10. }
  11. }
  12. func producer(jobs chan<- *ops.Job, filesToTransfer []string) {
  13. for i, file := range filesToTransfer {
  14. jobs <- &ops.Job{Id: i, Work: file}
  15. }
  16. }
  17. func main() {
  18. var (
  19. appconfigs map[string]*ops.AppConfig
  20. wg *sync.WaitGroup
  21. )
  22. jobs := make(chan *ops.Job)
  23. // setting up workers
  24. for i := 0; i < 10; i++ {
  25. wg.Add(1)
  26. go consumer(i, jobs)
  27. }
  28. // adding jobs
  29. for _, values := range appconfigs {
  30. filesToTransfer := ops.ScanUploadPath(values.LocalPath)
  31. go producer(jobs, filesToTransfer)
  32. }
  33. go func() {
  34. wg.Wait()
  35. close(jobs)
  36. }()
  37. }

在我的producer函数中,当我在close(jobs)调用之前遇到死锁和关闭通道的恐慌问题。我读到应该将这个调用放在main()函数中:

  1. go func() {
  2. wg.Wait()
  3. close(jobs)
  4. }()

我不太明白为什么我需要在producer之外再开一个单独的go例程。我希望有人能解释一下为什么需要这样做。

英文:

I have a large number of directories all containing hundreds to thousands of files. I want to loop through the list of directories. Then call a go routine for each directory that will scan the directories for files and add the path of the each file to a job queue for a set of works to process.

This is what I have so far:

  1. type AppConfig struct {
  2. UploadPath string `mapstructure:&quot;upload_path&quot;`
  3. LocalPath string `mapstructure:&quot;local_path&quot;`
  4. Bucket string `mapstructure:&quot;bucket&quot;`
  5. }
  6. func consumer(i int, jobs &lt;-chan *ops.Job) {
  7. defer wg.Done()
  8. for job := range jobs {
  9. fmt.Printf(&quot;Worker: %v is processing file: %v\n&quot;, i, job.Work)
  10. }
  11. }
  12. func producer(jobs chan&lt;- *ops.Job, filesToTransfer []string) {
  13. for i, file := range filesToTransfer {
  14. jobs &lt;- &amp;ops.Job{Id: i, Work: file}
  15. }
  16. }
  17. func main() {
  18. var (
  19. appconfigs map[string]*ops.AppConfig
  20. wg *sync.WaitGroup
  21. )
  22. jobs := make(chan *ops.Job)
  23. // setting up workers
  24. for i := 0; i &lt; 10; i++ {
  25. wg.Add(1)
  26. go consumer(i, jobs)
  27. }
  28. // adding jobs
  29. for _, values := range appconfigs {
  30. filesToTransfer := ops.ScanUploadPath(values.LocalPath)
  31. go producer(jobs, filesToTransfer)
  32. }
  33. go func() {
  34. wg.Wait()
  35. close(jobs)
  36. }()
  37. }

I was running into a deadlock and panic on closed channel issue before when I had my close(jobs) call in my producer function. I read I should have this in my main() instead:

  1. go func() {
  2. wg.Wait()
  3. close(jobs)
  4. }()

I don't really understand why I need a separate go routine outside of my producer. I was hoping someone could explain why.

答案1

得分: 2

因为当主函数返回时程序会退出,将调用移动到goroutine中会掩盖这个问题。

使用以下代码:

// 启动工作线程。
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(&wg, i, jobs)
}

// 将工作发送给工作线程。
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)

  1. // 从主goroutine发送工作。
  2. // 与问题中使用goroutine相比,并没有获得任何好处。
  3. for i, file := range filesToTransfer {
  4. jobs <- &ops.Job{Id: i, Work: file}
  5. }

}

// 关闭通道以表示所有工作都已发送。
close(jobs)

// 等待工作线程完成。
wg.Wait()

英文:

Because the program exits when the main function returns, moving the calls to a goroutine masks the problem.

Use this code:

  1. // Start workers.
  2. var wg sync.WaitGroup
  3. jobs := make(chan *ops.Job)
  4. for i := 0; i &lt; 10; i++ {
  5. wg.Add(1)
  6. go consumer(&amp;wg, i, jobs)
  7. }
  8. // Send jobs to the workers.
  9. for _, values := range appconfigs {
  10. filesToTransfer := ops.ScanUploadPath(values.LocalPath)
  11. // Send the jobs from the main goroutine.
  12. // Nothing is gained by using a goroutine
  13. // as in the question.
  14. for i, file := range filesToTransfer {
  15. jobs &lt;- &amp;ops.Job{Id: i, Work: file}
  16. }
  17. }
  18. // Close the channel to signal that all jobs are
  19. // sent.
  20. close(jobs)
  21. // Wait for the workers to complete.
  22. wg.Wait()

huangapple
  • 本文由 发表于 2022年10月8日 10:06:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/73993915.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定