英文:
how to chain first and last from subchains to chains properly in airflow?
问题
考虑以下内容:
sod = (
DummyOperator(task_id="sod")
>> DummyOperator(task_id="sod_do_this")
>> DummyOperator(task_id="sod_last")
)
(
BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
>> [sod, DummyOperator(task_id="no_sod")]
>> DummyOperator(task_id="end")
)
这个想法是在中间“连接”子链“sod”。然而,__rshift__
返回链中的最后一个运算符,所以sod = DummyOperator(task_id="sod_last")
,而这些内容混在一起。should_run_sod
连接到 sod_last
,而不是 sod
。
除了到处分配变量之外,我可以以一种简单的方式编写它吗?我想要获得与以下内容相同的结果,但这需要为最后和第一个分配变量,这会变得更加复杂:
sod = DummyOperator(task_id="sod")
sod_do_this = DummyOperator(task_id="sod_do_this")
sod_last = DummyOperator(task_id="sod_last")
sod >> sod_do_this >> sod_last
should_run_sod = BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
should_run_sod >> [sod, DummyOperator(task_id="no_sod")]
end = DummyOperator(task_id="end")
sod_last >> end
no_sod >> end
英文:
Consider the following:
sod = (
DummyOperator(task_id="sod")
>> DummyOperator(task_id="sod_do_this")
>> DummyOperator(task_id="sod_last")
)
(
BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
>> [sod, DummyOperator(task_id="no_sod")]
>> DummyOperator(task_id="end")
)
The idea is to "connect" a subchain "sod" in between. Hoewever, __rshift__
returns the last operator in chain, so sod = DummyOperator(task_id="sod_last")
and the stuff becomes mixed. should_run_sod
is connected to sod_last
, not to sod
.
Can I write it in some simple way, other than assigning variables everywhere? I would want to get the same result as in the following, this however requires variables for last and first, which becomes more convoluted
sod = DummyOperator(task_id="sod")
sod_do_this = DummyOperator(task_id="sod_do_this")
sod_last = DummyOperator(task_id="sod_last")
sod >> sod_do_this >> sod_last
should_run_sod = BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
should_run_sod >> [sod, DummyOperator(task_id="no_sod")]
end = DummyOperator(task_id="end")
sod_last >> end
no_sod >> end
答案1
得分: 1
TaskGroup 可用于存储您的子链。
with TaskGroup(group_id='sod') as sod:
DummyOperator(task_id="sod")
>> DummyOperator(task_id="sod_do_this")
>> DummyOperator(task_id="sod_last")
(
BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
>> [sod, DummyOperator(task_id="no_sod")]
>> DummyOperator(task_id="end")
)
英文:
TaskGroup can be used to store your subchain.
with TaskGroup(group_id='sod') as sod:
DummyOperator(task_id="sod")
>> DummyOperator(task_id="sod_do_this")
>> DummyOperator(task_id="sod_last")
(
BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
>> [sod, DummyOperator(task_id="no_sod")]
>> DummyOperator(task_id="end")
)
答案2
得分: 1
有两种方法可以实现这一点,1) 使用chain()
方法或2) 使用TaskGroup。后者更清晰。
方法1
from airflow.models.baseoperator import chain
with DAG(...):
should_run_sod = BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
sod = [
DummyOperator(task_id="sod"),
DummyOperator(task_id="sod_do_this"),
DummyOperator(task_id="sod_last"),
]
should_run_sod >> [sod[0], DummyOperator(task_id="no_sod")]
chain(*sod, DummyOperator(task_id="end"))
方法2
with DAG(...):
should_run_sod = BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
with TaskGroup(group_id="sod") as sod_group:
sod = EmptyOperator(task_id="sod")
sod_do_this = EmptyOperator(task_id="sod_do_this")
sod_last = EmptyOperator(task_id="sod_last")
sod >> sod_do_this >> sod_last
should_run_sod >> [sod_group, EmptyOperator(task_id="no_sod")]
sod_group >> EmptyOperator(task_id="end")
英文:
There are two ways you could achieve this, 1) using the chain()
method or 2), using a TaskGroup. The latter is cleaner.
Approach 1
from airflow.models.baseoperator import chain
with DAG(...):
should_run_sod = BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
sod = [
DummyOperator(task_id="sod"),
DummyOperator(task_id="sod_do_this"),
DummyOperator(task_id="sod_last"),
]
should_run_sod >> [sod[0], DummyOperator(task_id="no_sod")]
chain(*sod, DummyOperator(task_id="end"))
Approach 2
with DAG(...):
should_run_sod = BranchPythonOperator(
task_id="should_run_sod",
python_callable=lambda: "sod", # no_sod
)
with TaskGroup(group_id="sod") as sod_group:
sod = EmptyOperator(task_id="sod")
sod_do_this = EmptyOperator(task_id="sod_do_this")
sod_last = EmptyOperator(task_id="sod_last")
sod >> sod_do_this >> sod_last
should_run_sod >> [sod_group, EmptyOperator(task_id="no_sod")]
sod_group >> EmptyOperator(task_id="end")
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论