Dask的`map_partition`在客户端上未使用所有的工作节点。

huangapple go评论65阅读模式
英文:

Dask map_partition does no use all workers on client

问题

我有一个非常消耗CPU资源的进程,并希望在Dask中使用尽可能多的工作进程。

当我使用daskread_csv读取CSV文件,然后使用map_partitions处理数据框时,只使用一个工作进程。如果我使用pandasread_csv然后将文件转换为Dask数据框,那么所有可用的工作进程都会被使用。请参见下面的代码。

有人能解释行为差异吗?

理想情况下,我希望使用Daskread_csv,这样我就不必进行转换步骤。有人能帮助我吗?

import dask as d
import pandas as pd

def fWrapper(x):
    p = doSomething(x.ADDRESS, param)
    return(pd.DataFrame(p, columns=["ADDRESS", "DATA", "TOKEN", "CLASS"]))

# 只使用1个工作进程,而不是可用的8个
dask_df = d.dataframe('path\to\file')
dask_df.set_index(UID, npartitions=8, drop=False)
ddf2 = dask_df.map_partitions(fWrapper, meta={"ADDRESS": object, "DATA": object, "TOKEN": object, "CLASS": object}).compute()

# 使用所有8个工作进程
df = pd.read_csv('path\to\file')
df.set_index('UID', drop=False)
dask_df2 = d.dataframe.from_pandas(df, npartitions=dask_params['df_npartitions'], sort=True)
ddf3 = dask_df2.map_partitions(fWrapper, meta={"ADDRESS": object, "DATA": object, "TOKEN": object, "CLASS": object}).compute()
英文:

I have very CPU heavy process and would like to use as many workers are possible in Dask.

When I read the csv file using the read_csv from dask and then process the dataframe using map_partitions only one worker is used. If I use read_csv from pandas and then convert the file to a Dask dataframe, all my workers are used. See code below.

Could someone explain the difference in behavior?

Ideally, I would like to use read_csv from Dask so that I dont have to have a conversion step. Could anyone help me with that?

import dask as d
import pandas as pd

def fWrapper(x):
            p = doSomething(x.ADDRESS, param)
            return(pd.DataFrame(p, columns=["ADDRESS", "DATA","TOKEN", "CLASS"]))

# only use 1 worker instead of the available 8
dask_df = d.dataframe('path\to\file')
dask_df.set_index(UID, npartitions = 8,   drop = False)
ddf2 = dask_df.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute() 

#uses all 8 workers
df = pd.read_csv('path\to\file')
df.set_index('UID', drop=False)
dask_df2 =d.dataframe.from_pandas(df, npartitions=dask_params['df_npartitions'], sort=True)
ddf3 = dask_df2.map_partitions(fWrapper, meta={"ADDRESS" : object, "DATA" : object, "TOKEN" : object, "CLASS" : object}).compute() 

答案1

得分: 0

DataFrame.set_index 方法在 dask.dataframepandas 中都返回更新后的数据框,因此必须将其分配给一个标签。pandas 有一个方便的关键字参数 inplace,但在 dask 中不可用。这意味着在您的代码片段中,第一种方法应该如下所示:

dask_df = dask_df.set_index(UID, npartitions=8, drop=False)

这将确保新索引的 dask 数据框具有 8 个分区,因此下游工作应该分配给多个工作器。

英文:

The DataFrame.set_index method in both dask.dataframe and pandas returns the updated dataframe, so it must be assigned to a label. pandas does have a convenience kwarg inplace, but that's not available in dask. This means that in your snippet, the first approach should look like this:

dask_df = dask_df.set_index(UID, npartitions = 8,   drop = False)

This will make sure that the new indexed dask dataframe has 8 partitions, so downstream work should be allocated across multiple workers.

huangapple
  • 本文由 发表于 2023年6月1日 22:22:15
  • 转载请务必保留本文链接:https://go.coder-hub.com/76382922.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定