在Go中实现一个作业工作池

huangapple go评论86阅读模式
英文:

Implementing a job worker pool in Go

问题

由于Go语言没有泛型,所有预制的解决方案都使用类型转换,而我并不太喜欢这种方法。我也想自己实现,尝试了以下代码。然而,有时它不会等待所有的goroutine,我是不是过早地关闭了jobs通道?我没有从中获取任何内容。我可能也使用了一个伪输出通道,并等待从中获取确切的数量,但我相信以下代码也应该可以工作。我漏掉了什么?

func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) {
	wg.Add(1)
	defer wg.Done()

	for job := range jobs {
		item := ParseItem(job)
		item.SaveItem()
		MarkJobCompleted(item.ID)
		log.Println("Saved", item.Title)
	}
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

	jobs := make(chan string)

	list := GetJobs()
	// Start workers
	var wg sync.WaitGroup
	for w := 0; w < 10; w++ {
		go jobWorker(w, jobs, wg)
	}

	for _, url := range list {
		jobs <- url
	}

	close(jobs)
	wg.Wait()
}
英文:

Since Go does not have generics, all the premade solutions use type casting which I do not like very much. I also want to implement it on my own and tried the following code. However, sometimes it does not wait for all goroutines, am I closing the jobs channel prematurely? I do not have anything to fetch from them. I might have used a pseudo output channel too and waited to fetch the exact amount from them however I believe the following code should work too. What am I missing?

func jobWorker(id int, jobs &lt;-chan string, wg sync.WaitGroup) {
	wg.Add(1)
	defer wg.Done()

	for job := range jobs {
		item := ParseItem(job)
		item.SaveItem()
		MarkJobCompleted(item.ID)
		log.Println(&quot;Saved&quot;, item.Title)
	}
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

	jobs := make(chan string)

	list := GetJobs()
	// Start workers
	var wg sync.WaitGroup
	for w := 0; w &lt; 10; w++ {
		go jobWorker(w, jobs, wg)
	}

	for _, url := range list {
		jobs &lt;- url
	}

	close(jobs)
	wg.Wait()
}

答案1

得分: 1

在goroutine之外调用wg.Add,并传递一个指向等待组的指针。

如果在goroutine内部调用Add,主goroutine有可能在goroutine有机会运行之前调用Wait。如果没有调用Add,那么Wait将立即返回。

传递一个指向goroutine的指针。否则,goroutine将使用它们自己的等待组副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {

	defer wg.Done()

	for job := range jobs {
		item := ParseItem(job)
		item.SaveItem()
		MarkJobCompleted(item.ID)
		log.Println("Saved", item.Title)
	}
}

// ProcessJobs处理列表中的作业并删除它们
func ProcessJobs() {

	jobs := make(chan string)

	list := GetJobs()
	// 启动工作线程
	var wg sync.WaitGroup
	for w := 0; w < 10; w++ {
		wg.Add(1)
		go jobWorker(w, jobs, &wg)
	}

	for _, url := range list {
		jobs <- url
	}

	close(jobs)
	wg.Wait()
}
英文:

Call wg.Add outside of the goroutine and pass a pointer to the wait group.

If Add is called from inside the goroutine, it's possible for the main goroutine to call Wait before the goroutines get a chance to run. If Add has not been called, then Wait will return immediately.

Pass a pointer to the goroutine. Otherwise, the goroutines use their own copy of the wait group.

func jobWorker(id int, jobs &lt;-chan string, wg *sync.WaitGroup) {

	defer wg.Done()

	for job := range jobs {
		item := ParseItem(job)
		item.SaveItem()
		MarkJobCompleted(item.ID)
		log.Println(&quot;Saved&quot;, item.Title)
	}
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

	jobs := make(chan string)

	list := GetJobs()
	// Start workers
	var wg sync.WaitGroup
	for w := 0; w &lt; 10; w++ {
        wg.Add(1)
		go jobWorker(w, jobs, &amp;wg)
	}

	for _, url := range list {
		jobs &lt;- url
	}

	close(jobs)
	wg.Wait()
}

答案2

得分: 1

你需要传递一个指向waitgroup的指针,否则每个作业都会收到自己的副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println("Saved", item.Title)
    }
}

// ProcessJobs处理列表中的作业并删除它们
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // 启动工作线程
    var wg sync.WaitGroup
    for w := 0; w < 10; w++ {
        go jobWorker(w, jobs, &wg)
    }

    for _, url := range list {
        jobs <- url
    }

    close(jobs)
    wg.Wait()
}

在这里可以看到区别:没有指针有指针

英文:

You need to pass a pointer to the waitgroup, or else every job receives it's own copy.

func jobWorker(id int, jobs &lt;-chan string, wg *sync.WaitGroup) {
    wg.Add(1)
    defer wg.Done()

    for job := range jobs {
        item := ParseItem(job)
        item.SaveItem()
        MarkJobCompleted(item.ID)
        log.Println(&quot;Saved&quot;, item.Title)
    }
}

// ProcessJobs processes the jobs from the list and deletes them
func ProcessJobs() {

    jobs := make(chan string)

    list := GetJobs()
    // Start workers
    var wg sync.WaitGroup
    for w := 0; w &lt; 10; w++ {
        go jobWorker(w, jobs, &amp;wg)
    }

    for _, url := range list {
        jobs &lt;- url
    }

    close(jobs)
    wg.Wait()
}

See the difference here: without pointer, with pointer.

huangapple
  • 本文由 发表于 2016年2月22日 07:35:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/35543545.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定