如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

huangapple go评论176阅读模式
英文:

how to chain first and last from subchains to chains properly in airflow?

问题

考虑以下内容:

  1. sod = (
  2. DummyOperator(task_id="sod")
  3. >> DummyOperator(task_id="sod_do_this")
  4. >> DummyOperator(task_id="sod_last")
  5. )
  6. (
  7. BranchPythonOperator(
  8. task_id="should_run_sod",
  9. python_callable=lambda: "sod", # no_sod
  10. )
  11. >> [sod, DummyOperator(task_id="no_sod")]
  12. >> DummyOperator(task_id="end")
  13. )

这个想法是在中间“连接”子链“sod”。然而,__rshift__ 返回链中的最后一个运算符,所以sod = DummyOperator(task_id="sod_last"),而这些内容混在一起。should_run_sod 连接到 sod_last,而不是 sod

除了到处分配变量之外,我可以以一种简单的方式编写它吗?我想要获得与以下内容相同的结果,但这需要为最后和第一个分配变量,这会变得更加复杂:

  1. sod = DummyOperator(task_id="sod")
  2. sod_do_this = DummyOperator(task_id="sod_do_this")
  3. sod_last = DummyOperator(task_id="sod_last")
  4. sod >> sod_do_this >> sod_last
  5. should_run_sod = BranchPythonOperator(
  6. task_id="should_run_sod",
  7. python_callable=lambda: "sod", # no_sod
  8. )
  9. should_run_sod >> [sod, DummyOperator(task_id="no_sod")]
  10. end = DummyOperator(task_id="end")
  11. sod_last >> end
  12. no_sod >> end
英文:

Consider the following:

  1. sod = (
  2. DummyOperator(task_id="sod")
  3. >> DummyOperator(task_id="sod_do_this")
  4. >> DummyOperator(task_id="sod_last")
  5. )
  6. (
  7. BranchPythonOperator(
  8. task_id="should_run_sod",
  9. python_callable=lambda: "sod", # no_sod
  10. )
  11. >> [sod, DummyOperator(task_id="no_sod")]
  12. >> DummyOperator(task_id="end")
  13. )

The idea is to "connect" a subchain "sod" in between. Hoewever, __rshift__ returns the last operator in chain, so sod = DummyOperator(task_id="sod_last") and the stuff becomes mixed. should_run_sod is connected to sod_last, not to sod.

Can I write it in some simple way, other than assigning variables everywhere? I would want to get the same result as in the following, this however requires variables for last and first, which becomes more convoluted

  1. sod = DummyOperator(task_id="sod")
  2. sod_do_this = DummyOperator(task_id="sod_do_this")
  3. sod_last = DummyOperator(task_id="sod_last")
  4. sod >> sod_do_this >> sod_last
  5. should_run_sod = BranchPythonOperator(
  6. task_id="should_run_sod",
  7. python_callable=lambda: "sod", # no_sod
  8. )
  9. should_run_sod >> [sod, DummyOperator(task_id="no_sod")]
  10. end = DummyOperator(task_id="end")
  11. sod_last >> end
  12. no_sod >> end

答案1

得分: 1

TaskGroup 可用于存储您的子链。

  1. with TaskGroup(group_id='sod') as sod:
  2. DummyOperator(task_id="sod")
  3. >> DummyOperator(task_id="sod_do_this")
  4. >> DummyOperator(task_id="sod_last")
  5. (
  6. BranchPythonOperator(
  7. task_id="should_run_sod",
  8. python_callable=lambda: "sod", # no_sod
  9. )
  10. >> [sod, DummyOperator(task_id="no_sod")]
  11. >> DummyOperator(task_id="end")
  12. )

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

英文:

TaskGroup can be used to store your subchain.

  1. with TaskGroup(group_id='sod') as sod:
  2. DummyOperator(task_id="sod")
  3. >> DummyOperator(task_id="sod_do_this")
  4. >> DummyOperator(task_id="sod_last")
  5. (
  6. BranchPythonOperator(
  7. task_id="should_run_sod",
  8. python_callable=lambda: "sod", # no_sod
  9. )
  10. >> [sod, DummyOperator(task_id="no_sod")]
  11. >> DummyOperator(task_id="end")
  12. )

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

答案2

得分: 1

有两种方法可以实现这一点,1) 使用chain()方法或2) 使用TaskGroup。后者更清晰。

方法1

  1. from airflow.models.baseoperator import chain
  2. with DAG(...):
  3. should_run_sod = BranchPythonOperator(
  4. task_id="should_run_sod",
  5. python_callable=lambda: "sod", # no_sod
  6. )
  7. sod = [
  8. DummyOperator(task_id="sod"),
  9. DummyOperator(task_id="sod_do_this"),
  10. DummyOperator(task_id="sod_last"),
  11. ]
  12. should_run_sod >> [sod[0], DummyOperator(task_id="no_sod")]
  13. chain(*sod, DummyOperator(task_id="end"))

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

方法2

  1. with DAG(...):
  2. should_run_sod = BranchPythonOperator(
  3. task_id="should_run_sod",
  4. python_callable=lambda: "sod", # no_sod
  5. )
  6. with TaskGroup(group_id="sod") as sod_group:
  7. sod = EmptyOperator(task_id="sod")
  8. sod_do_this = EmptyOperator(task_id="sod_do_this")
  9. sod_last = EmptyOperator(task_id="sod_last")
  10. sod >> sod_do_this >> sod_last
  11. should_run_sod >> [sod_group, EmptyOperator(task_id="no_sod")]
  12. sod_group >> EmptyOperator(task_id="end")

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

英文:

There are two ways you could achieve this, 1) using the chain() method or 2), using a TaskGroup. The latter is cleaner.

Approach 1

  1. from airflow.models.baseoperator import chain
  2. with DAG(...):
  3. should_run_sod = BranchPythonOperator(
  4. task_id="should_run_sod",
  5. python_callable=lambda: "sod", # no_sod
  6. )
  7. sod = [
  8. DummyOperator(task_id="sod"),
  9. DummyOperator(task_id="sod_do_this"),
  10. DummyOperator(task_id="sod_last"),
  11. ]
  12. should_run_sod >> [sod[0], DummyOperator(task_id="no_sod")]
  13. chain(*sod, DummyOperator(task_id="end"))

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

Approach 2

  1. with DAG(...):
  2. should_run_sod = BranchPythonOperator(
  3. task_id="should_run_sod",
  4. python_callable=lambda: "sod", # no_sod
  5. )
  6. with TaskGroup(group_id="sod") as sod_group:
  7. sod = EmptyOperator(task_id="sod")
  8. sod_do_this = EmptyOperator(task_id="sod_do_this")
  9. sod_last = EmptyOperator(task_id="sod_last")
  10. sod >> sod_do_this >> sod_last
  11. should_run_sod >> [sod_group, EmptyOperator(task_id="no_sod")]
  12. sod_group >> EmptyOperator(task_id="end")

如何在Airflow中正确地将子链中的第一个和最后一个链接到链中?

huangapple
  • 本文由 发表于 2023年6月5日 20:13:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/76406296.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定