如何在工作池中发送Go协程

huangapple go评论73阅读模式
英文:

How to send of GO routines in a worker pool

问题

我正在编写一个算法,将图像分割并进行处理,但是我目前使用的Go协程的方式并不是很优化。

我想将其拆分为一个工作池,启动协程并让每个工作器接受一个新的任务,直到图像完成。

我将其分为8个部分,如下所示:

var bounds = img.Bounds()
var halfHeight = bounds.Max.Y / 2
var eighthOne = halfHeight / 4
var eighthTwo = eighthOne + eighthOne
var eighthThree = eighthOne + eighthTwo
var eighthFive = halfHeight + eighthOne
var eighthSix = halfHeight + eighthTwo
var eighthSeven = halfHeight + eighthThree

elapsed := time.Now()
go Threshold(pic, c2, 0, eighthOne)
go Threshold(pic, c5, eighthOne, eighthTwo)
go Threshold(pic, c6, eighthTwo, eighthThree)
go Threshold(pic, c7, eighthThree, halfHeight)
go Threshold(pic, c8, halfHeight, eighthFive)
go Threshold(pic, c9, eighthFive, eighthSix)
go Threshold(pic, c10, eighthSix, eighthSeven)
go Threshold(pic, c11, eighthSeven, bounds.Max.Y)

然后我一个接一个地启动Go协程,如何将其优化为一个工作系统呢?

谢谢。

英文:

im writing an algorithm to break down an image into segments and manipulate it, however the way im currently using Go routines isn't quite optimal.

I'd like to split it into a worker pool, firing off routines and having each worker take a new job until the image is completed.

I have it split into 8 as such:

var bounds = img.Bounds()
var halfHeight = bounds.Max.Y / 2
var eighthOne = halfHeight / 4
var eighthTwo = eighthOne + eighthOne
var eighthThree = eighthOne + eighthTwo
var eighthFive = halfHeight + eighthOne
var eighthSix = halfHeight + eighthTwo
var eighthSeven = halfHeight + eighthThree

elapsed := time.Now()
go Threshold(pic, c2, 0, eighthOne)
go Threshold(pic, c5, eighthOne, eighthTwo)
go Threshold(pic, c6, eighthTwo, eighthThree)
go Threshold(pic, c7, eighthThree, halfHeight)
go Threshold(pic, c8, halfHeight, eighthFive)
go Threshold(pic, c9, eighthFive, eighthSix)
go Threshold(pic, c10, eighthSix, eighthSeven)
go Threshold(pic, c11, eighthSeven, bounds.Max.Y)

From which i then fire off Go routines one after another, how can i optimise this into a worker system?

Thanks

答案1

得分: 2

这是一个通用的并发图像处理器实现模式,允许调用者对图像分区进行控制,将工作分成n个部分,并控制执行的并发级别(即用于执行(可能不同)处理作业的工作协程数量)。

请查看pprocess函数,它实现了整个模式,接受一个Partitioner和一个Processor作为参数,前者是一个函数,用于返回n个图像分区的作业,后者是一个函数,用于处理每个分区。

我在splitVert函数中实现了你在代码示例中提到的垂直分割,它返回一个函数,可以将图像分割成n个垂直部分。

为了进行一些实际工作,我实现了gray函数,它是一个Processor,将像素颜色转换为灰度级(亮度)。

以下是工作代码:

type MutableImage interface {
	image.Image
	Set(x, y int, c color.Color)
}

type Processor func(MutableImage, image.Rectangle)

type Partitioner func(image.Image) []image.Rectangle

func pprocess(i image.Image, concurrency int, part Partitioner, proc Processor) image.Image {
	m := image.NewRGBA(i.Bounds())
	draw.Draw(m, i.Bounds(), i, i.Bounds().Min, draw.Src)
	var wg sync.WaitGroup
	c := make(chan image.Rectangle, concurrency*2)
	for n := 0; n < concurrency; n++ {
		wg.Add(1)
		go func() {
			for r := range c {
				proc(m, r)
			}
			wg.Done()
		}()
	}
	for _, p := range part(i) {
		c <- p
	}
	close(c)
	wg.Wait()
	return m
}

func gray(i MutableImage, r image.Rectangle) {
	for x := r.Min.X; x <= r.Max.X; x++ {
		for y := r.Min.Y; y <= r.Max.Y; y++ {
			c := i.At(x, y)
			r, g, b, _ := c.RGBA()
			l := 0.299*float64(r) + 0.587*float64(g) + 0.114*float64(b)
			i.Set(x, y, color.Gray{uint8(l / 256)})
		}
	}
}

func splitVert(c int) Partitioner {
	return func(i image.Image) []image.Rectangle {
		b := i.Bounds()
		s := float64(b.Dy()) / float64(c)
		rs := make([]image.Rectangle, c)
		for n := 0; n < c; n++ {
			m := float64(n)
			x0 := b.Min.X
			y0 := b.Min.Y + int(0.5+m*s)
			x1 := b.Max.X
			y1 := b.Min.Y + int(0.5+(m+1)*s)
			if n < c-1 {
				y1--
			}
			rs[n] = image.Rect(x0, y0, x1, y1)
		}
		return rs
	}
}

func main() {
	i, err := jpeg.Decode(os.Stdin)
	if err != nil {
		log.Fatalf("decoding image: %v", err)
	}
	o := pprocess(i, runtime.NumCPU(), splitVert(8), gray)
	err = jpeg.Encode(os.Stdout, o, nil)
	if err != nil {
		log.Fatalf("encoding image: %v", err)
	}
}

希望对你有帮助!

英文:

Here you have a generic pattern for implementing concurrent image processors giving control to the caller over the image partitioning to split the work in n parts and over the concurrency level of the execution (i.e. the number of worker goroutines used for executing the (possibly different) number of processing jobs).

See the pprocess func which implements the whole pattern taking a Partitioner and a Processor, the former being a func that takes the job of returning n image partitions to operate on, and the latter being a func which will be used for processing each partition.

I implemented the vertical splitting you expressed in your code example in the func splitVert which returns a function which can split an image in n vertical sections.

For doing some actual work I implemented the gray func which is a Processor that transform pixel colors to gray levels (luminance).

Here's the working code:

type MutableImage interface {
image.Image
Set(x, y int, c color.Color)
}
type Processor func(MutableImage, image.Rectangle)
type Partitioner func(image.Image) []image.Rectangle
func pprocess(i image.Image, concurrency int, part Partitioner, proc Processor) image.Image {
m := image.NewRGBA(i.Bounds())
draw.Draw(m, i.Bounds(), i, i.Bounds().Min, draw.Src)
var wg sync.WaitGroup
c := make(chan image.Rectangle, concurrency*2)
for n := 0; n &lt; concurrency; n++ {
wg.Add(1)
go func() {
for r := range c {
proc(m, r)
}
wg.Done()
}()
}
for _, p := range part(i) {
c &lt;- p
}
close(c)
wg.Wait()
return m
}
func gray(i MutableImage, r image.Rectangle) {
for x := r.Min.X; x &lt;= r.Max.X; x++ {
for y := r.Min.Y; y &lt;= r.Max.Y; y++ {
c := i.At(x, y)
r, g, b, _ := c.RGBA()
l := 0.299*float64(r) + 0.587*float64(g) + 0.114*float64(b)
i.Set(x, y, color.Gray{uint8(l / 256)})
}
}
}
func splitVert(c int) Partitioner {
return func(i image.Image) []image.Rectangle {
b := i.Bounds()
s := float64(b.Dy()) / float64(c)
rs := make([]image.Rectangle, c)
for n := 0; n &lt; c; n++ {
m := float64(n)
x0 := b.Min.X
y0 := b.Min.Y + int(0.5+m*s)
x1 := b.Max.X
y1 := b.Min.Y + int(0.5+(m+1)*s)
if n &lt; c-1 {
y1--
}
rs[n] = image.Rect(x0, y0, x1, y1)
}
return rs
}
}
func main() {
i, err := jpeg.Decode(os.Stdin)
if err != nil {
log.Fatalf(&quot;decoding image: %v&quot;, err)
}
o := pprocess(i, runtime.NumCPU(), splitVert(8), gray)
err = jpeg.Encode(os.Stdout, o, nil)
if err != nil {
log.Fatalf(&quot;encoding image: %v&quot;, err)
}
}

huangapple
  • 本文由 发表于 2017年2月28日 23:09:01
  • 转载请务必保留本文链接:https://go.coder-hub.com/42512293.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定