英文:
PySpark / Snowpark calculate running sum between two given dates
问题
使用这个示例表格:
id | 销售额 | 销售日期 |
---|---|---|
1 | 10 | 2020-04-30 |
1 | 6 | 2020-10-31 |
1 | 9 | 2020-09-30 |
1 | 2 | 2021-04-30 |
2 | 8 | 2020-08-31 |
2 | 7 | 2020-07-31 |
2 | 3 | 2021-06-30 |
2 | 2 | 2021-05-31 |
我想计算在两个日期之间销售总额。
我假设这将是一个窗口函数,按id分区,并按sales_date排序。但我不知道如何得到两个给定日期之间的销售额总和。
win = Window.partitionBy('id').orderBy('sales_date')
df.withColumn('running_sum',sum(sales).over(win).rangeBetween(start_date,end_date) ??
例如:
ID 1
的起始日期是 2020-04-30
,我想要得到从 2020-04-30
到 2021-04-30
的总和。
ID 2
的起始日期是 2020-08-31
,结束日期是 2021-08-31
。
参考这个,它似乎很接近我想要的,但我的问题是每个ID可能在窗口求和时具有不同的起始日期和结束日期:
https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data
英文:
Using this sample table:
id | sales | sales_date |
---|---|---|
1 | 10 | 2020-04-30 |
1 | 6 | 2020-10-31 |
1 | 9 | 2020-09-30 |
1 | 2 | 2021-04-30 |
2 | 8 | 2020-08-31 |
2 | 7 | 2020-07-31 |
2 | 3 | 2021-06-30 |
2 | 2 | 2021-05-31 |
I would like to calculate the total sum of sales, with the range between two dates.
I assume it would be a window function, partition by id and order by sales_date. But I don't know how to get the sum(sales_date) between two given dates.
win = Window.partitionBy('id').orderBy('sales_date')
df.withColumn('running_sum',sum(sales).over(win).rangeBetween(start_date,end_date) ??
# rangeBetween of start_date and start_date + 1 year
For example;
ID 1
has a start date of 2020-04-30
, and I want to get the sum from 2020-04-30
to 2021-04-30
.
ID 2
has a start date of 2020-08-31
, and the end date would be 2021-08-31
.
Referring to this, it seems quite close to what I want but my problem is that each ID can have a different start-date and end-date for the window sum:
https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data
答案1
得分: 1
- 使用窗口函数获取每个
order_id
的start_date
。 - 过滤掉销售日期与开始日期之间差距超过12个月的行(可以更精确到天)。
- 按
id
分组,并求和sales
。
注意:我已更改输入,以便它可以将最后一条记录视为超出范围。
from pyspark.sql.window import Window
from pyspark.sql.functions import first, col, months_between, lit, sum, max
df = spark.createDataFrame([
(1, 10, datetime.strptime("2020-04-30",'%Y-%m-%d')),
(1, 6, datetime.strptime("2020-10-31",'%Y-%m-%d')),
(1, 9, datetime.strptime("2020-09-30",'%Y-%m-%d')),
(1, 2, datetime.strptime("2021-04-30",'%Y-%m-%d')),
(2, 7, datetime.strptime("2020-07-31",'%Y-%m-%d')),
(2, 8, datetime.strptime("2020-08-31",'%Y-%m-%d')),
(2, 3, datetime.strptime("2021-06-30",'%Y-%m-%d')),
(2, 2, datetime.strptime("2021-08-30",'%Y-%m-%d'))
], ['id', 'sales', 'sales_date'])
win = Window.partitionBy('id').orderBy('sales_date')
df = df.withColumn('order_start_date', first(col('sales_date')).over(win)) \
.filter(months_between(col('sales_date'), col('order_start_date')) <= 12)
df.show()
+---+-----+-------------------+-------------------+
| id|sales| sales_date| order_start_date|
+---+-----+-------------------+-------------------+
| 1| 10|2020-04-30 00:00:00|2020-04-30 00:00:00|
| 1| 9|2020-09-30 00:00:00|2020-04-30 00:00:00|
| 1| 6|2020-10-31 00:00:00|2020-04-30 00:00:00|
| 1| 2|2021-04-30 00:00:00|2020-04-30 00:00:00|
| 2| 7|2020-07-31 00:00:00|2020-07-31 00:00:00|
| 2| 8|2020-08-31 00:00:00|2020-07-31 00:00:00|
| 2| 3|2021-06-30 00:00:00|2020-07-31 00:00:00|
+---+-----+-------------------+-------------------+
# 2021-08-30 被删除
df = df.groupBy('id').agg(
sum('sales').alias('total_sales'),
max('sales_date').alias('latest_sales_date')
)
df.show()
+---+-----------+-------------------+
| id|total_sales| latest_sales_date|
+---+-----------+-------------------+
| 1| 27|2021-04-30 00:00:00|
| 2| 18|2021-06-30 00:00:00|
+---+-----------+-------------------+
英文:
I think you can do it like this:
- Use the window function to get the
start_date
of eachorder_id
- Filter any rows which have a difference between the sales date and the start date by more than 12 months (This can be more precise per day)
- Group by
id
and sum thesales
Note: I have changed the input so it can filter the last record as out of range
from pyspark.sql.window import Window
from pyspark.sql.functions import first, col, months_between, lit, sum, max
df = spark.createDataFrame([
(1, 10, datetime.strptime("2020-04-30",'%Y-%m-%d')),\
(1, 6, datetime.strptime("2020-10-31",'%Y-%m-%d')),\
(1, 9, datetime.strptime("2020-09-30",'%Y-%m-%d')),\
(1, 2, datetime.strptime("2021-04-30",'%Y-%m-%d')),\
(2, 7, datetime.strptime("2020-07-31",'%Y-%m-%d')),\
(2, 8, datetime.strptime("2020-08-31",'%Y-%m-%d')),\
(2, 3, datetime.strptime("2021-06-30",'%Y-%m-%d')),\
(2, 2, datetime.strptime("2021-08-30",'%Y-%m-%d'))\
], ['id', 'sales', 'sales_date'])
win = Window.partitionBy('id').orderBy('sales_date')
df = df.withColumn('order_start_date', first(col('sales_date')).over(win)) \
.filter(months_between(col('sales_date'), col('order_start_date')) <= 12)
df.show()
+---+-----+-------------------+-------------------+
| id|sales| sales_date| order_start_date|
+---+-----+-------------------+-------------------+
| 1| 10|2020-04-30 00:00:00|2020-04-30 00:00:00|
| 1| 9|2020-09-30 00:00:00|2020-04-30 00:00:00|
| 1| 6|2020-10-31 00:00:00|2020-04-30 00:00:00|
| 1| 2|2021-04-30 00:00:00|2020-04-30 00:00:00|
| 2| 7|2020-07-31 00:00:00|2020-07-31 00:00:00|
| 2| 8|2020-08-31 00:00:00|2020-07-31 00:00:00|
| 2| 3|2021-06-30 00:00:00|2020-07-31 00:00:00|
+---+-----+-------------------+-------------------+
# 2021-08-30 was removed
df = df.groupBy('id').agg(\
sum('sales').alias('total_sales'), \
max('sales_date').alias('latest_sales_date')\
)
df.show()
+---+-----------+-------------------+
| id|total_sales| latest_sales_date|
+---+-----------+-------------------+
| 1| 27|2021-04-30 00:00:00|
| 2| 18|2021-06-30 00:00:00|
+---+-----------+-------------------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论