英文:
Simple worker pool in Go
问题
我正在尝试在Go语言中实现一个简单的工作池,但一直遇到问题。我只想做的是有一定数量的工作线程,在完成一定量的工作后继续获取更多的工作。我使用的代码类似于:
jobs := make(chan imageMessage, 1)
results := make(chan imageMessage, 1)
for w := 0; w < 2; w++ {
go worker(jobs, results)
}
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
for r := 0; r < len(images); r++ {
<-results
}
func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
for j := range jobs {
processImage(j.path, j.img)
results <- j
}
}
我理解的是,这应该创建了2个工作线程,每次只能处理1个任务,并在完成一个任务后继续获取更多的任务,直到没有其他任务可做。然而,我得到了fatal error: all goroutines are asleep - deadlock!
的错误。
如果我将缓冲区设置为一个很大的值,比如100,那么这个代码可以工作,但我希望能够限制同时进行的工作量。
我觉得我离成功很近,但显然还是漏掉了什么东西。
英文:
I am trying to implement a simple worker pool in go and keep running into issues. All I want to do is have a set number of workers that do a set amount of work before getting more work to do. The code I am using looks similar to:
jobs := make(chan imageMessage, 1)
results := make(chan imageMessage, 1)
for w := 0; w < 2; w++ {
go worker(jobs, results)
}
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
for r := 0; r < len(images); r++ {
<-results
}
}
func worker(jobs <-chan imageMessage, results chan<- imageMessage) {
for j := range jobs {
processImage(j.path, j.img)
results <- j
}
}
My understanding is that this should create 2 workers that can do 1 "thing" at a time and will continue to get more work as they complete that 1 thing until there is nothing else to do. However, I get fatal error: all goroutines are asleep - deadlock!
If I set the buffer to something huge like 100, this works, but I want to be able to limit the work done at a time.
I feel like I'm close, but obviously missing something.
答案1
得分: 4
问题在于,你只有在成功将所有作业发送到jobs
通道后,才开始“排空”results
通道。但是要能够发送所有作业,要么jobs
通道必须具有足够大的缓冲区,要么工作协程必须能够从中消费作业。
但是,当工作协程消费一个作业时,在它可以获取下一个作业之前,会将结果发送到results
通道。如果results
通道的缓冲区已满,则发送结果将被阻塞。
但是最后一部分——一个工作协程在发送结果时被阻塞——只能通过从results
通道接收来“解除阻塞”,而你在发送所有作业之前并没有接收。如果jobs
通道和results
通道的缓冲区无法容纳所有作业,就会发生死锁。这也解释了为什么如果将缓冲区大小增加到一个很大的值,它就能正常工作:如果作业可以适应缓冲区,就不会发生死锁,并且在成功发送所有作业之后,最终的循环将排空results
通道。
解决方案是在自己的协程中运行生成和发送作业的过程,这样你就可以“立即”开始从results
通道接收,而不必等待发送所有作业,这意味着工作协程将不会永远被阻塞在尝试发送结果的过程中:
go func() {
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
}()
在Go Playground上尝试一下。
还可以在https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204中查看类似的实现。
英文:
The problem is that you only start "draining" the results
channel, once you successfully sent all jobs on the jobs
channel. But for you to be able to send all jobs, either the jobs
channel must have big enough buffer, or worker goroutines must be able to consume jobs from it.
But a worker goroutines when consuming a job, before it could take the next one, sends the result on the results
channel. If the buffer of the results
channel is full, sending the result will block.
But the last part – a worker goroutine blocked in sending the result – can only be "unblocked" by receiving from the results
channel – which you don't until you can send all the jobs. Deadlock if the buffer of the jobs
channel and the results
channel cannot hold all your jobs. This also explains why it works if you increase the buffer size to a big value: if the jobs can fit into the buffers, deadlock will not occur, and after all jobs are sent successfully, your final loop will drain the results
channel.
The solution? Run generating-and-sending jobs in its own goroutine, so you can start receiving from the results
channel "immediately" without having to wait to send all jobs, which means the worker goroutines will not get blocked forever trying to send results:
go func() {
for j := 0; j < len(images); j++ {
jobs <- imageMessage{path: paths[j], img: images[j]}
}
close(jobs)
}()
Try it on the Go Playground.
Also check out a similar implementation in https://stackoverflow.com/questions/38170852/is-this-an-idiomatic-worker-thread-pool-in-go/38172204#38172204
答案2
得分: 0
你可以使用这个工作器。简单高效。https://github.com/tamnguyenvt/go-worker
NewWorkerManager(WorkerManagerParams{
WorkerSize: <工作器数量>,
RelaxAfter: <在给定的时间间隔后休眠以放松服务器>,
RelaxDuration: <放松持续时间>,
WorkerFunc: <在这里输入你的工作器函数>,
LogEnable: <是否启用日志>,
StopTimeout: <在给定的时间间隔后超时所有工作器>,
})
英文:
You can use this worker. Simple and efficient. https://github.com/tamnguyenvt/go-worker
NewWorkerManager(WorkerManagerParams{
WorkerSize: <number of workers>,
RelaxAfter: <sleep for awhile to relax server after given duration>,
RelaxDuration: <relax duration>,
WorkerFunc: <your worker function here>,
LogEnable: <enable log or not>,
StopTimeout: <timeout all workers after given duration>,
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论