为什么我需要在单独的Go协程中使用wg.Wait()和close()函数?

huangapple go评论85阅读模式
英文:

Why do I need a wg.Wait() and close() in a separate go routine?

问题

我有很多包含数百到数千个文件的目录。我想循环遍历目录列表。然后为每个目录调用一个go例程,该例程将扫描目录中的文件,并将每个文件的路径添加到作业队列中以供一组工作处理。

这是我目前的代码:

type AppConfig struct {
    UploadPath string `mapstructure:"upload_path"`
    LocalPath  string `mapstructure:"local_path"`
    Bucket     string `mapstructure:"bucket"`
}

func consumer(i int, jobs <-chan *ops.Job) {
    defer wg.Done()
    for job := range jobs {
        fmt.Printf("Worker: %v is processing file: %v\n", i, job.Work)
    }
}

func producer(jobs chan<- *ops.Job, filesToTransfer []string) {
    for i, file := range filesToTransfer {
        jobs <- &ops.Job{Id: i, Work: file}
    }
}

func main() {
    var (
        appconfigs map[string]*ops.AppConfig
        wg *sync.WaitGroup
    )

    jobs := make(chan *ops.Job)

    // setting up workers
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go consumer(i, jobs)
    }

    // adding jobs
    for _, values := range appconfigs {
        filesToTransfer := ops.ScanUploadPath(values.LocalPath)
        go producer(jobs, filesToTransfer)
    }

    go func() {
        wg.Wait()
        close(jobs)
    }()
}

在我的producer函数中,当我在close(jobs)调用之前遇到死锁和关闭通道的恐慌问题。我读到应该将这个调用放在main()函数中:

go func() {
    wg.Wait()
    close(jobs)
}()

我不太明白为什么我需要在producer之外再开一个单独的go例程。我希望有人能解释一下为什么需要这样做。

英文:

I have a large number of directories all containing hundreds to thousands of files. I want to loop through the list of directories. Then call a go routine for each directory that will scan the directories for files and add the path of the each file to a job queue for a set of works to process.

This is what I have so far:

type AppConfig struct {
    UploadPath string `mapstructure:&quot;upload_path&quot;`
    LocalPath  string `mapstructure:&quot;local_path&quot;`
    Bucket     string `mapstructure:&quot;bucket&quot;`
}

func consumer(i int, jobs &lt;-chan *ops.Job) {
    defer wg.Done()
    for job := range jobs {
	    fmt.Printf(&quot;Worker: %v is processing file: %v\n&quot;, i, job.Work)
    }
}

func producer(jobs chan&lt;- *ops.Job, filesToTransfer []string) {
    for i, file := range filesToTransfer {
	    jobs &lt;- &amp;ops.Job{Id: i, Work: file}
    }
}

func main() {
    var (
        appconfigs map[string]*ops.AppConfig
        wg *sync.WaitGroup
    )

    jobs := make(chan *ops.Job)

    // setting up workers
    for i := 0; i &lt; 10; i++ {
	    wg.Add(1)
    	go consumer(i, jobs)
    }

    // adding jobs
    for _, values := range appconfigs {
	    filesToTransfer := ops.ScanUploadPath(values.LocalPath)
    	go producer(jobs, filesToTransfer)

    }

    go func() {
	    wg.Wait()
	    close(jobs)
    }()
}

I was running into a deadlock and panic on closed channel issue before when I had my close(jobs) call in my producer function. I read I should have this in my main() instead:

go func() {
	    wg.Wait()
	    close(jobs)
}()

I don't really understand why I need a separate go routine outside of my producer. I was hoping someone could explain why.

答案1

得分: 2

因为当主函数返回时程序会退出,将调用移动到goroutine中会掩盖这个问题。

使用以下代码:

// 启动工作线程。
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(&wg, i, jobs)
}

// 将工作发送给工作线程。
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)

// 从主goroutine发送工作。
// 与问题中使用goroutine相比,并没有获得任何好处。
for i, file := range filesToTransfer {
    jobs <- &ops.Job{Id: i, Work: file}
}

}

// 关闭通道以表示所有工作都已发送。
close(jobs)

// 等待工作线程完成。
wg.Wait()

英文:

Because the program exits when the main function returns, moving the calls to a goroutine masks the problem.

Use this code:

// Start workers.
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i &lt; 10; i++ {
	wg.Add(1)
	go consumer(&amp;wg, i, jobs)
}

// Send jobs to the workers.
for _, values := range appconfigs {
    filesToTransfer := ops.ScanUploadPath(values.LocalPath)

    // Send the jobs from the main goroutine.
    // Nothing is gained by using a goroutine
    // as in the question.
    for i, file := range filesToTransfer {
        jobs &lt;- &amp;ops.Job{Id: i, Work: file}
     }
}

// Close the channel to signal that all jobs are
// sent.
close(jobs)

// Wait for the workers to complete.
wg.Wait()

huangapple
  • 本文由 发表于 2022年10月8日 10:06:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/73993915.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定