英文:
can callbacks be implemented with the new @task decorator in airflow
问题
Airflow提供了任务成功和失败的任务回调示例。它提供了一个使用EmptyOperator的示例,如下所示:
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
我的回答是:是的,你可以在Airflow 2.0的任务流API(TaskFlow API)中使用这些回调,类似于你提供的示例:
@task(task_id="some_task", on_success_callback="this_func")
def this_func(context):
print('the function succeeded')
请注意,在TaskFlow API中,你可以使用@task
装饰器来定义任务,并通过on_success_callback
参数指定成功时的回调函数,就像你的示例中所做的那样。
英文:
Airflow provides examples of task callbacks for success and failures of a task. It gives an example with an EmptyOperator as such:
import datetime
import pendulum
from airflow import DAG
from airflow.operators.empty import EmptyOperator
def task_failure_alert(context):
print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
def dag_success_alert(context):
print(f"DAG has succeeded, run_id: {context['run_id']}")
with DAG(
dag_id="example_callback",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
dagrun_timeout=datetime.timedelta(minutes=60),
catchup=False,
on_success_callback=None,
on_failure_callback=task_failure_alert,
tags=["example"],
):
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
task1 >> task2 >> task3
My question can I use these callbacks with taskflow API on the new airflow 2.0 .
Something like:
@task(task_id="some_task",on_success_callback="this_func")
def this_func(context):
print 'the function succeeed'
答案1
得分: 2
可以的,你需要定义函数“this_func”并调用它(不作为字符串)。同时,你不能使用相同的名称调用任务函数和回调函数。
def this_func(context):
...
@task(on_success_callback=this_func)
def some_task():
...
英文:
Yes you can, you need to define the function "this_func" and call it (no as string). also you can not call the task function and the callback with the same name.
def this_func(context):
...
@task(on_success_callback=this_func)
def some_task():
...
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论