英文:
Job queue where workers can add jobs, is there an elegant solution to stop the program when all workers are idle?
问题
我发现自己处于这样一种情况:我有一个作业队列,工人在处理完一个作业后可以添加新的作业。
为了说明,下面的代码中,一个作业是计数到JOB_COUNTING_TO
,并且随机地,每5次中有1次工人会向队列中添加一个新的作业。
因为我的工人可以向队列中添加作业,所以我不能使用通道作为我的作业队列。这是因为向通道发送是阻塞的,即使使用了带缓冲的通道,由于这段代码的递归性质(作业添加作业),很容易出现所有工人都在向通道发送作业而没有工人可用来接收的情况。
这就是为什么我决定使用一个由互斥锁保护的共享队列。
现在,我希望程序在所有工人都处于空闲状态时停止。不幸的是,仅仅通过查看len(jobQueue) == 0
无法判断,因为队列可能为空,但仍有一些工人在执行任务,之后可能会添加新的作业。
我想到的解决办法有些笨拙,它使用了变量var idleWorkerCount int
和var isIdle [NB_WORKERS]bool
来跟踪空闲工人,当idleWorkerCount == NB_WORKERS
时,代码停止执行。
我的问题是,是否有一种并发模式可以使这个逻辑更加优雅?
此外,由于某种我不理解的原因,当工人数量变得非常大时(例如300000个工人),我目前使用的技术(下面的代码)变得非常低效:对于相同数量的作业,NB_WORKERS = 300000
与NB_WORKERS = 3000
相比,代码的执行速度将慢10倍以上。
非常感谢您的帮助!
package main
import (
"math/rand"
"sync"
)
const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000
var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool
func doJob(workerId int) {
mu.Lock()
if len(jobQueue) == 0 {
if !isIdle[workerId] {
idleWorkerCount += 1
}
isIdle[workerId] = true
mu.Unlock()
return
}
if isIdle[workerId] {
idleWorkerCount -= 1
}
isIdle[workerId] = false
var job int
job, jobQueue = jobQueue[0], jobQueue[1:]
mu.Unlock()
for i := 0; i < job; i += 1 {
}
if rand.Intn(5) == 0 {
mu.Lock()
jobQueue = append(jobQueue, JOB_COUNTING_TO)
mu.Unlock()
}
}
func main() {
// Filling up the queue with initial jobs
for i := 0; i < NB_INITIAL_JOBS; i += 1 {
jobQueue = append(jobQueue, JOB_COUNTING_TO)
}
var wg sync.WaitGroup
for i := 0; i < NB_WORKERS; i += 1 {
wg.Add(1)
go func(workerId int) {
for idleWorkerCount != NB_WORKERS {
doJob(workerId)
}
wg.Done()
}(i)
}
wg.Wait()
}
英文:
I find myself in a situation where I have a queue of jobs where workers can add new jobs when they are done processing one.
For illustration, in the code below, a job consists in counting up to JOB_COUNTING_TO
and, randomly, 1/5 of the time a worker will add a new job to the queue.
Because my workers can add jobs to the queue, it is my understanding that I was not able to use a channel as my job queue. This is because sending to the channel is blocking and, even with a buffered channel, this code, due to its recursive nature (jobs adding jobs) could easily reach a situation where all the workers are sending to the channel and no worker is available to receive.
This is why I decided to use a shared queue protected by a mutex.
Now, I would like the program to halt when all the workers are idle. Unfortunately this cannot be spotted just by looking for when len(jobQueue) == 0
as the queue could be empty but some worker still doing their job and maybe adding a new job after that.
The solution I came up with is, I feel a bit clunky, it makes use of variables var idleWorkerCount int
and var isIdle [NB_WORKERS]bool
to keep track of idle workers and the code stops when idleWorkerCount == NB_WORKERS
.
My question is if there is a concurrency pattern that I could use to make this logic more elegant?
Also, for some reason I don't understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000
vs NB_WORKERS = 3000
.
Thank you very much in advance!
package main
import (
"math/rand"
"sync"
)
const NB_WORKERS = 3000
const NB_INITIAL_JOBS = 300
const JOB_COUNTING_TO = 10000000
var jobQueue []int
var mu sync.Mutex
var idleWorkerCount int
var isIdle [NB_WORKERS]bool
func doJob(workerId int) {
mu.Lock()
if len(jobQueue) == 0 {
if !isIdle[workerId] {
idleWorkerCount += 1
}
isIdle[workerId] = true
mu.Unlock()
return
}
if isIdle[workerId] {
idleWorkerCount -= 1
}
isIdle[workerId] = false
var job int
job, jobQueue = jobQueue[0], jobQueue[1:]
mu.Unlock()
for i := 0; i < job; i += 1 {
}
if rand.Intn(5) == 0 {
mu.Lock()
jobQueue = append(jobQueue, JOB_COUNTING_TO)
mu.Unlock()
}
}
func main() {
// Filling up the queue with initial jobs
for i := 0; i < NB_INITIAL_JOBS; i += 1 {
jobQueue = append(jobQueue, JOB_COUNTING_TO)
}
var wg sync.WaitGroup
for i := 0; i < NB_WORKERS; i += 1 {
wg.Add(1)
go func(workerId int) {
for idleWorkerCount != NB_WORKERS {
doJob(workerId)
}
wg.Done()
}(i)
}
wg.Wait()
}
答案1
得分: 1
因为我的工人可以向队列中添加作业
一个可重入通道总是会发生死锁。可以使用以下代码轻松演示这一点:
package main
import (
"fmt"
)
func main() {
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
c <- v + " 2"
out <- v
}
}()
go func() {
c <- "hello world!" // 可以通过
c <- "hello world!" // 无法通过,该例程在向自身推送时被阻塞
}()
for v := range out {
fmt.Println(v)
}
}
当程序尝试在 c <- v + " 2"
处推送时,它无法在 for v := range c {
处读取,无法在 c <- "hello world!"
处推送,也无法在 for v := range out {
处读取,因此发生了死锁。
如果要通过这种情况,必须在某个地方进行溢出。
可以在例程中或其他地方进行溢出。
package main
import (
"fmt"
"time"
)
func main() {
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
go func() { // 使用堆栈上的例程作为所需溢出的银行。
<-time.After(time.Second) // 模拟缓慢。
c <- v + " 2"
}()
out <- v
}
}()
go func() {
for {
c <- "hello world!"
}
}()
exit := time.After(time.Second * 60)
for v := range out {
fmt.Println(v)
select {
case <-exit:
return
default:
}
}
}
但现在你有一个新问题。
通过在堆栈上无限制地进行溢出,你创建了一个内存炸弹。从技术上讲,这取决于完成作业所需的时间、可用的内存、CPU 的速度和数据的形状(它们可能会生成新的作业或不会生成新的作业)。因此,存在一个上限,但在实践中很难理解它。
考虑不在堆栈上无限制地进行溢出。
如果你手头没有任何任意的限制,可以使用信号量来限制溢出。
https://play.golang.org/p/5JWPQiqOYKz
我的炸弹在工作超时为1秒和2秒时没有爆炸,但它们占用了大量的内存。
在修改后的代码中,它爆炸了
当然,因为你在代码中使用了 if rand.Intn(5) == 0 {
,问题在很大程度上得到了缓解。不过,当遇到这种模式时,要仔细考虑代码。
另外,由于某种我不理解的原因,我目前使用的技术(下面的代码)在工人数量变得非常大(例如300000个工人)时变得非常低效:对于相同数量的作业,NB_WORKERS = 300000 与 NB_WORKERS = 3000 相比,代码的速度将慢10倍以上。
从宏观上看,你拥有有限的 CPU 周期。所有这些分配和指令,用于生成和同步,也必须执行。并发不是免费的。
现在,我希望在所有工人都空闲时程序停止。
我想出了以下解决方案,但我发现很难理解并确信它不会导致 write on closed channel
的恐慌。
这个想法是使用 sync.WaitGroup 来计算正在进行的项目,并依赖它来正确关闭输入通道并完成作业。
package main
import (
"log"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
var wg sync.WaitGroup
var wgr sync.WaitGroup
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
if rand.Intn(5) == 0 {
wgr.Add(1)
go func(v string) {
<-time.After(time.Microsecond)
c <- v + " 2"
}(v)
}
wgr.Done()
out <- v
}
close(out)
}()
var sent int
wg.Add(1)
go func() {
for i := 0; i < 300; i++ {
wgr.Add(1)
c <- "hello world!"
sent++
}
wg.Done()
}()
go func() {
wg.Wait()
wgr.Wait()
close(c)
}()
var rcv int
for v := range out {
// fmt.Println(v)
_ = v
rcv++
}
log.Println("sent", sent)
log.Println("rcv", rcv)
}
我使用 while go run -race .; do :; done
运行它,对于合理数量的迭代,它工作正常。
英文:
> Because my workers can add jobs to the queue
A re entrant channel always deadlock. This is easy to demonstrate using this code
package main
import (
"fmt"
)
func main() {
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
c <- v + " 2"
out <- v
}
}()
go func() {
c <- "hello world!" // pass OK
c <- "hello world!" // no pass, the routine is blocking at pushing to itself
}()
for v := range out {
fmt.Println(v)
}
}
While the program
- tries to push at
c <- v + " 2"
it can not
- read at
for v := range c {
, - push at
c <- "hello world!"
- read at
for v := range out {
thus, it deadlocks.
If you want to pass that situation you must overflow somewhere.
On the routines, or somewhere else.
package main
import (
"fmt"
"time"
)
func main() {
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
go func() { // use routines on the stack as a bank for the required overflow.
<-time.After(time.Second) // simulate slowliness.
c <- v + " 2"
}()
out <- v
}
}()
go func() {
for {
c <- "hello world!"
}
}()
exit := time.After(time.Second * 60)
for v := range out {
fmt.Println(v)
select {
case <-exit:
return
default:
}
}
}
But now you have a new problem.
You created a memory bomb by overflowing without limits on the stack. Technically, this is dependent on the time needed to finish a job, the memory available, the speed of your cpus and the shape of the data (they might or might not generate a new job). So there is a upper limit, but it is so hard to make sense of it, that in practice this ends up to be a bomb.
Consider not overflowing without limits on the stack.
If you dont have any arbitrary limit on hand, you can use a semaphore to cap the overflow.
https://play.golang.org/p/5JWPQiqOYKz
my bombs did not explode with a work timeout of 1s and 2s, but they took a large chunk of memory.
In another round with a modified code, it exploded
Of course, because you use if rand.Intn(5) == 0 {
in your code, the problem is largely mitigated. Though, when you meet such pattern, think twice to the code.
> Also, for some reason I don't understand the technique that I currently use (code below) becomes really inefficient when the number of Workers becomes quite big (such as 300000 workers): for the same number of jobs, the code will be > 10x slower for NB_WORKERS = 300000 vs NB_WORKERS = 3000.
In the big picture, you have a limited amount of cpu cycles. All those allocations and instructions, to spawn and synchronize, has to be executed too. Concurrency is not free.
> Now, I would like the program to halt when all the workers are idle.
I came up with that but i find it very difficult to reason about and convince myself it wont end up in a write on closed channel
panic.
The idea is to use a sync.WaitGroup to count in flight items and rely on it to properly close the input channel and finish the job.
package main
import (
"log"
"math/rand"
"sync"
"time"
)
func main() {
rand.Seed(time.Now().UnixNano())
var wg sync.WaitGroup
var wgr sync.WaitGroup
out := make(chan string)
c := make(chan string)
go func() {
for v := range c {
if rand.Intn(5) == 0 {
wgr.Add(1)
go func(v string) {
<-time.After(time.Microsecond)
c <- v + " 2"
}(v)
}
wgr.Done()
out <- v
}
close(out)
}()
var sent int
wg.Add(1)
go func() {
for i := 0; i < 300; i++ {
wgr.Add(1)
c <- "hello world!"
sent++
}
wg.Done()
}()
go func() {
wg.Wait()
wgr.Wait()
close(c)
}()
var rcv int
for v := range out {
// fmt.Println(v)
_ = v
rcv++
}
log.Println("sent", sent)
log.Println("rcv", rcv)
}
I ran it with while go run -race .; do :; done
it worked fine for a reasonable amount of iterations.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论