英文:
how to create a chain of dynamic tasks?
问题
首先,我使用 expand
函数开始。但问题是,程序应该等待所有的 Add 任务完成,然后才开始执行 Mul 任务。我需要下一个 Mul 在每个 Add 之后立即运行。然后我得到了可以创建图表的代码。
with DAG(dag_id="simple_maping", schedule='* * * * *', start_date=datetime(2022, 12, 22)) as dag:
@task
def read_conf():
conf = Variable.get('tables', deserialize_json=True)
return conf
@task
def add_one(x: str):
sleep(5)
return x + '1'
@task
def mul_two(x: str):
return x * 2
for i in read_conf():
mul_two(add_one(i))
但现在出现了一个错误 - 'xcomarg' 对象不可迭代。我可以通过从 read_conf
方法中移除任务装饰器来修复它,但我不确定这是否是最佳决策,因为在我的情况下,列表配置名称可能包含超过 1000 个元素。如果没有装饰器,调度程序在解析图表时必须每次读取配置。也许没有装饰器的情况下,负载不会太重要?或者有一种方法可以使对象可迭代?应该如何正确做?
英文:
I try create graph with chain of dynamic tasks.
First of all, I start with expand
function. But problem is program should wait, when all the Add tasks finished and only then start Mul tasks. I need the next Mul to run immediately after each Add. Then I got the code that the graph could make
with DAG(dag_id="simple_maping", schedule='* * * * *', start_date=datetime(2022, 12, 22)) as dag:
@task
def read_conf():
conf = Variable.get('tables', deserialize_json=True)
return conf
@task
def add_one(x: str):
sleep(5)
return x + '1'
@task
def mul_two(x: str):
return x * 2
for i in read_conf():
mul_two(add_one(i))
but now there is an error - 'xcomarg' object is not iterable. I can fix it just remove task decorator from read_conf
method, but I am not sure it's the best decision, because in my case list configuration names could contain >1000 elements. Without decorator, method have to read configuration every time when scheduler parsed graph.
Maybe the load without the decorator will not be critical? Or is there a way to make an object iterable? How to do it right?
答案1
得分: 2
这个解决方案在2.5.0版本中存在一个bug,该bug在2.5.1版本中得到了解决(尚未发布)。
是的,当您在动态映射任务链中连接任务时,后续的任务(mul_2
)默认会等待第一个任务(add_one
)的所有映射实例完成,因为默认的触发规则是all_success
。尽管您可以将触发规则更改为one_done
,但这不会解决您的问题,因为第二个任务在首次启动时只会决定创建多少个映射任务实例(使用one_done
只会创建一个映射任务实例,对于您的用例没有帮助)。
for循环的问题(以及为什么Airflow不允许您迭代XComArg)在于for循环在DAG代码解析时被解析,这发生在运行时之外,当Airflow还不知道read_conf()
将返回多少结果。如果配置数量很少更改,那么像这样在单独的文件中迭代列表的for循环是一个选项,但是在大规模情况下,这可能会引起性能问题。
我认为最佳解决方案是使用Airflow 2.5.0中新增的动态任务组映射功能:
所有映射的任务组将并行运行,对于read_conf()
的每个输入都会运行。因此,对于每个add_one
,它的mul_two
将立即运行。我在下面放置了此功能的代码。
注意:您目前无法在Airflow UI中看到映射的任务组,也无法访问其日志,因为该功能仍然相对较新,UI扩展应该在2.5.1版本中提供。这就是我添加了映射任务组下游的任务,该任务会打印出mul_two任务的结果列表,以便您可以检查是否正常工作。
from airflow import DAG
from airflow.decorators import task, task_group
from datetime import datetime
from time import sleep
with DAG(
dag_id="simple_mapping",
schedule=None,
start_date=datetime(2022, 12, 22),
catchup=False
) as dag:
@task
def read_conf():
return [10, 20, 30]
@task_group
def calculations(x):
@task
def add_one(x: int):
sleep(x)
return x + 1
@task()
def mul_two(x: int):
return x * 2
mul_two(add_one(x))
@task
def pull_xcom(**context):
pulled_xcom = context["ti"].xcom_pull(
task_ids=['calculations.mul_two'],
key="return_value"
)
print(pulled_xcom)
calculations.expand(x=read_conf()) >> pull_xcom()
希望这有所帮助!
PS:除非您想要回填几周的任务,否则您可能希望设置catchup=False
。
英文:
EDIT: This solution has a bug in 2.5.0 which was solved for 2.5.1 (not released yet).
Yes, when you are chaining dynamically mapped tasks the latter (mul_2
) will wait until all mapped instances of the first task (add_one
) are done by default because the default trigger rule is all_success
. While you can change the trigger rule for example to one_done
this will not solve your issue because the second task will only once, when it first starts running, decide how many mapped task instances it creates (with one_done
it only creates one mapped task instance, so not helpful for your use-case).
The issue with the for-loop (and why Airflow wont allow you to iterate over an XComArg) is that for-loops are parsed when the DAG code is parsed, which happens outside of runtime, when Airflow does not know yet how many results read_conf()
will return. If the number of the configurations only rarely change then having a for-loop like that iterating over list in a separate file is an option, but yes at scale this can cause performance issues.
The best solution in my opinion is to use dynamic task group mapping which was added in Airflow 2.5.0:
All mapped task groups will run in parallel and for every input from read_conf()
. So for every add_one
its mul_two
will run immediately. I put the code for this below.
One note: You will not be able to see the mapped task groups in the Airflow UI or be able to access their logs just yet, the feature is still quite new and the UI extension should come in 2.5.1. That is why I added a task downstream of the mapped task groups that prints out the list of results of the mul_two tasks, so you can check if it is working.
from airflow import DAG
from airflow.decorators import task, task_group
from datetime import datetime
from time import sleep
with DAG(
dag_id="simple_mapping",
schedule=None,
start_date=datetime(2022, 12, 22),
catchup=False
) as dag:
@task
def read_conf():
return [10, 20, 30]
@task_group
def calculations(x):
@task
def add_one(x: int):
sleep(x)
return x + 1
@task()
def mul_two(x: int):
return x * 2
mul_two(add_one(x))
@task
def pull_xcom(**context):
pulled_xcom = context["ti"].xcom_pull(
task_ids=['calculations.mul_two'],
key="return_value"
)
print(pulled_xcom)
calculations.expand(x=read_conf()) >> pull_xcom()
Hope this helps!
PS: you might want to set catchup=False
unless you want to backfill a few weeks of tasks.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论