Polars – 通过使用 partition_by 和 collect_all 来提速。

huangapple go评论93阅读模式
英文:

Polars - speedup by using partition_by and collect_all

问题

  1. 示例设置
  2. *警告:创建5GB内存的DataFrame*
  3. ```python
  4. import time
  5. import numpy as np
  6. import polars as pl
  7. rng = np.random.default_rng(1)
  8. nrows = 50_000_000
  9. df = pl.DataFrame(
  10. dict(
  11. id=rng.integers(1, 50, nrows),
  12. id2=rng.integers(1, 500, nrows),
  13. v=rng.normal(0, 1, nrows),
  14. v1=rng.normal(0, 1, nrows),
  15. v2=rng.normal(0, 1, nrows),
  16. v3=rng.normal(0, 1, nrows),
  17. v4=rng.normal(0, 1, nrows),
  18. v5=rng.normal(0, 1, nrows),
  19. v6=rng.normal(0, 1, nrows),
  20. v7=rng.normal(0, 1, nrows),
  21. v8=rng.normal(0, 1, nrows),
  22. v9=rng.normal(0, 1, nrows),
  23. v10=rng.normal(0, 1, nrows),
  24. )
  25. )

我手头上有一个简单的任务如下。

  1. start = time.perf_counter()
  2. res = (
  3. df.lazy()
  4. .with_columns(
  5. [
  6. pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
  7. for i in range(1, 11)
  8. ]
  9. )
  10. .groupby(["id", "id2"])
  11. .agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
  12. .collect()
  13. )
  14. time.perf_counter() - start
  15. # 9.85

在一个16核的机器上,上述任务大约在10秒内完成。

然而,如果我先将dfid拆分/分区,然后执行与上述相同的计算,并在最后调用collect_allconcat,我可以获得将近2倍的速度提升。

  1. start = time.perf_counter()
  2. res2 = pl.concat(
  3. pl.collect_all(
  4. [
  5. dfi.lazy()
  6. .with_columns(
  7. [
  8. pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
  9. for i in range(1, 11)
  10. ]
  11. )
  12. .groupby(["id", "id2"])
  13. .agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
  14. for dfi in df.partition_by("id", maintain_order=False)
  15. ]
  16. )
  17. )
  18. time.perf_counter() - start
  19. # 5.60

此外,如果我将分区方式从id改为id2,所需的时间将更快,大约为4秒。

我还注意到第二种方法(无论是按id还是id2分区)的CPU利用率比第一种方法更高。也许这是第二种方法更快的原因。

我的问题是:

  1. 为什么第二种方法更快,且CPU利用率更高?
  2. 它们在性能方面不应该是相同的吗?因为我认为窗口/groupby操作将始终在每个窗口/组中并行执行,并尽可能使用所有可用资源。
  1. <details>
  2. <summary>英文:</summary>
  3. Example setup
  4. *Warning: 5gb memory df creation*

import time

import numpy as np
import polars as pl

rng = np.random.default_rng(1)

nrows = 50_000_000
df = pl.DataFrame(
dict(
id=rng.integers(1, 50, nrows),
id2=rng.integers(1, 500, nrows),
v=rng.normal(0, 1, nrows),
v1=rng.normal(0, 1, nrows),
v2=rng.normal(0, 1, nrows),
v3=rng.normal(0, 1, nrows),
v4=rng.normal(0, 1, nrows),
v5=rng.normal(0, 1, nrows),
v6=rng.normal(0, 1, nrows),
v7=rng.normal(0, 1, nrows),
v8=rng.normal(0, 1, nrows),
v9=rng.normal(0, 1, nrows),
v10=rng.normal(0, 1, nrows),
)
)

  1. I have a simple task on hand as follows.

start = time.perf_counter()
res = (
df.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
.collect()
)
time.perf_counter() - start

9.85

  1. This task above completes in ~10s on a 16-core machine.
  2. However, if I first split/partition the `df` by `id` and then perform the same calculation as above and call `collect_all` and `concat` at the end, I can get a nearly 2x speedup.

start = time.perf_counter()
res2 = pl.concat(
pl.collect_all(
[
dfi.lazy()
.with_columns(
[
pl.col(f"v{i}") - pl.col(f"v{i}").mean().over(["id", "id2"])
for i in range(1, 11)
]
)
.groupby(["id", "id2"])
.agg([(pl.col(f"v{i}") * pl.col("v")).sum() for i in range(1, 11)])
for dfi in df.partition_by("id", maintain_order=False)
]
)
)
time.perf_counter() - start

5.60

  1. In addition, if I do the partition by `id2` instead of `id`, the time it takes will be even faster ~4s.
  2. I also noticed the second approach (either partition by `id` or `id2`) has better CPU utilization rate than the first one. Maybe this is the reason why the second approach is faster.
  3. My question is:
  4. 1. Why the second approach is faster and has better CPU utilization?
  5. 2. Shouldn&#39;t they be the same in terms of performance, since I think window/groupby operations will always be executed in parallel for each window/group and use as many available resources as possible?
  6. </details>
  7. # 答案1
  8. **得分**: 1
  9. 我在几种模式下都进行了两项测试。
  10. |测试 |时间 |
  11. |--------------------|-----|
  12. |测试1 (原样) |20.44|
  13. |测试2 (原样) |16.07|
  14. |测试1 aggpow |22.41|
  15. |测试2 aggpow |19.99|
  16. |测试1 w/pow |52.08|
  17. |测试2 w/pow |21.81|
  18. |测试1 aggpow eager|47.90|
  19. |测试2 aggpow eager|61.49|
  20. 第一次运行时,我从测试1到测试2获得了20.4416.07,所以我在复制你的方向,但在我的电脑上严重程度较小。
  21. 我在两项测试期间都运行了htop。内存使用量似乎在大致相同的范围内达到峰值,但不同之处在于,在测试1中,核心没有完全负载,而在测试2中,我可以看到所有核心几乎始终保持在最高水平附近。
  22. 为了进一步探讨这一点,我在with_columns中添加了`.pow(1.2).pow(0.3888)`,然后在agg中再次使用了它们。
  23. agg中进行这些昂贵的操作(具体来说,是在`sum()`之后),测试1耗时22.41,测试2则是19.985
  24. 我将其从agg中取出,放在了with_columns中(具体来说是在`pl.col(f&quot;v{i}&quot;)`上,而不是均值,只是原始的第一个)。有了这个昂贵的操作,差异真的非常大。测试152.08,测试221.81
  25. 在测试1期间,我可以看到核心几乎空闲,这可能是在做更便宜的agg。在测试2期间,它们基本上保持在最大负荷附近。我也在急切模式下进行了两项测试,结果完全颠倒。在急切模式下,测试147.90,而测试261.49
  26. 基于前述结果,我猜想在“collect_all”模式下,它会将一个框架交给每个核心处理,而该框架只由该核心处理,而不是每个表达式都有一个核心。因此,与agg相比,预分组操作的昂贵程度并不重要。它将一直保持高效工作。
  27. 在单帧模式下,它无法预先知道操作的昂贵程度,因此它只会根据其自然组进行分组。因此,它取第一组,执行预分组操作,然后将其传递给agg。在agg期间,它的工作程度不那么高,我认为这就是我看到核心在波浪中几乎变得空闲的原因。直到agg完成后,它才开始处理下一组,核心的强度会再次增加。
  28. 我相信我的猜测并不完全准确,但我认为它很接近。
  29. <details>
  30. <summary>英文:</summary>
  31. Initial disclaimer, this is somewhat speculative as I haven&#39;t looked through the source but I think it&#39;s well founded
  32. I did both tests in several modes
  33. |test |time |
  34. |--------------------|-----|
  35. |test1 (as-is) |20.44|
  36. |test2 (as-is) |16.07|
  37. |test1 w/aggpow |22.41|
  38. |test2 w/aggpow |19.99|
  39. |test1 w/w/pow |52.08|
  40. |test2 w/w/pow |21.81|
  41. |test1 w/aggpow eager|47.90|
  42. |test2 w/aggpow eager|61.49|
  43. and on the first run I got 20.44 and 16.07 from test1 to test2 so I am replicating the same direction that you are but the severity on my computer was less.
  44. I had htop running during both tests. The memory seemed to peak at roughly the same usage but what was different was that in test1 the cores weren&#39;t fully loaded whereas in test2 I could see all the cores were pretty consistently near the top.
  45. To explore this further I added `.pow(1.2).pow(0.3888)` in the with_columns and another round with them in the agg.
  46. With those expensive operations in the agg (specifically, it was after the `sum()` test1 took 22.41 and test2 was 19.985.
  47. I took that out of the agg and put it in the with_columns (specifically on `pl.col(f&quot;v{i}&quot;)`, not the mean just the raw first one). With the expensive operation there, the difference was really staggering. test1 was 52.08 and test2 was 21.81.
  48. During test1 I could see lulls where the cores were nearly idle while it was presumably doing the much cheaper agg. During test2 they were pretty consistently maxed out. I also did both tests in eager mode and the results flipped. In eager mode test1 was 47.90 while test2 was 61.49.
  49. Based on the aforementioned results, I&#39;m guessing that in `collect_all` mode it hands one frame to each core and that frame is worked on only by that core as opposed to each expression getting a core. Because of that, it doesn&#39;t matter that the pre-group operations are much more expensive than the agg. It&#39;ll just keep working hard the whole time.
  50. In single frame mode, it can&#39;t know in advance how expensive the operations will be so it just groups them according to their natural groups. As a result it takes the first group, does the pre-group operations then it takes that to the agg. During the agg it doesn&#39;t work so hard and, I think, that&#39;s why I&#39;m seeing the cores go to near idle in waves. It&#39;s not until after the agg is done that it starts on the next group and the cores will ramp up in intensity again.
  51. I&#39;m sure my guess isn&#39;t exactly right but I think it&#39;s close.
  52. </details>

huangapple
  • 本文由 发表于 2023年7月12日 23:57:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76672449.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定