工作线程池

huangapple go评论68阅读模式
英文:

Worker thread pool

问题

在提供的示例中,代码段展示了一个使用Golang处理每分钟100万个请求的调度器。你的问题是关于工作池(worker pool)的补充。

在代码中,工作池是一个通道(chan chan job),用于存储可用的工作通道。当一个任务到达时,调度器会从工作池中获取一个可用的工作通道,并将任务分派给该通道。这样,任务就会被发送到一个空闲的工作通道中进行处理。

你的疑问是,当调度器处理了MaxWorker数量的任务后,工作池是否会被耗尽,因为在第一次调用dispatcher.Run()之后,工作通道没有被重新填充。你是否漏掉了什么或者理解错误?工作池是如何重新填充可用的工作通道的?

实际上,代码中并没有展示工作池如何重新填充可用的工作通道。这段代码只是展示了如何将任务分派给工作通道进行处理。如果工作通道被耗尽,即没有可用的工作通道,那么调度器将会阻塞在jobChannel := <-d.WorkerPool这一行,直到有一个工作通道可用为止。

在实际应用中,你需要确保在调度器运行期间,工作池中始终有足够的可用工作通道。这可以通过在调度器启动之前初始化工作池,并在每个工作通道完成任务后将其放回工作池中来实现。这样,工作池就能够不断地提供可用的工作通道,以供调度器使用。

希望这能解答你的问题!如果还有其他疑问,请随时提出。

英文:

In the example provided at http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/ which has been cited in a lot of places.

func (d *Dispatcher) dispatch() {
for {
	select {
	case job := &lt;-JobQueue:
		// a job request has been received
		go func(job Job) {
			// try to obtain a worker job channel that is available.
			// this will block until a worker is idle
			jobChannel := &lt;-d.WorkerPool

			// dispatch the job to the worker job channel
			jobChannel &lt;- job
		 }(job)
	}
}
}

Wouldn't the worker pool (chan chan job) get depleted after MaxWorker number of jobs have been serviced by the dispatch? Since &lt;-d.WorkerPool is pulling from the channel and job channels are not being replenished after the first type dispatcher.Run() is invoked the first time? Or am I missing/misreading something ? How is the WorkerPool getting replenished with available job channels ?

go func(job Job) {
			// try to obtain a worker job channel that is available.
			// this will block until a worker is idle
			jobChannel := &lt;-d.WorkerPool

			// dispatch the job to the worker job channel
			jobChannel &lt;- job
		}(job)

答案1

得分: 3

如果你仔细阅读worker的代码,你会注意到以下部分:

w.WorkerPool <- w.JobChannel

每次循环开始时,worker自身的通道都会被放回。

我将整个函数复制如下:

func (w Worker) Start() {
    go func() {
        for {
            // 将当前worker注册到worker队列中。
            w.WorkerPool <- w.JobChannel

            select {
            case job := <-w.JobChannel:
                // 我们收到了一个工作请求。
                if err := job.Payload.UploadToS3(); err != nil {
                    log.Errorf("上传到S3时出错:%s", err.Error())
                }

            case <-w.quit:
                // 我们收到了停止信号。
                return
            }
        }
    }()
}
英文:

if you read the code of worker carefully, you will notice

w.WorkerPool &lt;- w.JobChannel

each time a loop begin, the channel of worker itself has been put back

I copy the whole function below:

func (w Worker) Start() {
	go func() {
		for {
			// register the current worker into the worker queue.
			w.WorkerPool &lt;- w.JobChannel

			select {
			case job := &lt;-w.JobChannel:
				// we have received a work request.
				if err := job.Payload.UploadToS3(); err != nil {
					log.Errorf(&quot;Error uploading to S3: %s&quot;, err.Error())
				}

			case &lt;-w.quit:
				// we have received a signal to stop
				return
			}
		}
	}()
}

huangapple
  • 本文由 发表于 2017年8月12日 22:30:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/45651387.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定