为什么这些 goroutine 在更多并发执行时没有提高性能?

huangapple go评论90阅读模式
英文:

Why do these goroutines not scale their performance from more concurrent executions?

问题

背景

我目前正在撰写我的学士论文,我的任务是优化给定的Go代码,即使其尽可能快地运行。首先,我优化了串行函数,然后尝试通过goroutine引入并行性。在上网搜索后,我现在明白了并发和并行性的区别,感谢talks.golang中的幻灯片。我参加了一些并行编程课程,在这些课程中,我们使用pthread/openmp并行化了C/C++代码,因此我尝试在Go中应用这些范例。也就是说,在这个特定的案例中,我正在优化一个函数,该函数计算长度为len:=n+(window_size-1)(它等于9393或10175)的切片的移动平均值,因此我们有n个窗口,我们计算相应的算术平均值,并将其正确保存在输出切片中。

请注意,这个任务本质上是尴尬的并行任务。

我的优化尝试和结果

moving_avg_concurrent2中,我将切片分成num_goroutines个较小的片段,并使用一个goroutine运行每个片段。这个函数在使用一个goroutine时表现得比moving_avg_serial4好,但是使用多个goroutine时,它的性能比moving_avg_serial4差。
moving_avg_concurrent3中,我采用了主/工作器模式。当使用一个goroutine时,性能比moving_avg_serial4差。在增加num_goroutines后,性能有所改善,但仍然不如moving_avg_serial4
为了比较moving_avg_serial4moving_avg_concurrent2moving_avg_concurrent3的性能,我编写了一个基准测试,并将结果制表如下:

函数和num_goroutines | 时间(ns/op) | 百分比
---------------------------------------------------------------------
          serial4    |     4357893   |  100.00%
          concur2_1  |     5174818   |  118.75%
          concur2_4  |     9986386   |  229.16%
          concur2_8  |    18973443   |  435.38%
          concur2_32 |    75602438   | 1734.84%
          concur3_1  |    32423150   |  744.01%
          concur3_4  |    21083897   |  483.81%
          concur3_8  |    16427430   |  376.96%
          concur3_32 |    15157314   |  347.81%

问题

由于如上所述,这个问题是尴尬的并行问题,我原本期望能看到巨大的性能提升,但事实并非如此。

为什么moving_avg_concurrent2根本不扩展?

为什么moving_avg_concurrent3moving_avg_serial4慢那么多?

我知道goroutine很廉价,但并不是免费的,但是可能会产生如此大的开销,以至于我们甚至比moving_avg_serial4更慢吗?

代码

函数:

// 返回包含输入(给定的,即未优化的)的移动平均值的切片
func moving_avg_serial(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // 用NaN初始化缓冲区
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i, val := range input {
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))] 
            buffer[int((math.Mod(float64(i), float64(window_size))))] = val
            if !NaN_in_slice(buffer) && first_time {
                sum := 0.0
                for _, entry := range buffer {
                    sum += entry
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) && !NaN_in_slice(buffer) {
                output[i] = output[i-1] + (val-old_val)/float64(window_size) // 无循环解决方案
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// 返回包含输入的移动平均值的切片
// 重新排序控制结构以利用短路评估
func moving_avg_serial4(input []float64, window_size int) []float64 {
    first_time := true
    var output = make([]float64, len(input))
    if len(input) > 0 {
        var buffer = make([]float64, window_size)
        // 用NaN初始化缓冲区
        for i := range buffer {
            buffer[i] = math.NaN()
        }
        for i := range input {
            // fmt.Printf("in mvg_avg4: i=%v\n", i)
            old_val := buffer[int((math.Mod(float64(i), float64(window_size))))] 
            buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
            if first_time && !NaN_in_slice(buffer) {
                sum := 0.0
                for j := range buffer {
                    sum += buffer[j]
                }
                output[i] = sum / float64(window_size)
                first_time = false
            } else if i > 0 && !math.IsNaN(output[i-1]) /* && !NaN_in_slice(buffer) */ {
                output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // 无循环解决方案
            } else {
                output[i] = math.NaN()
            }
        }
    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// 返回包含输入的移动平均值的切片
// 将切片分割成较小的片段以供goroutine使用,但不使用串行版本,即我们只在开头有NaN,因此希望减少一些开销
// 仍然不扩展(随着大小和num_goroutines的增加而性能下降)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_items := len(input) - (window_size - 1)
        var barrier_wg sync.WaitGroup
        n := num_items / num_goroutines
        go_avg := make([][]float64, num_goroutines)
        for i := 0; i < num_goroutines; i++ {
            go_avg[i] = make([]float64, 0, num_goroutines)
        }

        for i := 0; i < num_goroutines; i++ {
            barrier_wg.Add(1)
            go func(go_id int) {
                defer barrier_wg.Done()

                // 计算边界
                var start, stop int
                start = go_id*int(n) + (window_size - 1) // 起始索引
                // 结束索引
                if go_id != (num_goroutines - 1) {
                    stop = start + n // 结束索引
                } else {
                    stop = num_items + (window_size - 1) // 结束索引
                }

                loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

                loc_avg = make([]float64, stop-start)
                current_sum := 0.0
                for i := start - (window_size - 1); i < start+1; i++ {
                    current_sum += input[i]
                }
                loc_avg[0] = current_sum / float64(window_size)
                idx := 1

                for i := start + 1; i < stop; i++ {
                    loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
                    idx++
                }

                go_avg[go_id] = append(go_avg[go_id], loc_avg...)

            }(i)
        }
        barrier_wg.Wait()

        for i := 0; i < num_goroutines; i++ {
            output = append(output, go_avg[i]...)
        }

    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

// 返回包含输入的移动平均值的切片
// 范式的改变,我们选择了主/工作器模式,并生成每个窗口,每个窗口将由一个goroutine计算
func compute_window_avg(input, output []float64, start, end int) {
    sum := 0.0
    size := end - start
    for _, val := range input[start:end] {
        sum += val
    }
    output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
    var output = make([]float64, window_size-1, len(input))
    for i := 0; i < window_size-1; i++ {
        output[i] = math.NaN()
    }
    if len(input) > 0 {
        num_windows := len(input) - (window_size - 1)
        var output = make([]float64, len(input))
        for i := 0; i < window_size-1; i++ {
            output[i] = math.NaN()
        }

        pending := make(chan *Work)
        done := make(chan *Work)

        // 创建工作
        go func() {
            for i := 0; i < num_windows; i++ {
                pending <- NewWork(compute_window_avg, input, output, i, i+window_size)
            }
        }()

        // 启动工作器,直到没有工作为止
        for i := 0; i < num_goroutines; i++ {
            go func() {
                Worker(pending, done)
            }()
        }

        // 等待所有工作完成
        for i := 0; i < num_windows; i++ {
            <-done
        }

        return output

    } else { // 空输入
        fmt.Println("moving_avg is panicking!")
        panic(fmt.Sprintf("%v", input))
    }
    return output
}

基准测试:

//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res11 = r
}

var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_serial4(BackTest_res.F["Trading DrawDowns"], window)
    }
    import_data_res14 = r
}

var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent2(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res16 = r
}

var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
    var r []float64
    for n := 0; n < b.N; n++ {
        r = moving_avg_concurrent3(BackTest_res.F["Trading DrawDowns"], window, num_goroutines)
    }
    import_data_res17 = r
}



func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
    benchmarkMoving_avg_serial(b, 261*10)
}

func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
    benchmarkMoving_avg_serial4(b, 261*10)
}


func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}


func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
    benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############

备注:
这是我第一次发帖,我还在学习中,所以欢迎任何建设性的批评。

英文:

Background

I am currently working on my bachelor thesis and basically my task is to optimise a given code in Go, i.e. make it run as fast as possible. First, I optimised the serial function and then tried to introduce parallelism via goroutines. After researching on the internet I now understand the difference between concurrency and parallelism thanks to the following slides from talks.golang. I visited some parallel programming courses where we parallelised a c/c++ code with help of pthread/openmp, thus I tried to apply these paradigms in Go. That said, in this particular case I am optimising a function which computes the moving average of a slice with length len:=n+(window_size-1) (it equals either 9393 or 10175), hence we have n windows of which we compute the corresponding arithmetic average and save that properly in the output slice.

Note that this task is inherently embarrassing parallel.

My optimisation attempts and results

In moving_avg_concurrent2 I split up the slice into num_goroutines smaller pieces and ran each with one goroutine. This function performed with one goroutine, out of some reason (could not find out why yet, but we are getting tangent here), better than moving_avg_serial4 but with more than one goroutine it started to perform worse than moving_avg_serial4.
In moving_avg_concurrent3 I adopted the master/worker paradigm. The performance was worse than moving_avg_serial4 when using one goroutine. Here we at least I got a better performance when increasing num_goroutines but still not better than moving_avg_serial4.
To compare the performances of moving_avg_serial4, moving_avg_concurrent2 and moving_avg_concurrent3 I wrote a benchmark and I tabulated the results:

fct &amp; num_goroutines | timing in ns/op | percentage  
---------------------------------------------------------------------   
          serial4    |         4357893 |   100.00%  
          concur2_1  |         5174818 |   118.75%  
          concur2_4  |         9986386 |   229.16%  
          concur2_8  |        18973443 |   435.38%  
          concur2_32 |        75602438 |  1734.84%  
          concur3_1  |        32423150 |   744.01%  
          concur3_4  |        21083897 |   483.81%  
          concur3_8  |        16427430 |   376.96%  
          concur3_32 |        15157314 |   347.81%  

Question

Since as mentioned above this problem is embarrassingly parallel I was expecting to see a tremendous performance increase but that was not the case.

Why does moving_avg_concurrent2 not scale at all?

And why is moving_avg_concurrent3 that much slower than moving_avg_serial4?

I know that goroutines are cheap but still are not free, but is it possible that this generates that much overhead such that we are even slower than moving_avg_serial4?

Code

Functions:

// returns a slice containing the moving average of the input (given, i.e. not optimised)
func moving_avg_serial(input []float64, window_size int) []float64 {
	first_time := true
	var output = make([]float64, len(input))
	if len(input) &gt; 0 {
		var buffer = make([]float64, window_size)
		// initialise buffer with NaN
		for i := range buffer {
			buffer[i] = math.NaN()
		}
		for i, val := range input {
			old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
			buffer[int((math.Mod(float64(i), float64(window_size))))] = val
			if !NaN_in_slice(buffer) &amp;&amp; first_time {
				sum := 0.0
				for _, entry := range buffer {
					sum += entry
				}
				output[i] = sum / float64(window_size)
				first_time = false
			} else if i &gt; 0 &amp;&amp; !math.IsNaN(output[i-1]) &amp;&amp; !NaN_in_slice(buffer) {
				output[i] = output[i-1] + (val-old_val)/float64(window_size) // solution without loop
			} else {
				output[i] = math.NaN()
			}
		}
	} else { // empty input
		fmt.Println(&quot;moving_avg is panicking!&quot;)
		panic(fmt.Sprintf(&quot;%v&quot;, input))
	}
	return output
}

// returns a slice containing the moving average of the input
// reordering the control structures to exploid the short-circuit evaluation
func moving_avg_serial4(input []float64, window_size int) []float64 {
	first_time := true
	var output = make([]float64, len(input))
	if len(input) &gt; 0 {
		var buffer = make([]float64, window_size)
		// initialise buffer with NaN
		for i := range buffer {
			buffer[i] = math.NaN()
		}
		for i := range input {
			//            fmt.Printf(&quot;in mvg_avg4: i=%v\n&quot;, i)
			old_val := buffer[int((math.Mod(float64(i), float64(window_size))))]
			buffer[int((math.Mod(float64(i), float64(window_size))))] = input[i]
			if first_time &amp;&amp; !NaN_in_slice(buffer) {
				sum := 0.0
				for j := range buffer {
					sum += buffer[j]
				}
				output[i] = sum / float64(window_size)
				first_time = false
			} else if i &gt; 0 &amp;&amp; !math.IsNaN(output[i-1]) /* &amp;&amp; !NaN_in_slice(buffer)*/ {
				output[i] = output[i-1] + (input[i]-old_val)/float64(window_size) // solution without loop
			} else {
				output[i] = math.NaN()
			}
		}
	} else { // empty input
		fmt.Println(&quot;moving_avg is panicking!&quot;)
		panic(fmt.Sprintf(&quot;%v&quot;, input))
	}
	return output
}

// returns a slice containing the moving average of the input
// splitting up slice into smaller pieces for the goroutines but without using the serial version, i.e. we only have NaN&#39;s in the beginning, thus hope to reduce some overhead
// still does not scale (decreasing performance with increasing size and num_goroutines)
func moving_avg_concurrent2(input []float64, window_size, num_goroutines int) []float64 {
	var output = make([]float64, window_size-1, len(input))
	for i := 0; i &lt; window_size-1; i++ {
		output[i] = math.NaN()
	}
	if len(input) &gt; 0 {
		num_items := len(input) - (window_size - 1)
		var barrier_wg sync.WaitGroup
		n := num_items / num_goroutines
		go_avg := make([][]float64, num_goroutines)
		for i := 0; i &lt; num_goroutines; i++ {
			go_avg[i] = make([]float64, 0, num_goroutines)
		}

		for i := 0; i &lt; num_goroutines; i++ {
			barrier_wg.Add(1)
			go func(go_id int) {
				defer barrier_wg.Done()

				// computing boundaries
				var start, stop int
				start = go_id*int(n) + (window_size - 1) // starting index
				// ending index
				if go_id != (num_goroutines - 1) {
					stop = start + n // Ending index
				} else {
					stop = num_items + (window_size - 1) // Ending index
				}

				loc_avg := moving_avg_serial4(input[start-(window_size-1):stop], window_size)

				loc_avg = make([]float64, stop-start)
				current_sum := 0.0
				for i := start - (window_size - 1); i &lt; start+1; i++ {
					current_sum += input[i]
				}
				loc_avg[0] = current_sum / float64(window_size)
				idx := 1

				for i := start + 1; i &lt; stop; i++ {
					loc_avg[idx] = loc_avg[idx-1] + (input[i]-input[i-(window_size)])/float64(window_size)
					idx++
				}

				go_avg[go_id] = append(go_avg[go_id], loc_avg...)

			}(i)
		}
		barrier_wg.Wait()

		for i := 0; i &lt; num_goroutines; i++ {
			output = append(output, go_avg[i]...)
		}

	} else { // empty input
		fmt.Println(&quot;moving_avg is panicking!&quot;)
		panic(fmt.Sprintf(&quot;%v&quot;, input))
	}
	return output
}

// returns a slice containing the moving average of the input
// change of paradigm, we opt for a master worker pattern and spawn all windows which each will be computed by a goroutine
func compute_window_avg(input, output []float64, start, end int) {
	sum := 0.0
	size := end - start
	for _, val := range input[start:end] {
		sum += val
	}
	output[end-1] = sum / float64(size)
}

func moving_avg_concurrent3(input []float64, window_size, num_goroutines int) []float64 {
	var output = make([]float64, window_size-1, len(input))
	for i := 0; i &lt; window_size-1; i++ {
		output[i] = math.NaN()
	}
	if len(input) &gt; 0 {
		num_windows := len(input) - (window_size - 1)
		var output = make([]float64, len(input))
		for i := 0; i &lt; window_size-1; i++ {
			output[i] = math.NaN()
		}

		pending := make(chan *Work)
		done := make(chan *Work)

		// creating work
		go func() {
			for i := 0; i &lt; num_windows; i++ {
				pending &lt;- NewWork(compute_window_avg, input, output, i, i+window_size)
			}
		}()

		// start goroutines which work through pending till there is nothing left
		for i := 0; i &lt; num_goroutines; i++ {
			go func() {
				Worker(pending, done)
			}()
		}

		// wait till every work is done
		for i := 0; i &lt; num_windows; i++ {
			&lt;-done
		}

		return output

	} else { // empty input
		fmt.Println(&quot;moving_avg is panicking!&quot;)
		panic(fmt.Sprintf(&quot;%v&quot;, input))
	}
	return output
}

Benchmarks:

//############### BENCHMARKS ###############
var import_data_res11 []float64
func benchmarkMoving_avg_serial(b *testing.B, window int) {
	var r []float64
	for n := 0; n &lt; b.N; n++ {
		r = moving_avg_serial(BackTest_res.F[&quot;Trading DrawDowns&quot;], window)
	}
	import_data_res11 = r
}

var import_data_res14 []float64
func benchmarkMoving_avg_serial4(b *testing.B, window int) {
	var r []float64
	for n := 0; n &lt; b.N; n++ {
		r = moving_avg_serial4(BackTest_res.F[&quot;Trading DrawDowns&quot;], window)
	}
	import_data_res14 = r
}

var import_data_res16 []float64
func benchmarkMoving_avg_concurrent2(b *testing.B, window, num_goroutines int) {
	var r []float64
	for n := 0; n &lt; b.N; n++ {
		r = moving_avg_concurrent2(BackTest_res.F[&quot;Trading DrawDowns&quot;], window, num_goroutines)
	}
	import_data_res16 = r
}

var import_data_res17 []float64
func benchmarkMoving_avg_concurrent3(b *testing.B, window, num_goroutines int) {
	var r []float64
	for n := 0; n &lt; b.N; n++ {
		r = moving_avg_concurrent3(BackTest_res.F[&quot;Trading DrawDowns&quot;], window, num_goroutines)
	}
	import_data_res17 = r
}



func BenchmarkMoving_avg_serial_261x10(b *testing.B) {
	benchmarkMoving_avg_serial(b, 261*10)
}

func BenchmarkMoving_avg_serial4_261x10(b *testing.B) {
	benchmarkMoving_avg_serial4(b, 261*10)
}


func BenchmarkMoving_avg_concurrent2_261x10_1(b *testing.B) {
	benchmarkMoving_avg_concurrent2(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent2_261x10_8(b *testing.B) {
	benchmarkMoving_avg_concurrent2(b, 261*10, 8)
}


func BenchmarkMoving_avg_concurrent3_261x10_1(b *testing.B) {
	benchmarkMoving_avg_concurrent3(b, 261*10, 1)
}
func BenchmarkMoving_avg_concurrent3_261x10_8(b *testing.B) {
	benchmarkMoving_avg_concurrent3(b, 261*10, 8)
}
//############### BENCHMARKS end ###############

Remarks:
This is my very first post, I am still learning, so any constructive criticism is also welcome.

答案1

得分: 5

事实#0:过早进行的优化努力通常会产生负面收益,显示它们只是浪费时间和精力。

为什么?
一个错误的代码行可能会使性能下降超过约37%,或者可能改善性能以节省基准处理时间的不到57%。

MA(200)[10000]float64上的51.151μs    ~ MA(200)[10000]int上的22.017μs
MA(200)[10000]float64上的70.325μs

为什么使用[]int
你可以从上面自己看到 - 这是高性能计算/金融技术高效子处理策略的核心(我们仍然在讨论仅**[SERIAL]**进程调度的术语)。

你可以在任何规模上进行测试 - 但最好首先在与MA(200)[10000]float64相同规模的设置上测试你自己的实现,并将基准持续时间发布在[us]中,以查看初始进程性能并进行苹果与苹果的比较,将发布的51.2 [us]阈值与之进行比较。

接下来是更困难的部分:

事实#1:这个任务不是尴尬地并行的

是的,你可以实现一个移动平均计算,以便通过一些故意灌输的“仅”**[CONCURRENT]**处理方法来处理大量数据(无论是由于某种错误,某个权威的“建议”,专业上的盲目还是来自双苏格拉底的无知),但这并不意味着卷积流处理的本质在移动平均数学公式中已经忘记了成为一个纯粹的[SERIAL]进程,只是因为试图强制在某种程度上计算它在某种程度上的“仅”[CONCURRENT]处理中。

(顺便说一句,硬核计算机科学家和双域书呆子在这里也会反对,Go语言通过设计使用了Rob Pike的最佳技能,以便拥有并发协程的框架,而不是任何真正的**[PARALLEL]**进程调度,尽管语言概念中提供的Hoare的CSP工具可能会添加一些调味料,并引入一种停止阻塞类型的进程间通信工具,将“仅”[CONCURRENT]代码部分阻塞到一些硬连线的CSP-p2p同步中。)

事实#2:Go分布式(为了任何速度提升)只能在最后阶段进行

[SERIAL]中性能较差并不代表任何标准。只有在单线程中进行了合理的性能调优之后,才可以从分布式中获益(仍然需要支付额外的串行成本,这使得Amdahl定律(更严格的Overhead-strict-Amdahl定律)参与其中)。

如果可以引入这样的低级别的附加设置开销并且仍然实现任何显着的并行性,扩展到处理的非[SEQ]部分,那么只有在这里才有机会提高进程的有效性能。

在这方面,很容易失去比获得更多的东西,因此始终将纯[SEQ]non-[SEQ] / N[PAR]_processes理论上的、过度天真的加速比进行基准测试,其中将支付所有附加的[SEQ]开销的总和,因此仅当:

(         纯-[SEQ]_处理      [ns]
+       附加-[SEQ]设置开销 [ns]
+        ( 非-[SEQ]_处理      [ns] / N[PAR]_processes )
  ) &lt;&lt; (  纯-[SEQ]_处理      [ns]
       + ( 非-[SEQ]_处理      [ns] / 1 )
         )

如果没有这种喷气式战斗机的优势,即超过你的高度和太阳的优势,永远不要尝试进行任何类型的HPC / 并行化尝试 - 它们永远不会为自己付费,因为它们不明显地比聪明的**[SEQ]**进程更好。

英文:

Fact #0: Pre-mature optimisation efforts often have negative yields<br><sub> showing they are just a waste of time & efforts</sub>


Why?
A single "wrong" SLOC may devastate performance into more than about +37%
or may improve performance to spend less than -57% of the baseline processing time

51.151&#181;s on MA(200) [10000]float64    ~ 22.017&#181;s on MA(200) [10000]int
70.325&#181;s on MA(200) [10000]float64

Why []int-s?
You see it on your own above - this is the bread and butter for HPC/fintech efficient sub-[us] processing strategies ( and we still speak in terms of just [SERIAL] process scheduling ).

This one may test on any scale - but rather test first ( here ) your own implementations, on the very the same scale - MA(200) [10000]float64 setup - and post your baseline durations in [us] to view the initial process performance and to compare apples-to-apples, having the posted 51.2 [us] threshold to compare against.

Next comes the harder part:


Fact #1: This task is NOT embarrasingly parallel

Yes, one may go and implement a Moving Average calculation, so that it indeed proceeds through the heaps of data using some intentionally indoctrinated "just"-[CONCURRENT] processing approach ( irrespective whether being due to some kind of error, some authority's "advice", professional blindness or just from a dual-Socrates-fair ignorance ) which obviously does not mean that the nature of the convolutional stream-processing, present inside the Moving Average mathematical formulation, has forgotten to be a pure [SERIAL] process, just due to an attempt to enforce it get calculated inside some degree of "just"-[CONCURRENT] processing.

( Btw. The Hard Computer-Scientists and dual-domain nerds will also object here, that Go-language is by design using best Rob Pike's skills into having a framework of concurrent coroutines, not any true-[PARALLEL] process-scheduling, even though Hoare's CSP-tools, available in the language concept, may add some salt and pepper and introduce a stop-block type of inter-process communication tools, that will block "just"-[CONCURRENT] code sections into some hardwired CSP-p2p-synchronisation. )


Fact #2: Go distributed ( for any kind of speedup ) only AT THE END

Having a poor level of performance in [SERIAL] does not set any yardstick. Having a reasonable amount of performance tuning in single-thread, only then one may benefit from going distributed ( still having to pay additional serial costs, which makes Amdahl Law ( rather Overhead-strict-Amdahl Law ) get into the game ).

If one can introduce such low level of additional setup-overheads and still achieve any remarkable parallelism, scaled into the non-[SEQ] part of the processing, there and only there comes a chance to increase the process effective-performance.

It is not hard to loose much more than to gain in this, so always benchmark the pure-[SEQ] against the potential tradeoffs between a non-[SEQ] / N[PAR]_processes theoretical, overhead-naive speedup, for which one will pay a cost of a sum of all add-on-[SEQ]-overheads, so if and only if :

(         pure-[SEQ]_processing      [ns]
+       add-on-[SEQ]-setup-overheads [ns]
+        ( non-[SEQ]_processing      [ns] / N[PAR]_processes )
  ) &lt;&lt; (  pure-[SEQ]_processing      [ns]
       + ( non-[SEQ]_processing      [ns] / 1 )
         )

Not having this jet-fighters advantage of both the surplus height and Sun behind you, never attempt going into any kind of HPC / parallelisation attempts - they will never pay for themselves not being remarkably &lt;&lt; better, than a smart [SEQ]-process.


为什么这些 goroutine 在更多并发执行时没有提高性能?
Epilogue: on overhead-strict Amdahl's Law interactive experiment UI

One animation is worth million words.

An interactive animation even better:

So,
assume a process-under-test, which has both a [SERIAL] and a [PARALLEL] part of the process schedule.

Let p be the [PARALLEL] fraction of the process duration ~ ( 0.0 .. 1.0 ) thus the [SERIAL] part does not last longer than ( 1 - p ), right?

So, let's start interactive experimentation from such a test-case, where the p == 1.0, meaning all such process duration is spent in just a [PARALLEL] part, and the both initial serial and the terminating parts of the process-flow ( that principally are always [SERIAL] ) have zero-durations ( ( 1 - p ) == 0. )

Assume the system does no particular magic and thus needs to spend some real steps on intialisation of each of the [PARALLEL] part, so as to run it on a different processor ( (1), 2, .., N ), so let's add some overheads, if asked to re-organise the process flow and to marshal + distribute + un-marshal all the necessary instructions and data, so as the intended process now can start and run on N processors in parallel.

These costs are called o ( here initially assumed for simplicity to be just constant and invariant to N, which is not always the case in real, on silicon / on NUMA / on distributed infrastructures ).

By clicking on the Epilogue headline above, an interactive environment opens and is free for one's own experimentation.

With p == 1. &amp;&amp; o == 0. &amp;&amp; N &gt; 1 the performance is steeply growing to current achievable [PARALLEL]-hardware O/S limits for a still monolytical O/S code-execution ( where still no additional distribution costs for MPI- and similar depeche-mode distributions of work-units ( where one would immediately have to add indeed a big number of [ms], while our so far best just [SERIAL] implementation has obviously did the whole job in less than just ~ 22.1 [us] ) ).

But except such artificially optimistic case, the job does not look so cheap to get efficiently parallelised.

  • Try having not a zero, but just about ~ 0.01% the setup overhead costs of o, and the line starts to show some very different nature of the overhead-aware scaling for even the utmost extreme [PARALLEL] case ( having still p == 1.0 ), and having the potential speedup somewhere about just near the half of the initially super-idealistic linear speedup case.

  • Now, turn the p to something closer to reality, somewhere less artificially set than the initial super-idealistic case of == 1.00 --&gt; { 0.99, 0.98, 0.95 } and ... bingo, this is the reality, where process-scheduling ought be tested and pre-validated.

What does that mean?

As an example, if an overhead ( of launching + final joining a pool of coroutines ) would take more than ~ 0.1% of the actual [PARALLEL] processing section duration, there would be not bigger speedup of 4x ( about a 1/4 of the original duration in time ) for 5 coroutines ( having p ~ 0.95 ), not more than 10x ( a 10-times faster duration ) for 20 coroutines ( all assuming that a system has 5-CPU-cores, resp. 20-CPU-cores free & available and ready ( best with O/S-level CPU-core-affinity mapped processes / threads ) for uninterrupted serving all those coroutines during their whole life-span, so as to achieve any above expected speedups.

Not having such amount of hardware resources free and ready for all of those task-units, intended for implementing the [PARALLEL]-part of the process-schedule, the blocking/waiting states will introduce additional absolute wait-states and the resulting-performance adds these new-[SERIAL]-blocking/waiting sections to the overall process-duration and the initially wished-to-have speedups suddenly cease to exist and performance factor falls well under &lt;&lt; 1.00 ( meaning that the effective run-time was due to the blocking-states way slower, than the non-parallelised just-[SERIAL] workflow ).

This may sound complicated for new keen experimentators, however we may put it in a reversed perspective. Given the whole process of distribution the intended [PARALLEL] pool-of-tasks is known not to be shorter than, say, about a 10 [us], the overhead-strict graphs show, there needs to be at least about 1000 x 10 [us] of non-blocking computing intensive processing inside the [PARALLEL] section so as not to devastate the efficiency of the parallelised-processing.

If there is not a sufficiently "fat"-piece of processing, the overhead-costs ( going remarkably above the above cited threshold of ~ 0.1% ) then brutally devastate the net-efficiency of the successfully parallellised-processing ( but having performed at such unjustifiably high relative costs of the setup vs the limited net effects of any-N-processors, as was demonstrated in the available live-graphs ).

There is no surprise for distributed-computing nerds, that the overhead o comes with also additional dependencies - on N ( the more processes, the more efforts are to be spent to distribute work-packages ), on marshalled data-BLOBs' sizes ( the larger the BLOBs, the longer the MEM-/IO-devices remain blocked, before serving the next process to receive a distributed BLOB across such device/resource for each of the target 2..N-th receiving process ), on avoided / CSP-signalled, channel-mediated inter-process coordinations ( call it additional per-incident blocking, reducing the p further and further below the ultimately nice ideal of 1. ).

So, the real-world reality is rather very far from the initially idealised, nice and promising p == 1.0, ( 1 - p ) == 0.0 and o == 0.0

As obvious from the very beginning, try to beat rather the 22.1 [us] [SERIAL] threshold, than trying to beat this, while getting worse and worse, if going [PARALLEL] where realistic overheads and scaling, using already under-performing approaches, does not help a single bit.

huangapple
  • 本文由 发表于 2017年9月8日 00:20:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/46101078.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定