英文:
Golang worker pools - Queue new jobs from within jobs
问题
我正在尝试从一组桶中并发地构建一棵树,鉴于在Go语言中,工作模式似乎非常流行,我尝试将其应用到我的问题上。基本上,我启动了一定数量的工作线程,并让它们监听一个共享的作业通道。第一个工作线程接收树的根节点作为第一个作业,并填充相关信息,然后分支并创建2个更多的作业。然后这些作业应该分配给其他工作线程,这些工作线程将递归生成更多的作业,直到整个树被构建完成。
我天真的方法的简化表示类似于这样:
func workers(count int) {
wg := sync.WaitGroup{}
wg.Add(count)
jobs := make(chan job)
for i := 0; i < count; i++ {
go func() {
// 工作线程等待作业并执行它
for j := range jobs {
processJob(j, jobs)
}
wg.Done()
}()
}
// 从一些初始作业开始
jobs <- job{}
wg.Wait()
}
func processJob(j job, jobs chan job) {
// 当树完成时,关闭作业通道
if done {
close(jobs)
}
// 做一些其他无关紧要的事情
// 有时这个作业会产生2个新的作业
jobs <- job{}
jobs <- job{}
// 但是如果所有工作线程都试图发送作业而没有一个工作线程在接收端,这样是行不通的
}
问题是,我不能在一个作业中添加2个新的作业,因为在某个时刻,每个工作线程都会忙于尝试向通道发送作业,而没有工作线程在接收端。
有人能指导我一个优雅的解决方案,或者我的整个方法都是错误的吗?
英文:
I am trying to concurrently build a tree from a collection of buckets and given that the worker pattern seems to be very popular in go, I tried applying it to my problem. Basically, I start a given number of workers and make them listen to a shared jobs channel. The first worker then receives the trees root node as the first job and fills it with relevant information, before branching and creating 2 more jobs. These jobs are then supposed to be distributed among the other workers, which would then recursively generate even more jobs until the whole tree is constructed.
A simplified representation of my naive approach looks similar to this:
func workers(count int) {
wg := sync.WaitGroup{}
wg.Add(count)
jobs := make(chan job)
for i := 0; i < count; i++ {
go func() {
// worker waits for job and then executes it
for j := range jobs {
processJob(j, jobs)
}
wg.Done()
}()
}
// start with some initial job
jobs <- job{}
wg.Wait()
}
func processJob(j job, jobs chan job) {
// jobs channel is closed when tree is finished
if done {
close(jobs)
}
// Do some more irrelevant stuff
// sometimes 2 new jobs result from this one
jobs <- job{}
jobs <- job{}
// but that doesn't work, if all workers try to send and no one receives
}
The problem is, that I can't add 2 new jobs from within 1, because at some point every worker would be busy trying to send jobs to the channel and no worker would be on the receiving end.
Can anyone point me into the direction of an elegant solution, or is my whole approach to the problem wrong?
答案1
得分: 3
如果没有其他工作人员准备好处理工作,请使用当前工作人员:
func doJob(j job, jobs chan job) {
select {
case jobs <- j:
default:
// 发送到 jobs 通道不可用,直接在当前工作人员中执行任务
processJob(j, jobs)
}
}
将发送语句 jobs <- job{}
替换为调用 doJob(job{}, jobs)
。
使用带缓冲的通道来保持工作人员忙碌:
jobs := make(chan job, N)
调整 N
的值,直到找到工作人员大部分时间都在忙碌的值。N
的一个好的起始值是 count
。这个调整不是为了防止死锁。当 N
等于零时,程序不会发生死锁。
英文:
Use the current worker if no other worker is ready to process a job:
func doJob(j job, jobs chan job) {
select {
case jobs <- j:
default:
// Send to jobs was not ready, do the job
// in the current worker.
processJob(j, jobs)
}
}
Replace the send statement jobs <- job{}
with the call doJob(job{}, jobs)
.
Use a buffered channel to keep workers busy:
jobs := make(chan job, N)
Tune N
up until you find a value where the workers are mostly busy. A good starting value for N
is count
. This tuning is not required to prevent deadlock. The program does not deadlock when N is equal to zero.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论