Golang工作池 – 在作业中排队新作业

huangapple go评论82阅读模式
英文:

Golang worker pools - Queue new jobs from within jobs

问题

我正在尝试从一组桶中并发地构建一棵树,鉴于在Go语言中,工作模式似乎非常流行,我尝试将其应用到我的问题上。基本上,我启动了一定数量的工作线程,并让它们监听一个共享的作业通道。第一个工作线程接收树的根节点作为第一个作业,并填充相关信息,然后分支并创建2个更多的作业。然后这些作业应该分配给其他工作线程,这些工作线程将递归生成更多的作业,直到整个树被构建完成。

我天真的方法的简化表示类似于这样:

func workers(count int) {

	wg := sync.WaitGroup{}
	wg.Add(count)

	jobs := make(chan job)
	for i := 0; i < count; i++ {
		go func() {
			// 工作线程等待作业并执行它
			for j := range jobs {
				processJob(j, jobs)
			}
			wg.Done()
		}()
	}

	// 从一些初始作业开始
	jobs <- job{}

	wg.Wait()

}

func processJob(j job, jobs chan job) {

	// 当树完成时,关闭作业通道
	if done {
		close(jobs)
	}
	// 做一些其他无关紧要的事情

	// 有时这个作业会产生2个新的作业
	jobs <- job{}
	jobs <- job{}
	// 但是如果所有工作线程都试图发送作业而没有一个工作线程在接收端,这样是行不通的

}

问题是,我不能在一个作业中添加2个新的作业,因为在某个时刻,每个工作线程都会忙于尝试向通道发送作业,而没有工作线程在接收端。

有人能指导我一个优雅的解决方案,或者我的整个方法都是错误的吗?

英文:

I am trying to concurrently build a tree from a collection of buckets and given that the worker pattern seems to be very popular in go, I tried applying it to my problem. Basically, I start a given number of workers and make them listen to a shared jobs channel. The first worker then receives the trees root node as the first job and fills it with relevant information, before branching and creating 2 more jobs. These jobs are then supposed to be distributed among the other workers, which would then recursively generate even more jobs until the whole tree is constructed.
A simplified representation of my naive approach looks similar to this:

func workers(count int) {

	wg := sync.WaitGroup{}
	wg.Add(count)

	jobs := make(chan job)
	for i := 0; i &lt; count; i++ {
		go func() {
			// worker waits for job and then executes it
			for j := range jobs {
				processJob(j, jobs)
			}
			wg.Done()
		}()
	}

	// start with some initial job
	jobs &lt;- job{}

	wg.Wait()

}

func processJob(j job, jobs chan job) {

	// jobs channel is closed when tree is finished
	if done {
		close(jobs)
	}
	// Do some more irrelevant stuff

	// sometimes 2 new jobs result from this one
	jobs &lt;- job{}
	jobs &lt;- job{}
	// but that doesn&#39;t work, if all workers try to send and no one receives

}

The problem is, that I can't add 2 new jobs from within 1, because at some point every worker would be busy trying to send jobs to the channel and no worker would be on the receiving end.

Can anyone point me into the direction of an elegant solution, or is my whole approach to the problem wrong?

答案1

得分: 3

如果没有其他工作人员准备好处理工作,请使用当前工作人员:

func doJob(j job, jobs chan job) {
    select {
    case jobs <- j:
    default:
        // 发送到 jobs 通道不可用,直接在当前工作人员中执行任务
        processJob(j, jobs)
    }
}

将发送语句 jobs <- job{} 替换为调用 doJob(job{}, jobs)

使用带缓冲的通道来保持工作人员忙碌:

jobs := make(chan job, N)

调整 N 的值,直到找到工作人员大部分时间都在忙碌的值。N 的一个好的起始值是 count。这个调整不是为了防止死锁。当 N 等于零时,程序不会发生死锁。

英文:

Use the current worker if no other worker is ready to process a job:

func doJob(j job, jobs chan job) {
	select {
	case jobs &lt;- j:
	default:
        // Send to jobs was not ready, do the job
        // in the current worker.
		processJob(j, jobs)
	}
}

Replace the send statement jobs &lt;- job{} with the call doJob(job{}, jobs).

Use a buffered channel to keep workers busy:

jobs := make(chan job, N)

Tune N up until you find a value where the workers are mostly busy. A good starting value for N is count. This tuning is not required to prevent deadlock. The program does not deadlock when N is equal to zero.

huangapple
  • 本文由 发表于 2021年6月6日 04:25:04
  • 转载请务必保留本文链接:https://go.coder-hub.com/67853647.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定