英文:
Is there a more efficient way to filter previous month's (or X previous months') data using Pyspark?
问题
以下是您要翻译的内容:
"I am new to Pyspark, so I apologize if this is a basic of a question. I have looked at a few StackOverflow posts (like this one), but I am still not sure if I'm following the best (most efficient) way of doing things here.
I have a dataframe and a string parameter/variable representing dataset date. Using the dataset date, I would like to filter data from the dataframe for the last 'X' month (for simplicity, let's just say I'd like to fetch just the last month's data). What I have tried is as below.
customer_panel_s3_location = f's3://my-bucket/region_id={region_id}/marketplace_id={marketplace_id}/'
customer_panel_table = spark.read.parquet(customer_panel_s3_location)
customer_panel_table.show(20, False)
#+-----------+---------+--------------+------------+-------------+---------+-------------+-----------+
|customer_id|region_id|marketplace_id|snapshot_day|45d_yield|plan_type|payment_offer|target_date|
+-----------+---------+--------------+------------+-------------+---------+-------------+-----------+
|493595720 |1 |1 |2021-12-31 |0 |ANNUAL |PAID |2023-02-11 |
|277499930 |1 |1 |2022-01-04 |0 |ANNUAL |PAID |2023-02-11 |
|790961616 |1 |1 |2021-12-28 |0 |ANNUAL |PAID |2023-02-11 |
|2111203 |1 |1 |2022-12-14 |0 |MONTHLY |TRIAL |2023-02-11 |
|4673190 |1 |1 |2022-11-30 |1 |MONTHLY |TRIAL |2023-02-11 |
|2117974 |1 |1 |2022-12-15 |1 |ANNUAL |PAID |2023-02-11 |
dataset_date = '2023-03-16'
df_customer_panel_table = (
spark.read.parquet(customer_panel_s3_location)
.withColumn("dataset_date", dataset_date)
.filter(F.date_format(col("snapshot_day"), "yyyyMM") == F.date_format(F.add_months(F.to_date(col("dataset_date"), "yyyy-MM-dd"), month_offset_for_snapshot_day), "yyyyMM"))"
希望这对您有所帮助。
英文:
I am new to Pyspark, so I apologize if this is a basic of a question. I have looked at a few StackOverflow posts (like this one), but I am still not sure if I'm following the best (most efficient) way of doing things here.
I have a dataframe and a string parameter/variable representing dataset date. Using the dataset date, I would like to filter data from the dataframe for the last 'X' month (for simplicity, let's just say I'd like to fetch just the last month's data). What I have tried is as below.
customer_panel_s3_location = f"s3://my-bucket/region_id={region_id}/marketplace_id={marketplace_id}/"
customer_panel_table = spark.read.parquet(customer_panel_s3_location)
customer_panel_table.show(20, False)
#+-----------+---------+--------------+------------+-------------+---------+-------------+-----------+
# |customer_id|region_id|marketplace_id|snapshot_day|45d_yield|plan_type|payment_offer|target_date|
# +-----------+---------+--------------+------------+-------------+---------+-------------+-----------+
# |493595720 |1 |1 |2021-12-31 |0 |ANNUAL |PAID |2023-02-11 |
# |277499930 |1 |1 |2022-01-04 |0 |ANNUAL |PAID |2023-02-11 |
# |790961616 |1 |1 |2021-12-28 |0 |ANNUAL |PAID |2023-02-11 |
# |2111203 |1 |1 |2022-12-14 |0 |MONTHLY |TRIAL |2023-02-11 |
# |4673190 |1 |1 |2022-11-30 |1 |MONTHLY |TRIAL |2023-02-11 |
# |2117974 |1 |1 |2022-12-15 |1 |ANNUAL |PAID |2023-02-11 |
dataset_date = '2023-03-16'
df_customer_panel_table = (
spark.read.parquet(customer_panel_s3_location)
.withColumn("dataset_date", dataset_date)
.filter(F.date_format(col("snapshot_day"), "yyyyMM") == F.date_format(F.add_months(F.to_date(col("dataset_date"), "yyyy-MM-dd"), month_offset_for_snapshot_day), "yyyyMM"))
Is there a cleaner (more Pyspark-y) way of doing this? I felt like it's kind of clunky that I am converting snapshot_day
(a date type column) to yyyyMM
and thus, might be introducing inefficiencies in conversion, if any.
Thank you in advance for your answers!
答案1
得分: 1
以下是您的DataFrame的翻译:
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
|customer_id|region_id|marketplace_id|snapshot_day|45d_yield|plan_type|payment_offer|target_date|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
| 493595720| 1| 1| 2021-12-31| 0| ANNUAL| PAID| 2023-02-11|
| 277499930| 1| 1| 2022-01-04| 0| ANNUAL| PAID| 2023-02-11|
| 790961616| 1| 1| 2021-12-28| 0| ANNUAL| PAID| 2023-02-11|
| 2111203| 1| 1| 2022-12-14| 0| MONTHLY| TRIAL| 2023-02-11|
| 4673190| 1| 1| 2022-11-30| 1| MONTHLY| TRIAL| 2023-02-11|
| 2117974| 1| 1| 2022-12-15| 1| ANNUAL| PAID| 2023-02-11|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
导入必要的包的翻译:
from pyspark.sql.functions import lit, floor, months_between
- 添加一个具有给定日期的列的翻译:
dataset_day = "2023-03-16";
customer_panel_df = customer_panel_df.withColumn("dataset_day", lit(dataset_day).cast("DATE"))
- 使用
months_between()
函数计算快照日期与数据集日期之间的月数的翻译:
customer_panel_df = customer_panel_df.withColumn("month_gap", floor(months_between("dataset_day", "snapshot_day")))
- 过滤最近的10个月的记录的翻译:
customer_panel_df.filter("month_gap <= 10").drop("dataset_day", "month_gap").show()
输出
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
|customer_id|region_id|marketplace_id|snapshot_day|45d_yield|plan_type|payment_offer|target_date|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
| 2111203| 1| 1| 2022-12-14| 0| MONTHLY| TRIAL| 2023-02-11|
| 4673190| 1| 1| 2022-11-30| 1| MONTHLY| TRIAL| 2023-02-11|
| 2117974| 1| 1| 2022-12-15| 1| ANNUAL| PAID| 2023-02-11|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
英文:
Your DataFrame
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
|customer_id|region_id|marketplace_id|snapshot_day|45d_yield|plan_type|payment_offer|target_date|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
| 493595720| 1| 1| 2021-12-31| 0| ANNUAL| PAID| 2023-02-11|
| 277499930| 1| 1| 2022-01-04| 0| ANNUAL| PAID| 2023-02-11|
| 790961616| 1| 1| 2021-12-28| 0| ANNUAL| PAID| 2023-02-11|
| 2111203| 1| 1| 2022-12-14| 0| MONTHLY| TRIAL| 2023-02-11|
| 4673190| 1| 1| 2022-11-30| 1| MONTHLY| TRIAL| 2023-02-11|
| 2117974| 1| 1| 2022-12-15| 1| ANNUAL| PAID| 2023-02-11|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
Importing necessary package
from pyspark.sql.functions import lit, floor, months_between
- Adding a column with given date
dataset_day = "2023-03-16"
customer_panel_df = customer_panel_df.withColumn("dataset_day", lit(dataset_day).cast("DATE"))
- Calculate the months between Snapshot date with the dataset day using
months_between()
function
customer_panel_df = customer_panel_df.withColumn("month_gap", floor(months_between("dataset_day", "snapshot_day")))
- Filter last 10 months record
customer_panel_df.filter("month_gap <= 10").drop("dataset_day", "month_gap").show()
Output
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
|customer_id|region_id|marketplace_id|snapshot_day|45d_yield|plan_type|payment_offer|target_date|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
| 2111203| 1| 1| 2022-12-14| 0| MONTHLY| TRIAL| 2023-02-11|
| 4673190| 1| 1| 2022-11-30| 1| MONTHLY| TRIAL| 2023-02-11|
| 2117974| 1| 1| 2022-12-15| 1| ANNUAL| PAID| 2023-02-11|
+-----------+---------+--------------+------------+---------+---------+-------------+-----------+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论