Golang: consume items for multiple workers from channel in `case` statement



My consumer (run from main) supports context cancellation and reading from a channel via case statement. I can shutdown the consumer with the context, that works fine. However, when I spawn several workers in one case statement, every worker is given the same job (message) from jobsChan, which is not what I want:

  1. func (app *App) consumer() {
  2. for {
  3. select {
  4. case &lt;-app.ctx.Done():
  5. app.infoLog.Print(&quot;Caught SIGINT, stopping.&quot;)
  6. app.wg.Wait()
  7. app.doneChan &lt;- struct{}{} # main uses this channel to block itself until all goroutines are stopped
  8. app.infoLog.Print(&quot;Shutting down the consumer...&quot;)
  9. return
  10. case job := &lt;-app.jobsChan:
  11. // PROBLEM here: wrong, each worker is given the same job
  12. for workerNumber := 0; workerNumber &lt; app.config.workers; workerNumber++ {
  13. app.wg.Add(1)
  14. go app.workerFunc(workerNumber, job)
  15. }
  16. }
  17. }
  18. }
  19. func (app *App) workerFunc(id int, job Job) {
  20. defer app.wg.Done()
  21. ... actual worker code here ...
  22. }

How can I rewrite this code so that I can keep select for app.ctx.Done channel and at the same time can spawn workers so that each worker picks next message from the channel as a Job? I need to keep for/select to listen for ctx cancellation but at the same time I need to spawn X workers reading messages from jobsChan in the consumer. Is this possible?

The only alternative that comes to mind is passing channel directly into spawned workerFunc and have another for job := range app.jobsChan in the workerFunc. But then the whole case job := &lt;-app.jobsChan: in the consumer becomes pointless and I am not sure how to rewrite it.

To clarify: When I run the app, I expect every worker to have a new job id pulled from the jobsChan - but they all process the same, e.g. 1, then they all process the next one, e.g. 2

  1. #wrong
  2. Worker 0: start processing item 1
  3. Worker 2: start processing item 1
  4. Worker 1: start processing item 1


Your existing code explicitly assigns the same job to all the workers. If you have a fixed number of workers, create goroutines for them (during initialization), and have them listen to a channel:

  1. for workerNumber:0;workerNumber&lt;app.config.workers;workerNumber++ {
  2. go app.workerFunc(ctx,workerNumber,app.jobsChan)
  3. }

In each worker, simply check the jobQueue and the context cancellation.

In other words, you don't need the consumer, pass jobs directly to the workers.

