英文:
Polars - speedup by using partition_by and collect_all
问题
示例设置
*警告:创建5GB内存的DataFrame*
```python
import time
import numpy as np
import polars as pl
rng = np.random.default_rng(1)
nrows = 50_000_000
df = pl.DataFrame(
dict(
id=rng.integers(1, 50, nrows),
id2=rng.integers(1, 500, nrows),
v=rng.normal(0, 1, nrows),
v1=rng.normal(0, 1, nrows),
v2=rng.normal(0, 1, nrows),
v3=rng.normal(0, 1, nrows),
v4=rng.normal(0, 1, nrows),
v5=rng.normal(0, 1, nrows),
v6=rng.normal(0, 1, nrows),
v7=rng.normal(0, 1, nrows),
v8=rng.normal(0, 1, nrows),
v9=rng.normal(0, 1, nrows),
v10=rng.normal(0, 1, nrows),
)
)
我手头上有一个简单的任务如下。
start = time.perf_counter()
res = (
df.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
.collect()
)
time.perf_counter() - start
# 9.85
在一个16核的机器上,上述任务大约在10秒内完成。
然而,如果我先将df
按id
拆分/分区,然后执行与上述相同的计算,并在最后调用collect_all
和concat
,我可以获得将近2倍的速度提升。
start = time.perf_counter()
res2 = pl.concat(
pl.collect_all(
[
dfi.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
for dfi in df.partition_by("id", maintain_order=False)
]
)
)
time.perf_counter() - start
# 5.60
此外,如果我将分区方式从id
改为id2
,所需的时间将更快,大约为4秒。
我还注意到第二种方法(无论是按id
还是id2
分区)的CPU利用率比第一种方法更高。也许这是第二种方法更快的原因。
我的问题是:
- 为什么第二种方法更快,且CPU利用率更高?
- 它们在性能方面不应该是相同的吗?因为我认为窗口/groupby操作将始终在每个窗口/组中并行执行,并尽可能使用所有可用资源。
<details>
<summary>英文:</summary>
Example setup
*Warning: 5gb memory df creation*
import time
import numpy as np
import polars as pl
rng = np.random.default_rng(1)
nrows = 50_000_000
df = pl.DataFrame(
dict(
id=rng.integers(1, 50, nrows),
id2=rng.integers(1, 500, nrows),
v=rng.normal(0, 1, nrows),
v1=rng.normal(0, 1, nrows),
v2=rng.normal(0, 1, nrows),
v3=rng.normal(0, 1, nrows),
v4=rng.normal(0, 1, nrows),
v5=rng.normal(0, 1, nrows),
v6=rng.normal(0, 1, nrows),
v7=rng.normal(0, 1, nrows),
v8=rng.normal(0, 1, nrows),
v9=rng.normal(0, 1, nrows),
v10=rng.normal(0, 1, nrows),
)
)
I have a simple task on hand as follows.
start = time.perf_counter()
res = (
df.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
.collect()
)
time.perf_counter() - start
9.85
This task above completes in ~10s on a 16-core machine.
However, if I first split/partition the `df` by `id` and then perform the same calculation as above and call `collect_all` and `concat` at the end, I can get a nearly 2x speedup.
start = time.perf_counter()
res2 = pl.concat(
pl.collect_all(
[
dfi.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
for dfi in df.partition_by("id", maintain_order=False)
]
)
)
time.perf_counter() - start
5.60
In addition, if I do the partition by `id2` instead of `id`, the time it takes will be even faster ~4s.
I also noticed the second approach (either partition by `id` or `id2`) has better CPU utilization rate than the first one. Maybe this is the reason why the second approach is faster.
My question is:
1. Why the second approach is faster and has better CPU utilization?
2. Shouldn't they be the same in terms of performance, since I think window/groupby operations will always be executed in parallel for each window/group and use as many available resources as possible?
</details>
# 答案1
**得分**: 1
我在几种模式下都进行了两项测试。
|测试 |时间 |
|--------------------|-----|
|测试1 (原样) |20.44|
|测试2 (原样) |16.07|
|测试1 带aggpow |22.41|
|测试2 带aggpow |19.99|
|测试1 带w/pow |52.08|
|测试2 带w/pow |21.81|
|测试1 带aggpow eager|47.90|
|测试2 带aggpow eager|61.49|
第一次运行时,我从测试1到测试2获得了20.44和16.07,所以我在复制你的方向,但在我的电脑上严重程度较小。
我在两项测试期间都运行了htop。内存使用量似乎在大致相同的范围内达到峰值,但不同之处在于,在测试1中,核心没有完全负载,而在测试2中,我可以看到所有核心几乎始终保持在最高水平附近。
为了进一步探讨这一点,我在with_columns中添加了`.pow(1.2).pow(0.3888)`,然后在agg中再次使用了它们。
在agg中进行这些昂贵的操作(具体来说,是在`sum()`之后),测试1耗时22.41,测试2则是19.985。
我将其从agg中取出,放在了with_columns中(具体来说是在`pl.col(f"v{i}")`上,而不是均值,只是原始的第一个)。有了这个昂贵的操作,差异真的非常大。测试1是52.08,测试2是21.81。
在测试1期间,我可以看到核心几乎空闲,这可能是在做更便宜的agg。在测试2期间,它们基本上保持在最大负荷附近。我也在急切模式下进行了两项测试,结果完全颠倒。在急切模式下,测试1为47.90,而测试2为61.49。
基于前述结果,我猜想在“collect_all”模式下,它会将一个框架交给每个核心处理,而该框架只由该核心处理,而不是每个表达式都有一个核心。因此,与agg相比,预分组操作的昂贵程度并不重要。它将一直保持高效工作。
在单帧模式下,它无法预先知道操作的昂贵程度,因此它只会根据其自然组进行分组。因此,它取第一组,执行预分组操作,然后将其传递给agg。在agg期间,它的工作程度不那么高,我认为这就是我看到核心在波浪中几乎变得空闲的原因。直到agg完成后,它才开始处理下一组,核心的强度会再次增加。
我相信我的猜测并不完全准确,但我认为它很接近。
<details>
<summary>英文:</summary>
Initial disclaimer, this is somewhat speculative as I haven't looked through the source but I think it's well founded
I did both tests in several modes
|test |time |
|--------------------|-----|
|test1 (as-is) |20.44|
|test2 (as-is) |16.07|
|test1 w/aggpow |22.41|
|test2 w/aggpow |19.99|
|test1 w/w/pow |52.08|
|test2 w/w/pow |21.81|
|test1 w/aggpow eager|47.90|
|test2 w/aggpow eager|61.49|
and on the first run I got 20.44 and 16.07 from test1 to test2 so I am replicating the same direction that you are but the severity on my computer was less.
I had htop running during both tests. The memory seemed to peak at roughly the same usage but what was different was that in test1 the cores weren't fully loaded whereas in test2 I could see all the cores were pretty consistently near the top.
To explore this further I added `.pow(1.2).pow(0.3888)` in the with_columns and another round with them in the agg.
With those expensive operations in the agg (specifically, it was after the `sum()` test1 took 22.41 and test2 was 19.985.
I took that out of the agg and put it in the with_columns (specifically on `pl.col(f"v{i}")`, not the mean just the raw first one). With the expensive operation there, the difference was really staggering. test1 was 52.08 and test2 was 21.81.
During test1 I could see lulls where the cores were nearly idle while it was presumably doing the much cheaper agg. During test2 they were pretty consistently maxed out. I also did both tests in eager mode and the results flipped. In eager mode test1 was 47.90 while test2 was 61.49.
Based on the aforementioned results, I'm guessing that in `collect_all` mode it hands one frame to each core and that frame is worked on only by that core as opposed to each expression getting a core. Because of that, it doesn't matter that the pre-group operations are much more expensive than the agg. It'll just keep working hard the whole time.
In single frame mode, it can't know in advance how expensive the operations will be so it just groups them according to their natural groups. As a result it takes the first group, does the pre-group operations then it takes that to the agg. During the agg it doesn't work so hard and, I think, that's why I'm seeing the cores go to near idle in waves. It's not until after the agg is done that it starts on the next group and the cores will ramp up in intensity again.
I'm sure my guess isn't exactly right but I think it's close.
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论