How can I speed up the computation time for applying data cleaning and transformation on a list of dataframes in Python using polars/pandas?

huangapple go评论52阅读模式
英文:

How can I speed up the computation time for applying data cleaning and transformation on a list of dataframes in Python using polars/pandas?

问题

你可以尝试使用concurrent.futures库来实现并行处理多个数据框,以加快计算速度。这里是一个修改后的示例代码:

import polars as pl
import concurrent.futures

def convertdf(input):
    return pl.read_csv(input)

def applycast(df, col):
    dfcast = df.explode(col).select('Index', pl.col(col)).with_columns(
        pl.col(col).isnotnull(), pl.col(col).cast(pl.Float64, strict=False)
    ).groupby('Index').agg(pl.col(col)).sort('Index')
    df = df.replace(col, dfcast[col])
    return df

def datatransform(df):
    df = df.select(df.columns[1:]).select([pl.col(pl.Utf8).str.split(' ')])
    df = df.pipe(lambda df: reduce(lambda df, col: applycast(df, col), df.columns, df))
    return df

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv', ... 'input1000.csv']

# 使用 concurrent.futures 并行处理
df_files = []
with concurrent.futures.ThreadPoolExecutor() as executor:
    df_files = list(executor.map(convertdf, csvfiles))  # 并行读取CSV,加快速度

dftransformedfiles = [df.pipe(datatransform) for df in df_files]  # 保持其他部分不变,逐个处理数据框

# 现在 dftransformedfiles 包含了转换后的数据框

这段代码利用concurrent.futures.ThreadPoolExecutor来并行处理CSV文件的读取,可以显著加速这一部分的操作。然后,你可以继续使用列表推导式来逐个处理转换后的数据框,与之前的代码保持一致。这种方式可以有效减少总体计算时间。

英文:

How can I iterate over a large list of polars/pandas dataframes in python with a very fast computation time while simultaneously performing data cleaning/transformation on each dataframe?

My problem: I have a huge list of csv files (~1K for example), each file being 20MB approx. I have converted each of these csv file into dataframes (I have tried both pandas & polars just to see any difference in computation time) & apply some transformation for data cleaning per dataframe.

What's the efficient way of doing this as currently my total computation time, if I use list comprehension or map or even for loops, is ~3mins to convert all ~1K csv files to dataframes & ~2mins to do transformation per dataframe (i.e 2x 1K = 2K mins for all 1K dataframes)? (I am using Python3.11)

Below are more details of what I have tried so far.

A snippet of my csv (only mentioned few rows & columns here to give an idea) which is converted to dataframe looks like this (My actual csv has ~10K rows & 400 columns per file)

Index (Dtype: [i64]) A (Dtype: [str]) B (Dtype: [str])
0 '203 X 345 457' '346 X X 457 45 46'
0 '11 22 44 890' '22 33 44 88 90 100'
0 'X X 456 44 90' null
1 null '33 456 99 10 10 11'
1 null null

So basically I want to transform them into something like this:

Index (Dtype: [i64]) A (Dtype: List[f64]) B (Dtype: List[f64])
0 [203, null, 345, 457] [346, null, null, 457 ,45 ,46]
0 [11 ,22 ,44 ,890] [22, 33, 44, 88 ,90 ,100]
0 [null, null, 456 44 90] null
1 null [33, 456 ,99, 10 ,10 ,11]
1 null null

My code (this is in polars) so far looks like this:

import polars as pl  

def convertdf(input):  
    return pl.read_csv(input)  

def applycast(df,col):  
    dfcast     = df.explode(col).select('Index',pl.col(col)).withcolumns(pl.col(col).isnotnull()).then(pl.col(col).cast(pl.Float64,strict=False))).groupby('Index').agg(pl.col(col)).sort('Index')  
    df = df.replace(col,    dfcast[col]) 
    return df  

def datatransform(df):  
   df =     df.select(df.columns[1:]).select([pl.col(pl.Utf8).str.split(' ')]) 
   df = df.pipe(lambda df : reduce(lambda df,col: applycast(df,col), df.columns,df))  
   return df  

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
df_files = list(map(convertdf, csvfiles)) #Time taken: 3mins 
dftransformedfiles = [df.pipe(datatransform) for df in df_files] #Time taken: ~2K mins 

Basically, as you can see I am using list comprehension to loop over each csv file. Is there any way possible to do parallel execution for this?

Is there any way to apply "applycast()" function to all columns in one shot? Since, currently I am looping over each column, which seems to me the reason why its taking much longer time. Though my contents of each column varies, but datatype is List[str] which needs to be transformed to List[f64].

I tried to concatenate all dataframes before applying datatransform(), but concatenation took even longer time. I thought of using "Ray" API for parallel execution but it doesnt support latest Python3.11. How can I reduce the computation time or in which is the best possible way to iterate over multiple columns or the multiple list of dataframes/csvs?

答案1

得分: 1

以下是您要翻译的内容:

看起来变换是:

(df.with_columns(
   pl.col(pl.Utf8).str.strip("'").str.split(" ")
     .cast(pl.List(pl.Float64))
)

形状:(5, 3)
┌──────────────────────┬────────────────────────┬───────────────────────┐
│ 索引 (数据类型:[i64]) ┆ A (数据类型:[str]) ┆ B (数据类型:[str]) │
│ --- ┆ --- ┆ --- │
│ i64 ┆ 列表[f64] ┆ 列表[f64] │
╞══════════════════════╪════════════════════════╪═══════════════════════╡
│ 0 ┆ [203.0, null, … 457.0] ┆ [346.0, null, … 46.0] │
│ 0 ┆ [11.0, 22.0, … 890.0] ┆ [22.0, 33.0, … 100.0] │
│ 0 ┆ [null, null, … 90.0] ┆ null │
│ 1 ┆ null ┆ [33.0, 456.0, … 11.0] │
│ 1 ┆ null ┆ null │
└──────────────────────┴────────────────────────┴───────────────────────┘

至于从多个CSV文件创建数据框,您可以使用.scan_csv而不是.read_csv

df = pl.concat(pl.scan_csv(file) for file in csvfiles)

或者,如果您的文件名遵循通配符模式:

df = pl.scan_csv("input*.csv")

df = df.with_columns(
   pl.col(pl.Utf8).str.strip("'").str.split(" ")
     .cast(pl.List(pl.Float64))
)

由于.scan_csv返回一个LazyFrame,您可以使用.collect()来生成结果。

英文:

It looks like the transformation is:

(df.with_columns(
   pl.col(pl.Utf8).str.strip("'").str.split(" ")
     .cast(pl.List(pl.Float64))
)
shape: (5, 3)
┌──────────────────────┬────────────────────────┬───────────────────────┐
│ Index (Dtype: [i64]) ┆ A (Dtype: [str])       ┆ B (Dtype: [str])      │
│ ---                  ┆ ---                    ┆ ---                   │
│ i64                  ┆ list[f64]              ┆ list[f64]             │
╞══════════════════════╪════════════════════════╪═══════════════════════╡
│ 0                    ┆ [203.0, null, … 457.0] ┆ [346.0, null, … 46.0] │
│ 0                    ┆ [11.0, 22.0, … 890.0]  ┆ [22.0, 33.0, … 100.0] │
│ 0                    ┆ [null, null, … 90.0]   ┆ null                  │
│ 1                    ┆ null                   ┆ [33.0, 456.0, … 11.0] │
│ 1                    ┆ null                   ┆ null                  │
└──────────────────────┴────────────────────────┴───────────────────────┘

As for creating a dataframe from many csv files, you can use .scan_csv instead of .read_csv.

df = pl.concat(pl.scan_csv(file) for file in csvfiles)

Or, if your filenames follow a glob pattern:

df = pl.scan_csv("input*.csv")

df = df.with_columns(
   pl.col(pl.Utf8).str.strip("'").str.split(" ")
     .cast(pl.List(pl.Float64))
)

As .scan_csv returns a LazyFrame you can use .collect() to generate the result.

答案2

得分: 0

I think pipe in general with UDFs is guaranteed to be slow. And replace is not needed here.

我认为使用UDF的pipe通常会很慢。而且这里不需要使用replace

I would do the operation you're looking for with arr.eval, which treats every list in the column as its own Series:

我会使用arr.eval来执行您要查找的操作,它将每个列中的列表视为自己的Series:

df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
    pl.col(pl.Utf8)
    .str.split(" ")
    .arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
 x    y                z               
 ---  ---              ---             
 i64  list[i64]        list[i64]       
╞═════╪═════════════════╪═════════════════╡
 0    [3, null, null]  [1, null, 2]    
 1    null             [null, null, 5] 
└─────┴─────────────────┴─────────────────┘

So wrapping that with_columns operation to a method, see what the speedup is then:

因此,将with_columns操作封装到一个方法中,然后查看速度提升:

def data_load_transform(filename):
  return pd.read_csv(filename).with_columns(
      pl.col(pl.Utf8)
      .str.split(" ")
      .arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
  )

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]

If you need further speedup, the list comprehension at the end can be done in parallel in various multiprocessing ways.

如果您需要进一步提速,最后的列表推导可以以各种多进程方式并行执行。

英文:

I think pipe in general with UDFs is guaranteed to be slow. And replace is not needed here.

I would do the operation you're looking for with arr.eval, which treats every list in the column as its own Series:

df = pl.DataFrame({"x": [0, 1], "y": ["3 X 4.0", None], "z": ["1 X 2", "X X 5"]})
df.with_columns(
    pl.col(pl.Utf8)
    .str.split(" ")
    .arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
)
shape: (2, 3)
┌─────┬─────────────────┬─────────────────┐
│ x   ┆ y               ┆ z               │
│ --- ┆ ---             ┆ ---             │
│ i64 ┆ list[i64]       ┆ list[i64]       │
╞═════╪═════════════════╪═════════════════╡
│ 0   ┆ [3, null, null] ┆ [1, null, 2]    │
│ 1   ┆ null            ┆ [null, null, 5] │
└─────┴─────────────────┴─────────────────┘

So wrapping that with_columns operation to a method, see what the speedup is then:

def data_load_transform(filename):
  return pd.read_csv(filename).with_columns(
      pl.col(pl.Utf8)
      .str.split(" ")
      .arr.eval(pl.element().cast(pl.Int64, strict=False), parallel=True)
  )

csvfiles = ['input1.csv', 'input2.csv', 'input3.csv',....'input1000.csv'] 
dftransformedfiles = [data_load_transform(fn) for fn in csvfiles]

If you need further speedup, the list comprehension at the end can be done in parallel in various multiprocessing ways.

huangapple
  • 本文由 发表于 2023年5月24日 21:30:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/76324094.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定