如何将过去的TaskInstance的状态设置为成功,并在airflow DAG内继续执行流程?

huangapple go评论94阅读模式
英文:

How to set the state of a past TaskInstance to success and continue the pipeline inside a airflow DAG?

问题

  1. 我试图创建一个DAG其中可以使用dag_id/task_id触发参数DAG的目标是将上一个执行任务的状态设置为success”,并从这一点继续管道
  2. 管道示例:
  3. [![DAG管道示例][1]][1]
  4. [1]: https://i.stack.imgur.com/HZc7q.png
  5. 在我的DAG我希望能够将run_that设置为成功并自动运行run_them作为新状态更改的结果
  6. 到目前为止我已经完成了以下工作:
  7. ```py
  8. import airflow
  9. from airflow.models import DagRun, TaskInstance, DagBag
  10. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  11. from airflow.utils.trigger_rule import TriggerRule
  12. from airflow.utils.state import State
  13. from airflow.operators.python_operator import PythonOperator
  14. import pendulum
  15. from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail
  16. from datetime import timedelta, datetime
  17. default_args = {
  18. 'owner': 'tozzi',
  19. 'depends_on_past': False,
  20. 'start_date': pendulum.datetime(2022, 12, 19, tz='Europe/Paris'),
  21. 'retries': 0,
  22. 'retry_delay': timedelta(minutes=5),
  23. 'xcom_push': True,
  24. 'catchup': False,
  25. 'params': {
  26. 'dag_id': 'my_dag',
  27. 'task_id': 'run_that',
  28. }
  29. }
  30. def last_exec(dag_id, task_id, session):
  31. task_instances = (
  32. session.query(TaskInstance)
  33. .filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id)
  34. .all()
  35. )
  36. task_instances.sort(key=lambda x: x.execution_date, reverse=True)
  37. if task_instances:
  38. return task_instances[0]
  39. return None
  40. def set_last_task_success(**kwargs):
  41. dag_id = kwargs['dag_id']
  42. task_id = kwargs['task_id']
  43. session = airflow.settings.Session()
  44. task_instance = last_exec(dag_id, task_id, session)
  45. if (task_instance is not None):
  46. task_instance.state = 'success'
  47. # task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date)
  48. task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True)
  49. session.commit()
  50. session.close()
  51. doc_md=f"""## 设置给定dag_id的任务task_id为成功状态"""
  52. # 远程启动
  53. launcher = ScriptLauncher(default_args, "@once", 'set_task_to_success', ['airflow'], doc_md)
  54. dag = launcher.dag;
  55. set_to_success = PythonOperator(
  56. task_id='set_to_success',
  57. provide_context=True,
  58. python_callable=set_last_task_success,
  59. dag=dag,
  60. op_kwargs={
  61. 'dag_id': '{{ params.dag_id }}',
  62. 'task_id': '{{ params.task_id }}',
  63. }
  64. )

这里的task_instance.run(...)调用失败,出现以下错误:"AttributeError: 'TaskInstance'对象没有'task'属性",不过状态更改是正常工作的。我应该更改什么以便在更改“run_that”任务的状态时重新运行“run_them”任务?

  1. <details>
  2. <summary>英文:</summary>
  3. I&#39;m trying to make a DAG with params that can be triggered with a dag_id/task_id. The goal of this DAG is to set the state of the last executed task to &quot;success&quot; and to continue the pipeline from this point.
  4. exemple of pipeline:
  5. [![DAG pipeline exemple][1]][1]
  6. [1]: https://i.stack.imgur.com/HZc7q.png
  7. In my dag I want to be able to set &quot;run_that&quot; to success and automatically run &quot;run_them&quot; as a result of the new state change.
  8. Here is what I did from now:
  9. ```py
  10. import airflow
  11. from airflow.models import DagRun, TaskInstance, DagBag
  12. from airflow.operators.dagrun_operator import TriggerDagRunOperator
  13. from airflow.utils.trigger_rule import TriggerRule
  14. from airflow.utils.state import State
  15. from airflow.operators.python_operator import PythonOperator
  16. import pendulum
  17. from wrapper import ScriptLauncher, handleErrorSlack, handleErrorMail
  18. from datetime import timedelta, datetime
  19. default_args = {
  20. &#39;owner&#39;: &#39;tozzi&#39;,
  21. &#39;depends_on_past&#39;: False,
  22. &#39;start_date&#39;: pendulum.datetime(2022, 12, 19, tz=&#39;Europe/Paris&#39;),
  23. &#39;retries&#39;: 0,
  24. &#39;retry_delay&#39;: timedelta(minutes=5),
  25. &#39;xcom_push&#39;: True,
  26. &#39;catchup&#39;: False,
  27. &#39;params&#39;: {
  28. &#39;dag_id&#39;: &#39;my_dag&#39;,
  29. &#39;task_id&#39;: &#39;run_that&#39;,
  30. }
  31. }
  32. def last_exec(dag_id, task_id, session):
  33. task_instances = (
  34. session.query(TaskInstance)
  35. .filter(TaskInstance.dag_id == dag_id, TaskInstance.task_id == task_id)
  36. .all()
  37. )
  38. task_instances.sort(key=lambda x: x.execution_date, reverse=True)
  39. if task_instances:
  40. return task_instances[0]
  41. return None
  42. def set_last_task_success(**kwargs):
  43. dag_id = kwargs[&#39;dag_id&#39;]
  44. task_id = kwargs[&#39;task_id&#39;]
  45. session = airflow.settings.Session()
  46. task_instance = last_exec(dag_id, task_id, session)
  47. if (task_instance is not None):
  48. task_instance.state = &#39;success&#39;
  49. # task_instance = TaskInstance(task_id=task_id, execution_date=last_task_instance.execution_date)
  50. task_instance.run(session=session, ignore_ti_state=True, ignore_task_deps=True)
  51. session.commit()
  52. session.close()
  53. doc_md=f&quot;&quot;&quot;## Set the given task_id to success of the given dag_id&quot;&quot;&quot;
  54. # launched remotely
  55. launcher = ScriptLauncher(default_args, &quot;@once&quot;, &#39;set_task_to_success&#39;, [&#39;airflow&#39;], doc_md)
  56. dag = launcher.dag;
  57. set_to_success = PythonOperator(
  58. task_id=&#39;set_to_success&#39;,
  59. provide_context=True,
  60. python_callable=set_last_task_success,
  61. dag=dag,
  62. op_kwargs={
  63. &#39;dag_id&#39;: &#39;{{ params.dag_id }}&#39;,
  64. &#39;task_id&#39;: &#39;{{ params.task_id }}&#39;,
  65. }
  66. )

The task_instance.run(...) call fail here with this error : "AttributeError: 'TaskInstance' object has no attribute 'task'", the state change is correctly working tho. What should I change so it rerun the "run_them" task when I change the state of the "run_that" task?

答案1

得分: 2

截至Airflow 2.5.0版本,您可以通过Airflow REST API更新任何现有任务的状态。

英文:

As of Airflow 2.5.0 you can update the state of any existing task via the Airflow REST API.

答案2

得分: 1

使用@TJaniF的答案,我制作了这个小型可重用的失败函数任务on_failure_send_force_success_mail,该函数会发送一封邮件,其中包含指向自定义API的链接,该API调用使用GET请求来调用修补任务实例请求,它的运行符合预期:

  1. def on_failure_send_force_success_mail(task_id):
  2. def send_mail(**context):
  3. result = requests.post('mailing_api_url', json={
  4. "from": "foobar@mail.com",
  5. "to": "tozzi@mail.com",
  6. "subject": "Airflow task failed",
  7. "html": f'Airflow task test failed: <a href="custom_api_url/script/{context["task_instance"].dag_id}/dag_runs/{context["dag_run"].run_id}/task_instances/{task_id}?new_state=success">click here</a>',
  8. })
  9. return send_mail
  10. def run_this_func_failed(**context):
  11. raise Exception('FAILED')
  12. def run_this_func(**context):
  13. print('COMPLETED')
  14. run_that = PythonOperator(
  15. task_id='run_that',
  16. provide_context=True,
  17. python_callable=run_this_func_failed,
  18. dag=dag,
  19. )
  20. run_this = PythonOperator(
  21. task_id='run_this',
  22. provide_context=True,
  23. python_callable=run_this_func,
  24. trigger_rule="all_success",
  25. dag=dag,
  26. )
  27. on_failure = PythonOperator(
  28. task_id='on_failure',
  29. provide_context=True,
  30. python_callable=on_failure_send_force_success_mail('run_that'),
  31. trigger_rule="all_failed",
  32. dag=dag,
  33. )
  34. run_that >> on_failure
  35. run_that >> run_this
英文:

Using @TJaniF answer, I made this little reusable failure function task on_failure_send_force_success_mail, this function send a mail with a link to a custom API that call the patch task instance request using a get, it works as expected:

  1. def on_failure_send_force_success_mail(task_id):
  2. def send_mail(**context):
  3. result = requests.post(&#39;mailing_api_url&#39;, json={
  4. &quot;from&quot;: &quot;foobar@mail.com&quot;,
  5. &quot;to&quot;: &quot;tozzi@mail.com&quot;,
  6. &quot;subject&quot;: &quot;Airflow task failed&quot;,
  7. &quot;html&quot;: f&#39;Airflow task test failed: &lt;a href=&quot;custom_api_url/script/{context[&quot;task_instance&quot;].dag_id}/dag_runs/{context[&quot;dag_run&quot;].run_id}/task_instances/{task_id}?new_state=success&quot;&gt;click here&lt;/a&gt;&#39;,
  8. })
  9. return send_mail
  10. def run_this_func_failed(**context):
  11. raise Exception(&#39;FAILED&#39;)
  12. def run_this_func(**context):
  13. print(&#39;COMPLETED&#39;)
  14. run_that = PythonOperator(
  15. task_id=&#39;run_that&#39;,
  16. provide_context=True,
  17. python_callable=run_this_func_failed,
  18. dag=dag,
  19. )
  20. run_this = PythonOperator(
  21. task_id=&#39;run_this&#39;,
  22. provide_context=True,
  23. python_callable=run_this_func,
  24. trigger_rule=&quot;all_success&quot;,
  25. dag=dag,
  26. )
  27. on_failure = PythonOperator(
  28. task_id=&#39;on_failure&#39;,
  29. provide_context=True,
  30. python_callable=on_failure_send_force_success_mail(&#39;run_that&#39;),
  31. trigger_rule=&quot;all_failed&quot;,
  32. dag=dag,
  33. )
  34. run_that &gt;&gt; on_failure
  35. run_that &gt;&gt; run_this

huangapple
  • 本文由 发表于 2023年2月6日 21:18:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/75361826.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定