英文:
PySpark group by with rolling window
问题
假设我有一个包含三列的表格:dt
、id
和 value
。
df_tmp = spark.createDataFrame([('2023-01-01', 1001, 5),
('2023-01-15', 1001, 3),
('2023-02-10', 1001, 1),
('2023-02-20', 1001, 2),
('2023-01-02', 1002, 7),
('2023-01-02', 1002, 6),
('2023-01-03', 1002, 1)],
["date", "id", "value"])
df.show()
# +----------+----+-----+
# | date| id|value|
# +----------+----+-----+
# |2023-01-01|1001| 5|
# |2023-01-15|1001| 3|
# |2023-02-10|1001| 1|
# |2023-02-20|1001| 2|
# |2023-01-02|1002| 7|
# |2023-01-02|1002| 6|
# |2023-01-03|1002| 1|
# +----------+----+-----+
我想要计算按id
分组的每个date
的30天滚动总和,并且还要计算id
在过去30天内看到的不同日期数。结果应该如下所示:
+----------+----+-----+----------------+-------------------------+
| date| id|value|30_day_value_sum|days_seen_in_past_30_days|
+----------+----+-----+----------------+-------------------------+
|2023-01-01|1001| 5| 0| 0|
|2023-01-15|1001| 3| 0| 1|
|2023-02-10|1001| 1| 3| 1|
|2023-02-20|1001| 2| 1| 2|
|2023-01-02|1002| 7| 0| 0|
|2023-01-02|1002| 6| 7| 1|
|2023-01-03|1002| 1| 13| 2|
+----------+----+-----+----------------+-------------------------+
我怀疑可以使用Window
来完成,但具体细节不太清楚。
英文:
Suppose I have a table with three columns: dt
, id
and value
.
df_tmp = spark.createDataFrame([('2023-01-01', 1001, 5),
('2023-01-15', 1001, 3),
('2023-02-10', 1001, 1),
('2023-02-20', 1001, 2),
('2023-01-02', 1002, 7),
('2023-01-02', 1002, 6),
('2023-01-03', 1002, 1)],
["date", "id", "value"])
df.show()
# +----------+----+-----+
# | date| id|value|
# +----------+----+-----+
# |2023-01-01|1001| 5|
# |2023-01-15|1001| 3|
# |2023-02-10|1001| 1|
# |2023-02-20|1001| 2|
# |2023-01-02|1002| 7|
# |2023-01-02|1002| 6|
# |2023-01-03|1002| 1|
# +----------+----+-----+
I would like to compute the 30-day rolling sum of value
grouped by id
for every date
, and additionally, a number of distinct dates that the id
was seen. Something that would look like this:
+----------+----+-----+----------------+-------------------------+
| date| id|value|30_day_value_sum|days_seen_in_past_30_days|
+----------+----+-----+----------------+-------------------------+
|2023-01-01|1001| 5| 0| 0|
|2023-01-15|1001| 3| 0| 1|
|2023-02-10|1001| 1| 3| 1|
|2023-02-20|1001| 2| 1| 2|
|2023-01-02|1002| 7| 0| 0|
|2023-01-02|1002| 6| 7| 1|
|2023-01-03|1002| 1| 13| 2|
+----------+----+-----+----------------+-------------------------+
I suspect one could do it using Window
but am not clear about the explicit details.
答案1
得分: 1
我假设你的数据框最初有一个日期数据类型,所以我使用了稍微修改的输入如下:
from pyspark.sql import functions as F, Window as W
df_tmp = spark.createDataFrame(
['2023-01-01', 1001, 5),
['2023-01-15', 1001, 3),
['2023-02-10', 1001, 1),
['2023-02-20', 1001, 2),
['2023-01-01', 1002, 7),
['2023-01-02', 1002, 6),
['2023-01-03', 1002, 1)],
["date", "id", "value"]
).withColumn('date', F.col('date').cast('date'))
在这种情况下,以下窗口应该起作用:
w = W.partitionBy('id').orderBy(F.expr("unix_date(date)")).rangeBetween(-30, -1)
df = df_tmp.withColumn('30_day_value_sum', F.sum('value').over(w)) \
.withColumn('days_seen_in_past_30_days', F.count('id').over(w))
df = df.fillna(0, subset=['30_day_value_sum'])
df.show()
# +----------+----+-----+----------------+-------------------------+
# | date| id|value|30_day_value_sum|days_seen_in_past_30_days|
# +----------+----+-----+----------------+-------------------------+
# |2023-01-01|1001| 5| 0| 0|
# |2023-01-15|1001| 3| 5| 1|
# |2023-02-10|1001| 1| 3| 1|
# |2023-02-20|1001| 2| 1| 1|
# |2023-01-01|1002| 7| 0| 0|
# |2023-01-02|1002| 6| 7| 1|
# |2023-01-03|1002| 1| 13| 2|
# +----------+----+-----+----------------+-------------------------+
如果你的"date"列是字符串类型,那么你应该使用以下.orderBy子句:
.orderBy(F.expr("unix_date(to_date(date))")).rangeBetween(-30, -1)
更多选项和详情请参考这里。
英文:
I assume that originally you have date data type in your dataframe, so I used a bit modified input than yours:
from pyspark.sql import functions as F, Window as W
df_tmp = spark.createDataFrame(
[('2023-01-01', 1001, 5),
('2023-01-15', 1001, 3),
('2023-02-10', 1001, 1),
('2023-02-20', 1001, 2),
('2023-01-01', 1002, 7),
('2023-01-02', 1002, 6),
('2023-01-03', 1002, 1)],
["date", "id", "value"]
).withColumn('date', F.col('date').cast('date'))
In this case, the following window should work:
w = W.partitionBy('id').orderBy(F.expr("unix_date(date)")).rangeBetween(-30, -1)
df = df_tmp.withColumn('30_day_value_sum', F.sum('value').over(w)) \
.withColumn('days_seen_in_past_30_days', F.count('id').over(w))
df = df.fillna(0, subset=['30_day_value_sum'])
df.show()
# +----------+----+-----+----------------+-------------------------+
# | date| id|value|30_day_value_sum|days_seen_in_past_30_days|
# +----------+----+-----+----------------+-------------------------+
# |2023-01-01|1001| 5| 0| 0|
# |2023-01-15|1001| 3| 5| 1|
# |2023-02-10|1001| 1| 3| 1|
# |2023-02-20|1001| 2| 1| 1|
# |2023-01-01|1002| 7| 0| 0|
# |2023-01-02|1002| 6| 7| 1|
# |2023-01-03|1002| 1| 13| 2|
# +----------+----+-----+----------------+-------------------------+
If you have string type in the "date" column, then you should use the following .orderBy` clause:
.orderBy(F.expr("unix_date(to_date(date))")).rangeBetween(-30, -1)
More options and details are here.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论