Golang:如何捕获大规模并行基准测试(> 100万个任务)的返回值?

huangapple go评论76阅读模式
英文:

Golang: How to capture return values of massively parallel benchmark (> 1 million tasks)?

问题

我正在构建一个参数优化器,它基本上生成配置,对所有配置进行基准测试,收集所有结果,对它们进行排序,然后选择相对于基准测试结果表现最好的配置。

基准测试本身工作正常,但每次运行的时间取决于配置,介于50毫秒到2秒之间。关键在于,优化器生成了非常大量的配置,最低端约为10万个,最高端约为4000万个,正常范围在100万到500万之间。显然,单线程版本需要很长时间,而且CPU负载实际上非常低,因为任务相对较轻。

我已经设计了基准测试,以便与并发性良好地配合使用,也就是说,运行器封装在一个单独的结构体中(称为代理),基准测试本质上是一个纯函数,将所有状态作为参数。每次运行都会创建自己的状态,然后独立于其他所有运行进行,但所有函数都使用相同(引用的)共享数据集。以下是该函数的示例。

然而,我在处理每个基准测试的返回值时遇到了困难。在过去,我们在Scale中使用异步/等待进行任务并行处理,并让结果自动返回。据我所知,Go协程只适用于没有返回值的函数。实际上,通道是从Go协程中获取值的最自然方式。这就是我正在思考的关键问题:

考虑到我通常有超过100万个任务,如何正确高效地获取返回值?

与此相关的是,是否有适用于Golang的非常快速的参数优化器?对于Python,我记得optuna提供了出色的结果。

谢谢。

func (a *Agent) runOptimization(strategyConfigs []cmdb.Config) (result *bmx.OptimizeResult) {

	scores := make([]bmx.BackTestResult, len(strategyConfigs))

	println("Run all benchmarks")

	for i, config := range strategyConfigs {
		state := newState(&config)
		score := a.runBenchmark(state)
		scores[i] = *score // sort only works on actual values
	}

	println("Sort results")
	sort.Sort(bmx.ByTotalPnL(scores))

	println("Select best config")
	best := scores[len(scores)-1]

	println("Generate best strategy config")
	stratConf := a.getStrategyConfig(best.PatternConfig)

	println("Return optimization results")
	result = &bmx.OptimizeResult{
		Symbol:          best.Symbol,
		StrategyType:    best.StrategyType,
		OptimizedConfig: &stratConf,
		...

	}
	return result
}
英文:

I'm building a parameter optimizer that essentially generates configurations,
benchmarks all of them, collects all results, sorts them, and then picks the best performing configuration relative to the benchmark result.

The benchmark by itself works fine, but takes between 50 ms an 2 sec per run depending on the configuration. The crux is, the optimizer generates a very large number of configuration, that means, between 100k on the lowest end and about 40 million on the higher end with about 1 - 5 million as a good normal range. Obviously, the single threaded version takes forever and CPU load is actually very low as the task is relatively light.

I already designed the benchmark in a way to make it play nicely with concurrency, that is, the runner is encapsulated in a separate struct (called agent) and the benchmark is essentially a pure function that takes all state as a parameter. Essentially, each run creates its own state and then runs independently of all others, but all functions using the same (referenced) shared dataset. The function is shown below.

However, I struggle with dealing with the return value per Benchmark. Back in the days, in Scale we used Async / Await for task parallelism and just let the results roll on. Go Routines, afaik only work well with functions that have no return value. In practice, channels are the most natural way to fetch a value from a goroutine. And that's the crux I'm mulling over:

Considering that I usually have > 1 million tasks, how do I catch the return values correctly and efficiently?

Related to that, is there actually a very fast parameter optimizer for Golang?
For python, I remember optuna delivering excellent results.

Thank you

func (a *Agent) runOptimization(strategyConfigs []cmdb.Config) (result *bmx.OptimizeResult) {

scores := make([]bmx.BackTestResult, len(strategyConfigs))

println("Run all benchmarks")

for i, config := range strategyConfigs {
	state := newState(&config)
	score := a.runBenchmark(state)
	scores[i] = *score // sort only works on actual values
}


println("Sort results")
sort.Sort(bmx.ByTotalPnL(scores))

println("Select best config")
best := scores[len(scores)-1]

println("Generate best strategy config")
stratConf := a.getStrategyConfig(best.PatternConfig)

println("Return optimization results ")
result = &bmx.OptimizeResult{
	Symbol:          best.Symbol,
	StrategyType:    best.StrategyType,
	OptimizedConfig: &stratConf,
    ...

}
 	return result
 }

答案1

得分: 3

有多种方法可以实现这个。

一个“教科书”式的方法如下:

results := make(chan *score)

for i, config := range strategyConfigs {
    state := newState(&config)
    go a.runBenchmark(state, results)
}

for i := 0; i < len(strategyConfigs); i++ {
  scores[i] = <-results
}

然后修改你的 runBenchmark 方法,不返回任何值,并接受类型为 chan *score 的第二个参数。

这段代码的执行流程如下:

  1. 创建一个用于交换类型为 *score 的值的通道。
  2. 启动多个 goroutine 来运行 runBenchmark 方法,我猜测这是一个“代理”。
    该方法将计算得到的 score 对象(指针)通过提交给它的通道发送,并退出。
  3. 另一个循环从通道接收与启动的 goroutine 数量相同的值,并将每个接收到的值放入结果切片中。

注意事项:

  • 这假设 a 可以同时执行多个并发运行的 runBenchmark

    如果不可以,你可能需要为每个独立的 goroutine 创建一个单独的 a
    鉴于你的示例不是太简单,很难对此做出准确的猜测。
    如果你需要帮助,请提出一个单独的具体问题。

  • 如果你有成百上千万个“策略配置”,这种方法过于简单,因为所有的 goroutine 将一次性启动,这既浪费资源,也可能在数量过大时失败。
    一个经典的解决方法是使用所谓的“扇出”——当你有一个单独的 goroutine 通过通道接收“任务”,并将它们分发给一定数量的工作 goroutine,始终保持在某个限制之下。
    你可以从这里开始阅读:https://blog.golang.org/pipelines

另一种方法是利用 Go 语言中的一个特性,即数组(以及切片)的每个元素被视为一个独立的变量。
这意味着可以同时从工作 goroutine 并发更新结果切片的各个元素,只要在此过程进行时预先分配并且不重新分配(使用 append、重新切片等)该切片。

为了演示,让我们使用“等待组”(https://golang.org/pkg/sync#WaitGroup):

import "sync"

var wg sync.WaitGroup

scores := make([]*score, len(strategyConfigs))

wg.Add(len(strategyConfigs))
for i, config := range strategyConfigs {
    state := newState(&config)
    go a.runBenchmark(state, &scores[i], &wg)
}

wg.Wait()

runBenchmark 方法应该修改为:

defer wg.Done()

作为其第一条语句,并接受两个额外的参数,类型分别为 *score*sync.WaitGroup

在这里,runBenchmark 在一个单独的 goroutine 中启动运行,并传递一个要更新其结果的元素的地址,以及一个用于表示“任务完成”的等待组的地址。

请注意,基本上与第一种情况相同的注意事项也适用于这里。

正如你所看到的,goroutine 实际上并不返回任何值。
这主要是因为在它可以返回值的时候,生成它的 goroutine 可能已经不存在了,没有地方可以返回该值。

因此,基本上有两种方法可以从 goroutine 中“获取数据”:

  • 将该值发送到一个通道上(并让其他 goroutine 从该通道接收)。

    这是 Go 语言的基本操作。
    我建议从这种方法开始,并在你完全掌握它之前一直使用它。

  • 更新内存中的某个位置,前提是没有其他 goroutine 进行相同的操作(否则会出现数据竞争)。

    在某些情况下,这种方法可能更快(对某些人来说甚至更简单),但是这种代码背后的推理可能更难理解。

你可以从这里开始了解基本概念:https://blog.golang.org/codelab-sharehttps://blog.golang.org/waza-talk

总之,有几点建议:

英文:

There are multiple ways to do this.

A "textbook" one is like:

results := make(chan *score)

for i, config := range strategyConfigs {
    state := newState(&amp;config)
    go a.runBenchmark(state, results)
}

for i := 0; i &lt; len(strategyConfigs); i++ {
  scores[i] = &lt;-results
}

…and then modify your runBenchmark method to not return any values
and accept the second argument of type chan *score.

The snippet rolls like this:

  1. Creates a channel to exchange values of type *score.
  2. Starts as many goroutines running the runBenchmark metod of — I suppose — "an agent".
    The method sends the (pointer to) a score object it computed over the channel submitted to it and exits.
  3. Another loop performs as many receives from the channel as there were goroutines spawned and places each received values into the resulting slice.

Caveats:

  • This presupposes a is okay with executing its runBenchmark with multiple goroutines running concurrently.

    If it is not okay, you will supposedly need to create a separate a to run each separate goroutine.
    Given that your example is not too minimal, it's hard for me to make an educated guess about how hard/simple it would be.
    If you will need help on this, please ask a separate narrow question.

  • If you will have, like, multiple hundred millions "strategy configs", this approach would be too simplistic as all the goroutines will be spawned at once, which a) is a waste of resources; b) may even fail if the number will be too big.
    A textbook fix is to use the so-called "fan-out" — when you have a single goroutine receiving "tasks" over a channel and distributing them onto a limited number of worker goroutines which is kept below certain limit at all times. You can start here.

Another approach is to employ the fact that in Go, each element of an array (and a slice — by extension) is considered a separate variable.
This means that it's fine to concurrently update individual elements of the resulting slice from the worker goroutines — as long as the slice is preallocated and never reallocated (manipulated with append, resliced etc) while this process is ongoing.

To demonstrate, let's use "wait groups":

import &quot;sync&quot;

var wg sync.WaitGroup

scores := make([]*score, len(strategyConfigs))

wg.Add(len(strategyConfigs))
for i, config := range strategyConfigs {
    state := newState(&amp;config)
    go a.runBenchmark(state, &amp;scores[i], &amp;wg)
}

wg.Wait()

The runBenchmark method should be modified to have

defer wg.Done()

as its first statement and to accept two additional arguments —
of type *score and *sync.WaitGroup.

Here, runBenchmark is started to run in a separate goroutine and is passed an address of the element to update with its result and an address of a wait group to signal "task completion" on.

Note that basically the same caveats as in the first case apply.

As you can see, a goroutine does not indeed return any value.
This is mostly because by the time it could, the goroutine which spawned it may be long gone, and there is nowhere to return that value to.

Hence there are basically two methods to "get data out" of a goroutine:

  • Send that value on a channel (and have some other goroutine receive from that channel).

    This is the bread and butter of Go.
    I would recommend to start with this approach and work with it until you feel fully comfortable with it.

  • Update some place in memory providing no other goroutine does the same (otherwise you'd have a data race).

    This approach may be faster (and even simpler, to some) in certain cases but the reasoning behind such code might be harder to see.

You might start with this an this to get the basic concepts covered.


All-in-all, a couple of pointers.

  • I recommend to not embark on writing even barely serious code which involves concurrency before taking a grasp on its basics.

    Please start with the relevant part of The Tour of Go and then move on to the Go blog:

    • <https://blog.golang.org/pipelines>
    • <https://blog.golang.org/io2013-talk-concurrency>
    • <https://blog.golang.org/concurrency-timeouts>
  • Try playing with simpler examples first.

huangapple
  • 本文由 发表于 2021年8月10日 13:44:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/68721598.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定