Golang:在`case`语句中从通道中为多个工作器消费项目。

huangapple go评论86阅读模式
英文:

Golang: consume items for multiple workers from channel in `case` statement

问题

我的消费者(从main运行)支持上下文取消和通过case语句从通道读取。我可以使用上下文关闭消费者,这个功能正常工作。然而,当我在一个case语句中生成多个工作线程时,每个工作线程都会从jobsChan接收到相同的任务(消息),这不是我想要的:

func (app *App) consumer() {
	for {
		select {
		case <-app.ctx.Done():
			app.infoLog.Print("Caught SIGINT, stopping.")
			app.wg.Wait()
			app.doneChan <- struct{}{} // main使用此通道阻塞自身,直到所有goroutine停止
			app.infoLog.Print("Shutting down the consumer...")
			return
		case job := <-app.jobsChan:
			// 这里有问题:每个工作线程都得到相同的任务
			for workerNumber := 0; workerNumber < app.config.workers; workerNumber++ {
				app.wg.Add(1)
				go app.workerFunc(workerNumber, job)
			}
		}
	}
}

func (app *App) workerFunc(id int, job Job) {
	defer app.wg.Done()
	
    ... 实际的工作代码在这里 ...
}

我该如何重写这段代码,以便我可以保持select监听app.ctx.Done通道,同时可以生成工作线程,使每个工作线程从通道中选择下一个消息作为任务?我需要保持for/select监听ctx取消,但同时我需要在消费者中生成X个从jobsChan读取消息的工作线程。这可能吗?

我能想到的唯一替代方法是直接将通道传递给生成的workerFunc,并在workerFunc中使用另一个for job := range app.jobsChan。但是这样一来,消费者中的整个case job := <-app.jobsChan:就变得没有意义了,我不确定如何重写它。

澄清一下:当我运行应用程序时,我希望每个工作线程都从jobsChan中获取一个新的任务ID - 但它们都处理相同的任务,例如1,然后它们都处理下一个任务,例如2

# 错误的输出
Worker 0: 开始处理项目1
Worker 2: 开始处理项目1
Worker 1: 开始处理项目1
英文:

My consumer (run from main) supports context cancellation and reading from a channel via case statement. I can shutdown the consumer with the context, that works fine. However, when I spawn several workers in one case statement, every worker is given the same job (message) from jobsChan, which is not what I want:

func (app *App) consumer() {
	for {
		select {
		case &lt;-app.ctx.Done():
			app.infoLog.Print(&quot;Caught SIGINT, stopping.&quot;)
			app.wg.Wait()
			app.doneChan &lt;- struct{}{} # main uses this channel to block itself until all goroutines are stopped
			app.infoLog.Print(&quot;Shutting down the consumer...&quot;)
			return
		case job := &lt;-app.jobsChan:
			// PROBLEM here: wrong, each worker is given the same job
			for workerNumber := 0; workerNumber &lt; app.config.workers; workerNumber++ {
				app.wg.Add(1)
				go app.workerFunc(workerNumber, job)
			}
		}
	}
}

func (app *App) workerFunc(id int, job Job) {
	defer app.wg.Done()
	
    ... actual worker code here ...
}

How can I rewrite this code so that I can keep select for app.ctx.Done channel and at the same time can spawn workers so that each worker picks next message from the channel as a Job? I need to keep for/select to listen for ctx cancellation but at the same time I need to spawn X workers reading messages from jobsChan in the consumer. Is this possible?

The only alternative that comes to mind is passing channel directly into spawned workerFunc and have another for job := range app.jobsChan in the workerFunc. But then the whole case job := &lt;-app.jobsChan: in the consumer becomes pointless and I am not sure how to rewrite it.

To clarify: When I run the app, I expect every worker to have a new job id pulled from the jobsChan - but they all process the same, e.g. 1, then they all process the next one, e.g. 2

#wrong
Worker 0: start processing item 1
Worker 2: start processing item 1
Worker 1: start processing item 1

答案1

得分: 0

你现有的代码明确地将相同的工作分配给所有的工作者。如果你有固定数量的工作者,可以在初始化期间为它们创建 goroutine,并让它们监听一个通道:

for workerNumber := 0; workerNumber < app.config.workers; workerNumber++ {
   go app.workerFunc(ctx, workerNumber, app.jobsChan)
}

在每个工作者中,只需检查 jobQueue 和上下文的取消。

换句话说,你不需要 consumer,直接将工作传递给工作者即可。

英文:

Your existing code explicitly assigns the same job to all the workers. If you have a fixed number of workers, create goroutines for them (during initialization), and have them listen to a channel:

for workerNumber:0;workerNumber&lt;app.config.workers;workerNumber++ {
   go app.workerFunc(ctx,workerNumber,app.jobsChan)
}

In each worker, simply check the jobQueue and the context cancellation.

In other words, you don't need the consumer, pass jobs directly to the workers.

huangapple
  • 本文由 发表于 2021年10月30日 08:42:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/69775990.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定