新的 @task 装饰器在 Airflow 中是否可以用来实现回调?

huangapple go评论69阅读模式
英文:

can callbacks be implemented with the new @task decorator in airflow

问题

Airflow提供了任务成功和失败的任务回调示例。它提供了一个使用EmptyOperator的示例,如下所示:

import datetime
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):

    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

我的回答是:是的,你可以在Airflow 2.0的任务流API(TaskFlow API)中使用这些回调,类似于你提供的示例:

@task(task_id="some_task", on_success_callback="this_func")
def this_func(context):
    print('the function succeeded')

请注意,在TaskFlow API中,你可以使用@task装饰器来定义任务,并通过on_success_callback参数指定成功时的回调函数,就像你的示例中所做的那样。

英文:

Airflow provides examples of task callbacks for success and failures of a task. It gives an example with an EmptyOperator as such:

import datetime
import pendulum

from airflow import DAG
from airflow.operators.empty import EmptyOperator


def task_failure_alert(context):
    print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")


def dag_success_alert(context):
    print(f"DAG has succeeded, run_id: {context['run_id']}")


with DAG(
    dag_id="example_callback",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    dagrun_timeout=datetime.timedelta(minutes=60),
    catchup=False,
    on_success_callback=None,
    on_failure_callback=task_failure_alert,
    tags=["example"],
):

    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
    task1 >> task2 >> task3

My question can I use these callbacks with taskflow API on the new airflow 2.0 .

Something like:

@task(task_id="some_task",on_success_callback="this_func")
def this_func(context):
    print 'the function succeeed'

答案1

得分: 2

可以的,你需要定义函数“this_func”并调用它(不作为字符串)。同时,你不能使用相同的名称调用任务函数和回调函数。

def this_func(context):
    ...

@task(on_success_callback=this_func)
def some_task():
    ...
英文:

Yes you can, you need to define the function "this_func" and call it (no as string). also you can not call the task function and the callback with the same name.

def this_func(context):
    ...

@task(on_success_callback=this_func)
def some_task():
    ...

huangapple
  • 本文由 发表于 2023年7月17日 23:10:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76705834.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定