将Airflow数据间隔设置为覆盖上个月。

huangapple go评论73阅读模式
英文:

Set Airflow data interval to cover the previous month

问题

如何将Airflow数据间隔设置为从上个月的开始到上个月的结束?

假设我有一个在每个月的第7天上午8:00执行的ETL流程。我想要转换在上个月的开始和结束之间插入的新数据。

所以:

  • 在3月7日,我想要处理2月1日到2月28日(或29日)之间到达的数据,
  • 在4月7日,我想要处理3月1日到3月31日之间到达的数据,
  • 以此类推。

不幸的是,我还没有找到将数据间隔“移动”以从上个月的第一天开始并以上个月的最后一天结束的方法。

目前,如果我执行DAG,将获得以下时间段:

  • 对于3月7日:2023-02-07 8:00 a.m. - 2023-03-03 8:00 a.m.
英文:

How to set the Airflow data interval to start at the start of the previous month and to end on the end of the previous month?

Let's suppose that I have an ETL process that is executed at 8:00 a.m. on the 7th day of each month. I want to transform new data that were inserted between the start and end of the previous month.

So:

  • on March 7th I want to process the data that arrived between Feb 1 and Feb 28 (or 29),
  • on April 7th I want to process the data that arrived between Mar 1 and Mar 31,
  • etc.

Unfortunately, I haven't found the way to "shift" the data interval so that it starts on the first and ends on the last day of the previous month.
Currently, if I execute the DAG, I will get following the period:

  • for March 7th: 2023-02-07 8:00 a.m. - 2023-03-03 8:00 a.m.
  1. from __future__ import annotations
  2. from datetime import datetime
  3. from airflow import DAG
  4. from airflow.operators.python import PythonOperator
  5. from airflow.timetables.interval import CronDataIntervalTimetable
  6. ##############
  7. def do_the_etl(start_date_str: str, end_date_str: str) -> None:
  8. """
  9. in reality this executes a lenghty etl process,
  10. here just print a message
  11. """
  12. start_date = datetime.fromisoformat(start_date_str)
  13. end_date = datetime.fromisoformat(end_date_str)
  14. print(f"[Fake ETL] Querying for the period {start_date}-{end_date}")
  15. ####
  16. ETL_DATE_START = "{{ data_interval_start }}"
  17. ETL_DATE_END = "{{ data_interval_end }}"
  18. ##############
  19. with DAG(
  20. dag_id="etl_test",
  21. start_date=datetime(2023, 2, 4),
  22. schedule_interval=CronDataIntervalTimetable(cron="0 8 7 * *", timezone="Etc/UTC"),
  23. catchup=False,
  24. ) as dag:
  25. run_etl = PythonOperator(
  26. task_id="etl",
  27. python_callable=do_the_etl,
  28. op_kwargs={
  29. "start_date_str": ETL_DATE_START,
  30. "end_date_str": ETL_DATE_END,
  31. },
  32. )
  33. run_etl

答案1

得分: 1

你可以使用relativedelta来修改日期。

  1. # interval_start
  2. {{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1) }}
  3. # interval_end
  4. {{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=31) }}

months=-1表示从日期中减去一个月。

day=1表示将日期的日设置为1。请注意,它不是days

day=31表示将日期的日设置为月份中的最后一天,不考虑该月的天数。

英文:

You can use relativedelta to modify the date.

  1. # interval_start
  2. {{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1) }}
  3. # interval_end
  4. {{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=31) }}

months=-1 means subtracting one month from the date.

day=1 means setting the day of the date as 1. Note it's not days.

day=31 means setting the day of the date as the last day regardless of number of days in the month.

huangapple
  • 本文由 发表于 2023年3月3日 18:14:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/75625749.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定