英文:
How to assign value to trigger_dag_id based on the result of python_callable in TriggerDagRunOperator
问题
这是我的代码,我想根据workflow_name触发不同的DAG,我的问题是在trigger_dag_id中需要放入什么值?只留空字符串还是其他内容?
def trigger_check(context, dag_run_obj):
workflow_name_var = context['ti'].xcom_pull(key='workflow_name')
dag_run_obj.payload = {
"workflow_name": workflow_name_var
}
if workflow_name_var == 'normal':
dag_run_obj.trigger_dag_id = JOB_NAME
elif workflow_name_var == 'reload':
dag_run_obj.trigger_dag_id = JOB_NAME_RELOAD
elif workflow_name_var == 'XXX':
dag_run_obj.trigger_dag_id = XXXX
return dag_run_obj
trigger_check_task = TriggerDagRunOperator(
task_id='trigger_check_task',
python_callable=trigger_check,
trigger_dag_id="",
dag=dag
)
英文:
This is my code, I want to trigger different dags based on the workflow_name, my question is what value do I need to put in trigger_dag_id? Just leave it an empty string or something else?
def trigger_check(context, dag_run_obj):
workflow_name_var = context['ti'].xcom_pull(key='workflow_name')
dag_run_obj.payload = {
"workflow_name": workflow_name_var
}
if workflow_name_var == 'normal':
dag_run_obj.trigger_dag_id = JOB_NAME
elif workflow_name_var == 'reload':
dag_run_obj.trigger_dag_id = JOB_NAME_RELOAD
elif workflow_name_var == 'XXX':
dag_run_obj.trigger_dag_id = XXXX
return dag_run_obj
trigger_check_task = TriggerDagRunOperator(
task_id='trigger_check_task',
python_callable= trigger_check,
trigger_dag_id="",
dag=dag
)
答案1
得分: 0
I would solve it with branch_task and task for each TriggerDagRunOperator
from datetime import datetime
from random import choice
from airflow import DAG
from airflow.decorators import branch_task, task
from airflow.models import TaskInstance
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="test_dag",
schedule_interval=None,
default_args={
"start_date": datetime(2022, 1, 1),
},
render_template_as_native_obj=True,
tags=["test"],
) as dag:
dag.doc_md = __doc__
workflow_to_task = {
"normal": "task1",
"reload": "task2",
"XXX": "task3",
}
@task
def get_workflow_name():
return choice(list(workflow_to_task.keys()))
@branch_task()
def trigger_check():
ti: TaskInstance = get_current_context()["ti"]
workflow_name = ti.xcom_pull(task_ids="get_workflow_name")
print(workflow_name)
return workflow_to_task[workflow_name]
task1 = TriggerDagRunOperator(
task_id="task1",
trigger_dag_id="normal",
)
task2 = TriggerDagRunOperator(
task_id="task2",
trigger_dag_id="reload",
)
task3 = TriggerDagRunOperator(
task_id="task3",
trigger_dag_id="XXX",
)
(get_workflow_name() >> trigger_check() >> [task1, task2, task3])
英文:
I would solve it with branch_task and task for each TriggerDagRunOperator
from datetime import datetime
from random import choice
from airflow import DAG
from airflow.decorators import branch_task, task
from airflow.models import TaskInstance
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="test_dag",
schedule_interval=None,
default_args={
"start_date": datetime(2022, 1, 1),
},
render_template_as_native_obj=True,
tags=["test"],
) as dag:
dag.doc_md = __doc__
workflow_to_task = {
"normal": "task1",
"reload": "task2",
"XXX": "task3",
}
@task
def get_workflow_name():
return choice(list(workflow_to_task.keys()))
@branch_task()
def trigger_check():
ti: TaskInstance = get_current_context()["ti"]
workflow_name = ti.xcom_pull(task_ids="get_workflow_name")
print(workflow_name)
return workflow_to_task[workflow_name]
task1 = TriggerDagRunOperator(
task_id="task1",
trigger_dag_id="normal",
)
task2 = TriggerDagRunOperator(
task_id="task2",
trigger_dag_id="reload",
)
task3 = TriggerDagRunOperator(
task_id="task3",
trigger_dag_id="XXX",
)
(get_workflow_name() >> trigger_check() >> [task1, task2, task3])
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论