英文:
Add date column on per-file basis with Polars when aggregating over multiple Parquet files
问题
Sure, here is the translated code portion:
(
pl.scan_parquet('data/data-16828*.parquet')
.groupby(['type_id', 'location_id'])
.agg([
pl.min('n').alias('n_min'),
pl.max('n').alias('n_max')
])
.collect()
)
Regarding your question about adding a new column to the DataFrame with Polars' lazy API, it's possible to achieve this by using the with_column
method. You can calculate the desired value for the new column based on the date
values within each file and add it to your DataFrame before running aggregation. This can be done lazily without loading the entire dataset into memory.
If you need further assistance with the code for adding a new column, please let me know.
英文:
I have a very large number of Parquet data files that I can nicely join and aggregate with Polars doing something like this (note the glob in filename):
(
pl.scan_parquet('data/data-16828*.parquet')
.groupby(['type_id', 'location_id'])
.agg([
pl.min('n').alias('n_min'),
pl.max('n').alias('n_max')
])
.collect()
)
Each file is an output of a script run every five minutes and my goal is to make a single timeseries DataFrame out of them. There is a date
column of type datetime[μs, UTC]
. However, I discovered that the values of this column are not equal in a single file, rather they reflect the exact time during the run when a row was created. Because of this, the date
column, as it is, is useless for grouping.
The way I see this, I probably should add a new column and populate it with the date
value of the first row on a per-file basis. Is it possible to achieve this with Polars' lazy API or am I going to have to first fix the files before running aggregation with Polars?
Please note that I need to use the lazy API as the dataset is way larger than memory.
答案1
得分: 1
懒惰的框架(lazyframe)不包含有关它来自哪个文件的信息。因此,您需要将迭代移出polars,以便自己将文件信息提供给lazyframe。
类似这样的方式:
lazydf = []
from pathlib import Path
basepath = Path('data/')
for myfile in basepath.iterdir():
if not "data-16828" in myfile.name or myfile.suffix != '.parquet':
continue
lazydf.append((
pl.scan_parquet(myfile)
.groupby(['type_id', 'location_id'])
.agg([
pl.min('n').alias('n_min'),
pl.max('n').alias('n_max')
])
.with_columns(source_file=pl.lit(myfile.name))
))
pl.concat(lazydf)
这个代码片段没有捕获第一行的方面,为了实现这一点,您需要改变groupby/agg模型,并使用窗口函数,以便每一列都有自己的分组,就像这样:
lazydf = []
from pathlib import Path
basepath = Path('data/')
for myfile in basepath.iterdir():
if not "data-16828" in myfile.name or myfile.suffix != '.parquet':
continue
lazydf.append((
pl.scan_parquet(myfile)
.select('type_id',
'location_id',
n_min=pl.col('n').min().over(['type_id', 'location_id']),
n_max=pl.col('n').max().over(['type_id', 'location_id']),
date=pl.col('date').first())
.unique(subset=['type_id', 'location_id', 'n_min', 'n_max', 'date'])
))
pl.concat(lazydf)
英文:
The lazyframe doesn't have any information about the file from whence it came. For that reason you'll need to move the iteration out of polars so that you can feed the file info to the lazyframe yourself.
Something like this:
lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue
lazydf.append((
pl.scan_parquet(myfile)
.groupby(['type_id', 'location_id'])
.agg([
pl.min('n').alias('n_min'),
pl.max('n').alias('n_max')
])
.with_columns(source_file=pl.lit(myfile.name))
))
pl.concat(lazydf)
This doesn't capture the first row aspect, for that you'd need to change out of the groupby/agg model and use a window function so that each column gets its own grouping like this:
lazydf=[]
from pathlib import Path
basepath=Path('data/')
for myfile in basepath.iterdir():
if not "data-16828" in myfile.name or myfile.suffix!='.parquet': continue
lazydf.append((
pl.scan_parquet(myfile)
.select('type_id',
'location_id',
n_min=pl.col('n').min().over(['type_id','location_id']),
n_max=pl.col('n').max().over(['type_id','location_id']),
date=pl.col('date').first())
.unique(subset=['type_id','location_id','n_min','n_max','date'])
))
pl.concat(lazydf)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论