Polars – 通过使用 partition_by 和 collect_all 来提速。

huangapple go评论65阅读模式
英文:

Polars - speedup by using partition_by and collect_all

问题

示例设置

*警告:创建5GB内存的DataFrame*
```python
import time

import numpy as np
import polars as pl

rng = np.random.default_rng(1)

nrows = 50_000_000
df = pl.DataFrame(
    dict(
        id=rng.integers(1, 50, nrows),
        id2=rng.integers(1, 500, nrows),
        v=rng.normal(0, 1, nrows),
        v1=rng.normal(0, 1, nrows),
        v2=rng.normal(0, 1, nrows),
        v3=rng.normal(0, 1, nrows),
        v4=rng.normal(0, 1, nrows),
        v5=rng.normal(0, 1, nrows),
        v6=rng.normal(0, 1, nrows),
        v7=rng.normal(0, 1, nrows),
        v8=rng.normal(0, 1, nrows),
        v9=rng.normal(0, 1, nrows),
        v10=rng.normal(0, 1, nrows),
    )
)

我手头上有一个简单的任务如下。

start = time.perf_counter()
res = (
    df.lazy()
    .with_columns(
        [
            pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
            for i in range(1, 11)
        ]
    )
    .groupby(["id", "id2"])
    .agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
    .collect()
)
time.perf_counter() - start
# 9.85

在一个16核的机器上,上述任务大约在10秒内完成。

然而,如果我先将dfid拆分/分区,然后执行与上述相同的计算,并在最后调用collect_allconcat,我可以获得将近2倍的速度提升。

start = time.perf_counter()
res2 = pl.concat(
    pl.collect_all(
        [
            dfi.lazy()
            .with_columns(
                [
                    pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
                    for i in range(1, 11)
                ]
            )
            .groupby(["id", "id2"])
            .agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
            for dfi in df.partition_by("id", maintain_order=False)
        ]
    )
)
time.perf_counter() - start
# 5.60

此外,如果我将分区方式从id改为id2,所需的时间将更快,大约为4秒。

我还注意到第二种方法(无论是按id还是id2分区)的CPU利用率比第一种方法更高。也许这是第二种方法更快的原因。

我的问题是:

  1. 为什么第二种方法更快,且CPU利用率更高?
  2. 它们在性能方面不应该是相同的吗?因为我认为窗口/groupby操作将始终在每个窗口/组中并行执行,并尽可能使用所有可用资源。

<details>
<summary>英文:</summary>

Example setup

*Warning: 5gb memory df creation*

import time

import numpy as np
import polars as pl

rng = np.random.default_rng(1)

nrows = 50_000_000
df = pl.DataFrame(
dict(
id=rng.integers(1, 50, nrows),
id2=rng.integers(1, 500, nrows),
v=rng.normal(0, 1, nrows),
v1=rng.normal(0, 1, nrows),
v2=rng.normal(0, 1, nrows),
v3=rng.normal(0, 1, nrows),
v4=rng.normal(0, 1, nrows),
v5=rng.normal(0, 1, nrows),
v6=rng.normal(0, 1, nrows),
v7=rng.normal(0, 1, nrows),
v8=rng.normal(0, 1, nrows),
v9=rng.normal(0, 1, nrows),
v10=rng.normal(0, 1, nrows),
)
)


I have a simple task on hand as follows.

start = time.perf_counter()
res = (
df.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
.collect()
)
time.perf_counter() - start

9.85


This task above completes in ~10s on a 16-core machine.

However, if I first split/partition the `df` by `id` and then perform the same calculation as above and call `collect_all` and `concat` at the end, I can get a nearly 2x speedup.

start = time.perf_counter()
res2 = pl.concat(
pl.collect_all(
[
dfi.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
for dfi in df.partition_by("id", maintain_order=False)
]
)
)
time.perf_counter() - start

5.60


In addition, if I do the partition by `id2` instead of `id`, the time it takes will be even faster ~4s.

I also noticed the second approach (either partition by `id` or `id2`) has better CPU utilization rate than the first one. Maybe this is the reason why the second approach is faster.

My question is:
1. Why the second approach is faster and has better CPU utilization?
2. Shouldn&#39;t they be the same in terms of performance, since I think window/groupby operations will always be executed in parallel for each window/group and use as many available resources as possible?

</details>


# 答案1
**得分**: 1

我在几种模式下都进行了两项测试。

|测试                |时间 |
|--------------------|-----|
|测试1 (原样)       |20.44|
|测试2 (原样)       |16.07|
|测试1 带aggpow      |22.41|
|测试2 带aggpow      |19.99|
|测试1 带w/pow       |52.08|
|测试2 带w/pow       |21.81|
|测试1 带aggpow eager|47.90|
|测试2 带aggpow eager|61.49|

第一次运行时,我从测试1到测试2获得了20.44和16.07,所以我在复制你的方向,但在我的电脑上严重程度较小。

我在两项测试期间都运行了htop。内存使用量似乎在大致相同的范围内达到峰值,但不同之处在于,在测试1中,核心没有完全负载,而在测试2中,我可以看到所有核心几乎始终保持在最高水平附近。

为了进一步探讨这一点,我在with_columns中添加了`.pow(1.2).pow(0.3888)`,然后在agg中再次使用了它们。

在agg中进行这些昂贵的操作(具体来说,是在`sum()`之后),测试1耗时22.41,测试2则是19.985。

我将其从agg中取出,放在了with_columns中(具体来说是在`pl.col(f&quot;v{i}&quot;)`上,而不是均值,只是原始的第一个)。有了这个昂贵的操作,差异真的非常大。测试1是52.08,测试2是21.81。

在测试1期间,我可以看到核心几乎空闲,这可能是在做更便宜的agg。在测试2期间,它们基本上保持在最大负荷附近。我也在急切模式下进行了两项测试,结果完全颠倒。在急切模式下,测试1为47.90,而测试2为61.49。

基于前述结果,我猜想在“collect_all”模式下,它会将一个框架交给每个核心处理,而该框架只由该核心处理,而不是每个表达式都有一个核心。因此,与agg相比,预分组操作的昂贵程度并不重要。它将一直保持高效工作。

在单帧模式下,它无法预先知道操作的昂贵程度,因此它只会根据其自然组进行分组。因此,它取第一组,执行预分组操作,然后将其传递给agg。在agg期间,它的工作程度不那么高,我认为这就是我看到核心在波浪中几乎变得空闲的原因。直到agg完成后,它才开始处理下一组,核心的强度会再次增加。

我相信我的猜测并不完全准确,但我认为它很接近。

<details>
<summary>英文:</summary>

Initial disclaimer, this is somewhat speculative as I haven&#39;t looked through the source but I think it&#39;s well founded

I did both tests in several modes

|test                |time |
|--------------------|-----|
|test1 (as-is)       |20.44|
|test2 (as-is)       |16.07|
|test1 w/aggpow      |22.41|
|test2 w/aggpow      |19.99|
|test1 w/w/pow       |52.08|
|test2 w/w/pow       |21.81|
|test1 w/aggpow eager|47.90|
|test2 w/aggpow eager|61.49|

and on the first run I got 20.44 and 16.07 from test1 to test2 so I am replicating the same direction that you are but the severity on my computer was less.

I had htop running during both tests. The memory seemed to peak at roughly the same usage but what was different was that in test1 the cores weren&#39;t fully loaded whereas in test2 I could see all the cores were pretty consistently near the top.

To explore this further I added `.pow(1.2).pow(0.3888)` in the with_columns and another round with them in the agg.

With those expensive operations in the agg (specifically, it was after the `sum()` test1 took 22.41 and test2 was 19.985.

I took that out of the agg and put it in the with_columns (specifically on `pl.col(f&quot;v{i}&quot;)`, not the mean just the raw first one). With the expensive operation there, the difference was really staggering. test1 was 52.08 and test2 was 21.81.

During test1 I could see lulls where the cores were nearly idle while it was presumably doing the much cheaper agg. During test2 they were pretty consistently maxed out. I also did both tests in eager mode and the results flipped. In eager mode test1 was 47.90 while test2 was 61.49.

Based on the aforementioned results, I&#39;m guessing that in `collect_all` mode it hands one frame to each core and that frame is worked on only by that core as opposed to each expression getting a core. Because of that, it doesn&#39;t matter that the pre-group operations are much more expensive than the agg. It&#39;ll just keep working hard the whole time.

In single frame mode, it can&#39;t know in advance how expensive the operations will be so it just groups them according to their natural groups. As a result it takes the first group, does the pre-group operations then it takes that to the agg. During the agg it doesn&#39;t work so hard and, I think, that&#39;s why I&#39;m seeing the cores go to near idle in waves. It&#39;s not until after the agg is done that it starts on the next group and the cores will ramp up in intensity again.

I&#39;m sure my guess isn&#39;t exactly right but I think it&#39;s close.

</details>



huangapple
  • 本文由 发表于 2023年7月12日 23:57:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76672449.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定