每月运行一次DAG,除了1日。

huangapple go评论55阅读模式
英文:

Run a DAG on all days of a month except 1 day

问题

我正在运行DAG从源中导入数据。在这种情况下,我们每天都会得到增量数据,每月的特定一天会得到完整数据。因此,我们需要2个DAG,一个可以每天运行,除了每月获取完整数据的特定日期。

对于每月的DAG,调度很容易设置为在特定日期运行。对于每天的DAG,我们可以将其设置为每天运行。然而,在我们获取完整数据的那一天,DAG将始终失败,因为那一天我们不会获取增量数据。

为了避免出现错误,我想安排DAG,以便它在除了获取完整数据的特定日期外的所有日期都运行。

英文:

I am running DAG to import data from a source. In this case we get a daily incremental data and a full data on a specific day of the month. So we need 2 DAGs, one that can run daily except for the specific day of the month when we get the monthly full data.

For the monthly DAG the schedule is easy to set to run on the specific day. For the daily one we can set it to run daily. However, the DAG will always fail on the day when we get the full data since that day we won't get the incremental data.

So to avoid getting the errors I want to schedule the DAG so that it runs on all days except for that specific day when we get the full data.

答案1

得分: 1

以下是翻译好的内容:

为什么要使用两个DAGs?

您可以使用一个单独的DAG,其中第一个任务作为分支操作符,将决定运行是增量还是完整,并相应地继续。您可以使用BranchPythonOperator来实现这一点。只需编写一个从DAG的逻辑日期中提取月份日期的函数。

假设在每个月的1号,DAG将遵循完整数据路径,在其他日期将遵循增量路径。DAG代码如下:

from datetime import datetime
from airflow.operators.empty import EmptyOperator
from airflow import DAG
from airflow.decorators import task

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 2, 1)
}

with DAG('my_dag', schedule=None, default_args=default_args):

    @task.branch(task_id="branch_task")
    def branch_func(**kwargs):
        logical_date = kwargs["logical_date"]
        day_of_month = logical_date.strftime("%d")
        print(f"logical_date is: {logical_date} day of month is: {day_of_month}")
        if int(day_of_month) == 1:
            return "full"
        return "incremental"

    full = EmptyOperator(task_id="full")
    inc = EmptyOperator(task_id="incremental")

    branch_func() >> [full, inc]

如果您仍然更喜欢使用两个单独的DAGs的方法,您可以使用Timetables来自定义调度,以满足您的需求。

英文:

Why 2 DAGs?

You can have a single DAG with first task as branch operator that will decide if the run should be incremental or full and proceed accordingly. You can use BranchPythonOperator for that. Simply write a function that extract the day of the month from the logical date of the DAG.

Lets assume on the 1st of each month the DAG will follow full data path and in the rest of the days it will follow incremental path. The DAG code would be:

from datetime import datetime

from airflow.operators.empty import EmptyOperator

from airflow import DAG
from airflow.decorators import task

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 2, 1)
}

with DAG('my_dag', schedule=None, default_args=default_args):

    @task.branch(task_id="branch_task")
    def branch_func(**kwargs):
        logical_date = kwargs["logical_date"]
        day_of_month = logical_date.strftime("%d")
        print(f"logical_date is: {logical_date} day of month is: {day_of_month} ")
        if int(day_of_month) == 1:
            return "full"
        return "incremental"


    full = EmptyOperator(task_id="full")
    inc = EmptyOperator(task_id="incremental")

    branch_func() >> [full, inc]

每月运行一次DAG,除了1日。

branch task logs:
每月运行一次DAG,除了1日。

If you still prefer the approach of 2 separated DAGs you can use Timetables to customize scheduling as you see fit.

huangapple
  • 本文由 发表于 2023年5月25日 04:12:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76327091.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定