Parallel algorithm in golang to sum of elements in a vector

huangapple go评论88阅读模式
英文:

Parallel algorithm in golang to sum of elements in a vector

问题

我正在使用Golang实现一些并行算法作为练习。现在我正在尝试对一个向量中的所有元素求和,但是为了做到这一点,我需要一个屏障。我在Google上搜索了一下,但是找不到任何可以帮助我的东西。

这是我的代码:

package main

import (
	"fmt"
	"math"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	sumWorkerFunc := func(k int, a []int, br *sync.WaitGroup) {
		bound := int(math.Ceil(math.Log2(float64(k))))
		for i := 1; i < bound; i++ {
			if k%int(math.Pow(2, float64(i))) == 0 {
				a[k] = a[k-int(math.Pow(2, float64(i-1)))] + a[k]
			}

            /* 屏障在这里 */
		}

		wg.Done()
	}

	a := []int{0, 1, 2, 3, 4, 5, 6, 7}

	fmt.Println("Before:")
	fmt.Println(a)

	workers := 8
	wg.Add(workers)

	for k := 0; k < workers; k++ {
		go sumWorkerFunc(k, a, br)
	}
	wg.Wait()

	fmt.Println("After:")
	fmt.Println(a)
}

我需要等待所有的工作线程完成后再开始下一次迭代,因为它们需要下一次迭代的结果。这是我尝试做的:

package main

import (
	"fmt"
	"math"
	"sync"
)

func main() {
	var wg sync.WaitGroup

	sumWorkerFunc := func(k int, a []int, br *sync.WaitGroup) {
		bound := int(math.Ceil(math.Log2(float64(k))))
		for i := 1; i < bound; i++ {
			if k%int(math.Pow(2, float64(i))) == 0 {
				a[k] = a[k-int(math.Pow(2, float64(i-1)))] + a[k]
			}

			br.Done()
			br.Wait() // 这里不应该有这个
			br.Add(1)
		}

		wg.Done()
	}

	a := []int{0, 1, 2, 3, 4, 5, 6, 7}

	fmt.Println("Before:")
	fmt.Println(a)

	workers := 8
	wg.Add(workers)

	var barrier sync.WaitGroup
	barrier.Add(workers)

	for k := 0; k < workers; k++ {
		go sumWorkerFunc(k, a, &barrier)
	}
	wg.Wait()

	fmt.Println("After:")
	fmt.Println(a)
}

但是我不能在那里放置一个Wait(),因为它将被所有的工作线程调用。在那里实现一个正确的屏障的方法是什么?我开始觉得这个问题可能更适合于共享内存模型,而不适合于Golang。

谢谢!

编辑:

我添加了一个我想要实现的示例:

5      2      1      3      5       8      1      1
|      |      |      |      |       |      |      |
|_ _ _ 7      |_ _ _ 4      |_ _ _ 13      |_ _ _ 2
       |             |              |             |
       |_ _ _ _ _ _ 11              |_ _ _ _ _ _ 15
                     |                            |
                     |_ _ _ _ _ _ _ _ _ _ _ _ _  26

其中每个工作线程负责数组的一个元素。

英文:

I am implementing some parallel algorithms as an exercise in Golang. Right now I am trying to sum all the elements in a vector, but to do that, I need a barrier. I googled around but I could not find anything that could help me.

This is what my code looks like:

package main

import (
	&quot;fmt&quot;
	&quot;math&quot;
	&quot;sync&quot;
)

func main() {
	var wg sync.WaitGroup

	sumWorkerFunc := func(k int, a []int, br *sync.WaitGroup) {
		bound := int(math.Ceil(math.Log2(float64(k))))
		for i := 1; i &lt; bound; i++ {
			if k%int(math.Pow(2, float64(i))) == 0 {
				a[k] = a[k-int(math.Pow(2, float64(i-1)))] + a[k]
			}

            /* barrier here */
		}

		wg.Done()
	}

	a := []int{0, 1, 2, 3, 4, 5, 6, 7}

	fmt.Println(&quot;Before:&quot;)
	fmt.Println(a)

	workers := 8
	wg.Add(workers)

	for k := 0; k &lt; workers; k++ {
		go sumWorkerFunc(k, a, br)
	}
	wg.Wait()

	fmt.Println(&quot;After:&quot;)
	fmt.Println(a)
}

I need to wait for all the workers to be done before starting the next iteration as they need the results for the next iteration. This is what I tried to do:

package main

import (
	&quot;fmt&quot;
	&quot;math&quot;
	&quot;sync&quot;
)

func main() {
	var wg sync.WaitGroup

	sumWorkerFunc := func(k int, a []int, br *sync.WaitGroup) {
		bound := int(math.Ceil(math.Log2(float64(k))))
		for i := 1; i &lt; bound; i++ {
			if k%int(math.Pow(2, float64(i))) == 0 {
				a[k] = a[k-int(math.Pow(2, float64(i-1)))] + a[k]
			}

			br.Done()
			br.Wait() // this should not be here
			br.Add(1)
		}

		wg.Done()
	}

	a := []int{0, 1, 2, 3, 4, 5, 6, 7}

	fmt.Println(&quot;Before:&quot;)
	fmt.Println(a)

	workers := 8
	wg.Add(workers)

	var barrier sync.WaitGroup
	barrier.Add(workers)

	for k := 0; k &lt; workers; k++ {
		go sumWorkerFunc(k, a, &amp;barrier)
	}
	wg.Wait()

	fmt.Println(&quot;After:&quot;)
	fmt.Println(a)
}

But I cannot place a Wait() there because it will be called by all the workers. What would be a correct way of implementing a barrier there? I am starting to think that maybe this problem is oriented more towards the shared memory model which may not suitable for Golang.

Thanks!

EDIT:

I added an example of what I am trying to achieve:

5      2      1      3      5       8      1      1
|      |      |      |      |       |      |      |
|_ _ _ 7      |_ _ _ 4      |_ _ _ 13      |_ _ _ 2
       |             |              |             |
       |_ _ _ _ _ _ 11              |_ _ _ _ _ _ 15
                     |                            |
                     |_ _ _ _ _ _ _ _ _ _ _ _ _  26

where each worker is responsible for one element of the array.

答案1

得分: 0

你需要不止一个WaitGroup来协调这个例程。看一下这个模式:

func main() {
	const size = 100
	var (
		wg sync.WaitGroup
		a  [size]int
	)

	// 用1填充数组a
	for i := 0; i < size; i++ {
		go func(x int) {
			wg.Add(1)
			a[x] = 1
			wg.Done()
		}(i)
	}

	wg.Wait()

	fmt.Println(a)
}

每个工作线程在执行工作之前都会将自己添加到WaitGroup中,然后在完成工作后将自己移除。
你看到问题了吗?尝试自己运行几次并查看输出。

只有在所有预期的wg.Add(x)调用完成之后,wg.Wait()才是有效的。由于wg.Add(1)在goroutine中没有任何其他同步,我们无法确定在main执行wg.Wait()时有多少个工作线程已经被添加。例如,可能有100个工作线程中的50个都调用了wg.Add(1)wg.Done(),然后剩下的50个什么都没做。因此,wg.Wait()会继续执行,而其中50个工作线程仍未完成。这就是说,wg.Add(x)必须同步!

var barrier sync.WaitGroup
barrier.Add(workers)

for k := 0; k < workers; k++ {
    go sumWorkerFunc(k, a, &barrier)
}

在这种情况下,Add是同步的,因为它发生在工作线程开始执行之前。

br.Done()
br.Wait()
br.Add(1)

在所有工作线程调用Wait并没有问题,问题在于Add没有同步。在这里,你不能使用WaitGroup来进行同步。你需要一些额外的功能(另一个WaitGroup、锁、通道等)来在工作线程之间的每一轮中创建同步。

这是我能想到的最简单的解决方案,它为每一轮创建了一个WaitGroup:

func main() {
	const (
		workers = 3
		rounds  = 5
	)

	work := func(i int, roundWgs []sync.WaitGroup) {
		for r := 0; r < rounds; r++ {
			// 这是每一轮的工作
			fmt.Printf("round: %v, worker %v\n", r, i)

			// 我们完成了当前轮的工作,并等待组完成。
			roundWgs[r].Done()
			roundWgs[r].Wait()
		}
	}

	// 每一轮的工作都有自己的WaitGroup,其中每个工作线程都必须完成。
	var roundWgs = make([]sync.WaitGroup, rounds)
	for i := 0; i < rounds; i++ {
		roundWgs[i].Add(workers)
	}

	// wg是最外层的WaitGroup,它等待所有工作完成。
	var wg sync.WaitGroup
	wg.Add(workers)
	for i := 0; i < workers; i++ {
		go func(j int) {
			defer wg.Done()
			work(j, roundWgs)
		}(i)
	}
	wg.Wait()
}

输出:

round: 0, worker 2
round: 0, worker 0
round: 0, worker 1
round: 1, worker 1
round: 1, worker 0
round: 1, worker 2
round: 2, worker 2
round: 2, worker 1
round: 2, worker 0
round: 3, worker 0
round: 3, worker 2
round: 3, worker 1
round: 4, worker 1
round: 4, worker 2
round: 4, worker 0

再次说明,Add是同步的,因为它们都发生在任何工作线程开始之前。

英文:

You need more than just the one WaitGroup to coordinate this routine. Look at this pattern:

func main() {
	const size = 100
	var (
		wg sync.WaitGroup
		a  [size]int
	)

	// Fill array a with all ones
	for i := 0; i &lt; size; i++ {
		go func(x int) {
			wg.Add(1)
			a[x] = 1
			wg.Done()
		}(i)
	}

	wg.Wait()

	fmt.Println(a)
}

Each worker adds itself to the WaitGroup before doing work, then removes itself when it's done.
Do you see the problem? Try running it yourself a few times and see the output.

wg.Wait() is only valid if you call it after all of the expected wg.Add(x) have been called. Since the wg.Add(1) are inside the goroutine without any other synchronization, we don't know for sure how many workers were added by the time main goes to wg.Wait(). For example, it's possible that out of the 100 workers, 50 of them all called wg.Add(1) and then wg.Done() before the remaining 50 did anything. So, wg.Wait() continues while 50 of the workers still have not completed. All of this is to say that wg.Add(x) must be synchronized!

var barrier sync.WaitGroup
barrier.Add(workers)

for k := 0; k &lt; workers; k++ {
    go sumWorkerFunc(k, a, &amp;barrier)
}

In this case, the Add is synchronized because it happens before the workers begin to execute.

br.Done()
br.Wait()
br.Add(1)

There's no problem necessarily with having all of your workers call Wait, the problem is that the Add is not synchronized. You cannot use the WaitGroup to synchronize itself here. You need some additional feature (another WaitGroup, a lock, a channel, etc.) to create synchronization across the workers between rounds.

Here's the easiest solution I could come up with, which makes one WaitGroup per each round:

func main() {
	const (
		workers = 3
		rounds  = 5
	)

	work := func(i int, roundWgs []sync.WaitGroup) {
		for r := 0; r &lt; rounds; r++ {
			// This is the &quot;work&quot; we do each round
			fmt.Printf(&quot;round: %v, worker %v\n&quot;, r, i)

			// We are finished the current round, and will wait for the group.
			roundWgs[r].Done()
			roundWgs[r].Wait()
		}
	}

	// Each round of work has it&#39;s own WaitGroup, in which each worker must finish.
	var roundWgs = make([]sync.WaitGroup, rounds)
	for i := 0; i &lt; rounds; i++ {
		roundWgs[i].Add(workers)
	}

	// wg is our outermost WaitGroup, which waits until all work is done.
	var wg sync.WaitGroup
	wg.Add(workers)
	for i := 0; i &lt; workers; i++ {
		go func(j int) {
			defer wg.Done()
			work(j, roundWgs)
		}(i)
	}
	wg.Wait()
}

Output:

round: 0, worker 2
round: 0, worker 0
round: 0, worker 1
round: 1, worker 1
round: 1, worker 0
round: 1, worker 2
round: 2, worker 2
round: 2, worker 1
round: 2, worker 0
round: 3, worker 0
round: 3, worker 2
round: 3, worker 1
round: 4, worker 1
round: 4, worker 2
round: 4, worker 0

Again, the Adds are synchronized because they all happen before any workers begin.

答案2

得分: -1

我认为你试图做的更像是一个共享内存的场景。如果你能在启动 goroutine 的循环之后立即放置等待操作,效果会更好。而且你应该使用互斥锁来确保互斥访问。

我认为在这个问题中顺序并不重要。

英文:

I think what you are trying to do is more like a shared memory scenario. It would be better if instead of deferring the wait you can put it just after the loop that starts the goroutines.
And you should use a mutex to ensure mutual exclusion.

And I don't think order matters for this problem.

huangapple
  • 本文由 发表于 2022年6月11日 05:45:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/72580118.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定