英文:
Go: Reusable barrier like Java's CyclicBarrier?
问题
使用Google Go,我正在尝试在图像上同步多个执行迭代滤波的线程。我的代码基本上按照这里的描述工作:
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier ??) {
for i:= 0; i < runs; i++ {
// ... 进行图像处理 ...
// barrier.Await() 可以在这里使用
if start == 1 {
// 第一个线程切换图像以进行下一次迭代步骤
switchImgs(src, dest)
}
// 再次使用 barrier.Await()
}
}
func main() {
//...
barrier := sync.BarrierNew(numberOfThreads)
for i := 0; i < numberOfThreads; i++ {
go filter(..., barrier)
}
}
问题是,我需要一个可重用的屏障,类似于Java的CyclicBarrier
,将线程数设置为其计数器值。不幸的是,我找到的唯一类似于屏障的实现是sync.WaitGroup
。然而,WaitGroup
无法原子地重置为其先前的计数器值。它只提供了一个普通的Wait()
函数,不会重置计数器值。
有没有任何"Go惯用"的方法来实现我想要的,还是我应该自己实现一个CyclicBarrier
?非常感谢你的帮助!
英文:
Using Google Go, I'm trying to sync multiple threads performing an iterative filter on an image. My code basically works like outlined here:
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier ??) {
for i:= 0; i < runs; i++ {
// ... do image manipulation ...
// barrier.Await() would work here
if start == 1 {
// the first thread switches the images for the next iteration step
switchImgs(src, dest)
}
// barrier.Await() again
}
}
func main() {
//...
barrier := sync.BarrierNew(numberOfThreads)
for i := 0; i < numberOfThreads; i++ {
go filter(..., barrier)
}
The problem is that I would need a reusable barrier quite like Java's CyclicBarrier
, setting the number of threads as its counter value. Unfortunately, the only implementation similar to a barrier I have found is sync.WaitGroup
. The WaitGroup
however cannot be reset atomically to it's previous counter value. It only offers a normal Wait()
function that does not reset the counter value.
Is there any "Go idiomatic" way of achieving what I want or should I rather implement my own CyclicBarrier
? Thanks a lot for your help!
答案1
得分: 0
我不完全理解CyclicBarrier的工作原理,如果我理解错了,请原谅。
一个非常简单的SyncGroup
的包装器应该可以完成工作,例如:
type Barrier struct {
NumOfThreads int
wg sync.WaitGroup
}
func NewBarrier(num int) (b *Barrier) {
b = &Barrier{NumOfThreads: num}
b.wg.Add(num)
return
}
func (b *Barrier) Await() {
b.wg.Wait()
b.wg.Add(b.NumOfThreads)
}
func (b *Barrier) Done() {
b.wg.Done()
}
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier *Barrier) {
for i := 0; i < runs; i++ {
// ... 进行图像处理 ...
// 通过 b.Done() 表示该过滤器已完成
b.Done()
b.Await()
if start == 1 {
// 第一个线程切换图像以进行下一次迭代步骤
//switchImgs(src, dest)
}
b.Done()
b.Await()
}
}
func main() {
barrier := NewBarrier(5)
for i := 0; i < barrier.NumOfThreads; i++ {
go filter(1, barrier)
}
}
以上是对代码的翻译。
英文:
I don't fully understand how CyclicBarrier works, so excuse me if I'm way off.
A very simple wrapper around SyncGroup
should do the job, for example:
type Barrier struct {
NumOfThreads int
wg sync.WaitGroup
}
func NewBarrier(num int) (b *Barrier) {
b = &Barrier{NumOfThreads: num}
b.wg.Add(num)
return
}
func (b *Barrier) Await() {
b.wg.Wait()
b.wg.Add(b.NumOfThreads)
}
func (b *Barrier) Done() {
b.wg.Done()
}
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier *Barrier) {
for i := 0; i < runs; i++ {
// ... do image manipulation ...
//this filter is done, say so by using b.Done()
b.Done()
b.Await()
if start == 1 {
// the first thread switches the images for the next iteration step
//switchImgs(src, dest)
}
b.Done()
b.Await()
}
}
func main() {
barrier := NewBarrier(5)
for i := 0; i < barrier.NumOfThreads; i++ {
go filter(1, barrier)
}
}
答案2
得分: 0
你可以使用sync.Cond来实现CyclicBarrier,参考Java的CyclicBarrier源代码。
以下是一个简化的Go版本的CyclicBarrier(没有超时,没有线程中断):
type CyclicBarrier struct {
generation int
count int
parties int
trip *sync.Cond
}
func (b *CyclicBarrier) nextGeneration() {
// signal completion of last generation
b.trip.Broadcast()
b.count = b.parties
// set up next generation
b.generation++
}
func (b *CyclicBarrier) Await() {
b.trip.L.Lock()
defer b.trip.L.Unlock()
generation := b.generation
b.count--
index := b.count
//println(index)
if index == 0 {
b.nextGeneration()
} else {
for generation == b.generation {
//wait for current generation complete
b.trip.Wait()
}
}
}
func NewCyclicBarrier(num int) *CyclicBarrier {
b := CyclicBarrier{}
b.count = num
b.parties = num
b.trip = sync.NewCond(&sync.Mutex{})
return &b
}
希望对你有帮助!
英文:
You can use sync.Cond to implement CyclicBarrier, see source code of java's CyclicBarrier
Here is a minimized go version of CyclicBarrier (no timeout, no thread interrupts):
http://play.golang.org/p/5JSNTm0BLe
type CyclicBarrier struct {
generation int
count int
parties int
trip *sync.Cond
}
func (b *CyclicBarrier) nextGeneration() {
// signal completion of last generation
b.trip.Broadcast()
b.count = b.parties
// set up next generation
b.generation++
}
func (b *CyclicBarrier) Await() {
b.trip.L.Lock()
defer b.trip.L.Unlock()
generation := b.generation
b.count--
index := b.count
//println(index)
if index == 0 {
b.nextGeneration()
} else {
for generation == b.generation {
//wait for current generation complete
b.trip.Wait()
}
}
}
func NewCyclicBarrier(num int) *CyclicBarrier {
b := CyclicBarrier{}
b.count = num
b.parties = num
b.trip = sync.NewCond(&sync.Mutex{})
return &b
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论