新的 @task 装饰器在 Airflow 中是否可以用来实现回调?

huangapple go评论100阅读模式
英文:

can callbacks be implemented with the new @task decorator in airflow

问题

Airflow提供了任务成功和失败的任务回调示例。它提供了一个使用EmptyOperator的示例,如下所示:

  1. import datetime
  2. import pendulum
  3. from airflow import DAG
  4. from airflow.operators.empty import EmptyOperator
  5. def task_failure_alert(context):
  6. print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
  7. def dag_success_alert(context):
  8. print(f"DAG has succeeded, run_id: {context['run_id']}")
  9. with DAG(
  10. dag_id="example_callback",
  11. schedule=None,
  12. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  13. dagrun_timeout=datetime.timedelta(minutes=60),
  14. catchup=False,
  15. on_success_callback=None,
  16. on_failure_callback=task_failure_alert,
  17. tags=["example"],
  18. ):
  19. task1 = EmptyOperator(task_id="task1")
  20. task2 = EmptyOperator(task_id="task2")
  21. task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
  22. task1 >> task2 >> task3

我的回答是:是的,你可以在Airflow 2.0的任务流API(TaskFlow API)中使用这些回调,类似于你提供的示例:

  1. @task(task_id="some_task", on_success_callback="this_func")
  2. def this_func(context):
  3. print('the function succeeded')

请注意,在TaskFlow API中,你可以使用@task装饰器来定义任务,并通过on_success_callback参数指定成功时的回调函数,就像你的示例中所做的那样。

英文:

Airflow provides examples of task callbacks for success and failures of a task. It gives an example with an EmptyOperator as such:

  1. import datetime
  2. import pendulum
  3. from airflow import DAG
  4. from airflow.operators.empty import EmptyOperator
  5. def task_failure_alert(context):
  6. print(f"Task has failed, task_instance_key_str: {context['task_instance_key_str']}")
  7. def dag_success_alert(context):
  8. print(f"DAG has succeeded, run_id: {context['run_id']}")
  9. with DAG(
  10. dag_id="example_callback",
  11. schedule=None,
  12. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  13. dagrun_timeout=datetime.timedelta(minutes=60),
  14. catchup=False,
  15. on_success_callback=None,
  16. on_failure_callback=task_failure_alert,
  17. tags=["example"],
  18. ):
  19. task1 = EmptyOperator(task_id="task1")
  20. task2 = EmptyOperator(task_id="task2")
  21. task3 = EmptyOperator(task_id="task3", on_success_callback=[dag_success_alert])
  22. task1 >> task2 >> task3

My question can I use these callbacks with taskflow API on the new airflow 2.0 .

Something like:

  1. @task(task_id="some_task",on_success_callback="this_func")
  2. def this_func(context):
  3. print 'the function succeeed'

答案1

得分: 2

可以的,你需要定义函数“this_func”并调用它(不作为字符串)。同时,你不能使用相同的名称调用任务函数和回调函数。

  1. def this_func(context):
  2. ...
  3. @task(on_success_callback=this_func)
  4. def some_task():
  5. ...
英文:

Yes you can, you need to define the function "this_func" and call it (no as string). also you can not call the task function and the callback with the same name.

  1. def this_func(context):
  2. ...
  3. @task(on_success_callback=this_func)
  4. def some_task():
  5. ...

huangapple
  • 本文由 发表于 2023年7月17日 23:10:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/76705834.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定