英文:
Run Computations on Dask Dataframe Without Collecting Results
问题
我想在分区的数据框上运行计算,但 不 在最后收集结果。在下面的简化示例中,我正在进行一个简单的洗牌,收集具有相似 shuffle_value
的行。然后,我想从每个分区独立输出一些内容。ddf.compute()
的输出会收集完整的 Pandas 数据框,这不是我想要的,因为 A) 我不需要将完整的数据框放在一个地方,B) 它可能不适合内存,而且。在小型测试案例中,像下面这样的东西对我有效,但它不会扩展。
我如何在每个分区上触发计算,但 不 将任何结果发送到调用过程中?
ddf = dd.read_parquet(...)
ddf["shuffle_value"] = ddf.map_partitions(calc_shuffle_value,
meta=("shuffle_value", "int64"))
ddf = ddf.shuffle(ddf.shuffle_value, ignore_index=True, npartitions=N)
ddf.map_partitions(output_func, meta=("unused_value", "int64"))
df = ddf.compute()
谢谢!
英文:
I would like to run computations on a partitioned dataframe but not collect results in the end. In the simplified example below, I'm doing a simple shuffle collecting rows with alike shuffle_value
s. I then want to output something from each partition independently. The output of ddf.compute()
collects the full Pandas dataframe which I don't want to do because A) I don't need the full dataframe in one place and B) it potentially won't fit in memory, besides. In small test cases something like below works for me, but it won't scale.
How do I trigger computation on each partition but not send any results to the calling process?
ddf = dd.read_parquet(...)
ddf["shuffle_value"] = ddf.map_partitions(calc_shuffle_value,
meta=("shuffle_value", "int64"))
ddf = ddf.shuffle(ddf.shuffle_value, ignore_index=True, npartitions=N)
ddf.map_partitions(output_func, meta=("unused_value", "int64"))
df = ddf.compute()
Thanks!
答案1
得分: 0
一般情况下,假设 output_func
大多通过副作用来生成输出并且没有返回任何有用的东西,使用 .compute()
执行你的进程没有问题,因为返回的数据框会很小。
假设你正在使用分布式,其他选项包括将最终数据框 .persist()
,这将执行进程(异步执行),并存储集群中每个分区的不实用的返回值,但不会收集它们。然后,你可以使用 del
删除那些数据而不进行收集。client.submit
API 会执行非常类似的操作。
df_out = ddf.persist()
# 等待完成 - 查看仪表板或使用进度条
del df_out # 释放
英文:
In general, assuming that output_func
mostly creates output by side-effects and doesn't return anything useful, there is no problem with executing your process with .compute()
, as the dataframe returned will be tiny.
Assuming you are using distributed, other options include .persist()
-ing the final dataframe, which will execute the process (asynchronously) and store the not-useful return values of each partition in the cluster but not gather them. You can then del
to remove those data without collecting. The client.submit
API would do a very similar thing.
df_out = ddf.persist()
# wait to finish - look at dashboard or use progress bar
del df_out # release
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论