如何根据TriggerDagRunOperator中python_callable的结果来为trigger_dag_id赋值

huangapple go评论98阅读模式
英文:

How to assign value to trigger_dag_id based on the result of python_callable in TriggerDagRunOperator

问题

这是我的代码,我想根据workflow_name触发不同的DAG,我的问题是在trigger_dag_id中需要放入什么值?只留空字符串还是其他内容?

  1. def trigger_check(context, dag_run_obj):
  2. workflow_name_var = context['ti'].xcom_pull(key='workflow_name')
  3. dag_run_obj.payload = {
  4. "workflow_name": workflow_name_var
  5. }
  6. if workflow_name_var == 'normal':
  7. dag_run_obj.trigger_dag_id = JOB_NAME
  8. elif workflow_name_var == 'reload':
  9. dag_run_obj.trigger_dag_id = JOB_NAME_RELOAD
  10. elif workflow_name_var == 'XXX':
  11. dag_run_obj.trigger_dag_id = XXXX
  12. return dag_run_obj
  13. trigger_check_task = TriggerDagRunOperator(
  14. task_id='trigger_check_task',
  15. python_callable=trigger_check,
  16. trigger_dag_id="",
  17. dag=dag
  18. )
英文:

This is my code, I want to trigger different dags based on the workflow_name, my question is what value do I need to put in trigger_dag_id? Just leave it an empty string or something else?

  1. def trigger_check(context, dag_run_obj):
  2. workflow_name_var = context['ti'].xcom_pull(key='workflow_name')
  3. dag_run_obj.payload = {
  4. "workflow_name": workflow_name_var
  5. }
  6. if workflow_name_var == 'normal':
  7. dag_run_obj.trigger_dag_id = JOB_NAME
  8. elif workflow_name_var == 'reload':
  9. dag_run_obj.trigger_dag_id = JOB_NAME_RELOAD
  10. elif workflow_name_var == 'XXX':
  11. dag_run_obj.trigger_dag_id = XXXX
  12. return dag_run_obj
  13. trigger_check_task = TriggerDagRunOperator(
  14. task_id='trigger_check_task',
  15. python_callable= trigger_check,
  16. trigger_dag_id="",
  17. dag=dag
  18. )

答案1

得分: 0

I would solve it with branch_task and task for each TriggerDagRunOperator

  1. from datetime import datetime
  2. from random import choice
  3. from airflow import DAG
  4. from airflow.decorators import branch_task, task
  5. from airflow.models import TaskInstance
  6. from airflow.operators.python import get_current_context
  7. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  8. with DAG(
  9. dag_id="test_dag",
  10. schedule_interval=None,
  11. default_args={
  12. "start_date": datetime(2022, 1, 1),
  13. },
  14. render_template_as_native_obj=True,
  15. tags=["test"],
  16. ) as dag:
  17. dag.doc_md = __doc__
  18. workflow_to_task = {
  19. "normal": "task1",
  20. "reload": "task2",
  21. "XXX": "task3",
  22. }
  23. @task
  24. def get_workflow_name():
  25. return choice(list(workflow_to_task.keys()))
  26. @branch_task()
  27. def trigger_check():
  28. ti: TaskInstance = get_current_context()["ti"]
  29. workflow_name = ti.xcom_pull(task_ids="get_workflow_name")
  30. print(workflow_name)
  31. return workflow_to_task[workflow_name]
  32. task1 = TriggerDagRunOperator(
  33. task_id="task1",
  34. trigger_dag_id="normal",
  35. )
  36. task2 = TriggerDagRunOperator(
  37. task_id="task2",
  38. trigger_dag_id="reload",
  39. )
  40. task3 = TriggerDagRunOperator(
  41. task_id="task3",
  42. trigger_dag_id="XXX",
  43. )
  44. (get_workflow_name() >> trigger_check() >> [task1, task2, task3])

如何根据TriggerDagRunOperator中python_callable的结果来为trigger_dag_id赋值

英文:

I would solve it with branch_task and task for each TriggerDagRunOperator

  1. from datetime import datetime
  2. from random import choice
  3. from airflow import DAG
  4. from airflow.decorators import branch_task, task
  5. from airflow.models import TaskInstance
  6. from airflow.operators.python import get_current_context
  7. from airflow.operators.trigger_dagrun import TriggerDagRunOperator
  8. with DAG(
  9. dag_id="test_dag",
  10. schedule_interval=None,
  11. default_args={
  12. "start_date": datetime(2022, 1, 1),
  13. },
  14. render_template_as_native_obj=True,
  15. tags=["test"],
  16. ) as dag:
  17. dag.doc_md = __doc__
  18. workflow_to_task = {
  19. "normal": "task1",
  20. "reload": "task2",
  21. "XXX": "task3",
  22. }
  23. @task
  24. def get_workflow_name():
  25. return choice(list(workflow_to_task.keys()))
  26. @branch_task()
  27. def trigger_check():
  28. ti: TaskInstance = get_current_context()["ti"]
  29. workflow_name = ti.xcom_pull(task_ids="get_workflow_name")
  30. print(workflow_name)
  31. return workflow_to_task[workflow_name]
  32. task1 = TriggerDagRunOperator(
  33. task_id="task1",
  34. trigger_dag_id="normal",
  35. )
  36. task2 = TriggerDagRunOperator(
  37. task_id="task2",
  38. trigger_dag_id="reload",
  39. )
  40. task3 = TriggerDagRunOperator(
  41. task_id="task3",
  42. trigger_dag_id="XXX",
  43. )
  44. (get_workflow_name() >> trigger_check() >> [task1, task2, task3])

如何根据TriggerDagRunOperator中python_callable的结果来为trigger_dag_id赋值

huangapple
  • 本文由 发表于 2023年6月6日 07:47:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76410626.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定