在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

huangapple go评论159阅读模式
英文:

Parallel saxpy implementation in Go isn't scaling well across cores

问题

所以我正在尝试实现一个同时具有阻塞和并行计算功能的saxpy实现,可以利用我机器上可用的8个核心进行计算。我开始假设向量x和y的大小较小,可以在我的机器的L1缓存中进行计算(分为256kB - 128kB数据,128kB代码)可以串行计算。为了测试这个假设,我编写了两个saxpy的实现,一个是saxpy的阻塞串行版本(BSS),另一个是saxpy的阻塞并行版本(BPS)。阻塞算法仅在向量的大小大于4096个元素时使用。以下是这些实现的代码:

const cachecap = 32*1024/8 // 4096
func blocked_serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    for i := 0; i < nblocks; i++ {
        beg := i * cachecap
        end := (i + 1) * cachecap
        if end >= zn {
            end = zn
        }
        xb := x[beg:end]
        yb := y[beg:end]
        zb := z[beg:end]
        serial_saxpy(a, xb, incx, b, yb, incy, zb, incz)
    }
}
func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(0)
    if nblocks < nworkers {
        nworkers = nblocks
    }

    sendchan := make(chan block, nblocks)

    var wg sync.WaitGroup
    for i := 0; i < nworkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            a, b := a, b
            incx, incy, incz := incx, incy, incz
            for blk := range sendchan {
                beg, end := blk.beg, blk.end
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }
        }()
    }

    for i := 0; i < nblocks; i++ {
        beg := i * cachecap
        end := (i + 1) * cachecap
        if end >= zn {
            end = zn
        }
        sendchan <- block{beg, end}
    }
    close(sendchan)
    wg.Wait()
}

func serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    // 省略了函数的具体实现
}

然后,我为这三个函数blocked_serial_saxpyblocked_parallel_saxpyserial_saxpy编写了基准测试。下面的图片显示了使用向量大小为1e3、1e4、1e5、2e5、3e5、4e5、6e5、8e5和1e6的基准测试结果:

在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

为了帮助我可视化blocked_parallel_saxpy实现的性能,我绘制了结果,以下是我得到的结果:

在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

从图中可以看出,当在blocked_parallel_saxpy基准测试期间所有的CPU都被使用并且使用率达到100%时,我为什么没有看到并行加速。任务管理器的图像如下所示:

在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

有人可以帮助我理解这里发生了什么吗?我看到的是问题的症状还是正常现象?如果是前者,有没有办法解决它?

编辑:我修改了blocked_parallel_saxpy的代码如下。我将总块数(nblocks)分成nworker个goroutine并行计算nworker个块。此外,我删除了通道。我对代码进行了基准测试,性能与使用通道的并行实现相同,所以我没有附上基准测试结果。

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(0)
    if nblocks < nworkers {
        nworkers = nblocks
    }

    var wg sync.WaitGroup
    for i := 0; i < nworkers; i++ {
        for j := 0; j < nworkers && (i+j) < nblocks; j++ {
            wg.Add(1)
            go func(i, j int) {
                defer wg.Done()
                a, b := a, b
                incx, incy, incz := incx, incy, incz
                k := i + j
                beg := k * cachecap
                end := (k + 1) * cachecap
                if end >= zn {
                    end = zn
                }
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }(i, j)
        }
    }
    wg.Wait()
}

编辑2:我又写了一个没有使用通道的blocked_parallel_saxpy版本。这次,我生成了NumCPU个goroutine,每个goroutine处理nblocks/nworkers + 1个块,每个块的长度为cachecap个元素。即使在这里,代码的性能与前两个实现完全相同。

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
    zn := len(z)
    if zn <= cachecap {
        serial_saxpy(a, x, incx, b, y, incy, z, incz)
        return
    }

    nblocks := zn/cachecap + 1
    nworkers := runtime.GOMAXPROCS(runtime.NumCPU())
    if nblocks < nworkers {
        nworkers = nblocks
    }

    k := nblocks/nworkers + 1
    var wg sync.WaitGroup
    wg.Add(nworkers)
    for i := 0; i < nworkers; i++ {
        go func(i int) {
            defer wg.Done()
            for j := 0; j < k && (j+i*k) < nblocks; j++ {
                beg := (j + i*k) * cachecap
                end := beg + cachecap
                if end > zn {
                    end = zn
                }
                serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
            }
        }(i)
    }

    wg.Wait()
}
英文:

So I'm trying to implement an implementation of saxpy that is both blocked and can be computed in parallel using the 8-cores available on my machine. I started with the assumption that small sizes of the vectors x and y which fit into the L1 cache of my machine (split 256kB - 128kB data, 128kB code), can be computed in serial. To test this assumption, I wrote two implementations of saxpy, one which is a blocked serial version of saxpy (BSS) and a blocked parallel version of saxpy (BPS). The blocking algorithm is used only when the sizes of the vectors are larger than 4096 elements long. The following are the implementations:

const cachecap = 32*1024/8 // 4096
func blocked_serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
zn := len(z)
//fmt.Println(&quot;zn: &quot;, zn)
if zn &lt;= cachecap {
serial_saxpy(a, x, incx, b, y, incy, z, incz)
return
}
nblocks := zn/cachecap + 1
//fmt.Println(&quot;nblocks: &quot;, nblocks)
for i := 0; i &lt; nblocks; i++ {
beg := i * cachecap
end := (i + 1) * cachecap
if end &gt;= zn {
end = zn
}
//fmt.Println(&quot;beg, end: &quot;, beg, end)
xb := x[beg:end]
yb := y[beg:end]
zb := z[beg:end]
serial_saxpy(a, xb, incx, b, yb, incy, zb, incz)
}
}
func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
zn := len(z)
if zn &lt;= cachecap {
//fmt.Println(&quot;zn &lt;= cachecap: using serial_saxpy&quot;)
serial_saxpy(a, x, incx, b, y, incy, z, incz)
return
}
nblocks := zn/cachecap + 1
//fmt.Println(&quot;nblocks: &quot;, nblocks)
nworkers := runtime.GOMAXPROCS(0)
if nblocks &lt; nworkers {
nworkers = nblocks
}
//fmt.Println(&quot;nworkers: &quot;, nworkers)
//buf := blockSize*nworkers
//if buf &gt; nblocks {
//	buf = nblocks
//}
//sendchan := make(chan block, buf)
sendchan := make(chan block, nblocks)
var wg sync.WaitGroup
for i := 0; i &lt; nworkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
a, b := a, b
incx, incy, incz := incx, incy, incz
for blk := range sendchan {
beg, end := blk.beg, blk.end
serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
}
}()
}
for i := 0; i &lt; nblocks; i++ {
beg := i * cachecap
end := (i + 1) * cachecap
if end &gt;= zn {
end = zn
}
//fmt.Println(&quot;beg:end&quot;, beg, end)
sendchan &lt;- block{beg, end}
}
close(sendchan)
wg.Wait()
}
func serial_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
if incx &lt;= 0 || incy &lt;= 0 || incz &lt;= 0 {
panic(&quot;AxpBy: zero or negative increments not supported&quot;)
}
n := len(z) / incz
if incx == 1 &amp;&amp; incy == 1 &amp;&amp; incz == 1 {
if a == 1 &amp;&amp; b == 1 {
for i := 0; i &lt; n; i++ {
z[i] = x[i] + y[i]
}
return
}
if a == 0 &amp;&amp; b == 1 {
copy(z, y)
//for i := 0; i &lt; n; i++ {
//	z[i] = y[i]
//}
return
}
if a == 1 &amp;&amp; b == 0 {
copy(z, x)
//for i := 0; i &lt; n; i++ {
//	z[i] = x[i]
//}
return
}
if a == 0 &amp;&amp; b == 0 {
return
}
for i := 0; i &lt; n; i++ {
z[i] = a*x[i] + b*y[i]
}
return
}
// unequal increments or equal increments != 1
ix, iy, iz := 0, 0, 0
if a == 1 &amp;&amp; b == 1 {
for i := 0; i &lt; n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
z[iz] = x[ix] + y[iy]
}
return
}
if a == 0 &amp;&amp; b == 1 {
for i := 0; i &lt; n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
z[iz] = y[iy]
}
return
}
if a == 1 &amp;&amp; b == 0 {
for i := 0; i &lt; n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
z[iz] = x[ix]
}
return
}
if a == 0 &amp;&amp; b == 0 {
return
}
for i := 0; i &lt; n; i, ix, iy, iz = i+1, ix+incx, iy+incy, iz+incz {
z[iz] = a*x[ix] + b*y[iy]
}
}

I then wrote benchmarks for the three functions blocked_serial_saxpy, blocked_parallel_saxpy and serial_saxpy. The following image shows the results of the benchmarks with vector sizes 1e3, 1e4, 1e5, 2e5, 3e5, 4e5, 6e5, 8e5 and 1e6 respectively:
在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

To help me visualize the performance of the blocked_parallel_saxpy implementation, I plotted the results and this is what I obtained:
在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。
Looking at the plot, makes me wonder, why am I not seeing a parallel speedup, when all the CPUs are being used and at 100% during the blocked_parallel_saxpy benchmark. The image from task manager is below:
在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

Could someone help me understand what's going on here? Is what I'm seeing, symptom of a problem or the way it should be? If it's the former, is there a way to fix it?

Edit: I have modified the blocked_parallel_saxpy code to the following. I dividing the total no.of blocks (nblocks) such that there are nworker goroutines computing nworker no. of blocks, in parallel. In addition, I have removed the channel. I have benchmarked the code and it performs identically to the parallel implementation with the channel, hence why I haven't attached the benchmarks.

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
zn := len(z)
if zn &lt;= cachecap {
serial_saxpy(a, x, incx, b, y, incy, z, incz)
return
}
nblocks := zn/cachecap + 1
nworkers := runtime.GOMAXPROCS(0)
if nblocks &lt; nworkers {
nworkers = nblocks
}
var wg sync.WaitGroup
for i := 0; i &lt; nworkers; i++ {
for j := 0; j &lt; nworkers &amp;&amp; (i+j) &lt; nblocks; j++ {
wg.Add(1)
go func(i, j int) {
defer wg.Done()
a, b := a, b
incx, incy, incz := incx, incy, incz
k := i + j
beg := k * cachecap
end := (k + 1) * cachecap
if end &gt;= zn {
end = zn
}
serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
}(i, j)
}
wg.Wait()
}

Edit.2: I have written another version of the blocked_parallel_saxpy, again, without channels. This time, I spawn NumCPU goroutines, each processing nblocks/nworkers + 1 blocks where each block is cachecap no. of elements in length. Even, here, the code performs identically to the previous two implementations.

func blocked_parallel_saxpy(a float64, x []float64, incx int, b float64, y []float64, incy int, z []float64, incz int) {
zn := len(z)
if zn &lt;= cachecap {
serial_saxpy(a, x, incx, b, y, incy, z, incz)
return
}
nblocks := zn/cachecap + 1
nworkers := runtime.GOMAXPROCS(runtime.NumCPU())
if nblocks &lt; nworkers {
nworkers = nblocks
}
k := nblocks/nworkers + 1
var wg sync.WaitGroup
wg.Add(nworkers)
for i := 0; i &lt; nworkers; i++ {
go func(i int) {
defer wg.Done()
for j := 0; j &lt; k &amp;&amp; (j+i*k) &lt; nblocks; j++ {
beg := (j + i*k) * cachecap
end := beg + cachecap
if end &gt; zn {
end = zn
}
//fmt.Printf(&quot;i:%d, j:%d, k:%d, [beg:end]=[%d:%d]\n&quot;, i, j, k, beg, end)
serial_saxpy(a, x[beg:end], incx, b, y[beg:end], incy, z[beg:end], incz)
}
}(i)
}
wg.Wait()
}

答案1

得分: 1

我会尝试一个没有通道的并行版本,其中每个工作进程计算每8个块,无需协调。

英文:

I'd try a parallel version without channels, where each worker computes every 8th block, without coordination.

答案2

得分: 1

FWIW,在Linux上可以运行:你可以看到BlockedSaxpyParrSaxpy的调用位置。

首先,你可以看到一个核心被最大化使用 - 这是blocked_serial_saxpy被调用时。然后你可以看到所有核心都在使用,但它们使用的CPU比单个核心少。这是blocked_parrallel_saxpy被调用时。SIZE是一个随机的5126。

这是一个在SIZE=2e06上运行的版本。同样,你可以看到所有核心在后面的基准测试中都在工作。现在我认为你的函数可能可以稍微改进一下,但我会在我不饿着肚子的时候留给你作为练习。

使用SIZE=2e8,我们最终看到了这个结果:

go test -bench=. -benchtime=10s -cpuprofile=test.prof
BenchmarkBlockedSaxpy-8   	      20	 662405409 ns/op
BenchmarkParrSaxpy-8      	      30	 474503967 ns/op

最后,三个注意事项:

  1. saxpy是用于float32的。你的意思是daxpy,但我有点迂腐。
  2. 另外:gonum拥有很好的d/saxpy功能。
  3. 只需使用runtime.NumCPU()而不是runtime.GOMAXPROCS(runtime.NumCPU())
英文:

FWIW, it works on linux: you can see where the BlockedSaxpy and ParrSaxpy is called.

First you see one core being maximized - that's when blocked_serial_saxpy is called. Then you see all cores being used, but they use less CPU than a single core. That's when blocked_parrallel_saxpy is being called. SIZE is a random 5126

在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

Here's a version that runs on SIZE=2e06. Again you can see all cores working in the latter benchmark. Now I think your function could be improved slightly, but I'll leave that as an exercise for when I'm not starving for dinner.
在Go语言中并行实现的saxpy(向量乘法和向量加法)在多核上的扩展性不好。

With SIZE=2e8, we finally see this result:

go test -bench=. -benchtime=10s -cpuprofile=test.prof
BenchmarkBlockedSaxpy-8   	      20	 662405409 ns/op
BenchmarkParrSaxpy-8      	      30	 474503967 ns/op

Finally, three notes:

  1. saxpy is for float32. You mean daxpy, but I'm being pedantic.
  2. Additionally: gonum has great d/saxpy facilities.
  3. Just use runtime.NumCPU() instead of runtime.GOMAXPROCS(runtime.NumCPU()).

huangapple
  • 本文由 发表于 2017年9月10日 20:34:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/46140741.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定