如何根据TriggerDagRunOperator中python_callable的结果来为trigger_dag_id赋值

huangapple go评论71阅读模式
英文:

How to assign value to trigger_dag_id based on the result of python_callable in TriggerDagRunOperator

问题

这是我的代码,我想根据workflow_name触发不同的DAG,我的问题是在trigger_dag_id中需要放入什么值?只留空字符串还是其他内容?

def trigger_check(context, dag_run_obj):
    workflow_name_var = context['ti'].xcom_pull(key='workflow_name')

    dag_run_obj.payload = {
        "workflow_name": workflow_name_var
    }

    if workflow_name_var == 'normal':
        dag_run_obj.trigger_dag_id = JOB_NAME
    elif workflow_name_var == 'reload':
        dag_run_obj.trigger_dag_id = JOB_NAME_RELOAD
    elif workflow_name_var == 'XXX':
        dag_run_obj.trigger_dag_id = XXXX

    return dag_run_obj


trigger_check_task = TriggerDagRunOperator(
    task_id='trigger_check_task',
    python_callable=trigger_check,
    trigger_dag_id="",
    dag=dag
)
英文:

This is my code, I want to trigger different dags based on the workflow_name, my question is what value do I need to put in trigger_dag_id? Just leave it an empty string or something else?

def trigger_check(context, dag_run_obj):
    workflow_name_var = context['ti'].xcom_pull(key='workflow_name')

    dag_run_obj.payload = {
        "workflow_name": workflow_name_var
    }

    if workflow_name_var == 'normal':
        dag_run_obj.trigger_dag_id = JOB_NAME
    elif workflow_name_var == 'reload':
        dag_run_obj.trigger_dag_id = JOB_NAME_RELOAD
    elif workflow_name_var == 'XXX':
        dag_run_obj.trigger_dag_id = XXXX

    return dag_run_obj


   trigger_check_task = TriggerDagRunOperator(
    task_id='trigger_check_task',
    python_callable= trigger_check,
    trigger_dag_id="",
    dag=dag
)

答案1

得分: 0

I would solve it with branch_task and task for each TriggerDagRunOperator

from datetime import datetime
from random import choice

from airflow import DAG
from airflow.decorators import branch_task, task
from airflow.models import TaskInstance
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id="test_dag",
    schedule_interval=None,
    default_args={
        "start_date": datetime(2022, 1, 1),
    },
    render_template_as_native_obj=True,
    tags=["test"],
) as dag:
    dag.doc_md = __doc__

    workflow_to_task = {
        "normal": "task1",
        "reload": "task2",
        "XXX": "task3",
    }

    @task
    def get_workflow_name():
        return choice(list(workflow_to_task.keys()))

    @branch_task()
    def trigger_check():
        ti: TaskInstance = get_current_context()["ti"]
        workflow_name = ti.xcom_pull(task_ids="get_workflow_name")
        print(workflow_name)
        return workflow_to_task[workflow_name]

    task1 = TriggerDagRunOperator(
        task_id="task1",
        trigger_dag_id="normal",
    )

    task2 = TriggerDagRunOperator(
        task_id="task2",
        trigger_dag_id="reload",
    )

    task3 = TriggerDagRunOperator(
        task_id="task3",
        trigger_dag_id="XXX",
    )

    (get_workflow_name() >> trigger_check() >> [task1, task2, task3])

如何根据TriggerDagRunOperator中python_callable的结果来为trigger_dag_id赋值

英文:

I would solve it with branch_task and task for each TriggerDagRunOperator

from datetime import datetime
from random import choice

from airflow import DAG
from airflow.decorators import branch_task, task
from airflow.models import TaskInstance
from airflow.operators.python import get_current_context
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
    dag_id="test_dag",
    schedule_interval=None,
    default_args={
        "start_date": datetime(2022, 1, 1),
    },
    render_template_as_native_obj=True,
    tags=["test"],
) as dag:
    dag.doc_md = __doc__

    workflow_to_task = {
        "normal": "task1",
        "reload": "task2",
        "XXX": "task3",
    }

    @task
    def get_workflow_name():
        return choice(list(workflow_to_task.keys()))

    @branch_task()
    def trigger_check():
        ti: TaskInstance = get_current_context()["ti"]
        workflow_name = ti.xcom_pull(task_ids="get_workflow_name")
        print(workflow_name)
        return workflow_to_task[workflow_name]

    task1 = TriggerDagRunOperator(
        task_id="task1",
        trigger_dag_id="normal",
    )

    task2 = TriggerDagRunOperator(
        task_id="task2",
        trigger_dag_id="reload",
    )

    task3 = TriggerDagRunOperator(
        task_id="task3",
        trigger_dag_id="XXX",
    )

    (get_workflow_name() >> trigger_check() >> [task1, task2, task3])

如何根据TriggerDagRunOperator中python_callable的结果来为trigger_dag_id赋值

huangapple
  • 本文由 发表于 2023年6月6日 07:47:26
  • 转载请务必保留本文链接:https://go.coder-hub.com/76410626.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定