英文:
Dask map_partition does no use all workers on client
问题
我有一个非常消耗CPU资源的进程,并希望在Dask中使用尽可能多的工作进程。
当我使用dask
的read_csv
读取CSV文件,然后使用map_partitions
处理数据框时,只使用一个工作进程。如果我使用pandas
的read_csv
然后将文件转换为Dask数据框,那么所有可用的工作进程都会被使用。请参见下面的代码。
有人能解释行为差异吗?
理想情况下,我希望使用Dask
的read_csv
,这样我就不必进行转换步骤。有人能帮助我吗?
import dask as d
import pandas as pd
def fWrapper(x):
p = doSomething(x.ADDRESS, param)
return(pd.DataFrame(p, columns=["ADDRESS", "DATA", "TOKEN", "CLASS"]))
# 只使用1个工作进程,而不是可用的8个
dask_df = d.dataframe('path\to\file')
dask_df.set_index(UID, npartitions=8, drop=False)
ddf2 = dask_df.map_partitions(fWrapper, meta={"ADDRESS": object, "DATA": object, "TOKEN": object, "CLASS": object}).compute()
# 使用所有8个工作进程
df = pd.read_csv('path\to\file')
df.set_index('UID', drop=False)
dask_df2 = d.dataframe.from_pandas(df, npartitions=dask_params['df_npartitions'], sort=True)
ddf3 = dask_df2.map_partitions(fWrapper, meta={"ADDRESS": object, "DATA": object, "TOKEN": object, "CLASS": object}).compute()
英文:
I have very CPU heavy process and would like to use as many workers are possible in Dask.
When I read the csv file using the read_csv
from dask
and then process the dataframe using map_partitions
only one worker is used. If I use read_csv
from pandas
and then convert the file to a Dask dataframe, all my workers are used. See code below.
Could someone explain the difference in behavior?
Ideally, I would like to use read_csv
from Dask
so that I dont have to have a conversion step. Could anyone help me with that?
import dask as d
import pandas as pd
def fWrapper(x):
p = doSomething(x.ADDRESS, param)
return(pd.DataFrame(p, columns=["ADDRESS", "DATA","TOKEN", "CLASS"]))
# only use 1 worker instead of the available 8
dask_df = d.dataframe('path\to\file')
dask_df.set_index(UID, npartitions = 8, drop = False)
ddf2 = dask_df.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute()
#uses all 8 workers
df = pd.read_csv('path\to\file')
df.set_index('UID', drop=False)
dask_df2 =d.dataframe.from_pandas(df, npartitions=dask_params['df_npartitions'], sort=True)
ddf3 = dask_df2.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute()
答案1
得分: 0
DataFrame.set_index
方法在 dask.dataframe
和 pandas
中都返回更新后的数据框,因此必须将其分配给一个标签。pandas
有一个方便的关键字参数 inplace
,但在 dask
中不可用。这意味着在您的代码片段中,第一种方法应该如下所示:
dask_df = dask_df.set_index(UID, npartitions=8, drop=False)
这将确保新索引的 dask
数据框具有 8 个分区,因此下游工作应该分配给多个工作器。
英文:
The DataFrame.set_index
method in both dask.dataframe
and pandas
returns the updated dataframe, so it must be assigned to a label. pandas
does have a convenience kwarg inplace
, but that's not available in dask
. This means that in your snippet, the first approach should look like this:
dask_df = dask_df.set_index(UID, npartitions = 8, drop = False)
This will make sure that the new indexed dask dataframe has 8 partitions, so downstream work should be allocated across multiple workers.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论