在使用 Polars 聚合多个 Parquet 文件时,按照每个文件的基础添加日期列。

huangapple go评论51阅读模式
英文:

Add date column on per-file basis with Polars when aggregating over multiple Parquet files

问题

Sure, here is the translated code portion:

(
    pl.scan_parquet('data/data-16828*.parquet')
    .groupby(['type_id', 'location_id'])
    .agg([
        pl.min('n').alias('n_min'),
        pl.max('n').alias('n_max')
    ])
    .collect()
)

Regarding your question about adding a new column to the DataFrame with Polars' lazy API, it's possible to achieve this by using the with_column method. You can calculate the desired value for the new column based on the date values within each file and add it to your DataFrame before running aggregation. This can be done lazily without loading the entire dataset into memory.

If you need further assistance with the code for adding a new column, please let me know.

英文:

I have a very large number of Parquet data files that I can nicely join and aggregate with Polars doing something like this (note the glob in filename):

(
    pl.scan_parquet('data/data-16828*.parquet')
    .groupby(['type_id', 'location_id'])
    .agg([
        pl.min('n').alias('n_min'),
        pl.max('n').alias('n_max')
    ])
    .collect()
)

Each file is an output of a script run every five minutes and my goal is to make a single timeseries DataFrame out of them. There is a date column of type datetime[μs, UTC]. However, I discovered that the values of this column are not equal in a single file, rather they reflect the exact time during the run when a row was created. Because of this, the date column, as it is, is useless for grouping.

The way I see this, I probably should add a new column and populate it with the date value of the first row on a per-file basis. Is it possible to achieve this with Polars' lazy API or am I going to have to first fix the files before running aggregation with Polars?

Please note that I need to use the lazy API as the dataset is way larger than memory.

答案1

得分: 1

懒惰的框架(lazyframe)不包含有关它来自哪个文件的信息。因此,您需要将迭代移出polars,以便自己将文件信息提供给lazyframe。

类似这样的方式:

lazydf = []
from pathlib import Path
basepath = Path('data/')
for myfile in basepath.iterdir():
    if not "data-16828" in myfile.name or myfile.suffix != '.parquet':
        continue
    lazydf.append((
        pl.scan_parquet(myfile)
        .groupby(['type_id', 'location_id'])
        .agg([
            pl.min('n').alias('n_min'),
            pl.max('n').alias('n_max')
        ])
        .with_columns(source_file=pl.lit(myfile.name))
    ))
pl.concat(lazydf)

这个代码片段没有捕获第一行的方面,为了实现这一点,您需要改变groupby/agg模型,并使用窗口函数,以便每一列都有自己的分组,就像这样:

lazydf = []
from pathlib import Path
basepath = Path('data/')
for myfile in basepath.iterdir():
    if not "data-16828" in myfile.name or myfile.suffix != '.parquet':
        continue
    lazydf.append((
        pl.scan_parquet(myfile)
        .select('type_id',
                'location_id',
                n_min=pl.col('n').min().over(['type_id', 'location_id']),
                n_max=pl.col('n').max().over(['type_id', 'location_id']),
                date=pl.col('date').first())
        .unique(subset=['type_id', 'location_id', 'n_min', 'n_max', 'date'])
    ))
pl.concat(lazydf)
英文:

The lazyframe doesn't have any information about the file from whence it came. For that reason you'll need to move the iteration out of polars so that you can feed the file info to the lazyframe yourself.

Something like this:

lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
    if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue 
    lazydf.append((
        pl.scan_parquet(myfile)
        .groupby(['type_id', 'location_id'])
        .agg([
            pl.min('n').alias('n_min'),
            pl.max('n').alias('n_max')
        ])
        .with_columns(source_file=pl.lit(myfile.name))
    ))
pl.concat(lazydf)

This doesn't capture the first row aspect, for that you'd need to change out of the groupby/agg model and use a window function so that each column gets its own grouping like this:

lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
    if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue 
    lazydf.append((
        pl.scan_parquet(myfile)
        .select('type_id',
                'location_id',
                n_min=pl.col('n').min().over(['type_id','location_id']),
                n_max=pl.col('n').max().over(['type_id','location_id']),
                date=pl.col('date').first())
        .unique(subset=['type_id','location_id','n_min','n_max','date'])
    ))
pl.concat(lazydf)

huangapple
  • 本文由 发表于 2023年6月1日 04:18:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/76376988.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定