英文:
How to send of GO routines in a worker pool
问题
我正在编写一个算法,将图像分割并进行处理,但是我目前使用的Go协程的方式并不是很优化。
我想将其拆分为一个工作池,启动协程并让每个工作器接受一个新的任务,直到图像完成。
我将其分为8个部分,如下所示:
var bounds = img.Bounds()
var halfHeight = bounds.Max.Y / 2
var eighthOne = halfHeight / 4
var eighthTwo = eighthOne + eighthOne
var eighthThree = eighthOne + eighthTwo
var eighthFive = halfHeight + eighthOne
var eighthSix = halfHeight + eighthTwo
var eighthSeven = halfHeight + eighthThree
elapsed := time.Now()
go Threshold(pic, c2, 0, eighthOne)
go Threshold(pic, c5, eighthOne, eighthTwo)
go Threshold(pic, c6, eighthTwo, eighthThree)
go Threshold(pic, c7, eighthThree, halfHeight)
go Threshold(pic, c8, halfHeight, eighthFive)
go Threshold(pic, c9, eighthFive, eighthSix)
go Threshold(pic, c10, eighthSix, eighthSeven)
go Threshold(pic, c11, eighthSeven, bounds.Max.Y)
然后我一个接一个地启动Go协程,如何将其优化为一个工作系统呢?
谢谢。
英文:
im writing an algorithm to break down an image into segments and manipulate it, however the way im currently using Go routines isn't quite optimal.
I'd like to split it into a worker pool, firing off routines and having each worker take a new job until the image is completed.
I have it split into 8 as such:
var bounds = img.Bounds()
var halfHeight = bounds.Max.Y / 2
var eighthOne = halfHeight / 4
var eighthTwo = eighthOne + eighthOne
var eighthThree = eighthOne + eighthTwo
var eighthFive = halfHeight + eighthOne
var eighthSix = halfHeight + eighthTwo
var eighthSeven = halfHeight + eighthThree
elapsed := time.Now()
go Threshold(pic, c2, 0, eighthOne)
go Threshold(pic, c5, eighthOne, eighthTwo)
go Threshold(pic, c6, eighthTwo, eighthThree)
go Threshold(pic, c7, eighthThree, halfHeight)
go Threshold(pic, c8, halfHeight, eighthFive)
go Threshold(pic, c9, eighthFive, eighthSix)
go Threshold(pic, c10, eighthSix, eighthSeven)
go Threshold(pic, c11, eighthSeven, bounds.Max.Y)
From which i then fire off Go routines one after another, how can i optimise this into a worker system?
Thanks
答案1
得分: 2
这是一个通用的并发图像处理器实现模式,允许调用者对图像分区进行控制,将工作分成n个部分,并控制执行的并发级别(即用于执行(可能不同)处理作业的工作协程数量)。
请查看pprocess
函数,它实现了整个模式,接受一个Partitioner
和一个Processor
作为参数,前者是一个函数,用于返回n个图像分区的作业,后者是一个函数,用于处理每个分区。
我在splitVert
函数中实现了你在代码示例中提到的垂直分割,它返回一个函数,可以将图像分割成n个垂直部分。
为了进行一些实际工作,我实现了gray
函数,它是一个Processor
,将像素颜色转换为灰度级(亮度)。
以下是工作代码:
type MutableImage interface {
image.Image
Set(x, y int, c color.Color)
}
type Processor func(MutableImage, image.Rectangle)
type Partitioner func(image.Image) []image.Rectangle
func pprocess(i image.Image, concurrency int, part Partitioner, proc Processor) image.Image {
m := image.NewRGBA(i.Bounds())
draw.Draw(m, i.Bounds(), i, i.Bounds().Min, draw.Src)
var wg sync.WaitGroup
c := make(chan image.Rectangle, concurrency*2)
for n := 0; n < concurrency; n++ {
wg.Add(1)
go func() {
for r := range c {
proc(m, r)
}
wg.Done()
}()
}
for _, p := range part(i) {
c <- p
}
close(c)
wg.Wait()
return m
}
func gray(i MutableImage, r image.Rectangle) {
for x := r.Min.X; x <= r.Max.X; x++ {
for y := r.Min.Y; y <= r.Max.Y; y++ {
c := i.At(x, y)
r, g, b, _ := c.RGBA()
l := 0.299*float64(r) + 0.587*float64(g) + 0.114*float64(b)
i.Set(x, y, color.Gray{uint8(l / 256)})
}
}
}
func splitVert(c int) Partitioner {
return func(i image.Image) []image.Rectangle {
b := i.Bounds()
s := float64(b.Dy()) / float64(c)
rs := make([]image.Rectangle, c)
for n := 0; n < c; n++ {
m := float64(n)
x0 := b.Min.X
y0 := b.Min.Y + int(0.5+m*s)
x1 := b.Max.X
y1 := b.Min.Y + int(0.5+(m+1)*s)
if n < c-1 {
y1--
}
rs[n] = image.Rect(x0, y0, x1, y1)
}
return rs
}
}
func main() {
i, err := jpeg.Decode(os.Stdin)
if err != nil {
log.Fatalf("decoding image: %v", err)
}
o := pprocess(i, runtime.NumCPU(), splitVert(8), gray)
err = jpeg.Encode(os.Stdout, o, nil)
if err != nil {
log.Fatalf("encoding image: %v", err)
}
}
希望对你有帮助!
英文:
Here you have a generic pattern for implementing concurrent image processors giving control to the caller over the image partitioning to split the work in n parts and over the concurrency level of the execution (i.e. the number of worker goroutines used for executing the (possibly different) number of processing jobs).
See the pprocess
func which implements the whole pattern taking a Partitioner
and a Processor
, the former being a func that takes the job of returning n image partitions to operate on, and the latter being a func which will be used for processing each partition.
I implemented the vertical splitting you expressed in your code example in the func splitVert
which returns a function which can split an image in n vertical sections.
For doing some actual work I implemented the gray
func which is a Processor
that transform pixel colors to gray levels (luminance).
Here's the working code:
type MutableImage interface {
image.Image
Set(x, y int, c color.Color)
}
type Processor func(MutableImage, image.Rectangle)
type Partitioner func(image.Image) []image.Rectangle
func pprocess(i image.Image, concurrency int, part Partitioner, proc Processor) image.Image {
m := image.NewRGBA(i.Bounds())
draw.Draw(m, i.Bounds(), i, i.Bounds().Min, draw.Src)
var wg sync.WaitGroup
c := make(chan image.Rectangle, concurrency*2)
for n := 0; n < concurrency; n++ {
wg.Add(1)
go func() {
for r := range c {
proc(m, r)
}
wg.Done()
}()
}
for _, p := range part(i) {
c <- p
}
close(c)
wg.Wait()
return m
}
func gray(i MutableImage, r image.Rectangle) {
for x := r.Min.X; x <= r.Max.X; x++ {
for y := r.Min.Y; y <= r.Max.Y; y++ {
c := i.At(x, y)
r, g, b, _ := c.RGBA()
l := 0.299*float64(r) + 0.587*float64(g) + 0.114*float64(b)
i.Set(x, y, color.Gray{uint8(l / 256)})
}
}
}
func splitVert(c int) Partitioner {
return func(i image.Image) []image.Rectangle {
b := i.Bounds()
s := float64(b.Dy()) / float64(c)
rs := make([]image.Rectangle, c)
for n := 0; n < c; n++ {
m := float64(n)
x0 := b.Min.X
y0 := b.Min.Y + int(0.5+m*s)
x1 := b.Max.X
y1 := b.Min.Y + int(0.5+(m+1)*s)
if n < c-1 {
y1--
}
rs[n] = image.Rect(x0, y0, x1, y1)
}
return rs
}
}
func main() {
i, err := jpeg.Decode(os.Stdin)
if err != nil {
log.Fatalf("decoding image: %v", err)
}
o := pprocess(i, runtime.NumCPU(), splitVert(8), gray)
err = jpeg.Encode(os.Stdout, o, nil)
if err != nil {
log.Fatalf("encoding image: %v", err)
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论