英文:
How to vectorize complex cumulative aggregation problem?
问题
以下是翻译好的部分:
import numpy as np
import pandas as pd
df = pd.DataFrame({
"date": ["2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02"],
"time_index": [1, 2, 3, 1, 2, 3, 4, 1, 2, 1, 2, 3],
"identifier": ["stackoverflow", "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated", "cross_validated",
"stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated"],
"value": [5, 10, 10, 4, 6, 20, 5, 2, 10, 20, 3, 3]
})
df["cum_value"] = df.groupby(["identifier", "date"])["value"].cumsum()
df["bar_index"] = np.nan
df["desired_output"] = [0, 0, 1, 0, 0, 1, 2, 0, 0, 0, 1, 1]
# 定义阈值
threshold = 10
# 创建一个新列,表示当前的累积值是否大于等于阈值
df['exceeds_threshold'] = df.groupby(['identifier', 'date'])['cum_value'].transform(lambda x: x >= threshold)
# 创建一个新列,用于标识累积值的变化点
df['cum_value_change'] = df['exceeds_threshold'].ne(df['exceeds_threshold'].shift(fill_value=False)).astype(int)
# 使用cumsum计算bar_index
df['bar_index'] = df.groupby(['identifier', 'date'])['cum_value_change'].cumsum()
# 删除不必要的列
df.drop(['exceeds_threshold', 'cum_value_change'], axis=1, inplace=True)
print(df)
输出:
date time_index identifier value cum_value bar_index desired_output
0 2023-06-01 1 stackoverflow 5 5 0 0
1 2023-06-01 2 stackoverflow 10 15 0 0
2 2023-06-01 3 stackoverflow 10 25 1 1
3 2023-06-01 1 cross_validated 4 4 0 0
4 2023-06-01 2 cross_validated 6 10 0 0
5 2023-06-01 3 cross_validated 20 30 1 1
6 2023-06-01 4 cross_validated 5 35 2 2
7 2023-06-02 1 stackoverflow 2 2 0 0
8 2023-06-02 2 stackoverflow 10 12 0 0
9 2023-06-02 1 cross_validated 20 20 0 0
10 2023-06-02 2 cross_validated 3 23 1 1
11 2023-06-02 3 cross_validated 3 26 1 1
这段代码使用了 Pandas 的向量化操作来计算 bar_index,从而更有效地处理大量数据行。
英文:
Dataset:
date | time_index | identifier | value | cum_value | bar_index | desired_output |
---|---|---|---|---|---|---|
2023-06-01 | 1 | stackoverflow | 5 | 5 | NaN | 0 |
2023-06-01 | 2 | stackoverflow | 10 | 15 | NaN | 0 |
2023-06-01 | 3 | stackoverflow | 10 | 25 | NaN | 1 |
2023-06-01 | 1 | cross_validated | 4 | 4 | NaN | 0 |
2023-06-01 | 2 | cross_validated | 6 | 10 | NaN | 0 |
2023-06-01 | 3 | cross_validated | 20 | 30 | NaN | 1 |
2023-06-01 | 4 | cross_validated | 5 | 35 | NaN | 2 |
2023-06-02 | 1 | stackoverflow | 2 | 2 | NaN | 0 |
2023-06-02 | 2 | stackoverflow | 10 | 12 | NaN | 0 |
2023-06-02 | 1 | cross_validated | 20 | 20 | NaN | 0 |
2023-06-02 | 2 | cross_validated | 3 | 23 | NaN | 1 |
2023-06-02 | 3 | cross_validated | 3 | 26 | NaN | 1 |
Code that generates the dataset:
import numpy as np
import pandas as pd
df = pd.DataFrame({
"date": ["2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02"],
"time_index": [1, 2, 3, 1, 2, 3, 4, 1, 2, 1, 2, 3],
"identifier": ["stackoverflow", "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated", "cross_validated",
"stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated"],
"value": [5, 10, 10, 4, 6, 20, 5, 2, 10, 20, 3, 3]
})
df["cum_value"] = df.groupby(["identifier", "date"])["value"].cumsum()
df["bar_index"] = np.nan
df["desired_output"] = [0, 0, 1, 0, 0, 1, 2, 0, 0, 0, 1, 1]
I want to sample bar_index
for each identifier
and date
according to a fixed (for now) threshold τ
=10, using a column's value
and/or cum_value
.
τ
= 10- date: 2023-06-01 =
d1
& 2023-06-02 =d2
- identifier: stackoverflow =
id1
& cross_validated =id2
- time_index ∈
{t1, t2,...,tn} ∀ d, id
-
Observation {id1, d1, t1} has a value less than the threshold of 10 so we continue to the next entries. If we add the
value
of {id1, d1, t1} and {id1, d1, t2} together we reach acum_value
(cumulative value) of 15, which exceeds the threshold. Therefore we would sample {id1, d1, t1} as well as {id1, d1, t2} asbar_index
0. -
If we encounter an observation with a very large value (for example {id2, d1, t3}) and the previous bar ended (cumulative value exceeded the threshold from the last trade), we would sample this observation along as a
bar_index
. The next observation starts a new accumulation (in theory).
Current non-vectorized approach:
def aggregate_bars(group, threshold):
cum_value = 0
bar_index = 0
for i in range(len(group)):
cum_value += group.iloc[i]["value"]
if cum_value >= threshold:
group["bar_index"].iloc[i] = bar_index
bar_index += 1
cum_value = 0
elif cum_value < threshold:
group["bar_index"].iloc[i] = bar_index
return group
df = df.groupby(["identifier", "date"]).apply(lambda x: aggregate_bars(x, 10))
df
Output:
date | time_index | identifier | value | cum_value | bar_index | desired_output |
---|---|---|---|---|---|---|
2023-06-01 | 1 | stackoverflow | 5 | 5 | 0.0 | 0 |
2023-06-01 | 2 | stackoverflow | 10 | 15 | 0.0 | 0 |
2023-06-01 | 3 | stackoverflow | 10 | 25 | 1.0 | 1 |
2023-06-01 | 1 | cross_validated | 4 | 4 | 0.0 | 0 |
2023-06-01 | 2 | cross_validated | 6 | 10 | 0.0 | 0 |
2023-06-01 | 3 | cross_validated | 20 | 30 | 1.0 | 1 |
2023-06-01 | 4 | cross_validated | 5 | 35 | 2.0 | 2 |
2023-06-02 | 1 | stackoverflow | 2 | 2 | 0.0 | 0 |
2023-06-02 | 2 | stackoverflow | 10 | 12 | 0.0 | 0 |
2023-06-02 | 1 | cross_validated | 20 | 20 | 0.0 | 0 |
2023-06-02 | 2 | cross_validated | 3 | 23 | 1.0 | 1 |
2023-06-02 | 3 | cross_validated | 3 | 26 | 1.0 | 1 |
How to vectorize the code so it can effectively process trillions of rows?
答案1
得分: 2
以下是翻译好的内容:
这个函数不太可能被完全向量化,正如所述。你对元素 i
的选择直接取决于你对 i-1
的选择,这种方式你不能真正地重新排列任何计算。根据你对这些值的目标是什么,可能会找到一个替代函数,可以完成任务并更容易地向量化。
然而,这并不意味着这个函数不能被改进和加速。
初始想法:二分查找
首先,cumsum
操作可以被向量化,这样做将消除大量 Python 函数调用,全部在 C 中完成。
接下来,不再进行线性搜索以查找下一个溢出的位置,我们可以将其改为二分查找。NumPy 有一个实现。
def _binary_search_get_bar_index(cumsum, threshold):
bar_index = 0
covered_count = 0
covered_sum = 0
result = np.zeros_like(cumsum)
l = len(cumsum)
while True:
bar_last = np.searchsorted(cumsum, covered_sum + threshold)
result[covered_count:min(bar_last + 1, l)] = bar_index
bar_index += 1
covered_count = bar_last + 1
if covered_count >= l:
break
covered_sum = cumsum[bar_last]
return result
def binary_search(df, threshold):
cumsum = df["value"].cumsum()
df['bar_index'] = _binary_search_get_bar_index(cumsum.values, threshold)
return df
Numba
在进行性能基准测试之前,我认为我也会实现你的原始函数(稍微调整一下),并用 numba.njit
装饰它。这会编译基于 Python 函数的机器代码。就像 Pandas 已经在 C 中为你实现了一样。
@njit
def _numba_get_bar_index(cumsum, threshold):
covered_sum = 0
bar_index = 0
result = np.zeros_like(cumsum)
for i in range(len(cumsum)):
result[i] = bar_index
if cumsum[i] >= covered_sum + threshold:
bar_index += 1
covered_sum = cumsum[i]
return result
def numba_f(df, threshold):
cumsum = df["value"].cumsum()
df['bar_index'] = _numba_get_bar_index(cumsum.values, threshold)
return df
设置
我验证了这两种解决方案在你问题中的数据集上确实给出了所期望的输出。
我在一个包含 100,000 行的 DataFrame 上进行了性能基准测试,运行在 M1 Pro Mac 上。
数据是这样生成的。我省略了不相关的列。我也省略了分组,因为它对问题也不相关,你的原始函数只接收一个 DataFrame 并且只读取 values
列。
long_data = pd.DataFrame({'value': np.random.default_rng().poisson(8, 100000)})
# long_data.head() -> [6, 14, 8, 4, 7, 8, 6, 5, 4, 10]
结果
%timeit original(long_data, 10)
7 s ± 49 ms 每次循环(平均值 ± 标准差,7 次运行,1 次循环每次)
%timeit binary_search(long_data, 10)
82.2 ms ± 176 µs 每次循环(平均值 ± 标准差,7 次运行,10 次循环每次)
%timeit numba_f(long_data, 10)
433 µs ± 5.74 µs 每次循环(平均值 ± 标准差,7 次运行,1,000 次循环每次)
事实证明,简单地使用 numba 可以获得非常强大的性能提升(在这种情况下超过四个数量级)。这在于将事物保持在缓存中,并且不需要在不同的函数调用和指针解引用之间跳来跳去的神奇之处。
实际上,由于大 O 表示法中的复杂度总是受到计算 cumsum 的影响,我无法想象二分查找在任何情况下会胜过 numba 的结果。也不太可能获得比 numba 结果明显更好的任何东西。
英文:
It is unlikely that your function can be vectorised exactly as stated. Your choice for element i
depends directly on your choice for i-1
, in a way that you cannot really shuffle around any computation. Depending on what your goal is with these values, it may be a good solution to find an alternative function that gets the job done and is vectorised more readily.
That does not mean, however, that this function cannot be improved and sped up.
Initial ideas: Binary search
First, the cumsum
operation can be vectorised, and doing so will remove a whole bunch of Python function calls, doing it all in C.
Next, instead of doing a linear search for the next location where the bar overflows, we can make it a binary search. Numpy has an implementation for that.
def _binary_search_get_bar_index(cumsum, threshold):
bar_index = 0
covered_count = 0
covered_sum = 0
result = np.zeros_like(cumsum)
l = len(cumsum)
while True:
bar_last = np.searchsorted(cumsum, covered_sum + threshold)
result[covered_count:min(bar_last + 1, l)] = bar_index
bar_index += 1
covered_count = bar_last + 1
if covered_count >= l:
break
covered_sum = cumsum[bar_last]
return result
def binary_search(df, threshold):
cumsum = df["value"].cumsum()
df['bar_index'] = _binary_search_get_bar_index(cumsum.values, threshold)
return df
Numba
Before jumping into benchmarking, I thought I'd also implement your original function (shuffled around a bit), and decorating it with numba.njit
. This compiles machine code based on the python function. Almost as if pandas had it implemented for you in C.
@njit
def _numba_get_bar_index(cumsum, threshold):
covered_sum = 0
bar_index = 0
result = np.zeros_like(cumsum)
for i in range(len(cumsum)):
result[i] = bar_index
if cumsum[i] >= covered_sum + threshold:
bar_index += 1
covered_sum = cumsum[i]
return result
def numba_f(df, threshold):
cumsum = df["value"].cumsum()
df['bar_index'] = _numba_get_bar_index(cumsum.values, threshold)
return df
Setup
I verified that both of these solutions give exactly the desired output on your dataset in the question.
I benchmarked on a DataFrame with 100 000 rows, running on an M1 Pro Mac.
The data was generated as below. I omitted irrelevant columns. I also omitted grouping as it's also not relevant to the question, your original function simply receives a DataFrame and only reads the values
column.
long_data = pd.DataFrame({'value': np.random.default_rng().poisson(8, 100000)})
# long_data.head() -> [6, 14, 8, 4, 7, 8, 6, 5, 4, 10]
Results
%timeit original(long_data, 10)
7 s ± 49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit binary_search(long_data, 10)
82.2 ms ± 176 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit numba_f(long_data, 10)
433 µs ± 5.74 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)
It turns out that simply using numba can give you really strong gains (over four orders of magnitude in this case). The magic of having things nearby in cache and not jumping around a lot in memory for different function calls and pointer dereferences.
Actually, as the complexity in big O terms is always going to be dominated by computing the cumsum, I can't imagine a scenario, where the binary search would win out. It is also unlikely that you could get anything significantly better than the numba result.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论