英文:
Worker thread pool
问题
在提供的示例中,代码段展示了一个使用Golang处理每分钟100万个请求的调度器。你的问题是关于工作池(worker pool)的补充。
在代码中,工作池是一个通道(chan chan job),用于存储可用的工作通道。当一个任务到达时,调度器会从工作池中获取一个可用的工作通道,并将任务分派给该通道。这样,任务就会被发送到一个空闲的工作通道中进行处理。
你的疑问是,当调度器处理了MaxWorker
数量的任务后,工作池是否会被耗尽,因为在第一次调用dispatcher.Run()
之后,工作通道没有被重新填充。你是否漏掉了什么或者理解错误?工作池是如何重新填充可用的工作通道的?
实际上,代码中并没有展示工作池如何重新填充可用的工作通道。这段代码只是展示了如何将任务分派给工作通道进行处理。如果工作通道被耗尽,即没有可用的工作通道,那么调度器将会阻塞在jobChannel := <-d.WorkerPool
这一行,直到有一个工作通道可用为止。
在实际应用中,你需要确保在调度器运行期间,工作池中始终有足够的可用工作通道。这可以通过在调度器启动之前初始化工作池,并在每个工作通道完成任务后将其放回工作池中来实现。这样,工作池就能够不断地提供可用的工作通道,以供调度器使用。
希望这能解答你的问题!如果还有其他疑问,请随时提出。
英文:
In the example provided at http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ which has been cited in a lot of places.
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
Wouldn't the worker pool (chan chan job) get depleted after MaxWorker
number of jobs have been serviced by the dispatch? Since <-d.WorkerPool
is pulling from the channel and job channels are not being replenished after the first type dispatcher.Run()
is invoked the first time? Or am I missing/misreading something ? How is the WorkerPool getting replenished with available job channels ?
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
答案1
得分: 3
如果你仔细阅读worker的代码,你会注意到以下部分:
w.WorkerPool <- w.JobChannel
每次循环开始时,worker自身的通道都会被放回。
我将整个函数复制如下:
func (w Worker) Start() {
go func() {
for {
// 将当前worker注册到worker队列中。
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// 我们收到了一个工作请求。
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("上传到S3时出错:%s", err.Error())
}
case <-w.quit:
// 我们收到了停止信号。
return
}
}
}()
}
英文:
if you read the code of worker carefully, you will notice
w.WorkerPool <- w.JobChannel
each time a loop begin, the channel of worker itself has been put back
I copy the whole function below:
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论