`airflow.exceptions.AirflowException: ‘branch_task_ids’必须只包含有效的task_ids`

huangapple go评论95阅读模式
英文:

airflow.exceptions.AirflowException: 'branch_task_ids' must contain only valid task_ids

问题

  1. 我有一个DAG,其中包含1个自定义任务,1`@task.branch`任务修饰符和1`taskgroup`,在taskgroup中,有多个任务需要根据`@task.branch`的结果依次触发。
  2. PROCESS_BATCH_data_FILE = "batch_upload"
  3. SINGLE_data_FILE_FIRST_OPERATOR = "validate_data_schema_task"
  4. ENSURE_INTEGRITY_TASK = "provide_data_integrity_task"
  5. PROCESS_SINGLE_data_FILE = "process_single_data_file_task"
  6. default_args = {
  7. "retries": 0,
  8. "retry_delay": timedelta(seconds=30),
  9. "trigger_rule": "none_failed",
  10. }
  11. default_args = update_default_args(default_args)
  12. flow_name = "data_ingestion"
  13. with DAG(
  14. flow_name,
  15. default_args=default_args,
  16. start_date= airflow.utils.dates.days_ago(0),
  17. schedule=None,
  18. dagrun_timeout=timedelta(minutes=180)
  19. ) as dag:
  20. update_status_running_op = UpdateStatusOperator(
  21. task_id="update_status_running_task",
  22. )
  23. @task.branch(task_id='check_payload_type')
  24. def is_batch(**context):
  25. # data = context["dag_run"].conf["execution_context"].get("data")
  26. if isinstance(data, dict):
  27. subdag = "validate_data_schema_task"
  28. elif isinstance(data, list):
  29. subdag = PROCESS_BATCH_data_FILE
  30. return subdag
  31. with TaskGroup(group_id='group1') as my_task_group:
  32. validate_schema_operator = ValidatedataSchemaOperator(task_id=SINGLE_data_FILE_FIRST_OPERATOR)
  33. ensure_integrity_op = EnsuredataIntegrityOperator(task_id=ENSURE_INTEGRITY_TASK)
  34. process_single_data_file = ProcessdataOperatorR3(task_id=PROCESS_SINGLE_data_FILE)
  35. validate_schema_operator >> ensure_integrity_op >> process_single_data_file
  36. update_status_finished_op = UpdateStatusOperator(
  37. task_id="update_status_finished_task",
  38. dag=dag,
  39. trigger_rule="all_done",
  40. )
  41. batch_upload = DummyOperator(
  42. task_id=PROCESS_BATCH_data_FILE
  43. )
  44. for batch in range(0, BATCH_NUMBER):
  45. batch_upload >> ProcessdataOperatorR3(
  46. task_id=f"process_data_task_{batch + 1}",
  47. previous_task_id=f"provide_data_integrity_task_{batch + 1}",
  48. batch_number=batch + 1,
  49. trigger_rule="none_failed_or_skipped"
  50. ) >> update_status_finished_op
  51. branch_task = is_batch()
  52. update_status_running_op >> branch_task
  53. branch_task >> batch_upload
  54. branch_task >> my_task_group >> update_status_finished_op
英文:

I have a dag which contains 1 custom task, 1 @task.branch task decorator and 1 taskgroup, inside the taskgroup I have multiple tasks that need to be triggered sequentially depending on the outcome of the @task.branch.

  1. PROCESS_BATCH_data_FILE = "batch_upload"
  2. SINGLE_data_FILE_FIRST_OPERATOR = "validate_data_schema_task"
  3. ENSURE_INTEGRITY_TASK = "provide_data_integrity_task"
  4. PROCESS_SINGLE_data_FILE = "process_single_data_file_task"
  5. default_args = {
  6. "retries": 0,
  7. "retry_delay": timedelta(seconds=30),
  8. "trigger_rule": "none_failed",
  9. }
  10. default_args = update_default_args(default_args)
  11. flow_name = "data_ingestion"
  12. with DAG(
  13. flow_name,
  14. default_args=default_args,
  15. start_date= airflow.utils.dates.days_ago(0),
  16. schedule=None,
  17. dagrun_timeout=timedelta(minutes=180)
  18. ) as dag:
  19. update_status_running_op = UpdateStatusOperator(
  20. task_id="update_status_running_task",
  21. )
  22. @task.branch(task_id = 'check_payload_type')
  23. def is_batch(**context):
  24. # data = context["dag_run"].conf["execution_context"].get("data")
  25. if isinstance(data, dict):
  26. subdag = "validate_data_schema_task"
  27. elif isinstance(data, list):
  28. subdag = PROCESS_BATCH_data_FILE
  29. return subdag
  30. with TaskGroup(group_id='group1') as my_task_group:
  31. validate_schema_operator = ValidatedataSchemaOperator(task_id=SINGLE_data_FILE_FIRST_OPERATOR)
  32. ensure_integrity_op = EnsuredataIntegrityOperator(task_id=ENSURE_INTEGRITY_TASK)
  33. process_single_data_file = ProcessdataOperatorR3(task_id=PROCESS_SINGLE_data_FILE)
  34. validate_schema_operator >> ensure_integrity_op >> process_single_data_file
  35. update_status_finished_op = UpdateStatusOperator(
  36. task_id="update_status_finished_task",
  37. dag=dag,
  38. trigger_rule="all_done",
  39. )
  40. batch_upload = DummyOperator(
  41. task_id=PROCESS_BATCH_data_FILE
  42. )
  43. for batch in range(0, BATCH_NUMBER):
  44. batch_upload >> ProcessdataOperatorR3(
  45. task_id=f"process_data_task_{batch + 1}",
  46. previous_task_id=f"provide_data_integrity_task_{batch + 1}",
  47. batch_number=batch + 1,
  48. trigger_rule="none_failed_or_skipped"
  49. ) >> update_status_finished_op
  50. branch_task = is_batch()
  51. update_status_running_op >> branch_task
  52. branch_task >> batch_upload
  53. branch_task >> my_task_group >> update_status_finished_op

When I trigger below tag I get the following error:

  1. airflow.exceptions.AirflowException: 'branch_task_ids' must contain only valid task_ids. Invalid tasks found: {'validate_data_schema_task'}.

I dont understand why I get the error because 'validate_data_schema_task' is defined at the top of the file. I have tried to hard code 'validate_data_schema_task' as task_id but that gives me the same error.

答案1

得分: 1

任务组中的任务默认情况下在其task_id前面加上了任务组的group_id。这意味着,由于您的ValidatedataSchemaOperator任务位于“group1”的任务组中,该任务的task_id实际上是“group1.validate_data_schema_task”。添加group_id的目的是最初确保DAG中任务的唯一性。

如果希望禁用从TaskGroup添加group_id到底层任务中,可以在TaskGroup上设置prefix_group_id=False

在Airflow文档的TaskGroups部分中有一些其他细节。

英文:

Tasks within TaskGroups by default have the TaskGroup's group_id prepended to the task_id. Meaning since your ValidatedataSchemaOperator task is in a TaskGroup of "group1", that task's task_id is actually "group1.validate_data_schema_task". The prepending of the group_id is to initially ensure uniqueness of tasks within a DAG.

You can disable adding the group_id from TaskGroup to the underlying tasks by setting prefix_group_id=False on the TaskGroup if you'd like.

There are some other finer-grain details on TaskGroups in the Airflow docs.

答案2

得分: 1

当涉及到嵌套在任务组中的任务时,你需要指定它们的任务 _id 为 "group_id.task_id"。

这应该可以工作:

  1. @task.branch(task_id='check_payload_type')
  2. def is_batch(**context):
  3. # data = context["dag_run"].conf["execution_context"].get("data")
  4. if isinstance(data, dict):
  5. subdag = "group1.validate_data_schema_task"
  6. elif isinstance(data, list):
  7. subdag = "group1." + PROCESS_BATCH_data_FILE
  8. return subdag
英文:

When referring to a task nested in a task group you need to specify their task _id as "group_id.task_id".

This should work:

  1. @task.branch(task_id = 'check_payload_type')
  2. def is_batch(**context):
  3. # data = context["dag_run"].conf["execution_context"].get("data")
  4. if isinstance(data, dict):
  5. subdag = "group1.validate_data_schema_task"
  6. elif isinstance(data, list):
  7. subdag = "group1." + PROCESS_BATCH_data_FILE
  8. return subdag

huangapple
  • 本文由 发表于 2023年3月7日 22:59:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/75663632.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定