Go:类似Java的CyclicBarrier的可重用屏障?

huangapple go评论122阅读模式
英文:

Go: Reusable barrier like Java's CyclicBarrier?

问题

使用Google Go,我正在尝试在图像上同步多个执行迭代滤波的线程。我的代码基本上按照这里的描述工作:

  1. func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier ??) {
  2. for i:= 0; i < runs; i++ {
  3. // ... 进行图像处理 ...
  4. // barrier.Await() 可以在这里使用
  5. if start == 1 {
  6. // 第一个线程切换图像以进行下一次迭代步骤
  7. switchImgs(src, dest)
  8. }
  9. // 再次使用 barrier.Await()
  10. }
  11. }
  12. func main() {
  13. //...
  14. barrier := sync.BarrierNew(numberOfThreads)
  15. for i := 0; i < numberOfThreads; i++ {
  16. go filter(..., barrier)
  17. }
  18. }

问题是,我需要一个可重用的屏障,类似于Java的CyclicBarrier,将线程数设置为其计数器值。不幸的是,我找到的唯一类似于屏障的实现是sync.WaitGroup。然而,WaitGroup无法原子地重置为其先前的计数器值。它只提供了一个普通的Wait()函数,不会重置计数器值。

有没有任何"Go惯用"的方法来实现我想要的,还是我应该自己实现一个CyclicBarrier?非常感谢你的帮助!

英文:

Using Google Go, I'm trying to sync multiple threads performing an iterative filter on an image. My code basically works like outlined here:

  1. func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier ??) {
  2. for i:= 0; i &lt; runs; i++ {
  3. // ... do image manipulation ...
  4. // barrier.Await() would work here
  5. if start == 1 {
  6. // the first thread switches the images for the next iteration step
  7. switchImgs(src, dest)
  8. }
  9. // barrier.Await() again
  10. }
  11. }
  12. func main() {
  13. //...
  14. barrier := sync.BarrierNew(numberOfThreads)
  15. for i := 0; i &lt; numberOfThreads; i++ {
  16. go filter(..., barrier)
  17. }

The problem is that I would need a reusable barrier quite like Java's CyclicBarrier, setting the number of threads as its counter value. Unfortunately, the only implementation similar to a barrier I have found is sync.WaitGroup. The WaitGroup however cannot be reset atomically to it's previous counter value. It only offers a normal Wait() function that does not reset the counter value.

Is there any "Go idiomatic" way of achieving what I want or should I rather implement my own CyclicBarrier? Thanks a lot for your help!

答案1

得分: 0

我不完全理解CyclicBarrier的工作原理,如果我理解错了,请原谅。

一个非常简单的SyncGroup的包装器应该可以完成工作,例如:

  1. type Barrier struct {
  2. NumOfThreads int
  3. wg sync.WaitGroup
  4. }
  5. func NewBarrier(num int) (b *Barrier) {
  6. b = &Barrier{NumOfThreads: num}
  7. b.wg.Add(num)
  8. return
  9. }
  10. func (b *Barrier) Await() {
  11. b.wg.Wait()
  12. b.wg.Add(b.NumOfThreads)
  13. }
  14. func (b *Barrier) Done() {
  15. b.wg.Done()
  16. }
  17. func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier *Barrier) {
  18. for i := 0; i < runs; i++ {
  19. // ... 进行图像处理 ...
  20. // 通过 b.Done() 表示该过滤器已完成
  21. b.Done()
  22. b.Await()
  23. if start == 1 {
  24. // 第一个线程切换图像以进行下一次迭代步骤
  25. //switchImgs(src, dest)
  26. }
  27. b.Done()
  28. b.Await()
  29. }
  30. }
  31. func main() {
  32. barrier := NewBarrier(5)
  33. for i := 0; i < barrier.NumOfThreads; i++ {
  34. go filter(1, barrier)
  35. }
  36. }

以上是对代码的翻译。

英文:

I don't fully understand how CyclicBarrier works, so excuse me if I'm way off.

A very simple wrapper around SyncGroup should do the job, for example:

  1. type Barrier struct {
  2. NumOfThreads int
  3. wg sync.WaitGroup
  4. }
  5. func NewBarrier(num int) (b *Barrier) {
  6. b = &amp;Barrier{NumOfThreads: num}
  7. b.wg.Add(num)
  8. return
  9. }
  10. func (b *Barrier) Await() {
  11. b.wg.Wait()
  12. b.wg.Add(b.NumOfThreads)
  13. }
  14. func (b *Barrier) Done() {
  15. b.wg.Done()
  16. }
  17. func filter(src *image.Image, dest *image.Image, start, end, runs int, barrier *Barrier) {
  18. for i := 0; i &lt; runs; i++ {
  19. // ... do image manipulation ...
  20. //this filter is done, say so by using b.Done()
  21. b.Done()
  22. b.Await()
  23. if start == 1 {
  24. // the first thread switches the images for the next iteration step
  25. //switchImgs(src, dest)
  26. }
  27. b.Done()
  28. b.Await()
  29. }
  30. }
  31. func main() {
  32. barrier := NewBarrier(5)
  33. for i := 0; i &lt; barrier.NumOfThreads; i++ {
  34. go filter(1, barrier)
  35. }
  36. }

答案2

得分: 0

你可以使用sync.Cond来实现CyclicBarrier,参考Java的CyclicBarrier源代码

以下是一个简化的Go版本的CyclicBarrier(没有超时,没有线程中断):

  1. type CyclicBarrier struct {
  2. generation int
  3. count int
  4. parties int
  5. trip *sync.Cond
  6. }
  7. func (b *CyclicBarrier) nextGeneration() {
  8. // signal completion of last generation
  9. b.trip.Broadcast()
  10. b.count = b.parties
  11. // set up next generation
  12. b.generation++
  13. }
  14. func (b *CyclicBarrier) Await() {
  15. b.trip.L.Lock()
  16. defer b.trip.L.Unlock()
  17. generation := b.generation
  18. b.count--
  19. index := b.count
  20. //println(index)
  21. if index == 0 {
  22. b.nextGeneration()
  23. } else {
  24. for generation == b.generation {
  25. //wait for current generation complete
  26. b.trip.Wait()
  27. }
  28. }
  29. }
  30. func NewCyclicBarrier(num int) *CyclicBarrier {
  31. b := CyclicBarrier{}
  32. b.count = num
  33. b.parties = num
  34. b.trip = sync.NewCond(&sync.Mutex{})
  35. return &b
  36. }

希望对你有帮助!

英文:

You can use sync.Cond to implement CyclicBarrier, see source code of java's CyclicBarrier

Here is a minimized go version of CyclicBarrier (no timeout, no thread interrupts):
http://play.golang.org/p/5JSNTm0BLe

  1. type CyclicBarrier struct {
  2. generation int
  3. count int
  4. parties int
  5. trip *sync.Cond
  6. }
  7. func (b *CyclicBarrier) nextGeneration() {
  8. // signal completion of last generation
  9. b.trip.Broadcast()
  10. b.count = b.parties
  11. // set up next generation
  12. b.generation++
  13. }
  14. func (b *CyclicBarrier) Await() {
  15. b.trip.L.Lock()
  16. defer b.trip.L.Unlock()
  17. generation := b.generation
  18. b.count--
  19. index := b.count
  20. //println(index)
  21. if index == 0 {
  22. b.nextGeneration()
  23. } else {
  24. for generation == b.generation {
  25. //wait for current generation complete
  26. b.trip.Wait()
  27. }
  28. }
  29. }
  30. func NewCyclicBarrier(num int) *CyclicBarrier {
  31. b := CyclicBarrier{}
  32. b.count = num
  33. b.parties = num
  34. b.trip = sync.NewCond(&amp;sync.Mutex{})
  35. return &amp;b
  36. }

huangapple
  • 本文由 发表于 2014年7月21日 23:41:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/24869114.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定