英文:
Set Airflow data interval to cover the previous month
问题
如何将Airflow数据间隔设置为从上个月的开始到上个月的结束?
假设我有一个在每个月的第7天上午8:00执行的ETL流程。我想要转换在上个月的开始和结束之间插入的新数据。
所以:
- 在3月7日,我想要处理2月1日到2月28日(或29日)之间到达的数据,
- 在4月7日,我想要处理3月1日到3月31日之间到达的数据,
- 以此类推。
不幸的是,我还没有找到将数据间隔“移动”以从上个月的第一天开始并以上个月的最后一天结束的方法。
目前,如果我执行DAG,将获得以下时间段:
- 对于3月7日:2023-02-07 8:00 a.m. - 2023-03-03 8:00 a.m.
英文:
How to set the Airflow data interval to start at the start of the previous month and to end on the end of the previous month?
Let's suppose that I have an ETL process that is executed at 8:00 a.m. on the 7th day of each month. I want to transform new data that were inserted between the start and end of the previous month.
So:
- on March 7th I want to process the data that arrived between Feb 1 and Feb 28 (or 29),
- on April 7th I want to process the data that arrived between Mar 1 and Mar 31,
- etc.
Unfortunately, I haven't found the way to "shift" the data interval so that it starts on the first and ends on the last day of the previous month.
Currently, if I execute the DAG, I will get following the period:
- for March 7th: 2023-02-07 8:00 a.m. - 2023-03-03 8:00 a.m.
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.timetables.interval import CronDataIntervalTimetable
##############
def do_the_etl(start_date_str: str, end_date_str: str) -> None:
"""
in reality this executes a lenghty etl process,
here just print a message
"""
start_date = datetime.fromisoformat(start_date_str)
end_date = datetime.fromisoformat(end_date_str)
print(f"[Fake ETL] Querying for the period {start_date}-{end_date}")
####
ETL_DATE_START = "{{ data_interval_start }}"
ETL_DATE_END = "{{ data_interval_end }}"
##############
with DAG(
dag_id="etl_test",
start_date=datetime(2023, 2, 4),
schedule_interval=CronDataIntervalTimetable(cron="0 8 7 * *", timezone="Etc/UTC"),
catchup=False,
) as dag:
run_etl = PythonOperator(
task_id="etl",
python_callable=do_the_etl,
op_kwargs={
"start_date_str": ETL_DATE_START,
"end_date_str": ETL_DATE_END,
},
)
run_etl
答案1
得分: 1
你可以使用relativedelta
来修改日期。
# interval_start
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1) }}
# interval_end
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=31) }}
months=-1
表示从日期中减去一个月。
day=1
表示将日期的日设置为1
。请注意,它不是days
。
day=31
表示将日期的日设置为月份中的最后一天,不考虑该月的天数。
英文:
You can use relativedelta
to modify the date.
# interval_start
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1) }}
# interval_end
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=31) }}
months=-1
means subtracting one month from the date.
day=1
means setting the day of the date as 1
. Note it's not days
.
day=31
means setting the day of the date as the last day regardless of number of days in the month.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论