英文:
Implementing a job worker pool in Go
问题
由于Go语言没有泛型,所有预制的解决方案都使用类型转换,而我并不太喜欢这种方法。我也想自己实现,尝试了以下代码。然而,有时它不会等待所有的goroutine,我是不是过早地关闭了jobs通道?我没有从中获取任何内容。我可能也使用了一个伪输出通道,并等待从中获取确切的数量,但我相信以下代码也应该可以工作。我漏掉了什么?
func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
英文:
Since Go does not have generics, all the premade solutions use type casting which I do not like very much. I also want to implement it on my own and tried the following code. However, sometimes it does not wait for all goroutines, am I closing the jobs channel prematurely? I do not have anything to fetch from them. I might have used a pseudo output channel too and waited to fetch the exact amount from them however I believe the following code should work too. What am I missing?
func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
答案1
得分: 1
在goroutine之外调用wg.Add,并传递一个指向等待组的指针。
如果在goroutine内部调用Add,主goroutine有可能在goroutine有机会运行之前调用Wait。如果没有调用Add,那么Wait将立即返回。
传递一个指向goroutine的指针。否则,goroutine将使用它们自己的等待组副本。
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs处理列表中的作业并删除它们
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// 启动工作线程
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
wg.Add(1)
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
英文:
Call wg.Add outside of the goroutine and pass a pointer to the wait group.
If Add is called from inside the goroutine, it's possible for the main goroutine to call Wait before the goroutines get a chance to run. If Add has not been called, then Wait will return immediately.
Pass a pointer to the goroutine. Otherwise, the goroutines use their own copy of the wait group.
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
wg.Add(1)
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
答案2
得分: 1
你需要传递一个指向waitgroup的指针,否则每个作业都会收到自己的副本。
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs处理列表中的作业并删除它们
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// 启动工作线程
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
英文:
You need to pass a pointer to the waitgroup, or else every job receives it's own copy.
func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
wg.Add(1)
defer wg.Done()
for job := range jobs {
item := ParseItem(job)
item.SaveItem()
MarkJobCompleted(item.ID)
log.Println("Saved", item.Title)
}
}
// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {
jobs := make(chan string)
list := GetJobs()
// Start workers
var wg sync.WaitGroup
for w := 0; w < 10; w++ {
go jobWorker(w, jobs, &wg)
}
for _, url := range list {
jobs <- url
}
close(jobs)
wg.Wait()
}
See the difference here: without pointer, with pointer.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论