PySpark / Snowpark 在两个指定日期之间计算累积和

huangapple go评论76阅读模式
英文:

PySpark / Snowpark calculate running sum between two given dates

问题

使用这个示例表格:

id 销售额 销售日期
1 10 2020-04-30
1 6 2020-10-31
1 9 2020-09-30
1 2 2021-04-30
2 8 2020-08-31
2 7 2020-07-31
2 3 2021-06-30
2 2 2021-05-31

我想计算在两个日期之间销售总额。

我假设这将是一个窗口函数,按id分区,并按sales_date排序。但我不知道如何得到两个给定日期之间的销售额总和。

win = Window.partitionBy('id').orderBy('sales_date')
df.withColumn('running_sum',sum(sales).over(win).rangeBetween(start_date,end_date) ??

例如:

ID 1 的起始日期是 2020-04-30,我想要得到从 2020-04-302021-04-30 的总和。

ID 2 的起始日期是 2020-08-31,结束日期是 2021-08-31

参考这个,它似乎很接近我想要的,但我的问题是每个ID可能在窗口求和时具有不同的起始日期和结束日期:

https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data

英文:

Using this sample table:

id sales sales_date
1 10 2020-04-30
1 6 2020-10-31
1 9 2020-09-30
1 2 2021-04-30
2 8 2020-08-31
2 7 2020-07-31
2 3 2021-06-30
2 2 2021-05-31

I would like to calculate the total sum of sales, with the range between two dates.

I assume it would be a window function, partition by id and order by sales_date. But I don't know how to get the sum(sales_date) between two given dates.

win = Window.partitionBy('id').orderBy('sales_date')
df.withColumn('running_sum',sum(sales).over(win).rangeBetween(start_date,end_date) ??

# rangeBetween of start_date and start_date + 1 year

For example;

ID 1 has a start date of 2020-04-30, and I want to get the sum from 2020-04-30 to 2021-04-30.

ID 2 has a start date of 2020-08-31, and the end date would be 2021-08-31.

Referring to this, it seems quite close to what I want but my problem is that each ID can have a different start-date and end-date for the window sum:

https://stackoverflow.com/questions/45806194/pyspark-rolling-average-using-timeseries-data

答案1

得分: 1

  • 使用窗口函数获取每个 order_idstart_date
  • 过滤掉销售日期与开始日期之间差距超过12个月的行(可以更精确到天)。
  • id 分组,并求和 sales

注意:我已更改输入,以便它可以将最后一条记录视为超出范围。

from pyspark.sql.window import Window
from pyspark.sql.functions import first, col, months_between, lit, sum, max

df = spark.createDataFrame([
    (1, 10, datetime.strptime("2020-04-30",'%Y-%m-%d')),
    (1, 6, datetime.strptime("2020-10-31",'%Y-%m-%d')),
    (1, 9, datetime.strptime("2020-09-30",'%Y-%m-%d')),
    (1, 2, datetime.strptime("2021-04-30",'%Y-%m-%d')),
    (2, 7, datetime.strptime("2020-07-31",'%Y-%m-%d')),
    (2, 8, datetime.strptime("2020-08-31",'%Y-%m-%d')),
    (2, 3, datetime.strptime("2021-06-30",'%Y-%m-%d')),
    (2, 2, datetime.strptime("2021-08-30",'%Y-%m-%d'))
], ['id', 'sales', 'sales_date'])

win = Window.partitionBy('id').orderBy('sales_date')
df = df.withColumn('order_start_date', first(col('sales_date')).over(win)) \
    .filter(months_between(col('sales_date'), col('order_start_date')) <= 12)

df.show()
+---+-----+-------------------+-------------------+
| id|sales|         sales_date|   order_start_date|
+---+-----+-------------------+-------------------+
|  1|   10|2020-04-30 00:00:00|2020-04-30 00:00:00|
|  1|    9|2020-09-30 00:00:00|2020-04-30 00:00:00|
|  1|    6|2020-10-31 00:00:00|2020-04-30 00:00:00|
|  1|    2|2021-04-30 00:00:00|2020-04-30 00:00:00|
|  2|    7|2020-07-31 00:00:00|2020-07-31 00:00:00|
|  2|    8|2020-08-31 00:00:00|2020-07-31 00:00:00|
|  2|    3|2021-06-30 00:00:00|2020-07-31 00:00:00|
+---+-----+-------------------+-------------------+

# 2021-08-30 被删除
df = df.groupBy('id').agg(
    sum('sales').alias('total_sales'),
    max('sales_date').alias('latest_sales_date')
)

df.show()
+---+-----------+-------------------+
| id|total_sales|  latest_sales_date|
+---+-----------+-------------------+
|  1|         27|2021-04-30 00:00:00|
|  2|         18|2021-06-30 00:00:00|
+---+-----------+-------------------+
英文:

I think you can do it like this:

  • Use the window function to get the start_date of each order_id
  • Filter any rows which have a difference between the sales date and the start date by more than 12 months (This can be more precise per day)
  • Group by id and sum the sales

Note: I have changed the input so it can filter the last record as out of range

from pyspark.sql.window import Window
from pyspark.sql.functions import first, col, months_between, lit, sum, max

df = spark.createDataFrame([
    (1, 10, datetime.strptime(&quot;2020-04-30&quot;,&#39;%Y-%m-%d&#39;)),\
    (1, 6, datetime.strptime(&quot;2020-10-31&quot;,&#39;%Y-%m-%d&#39;)),\
    (1, 9, datetime.strptime(&quot;2020-09-30&quot;,&#39;%Y-%m-%d&#39;)),\
    (1, 2, datetime.strptime(&quot;2021-04-30&quot;,&#39;%Y-%m-%d&#39;)),\
    (2, 7, datetime.strptime(&quot;2020-07-31&quot;,&#39;%Y-%m-%d&#39;)),\
    (2, 8, datetime.strptime(&quot;2020-08-31&quot;,&#39;%Y-%m-%d&#39;)),\
    (2, 3, datetime.strptime(&quot;2021-06-30&quot;,&#39;%Y-%m-%d&#39;)),\
    (2, 2, datetime.strptime(&quot;2021-08-30&quot;,&#39;%Y-%m-%d&#39;))\
    ], [&#39;id&#39;, &#39;sales&#39;, &#39;sales_date&#39;])

win = Window.partitionBy(&#39;id&#39;).orderBy(&#39;sales_date&#39;)
df = df.withColumn(&#39;order_start_date&#39;, first(col(&#39;sales_date&#39;)).over(win)) \
    .filter(months_between(col(&#39;sales_date&#39;), col(&#39;order_start_date&#39;)) &lt;= 12)

df.show()
+---+-----+-------------------+-------------------+
| id|sales|         sales_date|   order_start_date|
+---+-----+-------------------+-------------------+
|  1|   10|2020-04-30 00:00:00|2020-04-30 00:00:00|
|  1|    9|2020-09-30 00:00:00|2020-04-30 00:00:00|
|  1|    6|2020-10-31 00:00:00|2020-04-30 00:00:00|
|  1|    2|2021-04-30 00:00:00|2020-04-30 00:00:00|
|  2|    7|2020-07-31 00:00:00|2020-07-31 00:00:00|
|  2|    8|2020-08-31 00:00:00|2020-07-31 00:00:00|
|  2|    3|2021-06-30 00:00:00|2020-07-31 00:00:00|
+---+-----+-------------------+-------------------+

# 2021-08-30 was removed
df = df.groupBy(&#39;id&#39;).agg(\
    sum(&#39;sales&#39;).alias(&#39;total_sales&#39;), \
    max(&#39;sales_date&#39;).alias(&#39;latest_sales_date&#39;)\
)

df.show()
+---+-----------+-------------------+
| id|total_sales|  latest_sales_date|
+---+-----------+-------------------+
|  1|         27|2021-04-30 00:00:00|
|  2|         18|2021-06-30 00:00:00|
+---+-----------+-------------------+

huangapple
  • 本文由 发表于 2023年6月19日 19:49:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/76506330.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定