英文:
Why do I need a wg.Wait() and close() in a separate go routine?
问题
我有很多包含数百到数千个文件的目录。我想循环遍历目录列表。然后为每个目录调用一个go例程,该例程将扫描目录中的文件,并将每个文件的路径添加到作业队列中以供一组工作处理。
这是我目前的代码:
type AppConfig struct {
UploadPath string `mapstructure:"upload_path"`
LocalPath string `mapstructure:"local_path"`
Bucket string `mapstructure:"bucket"`
}
func consumer(i int, jobs <-chan *ops.Job) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker: %v is processing file: %v\n", i, job.Work)
}
}
func producer(jobs chan<- *ops.Job, filesToTransfer []string) {
for i, file := range filesToTransfer {
jobs <- &ops.Job{Id: i, Work: file}
}
}
func main() {
var (
appconfigs map[string]*ops.AppConfig
wg *sync.WaitGroup
)
jobs := make(chan *ops.Job)
// setting up workers
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(i, jobs)
}
// adding jobs
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)
go producer(jobs, filesToTransfer)
}
go func() {
wg.Wait()
close(jobs)
}()
}
在我的producer
函数中,当我在close(jobs)
调用之前遇到死锁和关闭通道的恐慌问题。我读到应该将这个调用放在main()
函数中:
go func() {
wg.Wait()
close(jobs)
}()
我不太明白为什么我需要在producer
之外再开一个单独的go例程。我希望有人能解释一下为什么需要这样做。
英文:
I have a large number of directories all containing hundreds to thousands of files. I want to loop through the list of directories. Then call a go routine for each directory that will scan the directories for files and add the path of the each file to a job queue for a set of works to process.
This is what I have so far:
type AppConfig struct {
UploadPath string `mapstructure:"upload_path"`
LocalPath string `mapstructure:"local_path"`
Bucket string `mapstructure:"bucket"`
}
func consumer(i int, jobs <-chan *ops.Job) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker: %v is processing file: %v\n", i, job.Work)
}
}
func producer(jobs chan<- *ops.Job, filesToTransfer []string) {
for i, file := range filesToTransfer {
jobs <- &ops.Job{Id: i, Work: file}
}
}
func main() {
var (
appconfigs map[string]*ops.AppConfig
wg *sync.WaitGroup
)
jobs := make(chan *ops.Job)
// setting up workers
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(i, jobs)
}
// adding jobs
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)
go producer(jobs, filesToTransfer)
}
go func() {
wg.Wait()
close(jobs)
}()
}
I was running into a deadlock and panic on closed channel issue before when I had my close(jobs) call in my producer function. I read I should have this in my main() instead:
go func() {
wg.Wait()
close(jobs)
}()
I don't really understand why I need a separate go routine outside of my producer. I was hoping someone could explain why.
答案1
得分: 2
因为当主函数返回时程序会退出,将调用移动到goroutine中会掩盖这个问题。
使用以下代码:
// 启动工作线程。
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(&wg, i, jobs)
}
// 将工作发送给工作线程。
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)
// 从主goroutine发送工作。
// 与问题中使用goroutine相比,并没有获得任何好处。
for i, file := range filesToTransfer {
jobs <- &ops.Job{Id: i, Work: file}
}
}
// 关闭通道以表示所有工作都已发送。
close(jobs)
// 等待工作线程完成。
wg.Wait()
英文:
Because the program exits when the main function returns, moving the calls to a goroutine masks the problem.
Use this code:
// Start workers.
var wg sync.WaitGroup
jobs := make(chan *ops.Job)
for i := 0; i < 10; i++ {
wg.Add(1)
go consumer(&wg, i, jobs)
}
// Send jobs to the workers.
for _, values := range appconfigs {
filesToTransfer := ops.ScanUploadPath(values.LocalPath)
// Send the jobs from the main goroutine.
// Nothing is gained by using a goroutine
// as in the question.
for i, file := range filesToTransfer {
jobs <- &ops.Job{Id: i, Work: file}
}
}
// Close the channel to signal that all jobs are
// sent.
close(jobs)
// Wait for the workers to complete.
wg.Wait()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论