英文:
Groupby and cut on a Lazy DataFrame in Polars
问题
The provided code you want to translate is as follows:
import numpy as np
import polars as pl
def cut(_df):
_c = _df['x'].cut(bins).with_columns([pl.col('x').cast(pl.Int64)])
final = _df.join(_c, left_on='x', right_on='x')
return final
groups = ["A"]*500 + ["B"]*500
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
x = np.arange(0, 1000)
np.random.shuffle(x)
df = pl.DataFrame({"x": x, "group": groups})
with pl.StringCache():
res = df.groupby("group").apply(cut)
df
Out[4]:
shape: (1_000, 2)
┌─────┬───────┐
│ x ┆ group │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═══════╡
│ 105 ┆ A │
│ 166 ┆ A │
│ 291 ┆ A │
│ 183 ┆ A │
│ … ┆ … │
│ 949 ┆ B │
│ 891 ┆ B │
│ 831 ┆ B │
│ 535 ┆ B │
└─────┴───────┘
res
Out[5]:
shape: (1_000, 4)
┌─────┬───────┬─────────────┬─────────────────┐
│ x ┆ group ┆ break_point ┆ category │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 ┆ cat │
╞═════╪═══════╪═════════════╪═════════════════╡
│ 2 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ 3 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ 4 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ 6 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ … ┆ … ┆ … ┆ … │
│ 991 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
│ 993 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
│ 996 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
│ 997 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
└─────┴───────┴─────────────┴─────────────────┘
Is there a way to do the above with a polars lazy DataFrame without using `apply` or `map`?
My end goal is to scan a large csv, transform it and sink it using `sink_parquet`.
I get the following error when I use `map` or `apply` to `cut` the lazy dataframe.
PanicException: sink_parquet not yet supported in standard engine.
Please note that I've translated the code part while excluding the error message at the end, as you requested.
英文:
import numpy as np
import polars as pl
def cut(_df):
_c = _df['x'].cut(bins).with_columns([pl.col('x').cast(pl.Int64)])
final = _df.join(_c, left_on='x', right_on='x')
return final
groups = ["A"]*500 + ["B"]*500
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
x = np.arange(0,1000)
np.random.shuffle(x)
df = pl.DataFrame({"x":x,"group":groups})
with pl.StringCache():
res = df.groupby("group").apply(cut)
df
Out[4]:
shape: (1_000, 2)
┌─────┬───────┐
│ x ┆ group │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═══════╡
│ 105 ┆ A │
│ 166 ┆ A │
│ 291 ┆ A │
│ 183 ┆ A │
│ … ┆ … │
│ 949 ┆ B │
│ 891 ┆ B │
│ 831 ┆ B │
│ 535 ┆ B │
└─────┴───────┘
res
Out[5]:
shape: (1_000, 4)
┌─────┬───────┬─────────────┬─────────────────┐
│ x ┆ group ┆ break_point ┆ category │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ f64 ┆ cat │
╞═════╪═══════╪═════════════╪═════════════════╡
│ 2 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ 3 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ 4 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ 6 ┆ B ┆ 100.0 ┆ (0.0, 100.0] │
│ … ┆ … ┆ … ┆ … │
│ 991 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
│ 993 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
│ 996 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
│ 997 ┆ A ┆ 1000.0 ┆ (900.0, 1000.0] │
└─────┴───────┴─────────────┴─────────────────┘
Is there a way to do the above with a polars lazy DataFrame without using apply
or map
?
My end goal is to scan a large csv, transform it and sink it using sink_parquet
.
I get the following error when I use map
or apply
to cut
the lazy dataframe.
PanicException: sink_parquet not yet supported in standard engine.
答案1
得分: 1
我已经使它能够运行,使用了一个可流式的 UDF。
df = pl.from_repr("""
┌─────┬───────┐
│ x ┆ group │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═══════╡
│ 8 ┆ A │
│ 1 ┆ A │
│ 7 ┆ A │
│ 4 ┆ A │
│ 0 ┆ A │
│ 2 ┆ B │
│ 5 ┆ B │
│ 9 ┆ B │
│ 6 ┆ B │
│ 3 ┆ B │
└─────┴───────┘
""").lazy()
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
schema = pl.Struct([
pl.Field(name="", dtype=pl.Float64),
pl.Field(name="break_point", dtype=pl.Float64),
pl.Field(name="category", dtype=pl.Categorical),
pl.Field(name="group", dtype=pl.Utf8)
])
return_dtype = pl.List(schema)
(df.map(
function = lambda df:
df.select(
pl.col("x").apply(
lambda col: col.cut(bins, maintain_order=True).with_columns(df.limit(1).select("group")).to_struct(""),
return_dtype=return_dtype
).over("group")),
schema = {"x": schema},
streamable = True)
.unnest("x")
.rename({"": "x"}).
sink_parquet("sink.parquet"))
英文:
I got it to work using a streamable udf.
df = pl.from_repr("""
┌─────┬───────┐
│ x ┆ group │
│ --- ┆ --- │
│ i64 ┆ str │
╞═════╪═══════╡
│ 8 ┆ A │
│ 1 ┆ A │
│ 7 ┆ A │
│ 4 ┆ A │
│ 0 ┆ A │
│ 2 ┆ B │
│ 5 ┆ B │
│ 9 ┆ B │
│ 6 ┆ B │
│ 3 ┆ B │
└─────┴───────┘
""").lazy()
bins = [0, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000]
schema = pl.Struct([
pl.Field(name="", dtype=pl.Float64),
pl.Field(name="break_point", dtype=pl.Float64),
pl.Field(name="category", dtype=pl.Categorical),
pl.Field(name="group", dtype=pl.Utf8)
])
return_dtype = pl.List(schema)
(df.map(
function = lambda df:
df.select(
pl.col("x").apply(
lambda col: col.cut(bins, maintain_order=True).with_columns(df.limit(1).select("group")).to_struct(""),
return_dtype=return_dtype
).over("group")),
schema = {"x": schema},
streamable = True)
.unnest("x")
.rename({"": "x"})
.sink_parquet("sink.parquet"))
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论