英文:
Airflow trigger rule, all but one has succeeded
问题
Airflow中是否有触发规则(或自定义规则),当除一个上游任务仍在运行以外,所有其他上游任务都已成功时,任务将触发?
还有关于当上游任务成功完成的比例达到80%时的触发规则,是否可行(当然要取最接近的整数)?
英文:
I'm new to Airflow. Is there an Airflow trigger rule (or a custom rule) that if all but one upstream tasks have succeeded (the remaining one is still running), the task will trigger?
How about trigger rule for when a certain proportion of the upstream tasks have succeeded, say 80% (to the closest integer of course)?
答案1
得分: 1
以下是翻译好的部分:
- 以一组任务 T_i 开始,并进行下游任务 D。
- 对于集合中的每个任务成功完成,创建一个下游依赖任务 PythonOperator,检查任务集合 T 中的状态。如果至少有 80% 的任务成功完成,则返回成功,否则失败。
- 将 D 标记为所有 PythonOperator 任务的下游。设置
trigger_rule='one_success'
。因此,如果任何 PythonOperator 成功(知道 80% 的任务已成功),则 D 成功。
粗略代码示意(灵感来自于这个回答):
假设你有一组任务和下游任务:
tasks = [
task_1
task_2
...
task_n
]
downstream_task = EmptyOperator()
def succeed_if_80pct_succeeds(tasks, **context):
states = [context["dag_run"].get_task_instance(t.task_id).state for t in tasks]
num_success = states.count("success")
if num_success >= round(len(states) * 0.8):
return True
return False
success_checks = []
for t in tasks:
p = PythonOperator(
python_callable=succeed_if_80pct_succeeds,
provide_context=True,
)
t >> p
success_checks.append(p)
continue_after_80pct_check = EmptyOperator(trigger_rule='one_success')
success_checks >> continue_after_80pct_check >> downstream_task
希望这对你有所帮助。
英文:
What if you did something like this...
- Start with a set of tasks T_i, and the downstream task D.
- For each task in the set of tasks succeeds, create a downstream dependent task PythonOperator that checks the states across T. If at least 80% of the tasks succeed, return a success. Otherwise fail.
- Mark D as downstream of all the PythonOperator tasks. Set the
trigger_rule='one_success'
. Therefore if any of the PythonOperators succeed (knowing that 80% of the tasks have succeeded) then D succeeds.
Rough idea in code (taking inspiration from this answer)
Suppose you have a list of tasks and the downstream task:
tasks = [
task_1
task_2
...
task_n
]
downstream_task = EmptyOperator()
def succeed_if_80pct_succeeds(tasks, **context):
states = [context["dag_run"].get_task_instance(t.task_id).state for t in tasks]
num_success = states.count("success")
if num_success >= round(len(states) * 0.8):
return True
return False
success_checks = []
for t in tasks:
p = PythonOperator(
python_callable=succeed_if_80pct_succeeds,
provide_context=True,
)
t >> p
success_checks.append(p)
continue_after_80pct_check = EmptyOperator(trigger_rule='one_success')
success_checks >> continue_after_80pct_check >> downstream_task
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论