英文:
airflow.exceptions.AirflowException: 'branch_task_ids' must contain only valid task_ids
问题
我有一个DAG,其中包含1个自定义任务,1个`@task.branch`任务修饰符和1个`taskgroup`,在taskgroup中,有多个任务需要根据`@task.branch`的结果依次触发。
PROCESS_BATCH_data_FILE = "batch_upload"
SINGLE_data_FILE_FIRST_OPERATOR = "validate_data_schema_task"
ENSURE_INTEGRITY_TASK = "provide_data_integrity_task"
PROCESS_SINGLE_data_FILE = "process_single_data_file_task"
default_args = {
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
    "trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
flow_name = "data_ingestion"
with DAG(
    flow_name,
    default_args=default_args,
    start_date= airflow.utils.dates.days_ago(0),
    schedule=None,
    dagrun_timeout=timedelta(minutes=180)
) as dag:
    update_status_running_op = UpdateStatusOperator(
        task_id="update_status_running_task",
    )
    @task.branch(task_id='check_payload_type')
    def is_batch(**context):
        # data = context["dag_run"].conf["execution_context"].get("data")
        if isinstance(data, dict):
            subdag = "validate_data_schema_task"
        elif isinstance(data, list):
            subdag = PROCESS_BATCH_data_FILE
        return subdag
    with TaskGroup(group_id='group1') as my_task_group:
        validate_schema_operator = ValidatedataSchemaOperator(task_id=SINGLE_data_FILE_FIRST_OPERATOR)
        ensure_integrity_op = EnsuredataIntegrityOperator(task_id=ENSURE_INTEGRITY_TASK)
        process_single_data_file = ProcessdataOperatorR3(task_id=PROCESS_SINGLE_data_FILE)
        validate_schema_operator >> ensure_integrity_op >> process_single_data_file 
    update_status_finished_op = UpdateStatusOperator(
        task_id="update_status_finished_task",
        dag=dag,
        trigger_rule="all_done",
    )
    batch_upload = DummyOperator(
        task_id=PROCESS_BATCH_data_FILE
    )
    for batch in range(0, BATCH_NUMBER):
        batch_upload >> ProcessdataOperatorR3(
            task_id=f"process_data_task_{batch + 1}",
            previous_task_id=f"provide_data_integrity_task_{batch + 1}",
            batch_number=batch + 1,
            trigger_rule="none_failed_or_skipped"
        ) >> update_status_finished_op
branch_task = is_batch()
update_status_running_op >> branch_task
branch_task >> batch_upload
branch_task >> my_task_group >> update_status_finished_op
英文:
I have a dag which contains 1 custom task, 1 @task.branch task decorator and 1 taskgroup, inside the taskgroup I have multiple tasks that need to be triggered sequentially depending on the outcome of the @task.branch.
PROCESS_BATCH_data_FILE = "batch_upload"
SINGLE_data_FILE_FIRST_OPERATOR = "validate_data_schema_task"
ENSURE_INTEGRITY_TASK = "provide_data_integrity_task"
PROCESS_SINGLE_data_FILE = "process_single_data_file_task"
default_args = {       
    "retries": 0,
    "retry_delay": timedelta(seconds=30),
    "trigger_rule": "none_failed",
}
default_args = update_default_args(default_args)
flow_name = "data_ingestion"
with DAG(
    flow_name,
    default_args=default_args,
    start_date= airflow.utils.dates.days_ago(0),
    schedule=None,
    dagrun_timeout=timedelta(minutes=180)
) as dag:
    update_status_running_op = UpdateStatusOperator(
        task_id="update_status_running_task",
    )
    @task.branch(task_id = 'check_payload_type')
    def is_batch(**context):
        # data = context["dag_run"].conf["execution_context"].get("data")
        if isinstance(data, dict):
            subdag = "validate_data_schema_task"
        elif isinstance(data, list):
            subdag = PROCESS_BATCH_data_FILE
        return subdag
    with TaskGroup(group_id='group1') as my_task_group:
        validate_schema_operator = ValidatedataSchemaOperator(task_id=SINGLE_data_FILE_FIRST_OPERATOR)
        ensure_integrity_op = EnsuredataIntegrityOperator(task_id=ENSURE_INTEGRITY_TASK)
        process_single_data_file = ProcessdataOperatorR3(task_id=PROCESS_SINGLE_data_FILE)
        validate_schema_operator >> ensure_integrity_op >> process_single_data_file 
    update_status_finished_op = UpdateStatusOperator(
        task_id="update_status_finished_task",
        dag=dag,
        trigger_rule="all_done",
    )
    batch_upload = DummyOperator(
        task_id=PROCESS_BATCH_data_FILE
    )
    for batch in range(0, BATCH_NUMBER):
        batch_upload >> ProcessdataOperatorR3(
            task_id=f"process_data_task_{batch + 1}",
            previous_task_id=f"provide_data_integrity_task_{batch + 1}",
            batch_number=batch + 1,
            trigger_rule="none_failed_or_skipped"
        ) >> update_status_finished_op
branch_task = is_batch()
update_status_running_op >> branch_task
branch_task >> batch_upload
branch_task >> my_task_group >> update_status_finished_op
When I trigger below tag I get the following error:
airflow.exceptions.AirflowException: 'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'validate_data_schema_task'}.
I dont understand why I get the error because 'validate_data_schema_task' is defined at the top of the file. I have tried to hard code 'validate_data_schema_task' as task_id but that gives me the same error.
答案1
得分: 1
任务组中的任务默认情况下在其task_id前面加上了任务组的group_id。这意味着,由于您的ValidatedataSchemaOperator任务位于“group1”的任务组中,该任务的task_id实际上是“group1.validate_data_schema_task”。添加group_id的目的是最初确保DAG中任务的唯一性。
如果希望禁用从TaskGroup添加group_id到底层任务中,可以在TaskGroup上设置prefix_group_id=False。
在Airflow文档的TaskGroups部分中有一些其他细节。
英文:
Tasks within TaskGroups by default have the TaskGroup's group_id prepended to the task_id. Meaning since your ValidatedataSchemaOperator task is in a TaskGroup of "group1", that task's task_id is actually "group1.validate_data_schema_task". The prepending of the group_id is to initially ensure uniqueness of tasks within a DAG.
You can disable adding the group_id from TaskGroup to the underlying tasks by setting prefix_group_id=False on the TaskGroup if you'd like.
There are some other finer-grain details on TaskGroups in the Airflow docs.
答案2
得分: 1
当涉及到嵌套在任务组中的任务时,你需要指定它们的任务 _id 为 "group_id.task_id"。
这应该可以工作:
    @task.branch(task_id='check_payload_type')
    def is_batch(**context):
        # data = context["dag_run"].conf["execution_context"].get("data")
        if isinstance(data, dict):
            subdag = "group1.validate_data_schema_task"
        elif isinstance(data, list):
            subdag = "group1." + PROCESS_BATCH_data_FILE
        return subdag
英文:
When referring to a task nested in a task group you need to specify their task _id as "group_id.task_id".
This should work:
    @task.branch(task_id = 'check_payload_type')
    def is_batch(**context):
        # data = context["dag_run"].conf["execution_context"].get("data")
        if isinstance(data, dict):
            subdag = "group1.validate_data_schema_task"
        elif isinstance(data, list):
            subdag = "group1." + PROCESS_BATCH_data_FILE
        return subdag
    
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论