PySpark 使用滚动窗口进行分组

huangapple go评论84阅读模式
英文:

PySpark group by with rolling window

问题

假设我有一个包含三列的表格:dtidvalue

df_tmp = spark.createDataFrame([('2023-01-01', 1001, 5),
                                ('2023-01-15', 1001, 3),
                                ('2023-02-10', 1001, 1),
                                ('2023-02-20', 1001, 2),
                                ('2023-01-02', 1002, 7),
                                ('2023-01-02', 1002, 6),
                                ('2023-01-03', 1002, 1)],
                               ["date", "id", "value"])
df.show()
# +----------+----+-----+
# |      date|  id|value|
# +----------+----+-----+
# |2023-01-01|1001|    5|
# |2023-01-15|1001|    3|
# |2023-02-10|1001|    1|
# |2023-02-20|1001|    2|
# |2023-01-02|1002|    7|
# |2023-01-02|1002|    6|
# |2023-01-03|1002|    1|
# +----------+----+-----+

我想要计算按id分组的每个date的30天滚动总和,并且还要计算id在过去30天内看到的不同日期数。结果应该如下所示:

+----------+----+-----+----------------+-------------------------+
|      date|  id|value|30_day_value_sum|days_seen_in_past_30_days|
+----------+----+-----+----------------+-------------------------+
|2023-01-01|1001|    5|               0|                        0|
|2023-01-15|1001|    3|               0|                        1|
|2023-02-10|1001|    1|               3|                        1|
|2023-02-20|1001|    2|               1|                        2|
|2023-01-02|1002|    7|               0|                        0|
|2023-01-02|1002|    6|               7|                        1|
|2023-01-03|1002|    1|              13|                        2|
+----------+----+-----+----------------+-------------------------+

我怀疑可以使用Window来完成,但具体细节不太清楚。

英文:

Suppose I have a table with three columns: dt, id and value.

df_tmp = spark.createDataFrame([('2023-01-01', 1001, 5),
                                ('2023-01-15', 1001, 3),
                                ('2023-02-10', 1001, 1),
                                ('2023-02-20', 1001, 2),
                                ('2023-01-02', 1002, 7),
                                ('2023-01-02', 1002, 6),
                                ('2023-01-03', 1002, 1)],
                               ["date", "id", "value"])
df.show()
# +----------+----+-----+
# |      date|  id|value|
# +----------+----+-----+
# |2023-01-01|1001|    5|
# |2023-01-15|1001|    3|
# |2023-02-10|1001|    1|
# |2023-02-20|1001|    2|
# |2023-01-02|1002|    7|
# |2023-01-02|1002|    6|
# |2023-01-03|1002|    1|
# +----------+----+-----+

I would like to compute the 30-day rolling sum of value grouped by id for every date, and additionally, a number of distinct dates that the id was seen. Something that would look like this:

+----------+----+-----+----------------+-------------------------+
|      date|  id|value|30_day_value_sum|days_seen_in_past_30_days|
+----------+----+-----+----------------+-------------------------+
|2023-01-01|1001|    5|               0|                        0|
|2023-01-15|1001|    3|               0|                        1|
|2023-02-10|1001|    1|               3|                        1|
|2023-02-20|1001|    2|               1|                        2|
|2023-01-02|1002|    7|               0|                        0|
|2023-01-02|1002|    6|               7|                        1|
|2023-01-03|1002|    1|              13|                        2|
+----------+----+-----+----------------+-------------------------+

I suspect one could do it using Window but am not clear about the explicit details.

答案1

得分: 1

我假设你的数据框最初有一个日期数据类型,所以我使用了稍微修改的输入如下:

from pyspark.sql import functions as F, Window as W
df_tmp = spark.createDataFrame(
    ['2023-01-01', 1001, 5),
    ['2023-01-15', 1001, 3),
    ['2023-02-10', 1001, 1),
    ['2023-02-20', 1001, 2),
    ['2023-01-01', 1002, 7),
    ['2023-01-02', 1002, 6),
    ['2023-01-03', 1002, 1)],
    ["date", "id", "value"]
).withColumn('date', F.col('date').cast('date'))

在这种情况下,以下窗口应该起作用:

w = W.partitionBy('id').orderBy(F.expr("unix_date(date)")).rangeBetween(-30, -1)
df = df_tmp.withColumn('30_day_value_sum', F.sum('value').over(w)) \
           .withColumn('days_seen_in_past_30_days', F.count('id').over(w))
df = df.fillna(0, subset=['30_day_value_sum'])

df.show()
# +----------+----+-----+----------------+-------------------------+
# |      date|  id|value|30_day_value_sum|days_seen_in_past_30_days|
# +----------+----+-----+----------------+-------------------------+
# |2023-01-01|1001|    5|               0|                        0|
# |2023-01-15|1001|    3|               5|                        1|
# |2023-02-10|1001|    1|               3|                        1|
# |2023-02-20|1001|    2|               1|                        1|
# |2023-01-01|1002|    7|               0|                        0|
# |2023-01-02|1002|    6|               7|                        1|
# |2023-01-03|1002|    1|              13|                        2|
# +----------+----+-----+----------------+-------------------------+

如果你的"date"列是字符串类型,那么你应该使用以下.orderBy子句:

.orderBy(F.expr("unix_date(to_date(date))")).rangeBetween(-30, -1)

更多选项和详情请参考这里

英文:

I assume that originally you have date data type in your dataframe, so I used a bit modified input than yours:

from pyspark.sql import functions as F, Window as W
df_tmp = spark.createDataFrame(
    [('2023-01-01', 1001, 5),
     ('2023-01-15', 1001, 3),
     ('2023-02-10', 1001, 1),
     ('2023-02-20', 1001, 2),
     ('2023-01-01', 1002, 7),
     ('2023-01-02', 1002, 6),
     ('2023-01-03', 1002, 1)],
    ["date", "id", "value"]
).withColumn('date', F.col('date').cast('date'))

In this case, the following window should work:

w = W.partitionBy('id').orderBy(F.expr("unix_date(date)")).rangeBetween(-30, -1)
df = df_tmp.withColumn('30_day_value_sum', F.sum('value').over(w)) \
           .withColumn('days_seen_in_past_30_days', F.count('id').over(w))
df = df.fillna(0, subset=['30_day_value_sum'])

df.show()
# +----------+----+-----+----------------+-------------------------+
# |      date|  id|value|30_day_value_sum|days_seen_in_past_30_days|
# +----------+----+-----+----------------+-------------------------+
# |2023-01-01|1001|    5|               0|                        0|
# |2023-01-15|1001|    3|               5|                        1|
# |2023-02-10|1001|    1|               3|                        1|
# |2023-02-20|1001|    2|               1|                        1|
# |2023-01-01|1002|    7|               0|                        0|
# |2023-01-02|1002|    6|               7|                        1|
# |2023-01-03|1002|    1|              13|                        2|
# +----------+----+-----+----------------+-------------------------+

If you have string type in the "date" column, then you should use the following .orderBy` clause:

.orderBy(F.expr("unix_date(to_date(date))")).rangeBetween(-30, -1)

More options and details are here.

huangapple
  • 本文由 发表于 2023年8月4日 04:55:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/76831553.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定