将Airflow数据间隔设置为覆盖上个月。

huangapple go评论44阅读模式
英文:

Set Airflow data interval to cover the previous month

问题

如何将Airflow数据间隔设置为从上个月的开始到上个月的结束?

假设我有一个在每个月的第7天上午8:00执行的ETL流程。我想要转换在上个月的开始和结束之间插入的新数据。

所以:

  • 在3月7日,我想要处理2月1日到2月28日(或29日)之间到达的数据,
  • 在4月7日,我想要处理3月1日到3月31日之间到达的数据,
  • 以此类推。

不幸的是,我还没有找到将数据间隔“移动”以从上个月的第一天开始并以上个月的最后一天结束的方法。

目前,如果我执行DAG,将获得以下时间段:

  • 对于3月7日:2023-02-07 8:00 a.m. - 2023-03-03 8:00 a.m.
英文:

How to set the Airflow data interval to start at the start of the previous month and to end on the end of the previous month?

Let's suppose that I have an ETL process that is executed at 8:00 a.m. on the 7th day of each month. I want to transform new data that were inserted between the start and end of the previous month.

So:

  • on March 7th I want to process the data that arrived between Feb 1 and Feb 28 (or 29),
  • on April 7th I want to process the data that arrived between Mar 1 and Mar 31,
  • etc.

Unfortunately, I haven't found the way to "shift" the data interval so that it starts on the first and ends on the last day of the previous month.
Currently, if I execute the DAG, I will get following the period:

  • for March 7th: 2023-02-07 8:00 a.m. - 2023-03-03 8:00 a.m.
from __future__ import annotations
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.timetables.interval import CronDataIntervalTimetable

##############


def do_the_etl(start_date_str: str, end_date_str: str) -> None:
    """
    in reality this executes a lenghty etl process,
    here just print a message
    """

    start_date = datetime.fromisoformat(start_date_str)
    end_date = datetime.fromisoformat(end_date_str)

    print(f"[Fake ETL] Querying for the period {start_date}-{end_date}")


####

ETL_DATE_START = "{{ data_interval_start }}"
ETL_DATE_END = "{{ data_interval_end }}"

##############

with DAG(
    dag_id="etl_test",
    start_date=datetime(2023, 2, 4),
    schedule_interval=CronDataIntervalTimetable(cron="0 8 7 * *", timezone="Etc/UTC"),
    catchup=False,
) as dag:
    run_etl = PythonOperator(
        task_id="etl",
        python_callable=do_the_etl,
        op_kwargs={
            "start_date_str": ETL_DATE_START,
            "end_date_str": ETL_DATE_END,
        },
    )

    run_etl

答案1

得分: 1

你可以使用relativedelta来修改日期。

# interval_start
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1) }}
# interval_end
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=31) }}

months=-1表示从日期中减去一个月。

day=1表示将日期的日设置为1。请注意,它不是days

day=31表示将日期的日设置为月份中的最后一天,不考虑该月的天数。

英文:

You can use relativedelta to modify the date.

# interval_start
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1) }}
# interval_end
{{ data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=31) }}

months=-1 means subtracting one month from the date.

day=1 means setting the day of the date as 1. Note it's not days.

day=31 means setting the day of the date as the last day regardless of number of days in the month.

huangapple
  • 本文由 发表于 2023年3月3日 18:14:27
  • 转载请务必保留本文链接:https://go.coder-hub.com/75625749.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定