Golang:生产者/消费者并发模型,但结果是串行化的。

huangapple go评论154阅读模式
英文:

Golang: Producer/Consumer concurrency model but with serialized results

问题

主函数中的代码是将作业推送到jobChan,goroutine将从jobChan中取出作业并并发执行,并将结果推送到resultsChan中。然后我们将从resultsChan中取出结果。

问题1:

在我的代码中,没有对结果进行序列化/线性化处理。虽然作业按照job1、job2、job3的顺序进入,但结果可能按照job3、job1、job2的顺序出来,这取决于哪个作业花费的时间最长。

我仍然希望并发执行作业,但我需要确保结果按照作业的顺序从resultsChan中出来。

问题2:

我大约有30万个作业,这意味着代码将生成多达30万个goroutine。拥有这么多goroutine是否高效,或者将作业分组到一个包含100个作业的切片中,每个goroutine处理100个作业,是否更好?

英文:
func main() {
	jobs := []Job{job1, job2, job3}
	numOfJobs := len(jobs)
	resultsChan := make(chan *Result, numOfJobs)
	jobChan := make(chan *job, numOfJobs)
	go consume(numOfJobs, jobChan, resultsChan)
	for i := 0; i < numOfJobs; i++ {
		jobChan <- jobs[i]
	}
	close(jobChan)

	for i := 0; i < numOfJobs; i++ {
		<-resultsChan
	}
	close(resultsChan)
}

func (b *Blockchain) consume(num int, jobChan chan *Job, resultsChan chan *Result) {
	for i := 0; i < num; i++ {
		go func() {
			job := <-jobChan
			resultsChan <- doJob(job)
		}()
	}
}

In the above example, jobs are pushed into the jobChan and goroutines will pull it off the jobChan and execute the jobs concurrently and push results into resultsChan. We will then pull results out of resultsChan.

Question 1:

In my code, there is no serialized/linearilized results. Although jobs go in the order of job1, job2, job3. The results might come out as job3, job1, job2, depending which one takes the longest.

I would still like to execute the jobs concurrently, however, I need to make sure that results come out of the resultsChan in the same order that it went in as jobs.

Question2:

I have approximately 300k jobs, this means the code will generate up to 300k goroutines. Is this efficient to have so many goroutines or would I be better off group the jobs together in a slice of 100 or so and have each goroutine go through 100 rather than 1.

答案1

得分: 1

这是我处理序列化(以及设置有限数量的工作线程)的一种方法。我设置了一些具有输入和输出字段以及同步通道的工作线程对象,然后我循环遍历它们,获取它们完成的任何工作并给它们一个新的任务。然后,我最后再次遍历它们,获取任何剩余的已完成任务。请注意,您可能希望工作线程的数量略大于核心数量,这样即使有一个异常长的任务,也可以使所有资源保持繁忙一段时间。代码在http://play.golang.org/p/PM9y4ieMxw中。

这段代码有些复杂(比我记得的写一个示例时更复杂!)- 我很想看看其他人有什么更好的实现方法,或者是实现您目标的完全不同的方法。

请注意,这个示例中只有inout承载实际数据 - 所有其他通道仅用于同步。

英文:

Here's a way I've handled serialization (and also setting a limited number of workers). I set some worker objects with input and output fields and synchronization channels, then I go round-robin through them, picking up any work they've done and giving them a new job. Then I make one final pass through them to pick up any completed jobs that are left over. Note you might want the worker count to exceed your core count somewhat, so that you can keep all resources busy for a bit even when there's one unusually long job. Code is at http://play.golang.org/p/PM9y4ieMxw and below.

This is hairy (hairier than I remember it being before sitting down to write an example!)--would love to see what anyone else has, either just better implementations or a whole different way to accomplish your goal.

package main

import (
	"fmt"
	"math/rand"
	"runtime"
	"time"
)

type Worker struct {
	in     int
	out    int
	inited bool

	jobReady chan bool
	done     chan bool
}

func (w *Worker) work() {
	time.Sleep(time.Duration(rand.Float32() * float32(time.Second)))
	w.out = w.in + 1000
}
func (w *Worker) listen() {
	for <-w.jobReady {
		w.work()
		w.done <- true
	}
}
func doSerialJobs(in chan int, out chan int) {
	concurrency := 23
	workers := make([]Worker, concurrency)
	i := 0
	// feed in and get out items
	for workItem := range in {
		w := &workers[i%
			concurrency]
		if w.inited {
			<-w.done
			out <- w.out
		} else {
			w.jobReady = make(chan bool)
			w.done = make(chan bool)
			w.inited = true
			go w.listen()
		}
		w.in = workItem
		w.jobReady <- true
		i++
	}
	// get out any job results left over after we ran out of input
	for n := 0; n < concurrency; n++ {
		w := &workers[i%concurrency]
		if w.inited {
			<-w.done
			out <- w.out
		}
        close(w.jobReady)
		i++
	}
	close(out)
}
func main() {
	runtime.GOMAXPROCS(10)
	in, out := make(chan int), make(chan int)
	allFinished := make(chan bool)
	go doSerialJobs(in, out)
	go func() {
		for result := range out {
			fmt.Println(result)
		}
		allFinished <- true
	}()
	for i := 0; i < 100; i++ {
		in <- i
	}
	close(in)
	<-allFinished
}

Note that only in and out in this example carry actual data--all the other channels are just for synchronization.

huangapple
  • 本文由 发表于 2014年1月8日 02:11:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/20978778.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定