英文:
How can I speed up the computation time for applying data cleaning and transformation on a list of dataframes in Python using polars/pandas?
问题
你可以尝试使用concurrent.futures
库来实现并行处理多个数据框,以加快计算速度。这里是一个修改后的示例代码:
import polars as pl
import concurrent.futures
def convertdf(input):
return pl.read_csv(input)
def applycast(df, col):
dfcast = df.explode(col).select('Index', pl.col(col)).with_columns(
pl.col(col).isnotnull(), pl.col(col).cast(pl.Float64, strict=False)
).groupby('Index').agg(pl.col(col)).sort('Index')
df = df.replace(col, dfcast[col])
return df
def datatransform(df):
df = df.select(df.columns[1:]).select([pl.col(pl.Utf8).str.split(' ')])
df = df.pipe(lambda df: reduce(lambda df, col: applycast(df, col), df.columns, df))
return df
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv', ... 'input1000.csv']
# 使用 concurrent.futures 并行处理
df_files = []
with concurrent.futures.ThreadPoolExecutor() as executor:
df_files = list(executor.map(convertdf, csvfiles)) # 并行读取CSV,加快速度
dftransformedfiles = [df.pipe(datatransform) for df in df_files] # 保持其他部分不变,逐个处理数据框
# 现在 dftransformedfiles 包含了转换后的数据框
这段代码利用concurrent.futures.ThreadPoolExecutor
来并行处理CSV文件的读取,可以显著加速这一部分的操作。然后,你可以继续使用列表推导式来逐个处理转换后的数据框,与之前的代码保持一致。这种方式可以有效减少总体计算时间。
英文:
How can I iterate over a large list of polars/pandas dataframes in python with a very fast computation time while simultaneously performing data cleaning/transformation on each dataframe?
My problem: I have a huge list of csv files (~1K for example), each file being 20MB approx. I have converted each of these csv file into dataframes (I have tried both pandas & polars just to see any difference in computation time) & apply some transformation for data cleaning per dataframe.
What's the efficient way of doing this as currently my total computation time, if I use list comprehension or map or even for loops, is ~3mins to convert all ~1K csv files to dataframes & ~2mins to do transformation per dataframe (i.e 2x 1K = 2K mins for all 1K dataframes)? (I am using Python3.11)
Below are more details of what I have tried so far.
A snippet of my csv (only mentioned few rows & columns here to give an idea) which is converted to dataframe looks like this (My actual csv has ~10K rows & 400 columns per file)
Index (Dtype: [i64]) | A (Dtype: [str]) | B (Dtype: [str]) |
---|---|---|
0 | '203 X 345 457' | '346 X X 457 45 46' |
0 | '11 22 44 890' | '22 33 44 88 90 100' |
0 | 'X X 456 44 90' | null |
1 | null | '33 456 99 10 10 11' |
1 | null | null |
So basically I want to transform them into something like this:
Index (Dtype: [i64]) | A (Dtype: List[f64]) | B (Dtype: List[f64]) |
---|---|---|
0 | [203, null, 345, 457] | [346, null, null, 457 ,45 ,46] |
0 | [11 ,22 ,44 ,890] | [22, 33, 44, 88 ,90 ,100] |
0 | [null, null, 456 44 90] | null |
1 | null | [33, 456 ,99, 10 ,10 ,11] |
1 | null | null |
My code (this is in polars) so far looks like this:
import polars as pl  
def convertdf(input):  
return pl.read_csv(input)  
def applycast(df,col):  
dfcast = df.explode(col).select('Index',pl.col(col)).withcolumns(pl.col(col).isnotnull()).then(pl.col(col).cast(pl.Float64,strict=False))).groupby('Index').agg(pl.col(col)).sort('Index')  
df = df.replace(col, dfcast[col]) 
return df  
def datatransform(df):  
df = df.select(df.columns[1:]).select([pl.col(pl.Utf8).str.split(' ')])
df = df.pipe(lambda df : reduce(lambda df,col: applycast(df,col), df.columns,df))  
return df  
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
df_files = list(map(convertdf, csvfiles)) #Time taken: 3mins 
dftransformedfiles = [df.pipe(datatransform) for df in df_files] #Time taken: ~2K mins 
Basically, as you can see I am using list comprehension to loop over each csv file. Is there any way possible to do parallel execution for this?
Is there any way to apply "applycast()" function to all columns in one shot? Since, currently I am looping over each column, which seems to me the reason why its taking much longer time. Though my contents of each column varies, but datatype is List[str] which needs to be transformed to List[f64].
I tried to concatenate all dataframes before applying datatransform(), but concatenation took even longer time. I thought of using "Ray" API for parallel execution but it doesnt support latest Python3.11. How can I reduce the computation time or in which is the best possible way to iterate over multiple columns or the multiple list of dataframes/csvs?
答案1
得分: 1
以下是您要翻译的内容:
看起来变换是:
(df.with_columns(
pl.col(pl.Utf8).str.strip("'").str.split(" ")
.cast(pl.List(pl.Float64))
)
形状:(5, 3)
┌──────────────────────┬────────────────────────┬───────────────────────┐
│ 索引 (数据类型:[i64]) ┆ A (数据类型:[str]) ┆ B (数据类型:[str]) │
│ --- ┆ --- ┆ --- │
│ i64 ┆ 列表[f64] ┆ 列表[f64] │
╞══════════════════════╪════════════════════════╪═══════════════════════╡
│ 0 ┆ [203.0, null, … 457.0] ┆ [346.0, null, … 46.0] │
│ 0 ┆ [11.0, 22.0, … 890.0] ┆ [22.0, 33.0, … 100.0] │
│ 0 ┆ [null, null, … 90.0] ┆ null │
│ 1 ┆ null ┆ [33.0, 456.0, … 11.0] │
│ 1 ┆ null ┆ null │
└──────────────────────┴────────────────────────┴───────────────────────┘
至于从多个CSV文件创建数据框,您可以使用.scan_csv
而不是.read_csv
。
df = pl.concat(pl.scan_csv(file) for file in csvfiles)
或者,如果您的文件名遵循通配符模式:
df = pl.scan_csv("input*.csv")
df = df.with_columns(
pl.col(pl.Utf8).str.strip("'").str.split(" ")
.cast(pl.List(pl.Float64))
)
由于.scan_csv
返回一个LazyFrame,您可以使用.collect()
来生成结果。
英文:
It looks like the transformation is:
(df.with_columns(
pl.col(pl.Utf8).str.strip("'").str.split(" ")
.cast(pl.List(pl.Float64))
)
shape: (5, 3)
┌──────────────────────┬────────────────────────┬───────────────────────┐
│ Index (Dtype: [i64]) ┆ A (Dtype: [str]) ┆ B (Dtype: [str]) │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[f64] ┆ list[f64] │
╞══════════════════════╪════════════════════════╪═══════════════════════╡
│ 0 ┆ [203.0, null, … 457.0] ┆ [346.0, null, … 46.0] │
│ 0 ┆ [11.0, 22.0, … 890.0] ┆ [22.0, 33.0, … 100.0] │
│ 0 ┆ [null, null, … 90.0] ┆ null │
│ 1 ┆ null ┆ [33.0, 456.0, … 11.0] │
│ 1 ┆ null ┆ null │
└──────────────────────┴────────────────────────┴───────────────────────┘
As for creating a dataframe from many csv files, you can use .scan_csv
instead of .read_csv
.
df = pl.concat(pl.scan_csv(file) for file in csvfiles)
Or, if your filenames follow a glob pattern:
df = pl.scan_csv("input*.csv")
df = df.with_columns(
pl.col(pl.Utf8).str.strip("'").str.split(" ")
.cast(pl.List(pl.Float64))
)
As .scan_csv
returns a LazyFrame you can use .collect()
to generate the result.
答案2
得分: 0
I think pipe
in general with UDFs is guaranteed to be slow. And replace
is not needed here.
我认为使用UDF的pipe
通常会很慢。而且这里不需要使用replace
。
I would do the operation you're looking for with arr.eval
, which treats every list in the column as its own Series:
我会使用arr.eval
来执行您要查找的操作,它将每个列中的列表视为自己的Series:
df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
pl.col(pl.Utf8)
.str.split(" ")
.arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
│ x ┆ y ┆ z │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[i64] ┆ list[i64] │
╞═════╪═════════════════╪═════════════════╡
│ 0 ┆ [3, null, null] ┆ [1, null, 2] │
│ 1 ┆ null ┆ [null, null, 5] │
└─────┴─────────────────┴─────────────────┘
So wrapping that with_columns
operation to a method, see what the speedup is then:
因此,将with_columns
操作封装到一个方法中,然后查看速度提升:
def data_load_transform(filename):
return pd.read_csv(filename).with_columns(
pl.col(pl.Utf8)
.str.split(" ")
.arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
)
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv']
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]
If you need further speedup, the list comprehension at the end can be done in parallel in various multiprocessing ways.
如果您需要进一步提速,最后的列表推导可以以各种多进程方式并行执行。
英文:
I think pipe
in general with UDFs is guaranteed to be slow. And replace
is not needed here.
I would do the operation you're looking for with arr.eval
, which treats every list in the column as its own Series:
df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
pl.col(pl.Utf8)
.str.split(" ")
.arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
│ x ┆ y ┆ z │
│ --- ┆ --- ┆ --- │
│ i64 ┆ list[i64] ┆ list[i64] │
╞═════╪═════════════════╪═════════════════╡
│ 0 ┆ [3, null, null] ┆ [1, null, 2] │
│ 1 ┆ null ┆ [null, null, 5] │
└─────┴─────────────────┴─────────────────┘
So wrapping that with_columns
operation to a method, see what the speedup is then:
def data_load_transform(filename):
return pd.read_csv(filename).with_columns(
pl.col(pl.Utf8)
.str.split(" ")
.arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
)
csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv']
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]
If you need further speedup, the list comprehension at the end can be done in parallel in various multiprocessing ways.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论