在Dask数据框上运行计算而不收集结果。

huangapple go评论67阅读模式
英文:

Run Computations on Dask Dataframe Without Collecting Results

问题

我想在分区的数据框上运行计算,但 在最后收集结果。在下面的简化示例中,我正在进行一个简单的洗牌,收集具有相似 shuffle_value 的行。然后,我想从每个分区独立输出一些内容。ddf.compute() 的输出会收集完整的 Pandas 数据框,这不是我想要的,因为 A) 我不需要将完整的数据框放在一个地方,B) 它可能不适合内存,而且。在小型测试案例中,像下面这样的东西对我有效,但它不会扩展。

我如何在每个分区上触发计算,但 将任何结果发送到调用过程中?

ddf = dd.read_parquet(...)
ddf["shuffle_value"] = ddf.map_partitions(calc_shuffle_value,
                                     meta=("shuffle_value", "int64"))
ddf = ddf.shuffle(ddf.shuffle_value, ignore_index=True, npartitions=N)
ddf.map_partitions(output_func, meta=("unused_value", "int64"))
df = ddf.compute()

谢谢!

英文:

I would like to run computations on a partitioned dataframe but not collect results in the end. In the simplified example below, I'm doing a simple shuffle collecting rows with alike shuffle_values. I then want to output something from each partition independently. The output of ddf.compute() collects the full Pandas dataframe which I don't want to do because A) I don't need the full dataframe in one place and B) it potentially won't fit in memory, besides. In small test cases something like below works for me, but it won't scale.

How do I trigger computation on each partition but not send any results to the calling process?

ddf = dd.read_parquet(...)
ddf["shuffle_value"] = ddf.map_partitions(calc_shuffle_value,
                                     meta=("shuffle_value", "int64"))
ddf = ddf.shuffle(ddf.shuffle_value, ignore_index=True, npartitions=N)
ddf.map_partitions(output_func, meta=("unused_value", "int64"))
df = ddf.compute()

Thanks!

答案1

得分: 0

一般情况下,假设 output_func 大多通过副作用来生成输出并且没有返回任何有用的东西,使用 .compute() 执行你的进程没有问题,因为返回的数据框会很小。

假设你正在使用分布式,其他选项包括将最终数据框 .persist(),这将执行进程(异步执行),并存储集群中每个分区的不实用的返回值,但不会收集它们。然后,你可以使用 del 删除那些数据而不进行收集。client.submit API 会执行非常类似的操作。

df_out = ddf.persist()
# 等待完成 - 查看仪表板或使用进度条
del df_out  # 释放
英文:

In general, assuming that output_func mostly creates output by side-effects and doesn't return anything useful, there is no problem with executing your process with .compute(), as the dataframe returned will be tiny.

Assuming you are using distributed, other options include .persist()-ing the final dataframe, which will execute the process (asynchronously) and store the not-useful return values of each partition in the cluster but not gather them. You can then del to remove those data without collecting. The client.submit API would do a very similar thing.

df_out = ddf.persist()
# wait to finish - look at dashboard or use progress bar 
del df_out  # release

huangapple
  • 本文由 发表于 2023年6月29日 04:09:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/76576421.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定