在Polars中对懒洋洋的DataFrame进行分组和切割。

huangapple go评论81阅读模式
英文:

Groupby and cut on a Lazy DataFrame in Polars

问题

The provided code you want to translate is as follows:

import numpy as np
import polars as pl

def cut(_df):
    _c = _df['x'].cut(bins).with_columns([pl.col('x').cast(pl.Int64)])
    final = _df.join(_c, left_on='x', right_on='x')
    return final

groups = ["A"]*500 + ["B"]*500
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
x = np.arange(0, 1000)
np.random.shuffle(x)
df = pl.DataFrame({"x": x, "group": groups})
with pl.StringCache():
    res = df.groupby("group").apply(cut)

df
Out[4]: 
shape: (1_000, 2)
┌─────┬───────┐
 x    group 
 ---  ---   
 i64  str   
╞═════╪═══════╡
 105  A     
 166  A     
 291  A     
 183  A     
          
 949  B     
 891  B     
 831  B     
 535  B     
└─────┴───────┘
res
Out[5]: 
shape: (1_000, 4)
┌─────┬───────┬─────────────┬─────────────────┐
 x    group  break_point  category        
 ---  ---    ---          ---             
 i64  str    f64          cat             
╞═════╪═══════╪═════════════╪═════════════════╡
 2    B      100.0        (0.0, 100.0]    
 3    B      100.0        (0.0, 100.0]    
 4    B      100.0        (0.0, 100.0]    
 6    B      100.0        (0.0, 100.0]    
                                      
 991  A      1000.0       (900.0, 1000.0] 
 993  A      1000.0       (900.0, 1000.0] 
 996  A      1000.0       (900.0, 1000.0] 
 997  A      1000.0       (900.0, 1000.0] 
└─────┴───────┴─────────────┴─────────────────┘

Is there a way to do the above with a polars lazy DataFrame without using `apply` or `map`?
My end goal is to scan a large csv, transform it and sink it using `sink_parquet`.
I get the following error when I use `map` or `apply` to `cut` the lazy dataframe.

PanicException: sink_parquet not yet supported in standard engine.

Please note that I've translated the code part while excluding the error message at the end, as you requested.

英文:
import numpy as np
import polars as pl
def cut(_df):
_c = _df['x'].cut(bins).with_columns([pl.col('x').cast(pl.Int64)])
final = _df.join(_c, left_on='x', right_on='x')
return final
groups = ["A"]*500 + ["B"]*500
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
x = np.arange(0,1000)
np.random.shuffle(x)
df = pl.DataFrame({"x":x,"group":groups})
with pl.StringCache():
res = df.groupby("group").apply(cut)
df
Out[4]: 
shape: (1_000, 2)
┌─────┬───────┐
│ x   ┆ group │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 105 ┆ A     │
│ 166 ┆ A     │
│ 291 ┆ A     │
│ 183 ┆ A     │
│ …   ┆ …     │
│ 949 ┆ B     │
│ 891 ┆ B     │
│ 831 ┆ B     │
│ 535 ┆ B     │
└─────┴───────┘
res
Out[5]: 
shape: (1_000, 4)
┌─────┬───────┬─────────────┬─────────────────┐
│ x   ┆ group ┆ break_point ┆ category        │
│ --- ┆ ---   ┆ ---         ┆ ---             │
│ i64 ┆ str   ┆ f64         ┆ cat             │
╞═════╪═══════╪═════════════╪═════════════════╡
│ 2   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ 3   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ 4   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ 6   ┆ B     ┆ 100.0       ┆ (0.0, 100.0]    │
│ …   ┆ …     ┆ …           ┆ …               │
│ 991 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
│ 993 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
│ 996 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
│ 997 ┆ A     ┆ 1000.0      ┆ (900.0, 1000.0] │
└─────┴───────┴─────────────┴─────────────────┘

Is there a way to do the above with a polars lazy DataFrame without using apply or map?
My end goal is to scan a large csv, transform it and sink it using sink_parquet.
I get the following error when I use map or apply to cut the lazy dataframe.

PanicException: sink_parquet not yet supported in standard engine. 

答案1

得分: 1

我已经使它能够运行,使用了一个可流式的 UDF。

df = pl.from_repr("""
┌─────┬───────┐
│ x   ┆ group │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 8   ┆ A     │
│ 1   ┆ A     │
│ 7   ┆ A     │
│ 4   ┆ A     │
│ 0   ┆ A     │
│ 2   ┆ B     │
│ 5   ┆ B     │
│ 9   ┆ B     │
│ 6   ┆ B     │
│ 3   ┆ B     │
└─────┴───────┘
""").lazy()
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
schema = pl.Struct([
pl.Field(name="", dtype=pl.Float64),
pl.Field(name="break_point", dtype=pl.Float64), 
pl.Field(name="category", dtype=pl.Categorical), 
pl.Field(name="group", dtype=pl.Utf8)
])
return_dtype = pl.List(schema)
(df.map(
function = lambda df: 
df.select(
pl.col("x").apply(
lambda col: col.cut(bins, maintain_order=True).with_columns(df.limit(1).select("group")).to_struct(""), 
return_dtype=return_dtype
).over("group")),
schema = {"x": schema},
streamable = True)
.unnest("x")
.rename({"": "x"}).
sink_parquet("sink.parquet"))
英文:

I got it to work using a streamable udf.

df = pl.from_repr("""
┌─────┬───────┐
│ x   ┆ group │
│ --- ┆ ---   │
│ i64 ┆ str   │
╞═════╪═══════╡
│ 8   ┆ A     │
│ 1   ┆ A     │
│ 7   ┆ A     │
│ 4   ┆ A     │
│ 0   ┆ A     │
│ 2   ┆ B     │
│ 5   ┆ B     │
│ 9   ┆ B     │
│ 6   ┆ B     │
│ 3   ┆ B     │
└─────┴───────┘
""").lazy()
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
schema = pl.Struct([
pl.Field(name="", dtype=pl.Float64),
pl.Field(name="break_point", dtype=pl.Float64), 
pl.Field(name="category", dtype=pl.Categorical), 
pl.Field(name="group", dtype=pl.Utf8)
])
return_dtype = pl.List(schema)
(df.map(
function = lambda df: 
df.select(
pl.col("x").apply(
lambda col: col.cut(bins, maintain_order=True).with_columns(df.limit(1).select("group")).to_struct(""), 
return_dtype=return_dtype
).over("group")),
schema = {"x": schema},
streamable = True)
.unnest("x")
.rename({"": "x"})
.sink_parquet("sink.parquet"))

huangapple
  • 本文由 发表于 2023年4月11日 00:15:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/75978755.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定