英文:
Go: Reusable barrier like Java's CyclicBarrier?
问题
使用Google Go,我正在尝试在图像上同步多个执行迭代滤波的线程。我的代码基本上按照这里的描述工作:
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier ??) {
    for i:= 0; i < runs; i++ {
        // ... 进行图像处理 ...
        // barrier.Await() 可以在这里使用
        if start == 1 {
            // 第一个线程切换图像以进行下一次迭代步骤
            switchImgs(src, dest)
        }
        
        // 再次使用 barrier.Await()
     }
}
func main() {
    //...
    barrier := sync.BarrierNew(numberOfThreads)
    for i := 0; i < numberOfThreads; i++ {
        go filter(..., barrier)
    }
}
问题是,我需要一个可重用的屏障,类似于Java的CyclicBarrier,将线程数设置为其计数器值。不幸的是,我找到的唯一类似于屏障的实现是sync.WaitGroup。然而,WaitGroup无法原子地重置为其先前的计数器值。它只提供了一个普通的Wait()函数,不会重置计数器值。
有没有任何"Go惯用"的方法来实现我想要的,还是我应该自己实现一个CyclicBarrier?非常感谢你的帮助!
英文:
Using Google Go, I'm trying to sync multiple threads performing an iterative filter on an image. My code basically works like outlined here:
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier ??) {
    for i:= 0; i < runs; i++ {
        // ... do image manipulation ...
        // barrier.Await() would work here
        if start == 1 {
            // the first thread switches the images for the next iteration step
            switchImgs(src, dest)
        }
        
        // barrier.Await() again
     }
}
func main() {
    //...
    barrier := sync.BarrierNew(numberOfThreads)
    for i := 0; i < numberOfThreads; i++ {
        go filter(..., barrier)
    }
The problem is that I would need a reusable barrier quite like Java's CyclicBarrier, setting the number of threads as its counter value. Unfortunately, the only implementation similar to a barrier I have found is sync.WaitGroup. The WaitGroup however cannot be reset atomically to it's previous counter value. It only offers a normal Wait() function that does not reset the counter value.
Is there any "Go idiomatic" way of achieving what I want or should I rather implement my own CyclicBarrier? Thanks a lot for your help!
答案1
得分: 0
我不完全理解CyclicBarrier的工作原理,如果我理解错了,请原谅。
一个非常简单的SyncGroup的包装器应该可以完成工作,例如:
type Barrier struct {
    NumOfThreads int
    wg           sync.WaitGroup
}
func NewBarrier(num int) (b *Barrier) {
    b = &Barrier{NumOfThreads: num}
    b.wg.Add(num)
    return
}
func (b *Barrier) Await() {
    b.wg.Wait()
    b.wg.Add(b.NumOfThreads)
}
func (b *Barrier) Done() {
    b.wg.Done()
}
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier *Barrier) {
    for i := 0; i < runs; i++ {
        // ... 进行图像处理 ...
        // 通过 b.Done() 表示该过滤器已完成
        b.Done()
        b.Await()
        if start == 1 {
            // 第一个线程切换图像以进行下一次迭代步骤
            //switchImgs(src, dest)
        }
        b.Done()
        b.Await()
    }
}
func main() {
    barrier := NewBarrier(5)
    for i := 0; i < barrier.NumOfThreads; i++ {
        go filter(1, barrier)
    }
}
以上是对代码的翻译。
英文:
I don't fully understand how CyclicBarrier works, so excuse me if I'm way off.
A very simple wrapper around SyncGroup should do the job, for example:
type Barrier struct {
    NumOfThreads int
	wg           sync.WaitGroup
}
func NewBarrier(num int) (b *Barrier) {
	b = &Barrier{NumOfThreads: num}
	b.wg.Add(num)
	return
}
func (b *Barrier) Await() {
	b.wg.Wait()
	b.wg.Add(b.NumOfThreads)
}
func (b *Barrier) Done() {
	b.wg.Done()
}
func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier *Barrier) {
	for i := 0; i < runs; i++ {
		// ... do image manipulation ...
		//this filter is done, say so by using b.Done()
		b.Done()
		b.Await()
		if start == 1 {
			// the first thread switches the images for the next iteration step
			//switchImgs(src, dest)
		}
		b.Done()
		b.Await()
	}
}
func main() {
	barrier := NewBarrier(5)
	for i := 0; i < barrier.NumOfThreads; i++ {
		go filter(1, barrier)
	}
}
答案2
得分: 0
你可以使用sync.Cond来实现CyclicBarrier,参考Java的CyclicBarrier源代码。
以下是一个简化的Go版本的CyclicBarrier(没有超时,没有线程中断):
type CyclicBarrier struct {
    generation int
    count      int
    parties    int
    trip       *sync.Cond
}
func (b *CyclicBarrier) nextGeneration() {
    // signal completion of last generation
    b.trip.Broadcast()
    b.count = b.parties
    // set up next generation
    b.generation++
}
func (b *CyclicBarrier) Await() {
    b.trip.L.Lock()
    defer b.trip.L.Unlock()
    
    generation := b.generation
    b.count--
    index := b.count
    //println(index)
    if index == 0 {
        b.nextGeneration()
    } else {
        for generation == b.generation {
            //wait for current generation complete
            b.trip.Wait()
        }
    }
}
func NewCyclicBarrier(num int) *CyclicBarrier {
    b := CyclicBarrier{}
    b.count = num
    b.parties = num
    b.trip = sync.NewCond(&sync.Mutex{})
    return &b
}
希望对你有帮助!
英文:
You can use sync.Cond to implement CyclicBarrier, see source code of java's CyclicBarrier
Here is a minimized go version of CyclicBarrier (no timeout, no thread interrupts):
http://play.golang.org/p/5JSNTm0BLe
type CyclicBarrier struct {
	generation int
	count      int
	parties    int
	trip       *sync.Cond
}
func (b *CyclicBarrier) nextGeneration() {
	// signal completion of last generation
	b.trip.Broadcast()
	b.count = b.parties
	// set up next generation
	b.generation++
}
func (b *CyclicBarrier) Await() {
	b.trip.L.Lock()
	defer b.trip.L.Unlock()
	
	generation := b.generation
	b.count--
	index := b.count
	//println(index)
	if index == 0 {
		b.nextGeneration()
	} else {
		for generation == b.generation {
			//wait for current generation complete
			b.trip.Wait()
		}
	}
}
func NewCyclicBarrier(num int) *CyclicBarrier {
	b := CyclicBarrier{}
	b.count = num
	b.parties = num
	b.trip = sync.NewCond(&sync.Mutex{})
	return &b
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论