Airflow触发规则,除一个外都已成功。

huangapple go评论73阅读模式
英文:

Airflow trigger rule, all but one has succeeded

问题

Airflow中是否有触发规则(或自定义规则),当除一个上游任务仍在运行以外,所有其他上游任务都已成功时,任务将触发?

还有关于当上游任务成功完成的比例达到80%时的触发规则,是否可行(当然要取最接近的整数)?

英文:

I'm new to Airflow. Is there an Airflow trigger rule (or a custom rule) that if all but one upstream tasks have succeeded (the remaining one is still running), the task will trigger?

How about trigger rule for when a certain proportion of the upstream tasks have succeeded, say 80% (to the closest integer of course)?

答案1

得分: 1

以下是翻译好的部分:

  1. 以一组任务 T_i 开始,并进行下游任务 D。
  2. 对于集合中的每个任务成功完成,创建一个下游依赖任务 PythonOperator,检查任务集合 T 中的状态。如果至少有 80% 的任务成功完成,则返回成功,否则失败。
  3. 将 D 标记为所有 PythonOperator 任务的下游。设置 trigger_rule='one_success'。因此,如果任何 PythonOperator 成功(知道 80% 的任务已成功),则 D 成功。

粗略代码示意(灵感来自于这个回答):

假设你有一组任务和下游任务:

tasks = [
    task_1
    task_2
    ...
    task_n
]

downstream_task = EmptyOperator()
def succeed_if_80pct_succeeds(tasks, **context):
    states = [context["dag_run"].get_task_instance(t.task_id).state for t in tasks]
    num_success = states.count("success")
    if num_success >= round(len(states) * 0.8):
         return True
    return False

success_checks = []

for t in tasks:
    p = PythonOperator(
         python_callable=succeed_if_80pct_succeeds,
         provide_context=True,
    )
    t >> p
    success_checks.append(p)

continue_after_80pct_check = EmptyOperator(trigger_rule='one_success')

success_checks >> continue_after_80pct_check >> downstream_task

希望这对你有所帮助。

英文:

What if you did something like this...

  1. Start with a set of tasks T_i, and the downstream task D.
  2. For each task in the set of tasks succeeds, create a downstream dependent task PythonOperator that checks the states across T. If at least 80% of the tasks succeed, return a success. Otherwise fail.
  3. Mark D as downstream of all the PythonOperator tasks. Set the trigger_rule='one_success'. Therefore if any of the PythonOperators succeed (knowing that 80% of the tasks have succeeded) then D succeeds.

Rough idea in code (taking inspiration from this answer)

Suppose you have a list of tasks and the downstream task:


tasks = [
    task_1
    task_2
    ...
    task_n
]


downstream_task = EmptyOperator()

def succeed_if_80pct_succeeds(tasks, **context):
    states = [context["dag_run"].get_task_instance(t.task_id).state for t in tasks]
    num_success = states.count("success")
    if num_success >= round(len(states) * 0.8):
         return True
    return False

success_checks = []

for t in tasks:
    p = PythonOperator(
         python_callable=succeed_if_80pct_succeeds,
         provide_context=True,
    )
    t >> p
    success_checks.append(p)

continue_after_80pct_check = EmptyOperator(trigger_rule='one_success')

success_checks >> continue_after_80pct_check >> downstream_task

huangapple
  • 本文由 发表于 2023年7月6日 20:15:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/76628731.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定