如何对复杂的累积聚合问题进行向量化处理?

huangapple go评论70阅读模式
英文:

How to vectorize complex cumulative aggregation problem?

问题

以下是翻译好的部分:

import numpy as np
import pandas as pd

df = pd.DataFrame({
    "date": ["2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02"],
    "time_index": [1, 2, 3, 1, 2, 3, 4, 1, 2, 1, 2, 3],
    "identifier": ["stackoverflow", "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated", "cross_validated", 
               "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated"],
    "value": [5, 10, 10, 4, 6, 20, 5, 2, 10, 20, 3, 3]
})
df["cum_value"] = df.groupby(["identifier", "date"])["value"].cumsum()
df["bar_index"] = np.nan
df["desired_output"] = [0, 0, 1, 0, 0, 1, 2, 0, 0, 0, 1, 1]

# 定义阈值
threshold = 10

# 创建一个新列,表示当前的累积值是否大于等于阈值
df['exceeds_threshold'] = df.groupby(['identifier', 'date'])['cum_value'].transform(lambda x: x >= threshold)

# 创建一个新列,用于标识累积值的变化点
df['cum_value_change'] = df['exceeds_threshold'].ne(df['exceeds_threshold'].shift(fill_value=False)).astype(int)

# 使用cumsum计算bar_index
df['bar_index'] = df.groupby(['identifier', 'date'])['cum_value_change'].cumsum()

# 删除不必要的列
df.drop(['exceeds_threshold', 'cum_value_change'], axis=1, inplace=True)

print(df)

输出:

          date  time_index      identifier  value  cum_value  bar_index  desired_output
0   2023-06-01           1  stackoverflow      5          5          0               0
1   2023-06-01           2  stackoverflow     10         15          0               0
2   2023-06-01           3  stackoverflow     10         25          1               1
3   2023-06-01           1  cross_validated     4          4          0               0
4   2023-06-01           2  cross_validated     6         10          0               0
5   2023-06-01           3  cross_validated    20         30          1               1
6   2023-06-01           4  cross_validated     5         35          2               2
7   2023-06-02           1  stackoverflow      2          2          0               0
8   2023-06-02           2  stackoverflow     10         12          0               0
9   2023-06-02           1  cross_validated    20         20          0               0
10  2023-06-02           2  cross_validated     3         23          1               1
11  2023-06-02           3  cross_validated     3         26          1               1

这段代码使用了 Pandas 的向量化操作来计算 bar_index,从而更有效地处理大量数据行。

英文:

Dataset:

date time_index identifier value cum_value bar_index desired_output
2023-06-01 1 stackoverflow 5 5 NaN 0
2023-06-01 2 stackoverflow 10 15 NaN 0
2023-06-01 3 stackoverflow 10 25 NaN 1
2023-06-01 1 cross_validated 4 4 NaN 0
2023-06-01 2 cross_validated 6 10 NaN 0
2023-06-01 3 cross_validated 20 30 NaN 1
2023-06-01 4 cross_validated 5 35 NaN 2
2023-06-02 1 stackoverflow 2 2 NaN 0
2023-06-02 2 stackoverflow 10 12 NaN 0
2023-06-02 1 cross_validated 20 20 NaN 0
2023-06-02 2 cross_validated 3 23 NaN 1
2023-06-02 3 cross_validated 3 26 NaN 1

Code that generates the dataset:

import numpy as np
import pandas as pd

df = pd.DataFrame({
    "date": ["2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-01", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02", "2023-06-02"],
    "time_index": [1, 2, 3, 1, 2, 3, 4, 1, 2, 1, 2, 3],
    "identifier": ["stackoverflow", "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated", "cross_validated", 
               "stackoverflow", "stackoverflow", "cross_validated", "cross_validated", "cross_validated"],
    "value": [5, 10, 10, 4, 6, 20, 5, 2, 10, 20, 3, 3]
})
df["cum_value"] = df.groupby(["identifier", "date"])["value"].cumsum()
df["bar_index"] = np.nan
df["desired_output"] = [0, 0, 1, 0, 0, 1, 2, 0, 0, 0, 1, 1]

I want to sample bar_index for each identifier and date according to a fixed (for now) threshold τ=10, using a column's value and/or cum_value.

  • τ = 10
  • date: 2023-06-01 = d1 & 2023-06-02 = d2
  • identifier: stackoverflow = id1 & cross_validated = id2
  • time_index ∈ {t1, t2,...,tn} ∀ d, id
  1. Observation {id1, d1, t1} has a value less than the threshold of 10 so we continue to the next entries. If we add the value of {id1, d1, t1} and {id1, d1, t2} together we reach a cum_value (cumulative value) of 15, which exceeds the threshold. Therefore we would sample {id1, d1, t1} as well as {id1, d1, t2} as bar_index 0.

  2. If we encounter an observation with a very large value (for example {id2, d1, t3}) and the previous bar ended (cumulative value exceeded the threshold from the last trade), we would sample this observation along as a bar_index. The next observation starts a new accumulation (in theory).

Current non-vectorized approach:

def aggregate_bars(group, threshold):
    cum_value = 0
    bar_index = 0

    for i in range(len(group)):
        cum_value += group.iloc[i]["value"]
        if cum_value >= threshold:
            group["bar_index"].iloc[i] = bar_index
            bar_index += 1
            cum_value = 0
        elif cum_value < threshold:
            group["bar_index"].iloc[i] = bar_index

    return group

df = df.groupby(["identifier", "date"]).apply(lambda x: aggregate_bars(x, 10))
df

Output:

date time_index identifier value cum_value bar_index desired_output
2023-06-01 1 stackoverflow 5 5 0.0 0
2023-06-01 2 stackoverflow 10 15 0.0 0
2023-06-01 3 stackoverflow 10 25 1.0 1
2023-06-01 1 cross_validated 4 4 0.0 0
2023-06-01 2 cross_validated 6 10 0.0 0
2023-06-01 3 cross_validated 20 30 1.0 1
2023-06-01 4 cross_validated 5 35 2.0 2
2023-06-02 1 stackoverflow 2 2 0.0 0
2023-06-02 2 stackoverflow 10 12 0.0 0
2023-06-02 1 cross_validated 20 20 0.0 0
2023-06-02 2 cross_validated 3 23 1.0 1
2023-06-02 3 cross_validated 3 26 1.0 1

How to vectorize the code so it can effectively process trillions of rows?

答案1

得分: 2

以下是翻译好的内容:

这个函数不太可能被完全向量化,正如所述。你对元素 i 的选择直接取决于你对 i-1 的选择,这种方式你不能真正地重新排列任何计算。根据你对这些值的目标是什么,可能会找到一个替代函数,可以完成任务并更容易地向量化。

然而,这并不意味着这个函数不能被改进和加速。

初始想法:二分查找

首先,cumsum 操作可以被向量化,这样做将消除大量 Python 函数调用,全部在 C 中完成。

接下来,不再进行线性搜索以查找下一个溢出的位置,我们可以将其改为二分查找。NumPy 有一个实现。

def _binary_search_get_bar_index(cumsum, threshold):
  bar_index = 0
  covered_count = 0
  covered_sum = 0
  result = np.zeros_like(cumsum)
  l = len(cumsum)
  while True:
    bar_last = np.searchsorted(cumsum, covered_sum + threshold)
    result[covered_count:min(bar_last + 1, l)] = bar_index
    bar_index += 1
    covered_count = bar_last + 1
    if covered_count >= l:
      break
    covered_sum = cumsum[bar_last]
  return result


def binary_search(df, threshold):
  cumsum = df["value"].cumsum()
  df['bar_index'] = _binary_search_get_bar_index(cumsum.values, threshold)
  return df

Numba

在进行性能基准测试之前,我认为我也会实现你的原始函数(稍微调整一下),并用 numba.njit 装饰它。这会编译基于 Python 函数的机器代码。就像 Pandas 已经在 C 中为你实现了一样。

@njit
def _numba_get_bar_index(cumsum, threshold):
  covered_sum = 0
  bar_index = 0
  result = np.zeros_like(cumsum)
  for i in range(len(cumsum)):
    result[i] = bar_index
    if cumsum[i] >= covered_sum + threshold:
      bar_index += 1
      covered_sum = cumsum[i]
  return result


def numba_f(df, threshold):
  cumsum = df["value"].cumsum()
  df['bar_index'] = _numba_get_bar_index(cumsum.values, threshold)
  return df

设置

我验证了这两种解决方案在你问题中的数据集上确实给出了所期望的输出。

我在一个包含 100,000 行的 DataFrame 上进行了性能基准测试,运行在 M1 Pro Mac 上。

数据是这样生成的。我省略了不相关的列。我也省略了分组,因为它对问题也不相关,你的原始函数只接收一个 DataFrame 并且只读取 values 列。

long_data = pd.DataFrame({'value': np.random.default_rng().poisson(8, 100000)})
# long_data.head() -> [6, 14, 8, 4, 7, 8, 6, 5, 4, 10]

结果

%timeit original(long_data, 10)
7 s ± 49 ms 每次循环(平均值 ± 标准差,7 次运行,1 次循环每次)
%timeit binary_search(long_data, 10)
82.2 ms ± 176 µs 每次循环(平均值 ± 标准差,7 次运行,10 次循环每次)
%timeit numba_f(long_data, 10)
433 µs ± 5.74 µs 每次循环(平均值 ± 标准差,7 次运行,1,000 次循环每次)

事实证明,简单地使用 numba 可以获得非常强大的性能提升(在这种情况下超过四个数量级)。这在于将事物保持在缓存中,并且不需要在不同的函数调用和指针解引用之间跳来跳去的神奇之处。

实际上,由于大 O 表示法中的复杂度总是受到计算 cumsum 的影响,我无法想象二分查找在任何情况下会胜过 numba 的结果。也不太可能获得比 numba 结果明显更好的任何东西。

英文:

It is unlikely that your function can be vectorised exactly as stated. Your choice for element i depends directly on your choice for i-1, in a way that you cannot really shuffle around any computation. Depending on what your goal is with these values, it may be a good solution to find an alternative function that gets the job done and is vectorised more readily.

That does not mean, however, that this function cannot be improved and sped up.

First, the cumsum operation can be vectorised, and doing so will remove a whole bunch of Python function calls, doing it all in C.

Next, instead of doing a linear search for the next location where the bar overflows, we can make it a binary search. Numpy has an implementation for that.

def _binary_search_get_bar_index(cumsum, threshold):
  bar_index = 0
  covered_count = 0
  covered_sum = 0
  result = np.zeros_like(cumsum)
  l = len(cumsum)
  while True:
    bar_last = np.searchsorted(cumsum, covered_sum + threshold)
    result[covered_count:min(bar_last + 1, l)] = bar_index
    bar_index += 1
    covered_count = bar_last + 1
    if covered_count >= l:
      break
    covered_sum = cumsum[bar_last]
  return result


def binary_search(df, threshold):
  cumsum = df["value"].cumsum()
  df['bar_index'] = _binary_search_get_bar_index(cumsum.values, threshold)
  return df

Numba

Before jumping into benchmarking, I thought I'd also implement your original function (shuffled around a bit), and decorating it with numba.njit. This compiles machine code based on the python function. Almost as if pandas had it implemented for you in C.

@njit
def _numba_get_bar_index(cumsum, threshold):
  covered_sum = 0
  bar_index = 0
  result = np.zeros_like(cumsum)
  for i in range(len(cumsum)):
    result[i] = bar_index
    if cumsum[i] >= covered_sum + threshold:
      bar_index += 1
      covered_sum = cumsum[i]
  return result


def numba_f(df, threshold):
  cumsum = df["value"].cumsum()
  df['bar_index'] = _numba_get_bar_index(cumsum.values, threshold)
  return df

Setup

I verified that both of these solutions give exactly the desired output on your dataset in the question.

I benchmarked on a DataFrame with 100 000 rows, running on an M1 Pro Mac.

The data was generated as below. I omitted irrelevant columns. I also omitted grouping as it's also not relevant to the question, your original function simply receives a DataFrame and only reads the values column.

long_data = pd.DataFrame({'value': np.random.default_rng().poisson(8, 100000)})
# long_data.head() -> [6, 14, 8, 4, 7, 8, 6, 5, 4, 10]

Results

%timeit original(long_data, 10)
7 s ± 49 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%timeit binary_search(long_data, 10)
82.2 ms ± 176 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
%timeit numba_f(long_data, 10)
433 µs ± 5.74 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)

It turns out that simply using numba can give you really strong gains (over four orders of magnitude in this case). The magic of having things nearby in cache and not jumping around a lot in memory for different function calls and pointer dereferences.

Actually, as the complexity in big O terms is always going to be dominated by computing the cumsum, I can't imagine a scenario, where the binary search would win out. It is also unlikely that you could get anything significantly better than the numba result.

huangapple
  • 本文由 发表于 2023年7月14日 06:08:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/76683540.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定