如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

huangapple go评论141阅读模式
英文:

how to chain first and last from subchains to chains properly in airflow?

问题

考虑以下内容:

sod = (
  DummyOperator(task_id="sod")
  >> DummyOperator(task_id="sod_do_this")
  >> DummyOperator(task_id="sod_last")
)

(
  BranchPythonOperator(
    task_id="should_run_sod",
    python_callable=lambda: "sod", # no_sod
  )
  >> [sod, DummyOperator(task_id="no_sod")]
  >> DummyOperator(task_id="end")
)

这个想法是在中间“连接”子链“sod”。然而,__rshift__ 返回链中的最后一个运算符,所以sod = DummyOperator(task_id="sod_last"),而这些内容混在一起。should_run_sod 连接到 sod_last,而不是 sod

除了到处分配变量之外,我可以以一种简单的方式编写它吗?我想要获得与以下内容相同的结果,但这需要为最后和第一个分配变量,这会变得更加复杂:

sod = DummyOperator(task_id="sod")
sod_do_this = DummyOperator(task_id="sod_do_this")
sod_last = DummyOperator(task_id="sod_last")
sod >> sod_do_this >> sod_last 

should_run_sod = BranchPythonOperator(
  task_id="should_run_sod",
  python_callable=lambda: "sod", # no_sod
)
should_run_sod >> [sod, DummyOperator(task_id="no_sod")]

end = DummyOperator(task_id="end")
sod_last >> end
no_sod >> end
英文:

Consider the following:

sod = (
  DummyOperator(task_id="sod")
  >> DummyOperator(task_id="sod_do_this")
  >> DummyOperator(task_id="sod_last")
)

(
  BranchPythonOperator(
    task_id="should_run_sod",
    python_callable=lambda: "sod", # no_sod
  )
  >> [sod, DummyOperator(task_id="no_sod")]
  >> DummyOperator(task_id="end")
)

The idea is to "connect" a subchain "sod" in between. Hoewever, __rshift__ returns the last operator in chain, so sod = DummyOperator(task_id="sod_last") and the stuff becomes mixed. should_run_sod is connected to sod_last, not to sod.

Can I write it in some simple way, other than assigning variables everywhere? I would want to get the same result as in the following, this however requires variables for last and first, which becomes more convoluted

sod = DummyOperator(task_id="sod")
sod_do_this = DummyOperator(task_id="sod_do_this")
sod_last = DummyOperator(task_id="sod_last")
sod >> sod_do_this >> sod_last 

should_run_sod = BranchPythonOperator(
  task_id="should_run_sod",
  python_callable=lambda: "sod", # no_sod
)
should_run_sod >> [sod, DummyOperator(task_id="no_sod")]

end = DummyOperator(task_id="end")
sod_last >> end
no_sod >> end

答案1

得分: 1

TaskGroup 可用于存储您的子链。

with TaskGroup(group_id='sod') as sod:
    DummyOperator(task_id="sod") 
    >> DummyOperator(task_id="sod_do_this") 
    >> DummyOperator(task_id="sod_last")

(
  BranchPythonOperator(
    task_id="should_run_sod",
    python_callable=lambda: "sod", # no_sod
  )
  >> [sod, DummyOperator(task_id="no_sod")]
  >> DummyOperator(task_id="end")
)

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

英文:

TaskGroup can be used to store your subchain.

with TaskGroup(group_id='sod') as sod:
    DummyOperator(task_id="sod") 
    >> DummyOperator(task_id="sod_do_this") 
    >> DummyOperator(task_id="sod_last")

(
  BranchPythonOperator(
    task_id="should_run_sod",
    python_callable=lambda: "sod", # no_sod
  )
  >> [sod, DummyOperator(task_id="no_sod")]
  >> DummyOperator(task_id="end")
)

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

答案2

得分: 1

有两种方法可以实现这一点,1) 使用chain()方法或2) 使用TaskGroup。后者更清晰。

方法1

from airflow.models.baseoperator import chain

with DAG(...):
    should_run_sod = BranchPythonOperator(
        task_id="should_run_sod",
        python_callable=lambda: "sod",  # no_sod
    )

    sod = [
        DummyOperator(task_id="sod"),
        DummyOperator(task_id="sod_do_this"),
        DummyOperator(task_id="sod_last"),
    ]

    should_run_sod >> [sod[0], DummyOperator(task_id="no_sod")]
    chain(*sod, DummyOperator(task_id="end"))

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

方法2

with DAG(...):
    should_run_sod = BranchPythonOperator(
        task_id="should_run_sod",
        python_callable=lambda: "sod",  # no_sod
    )

    with TaskGroup(group_id="sod") as sod_group:
        sod = EmptyOperator(task_id="sod")
        sod_do_this = EmptyOperator(task_id="sod_do_this")
        sod_last = EmptyOperator(task_id="sod_last")

        sod >> sod_do_this >> sod_last

    should_run_sod >> [sod_group, EmptyOperator(task_id="no_sod")]
    sod_group >> EmptyOperator(task_id="end")

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

英文:

There are two ways you could achieve this, 1) using the chain() method or 2), using a TaskGroup. The latter is cleaner.

Approach 1

from airflow.models.baseoperator import chain

with DAG(...):
    should_run_sod = BranchPythonOperator(
        task_id="should_run_sod",
        python_callable=lambda: "sod",  # no_sod
    )

    sod = [
        DummyOperator(task_id="sod"),
        DummyOperator(task_id="sod_do_this"),
        DummyOperator(task_id="sod_last"),
    ]

    should_run_sod >> [sod[0], DummyOperator(task_id="no_sod")]
    chain(*sod, DummyOperator(task_id="end"))

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

Approach 2

with DAG(...):
    should_run_sod = BranchPythonOperator(
        task_id="should_run_sod",
        python_callable=lambda: "sod",  # no_sod
    )

    with TaskGroup(group_id="sod") as sod_group:
        sod = EmptyOperator(task_id="sod")
        sod_do_this = EmptyOperator(task_id="sod_do_this")
        sod_last = EmptyOperator(task_id="sod_last")

        sod >> sod_do_this >> sod_last

    should_run_sod >> [sod_group, EmptyOperator(task_id="no_sod")]
    sod_group >> EmptyOperator(task_id="end")

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

huangapple
  • 本文由 发表于 2023年6月5日 20:13:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/76406296.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定