英文:
Create Dynamic Airflow tasks with separate dependencies
问题
我想在循环中生成多个Airflow传感器/操作器,但我希望能够逐个访问它们,因为它们具有不同的依赖关系。例如,task1
依赖于 operator1
、operator2
和 operator3
;而 task2
依赖于 operator4
、operator1
和 operator3
,依此类推。有些差异会重叠,但大多数是不同的。
看一下我制作的示例图。
我尝试在循环中生成它们,并在我的DAG末尾分配依赖关系,但是Airflow报告了一个导入错误,说它无法看到我在文件末尾引用的变量。
我尝试了类似以下的方法
for table_script_id in [1,2,3,4]:
table_sensor = PythonSensor(
task_id=f"table_sensor_{table_script_id}",
python_callable=hello_world,
)
# 在DAG末尾;这种方式报告了一个导入错误
table_sensor_1 >> something_else1
table_sensor_2 >> [something_else2, something_else3]
但它没有按照我希望的方式进行,因为所有传感器都生成到 table_sensor
变量中,并相互覆盖,就我所知。
有没有人有一个 最佳 方法来做到这一点的想法?有人做过类似的事情吗?我确定有!但在两天的Google搜索之后,我找不到任何示例,其中动态生成的任务具有不同的依赖关系...
英文:
I want to generate multiple Airflow sensors/operators in a loop, but I want to be able to access them one-by-one, as they have different dependencies. For example, task1
has a dependency for operator1
, operator2
, and operator3
; while task2
has a dependency for operator4
, operator1
, and operator3
, etc. Some of the differences overlap, but most of them are different.
Take a look at the sample graph I made.
I tried generating them in a loop, and assign the dependencies at the end of my DAG, but I got an import error from Airflow, saying it can't see the variable I referenced at the end of my file.
I tried something along the lines of this
for table_script_id in [1,2,3,4]:
table_sensor = PythonSensor(
task_id=f"table_sensor_{table_script_id}",
python_callable=hello_world,
)
# at the end of the DAG; got an import error this way
table_sensor_1 >> something_else1
table_sensor_2 >> [something_else2, something_else3]
But it didn't go as I hoped so, as all the sensors get generated into the table_sensor
variable, and overwrite each other as I understand.
Does anyone have an idea for an optimal way to do this? Has anyone done something like this? I'm sure! But after two days of Googling, I could not find any examples where dynamically generated tasks get different dependencies...
答案1
得分: 0
不确定您想要做什么,但您可以将任务存储在某个字典中,并像下面这样应用依赖关系:
task_map = {}
for table_script_id in [1, 2, 3, 4]:
table_sensor = PythonSensor(
task_id=f"table_sensor_{table_script_id}",
python_callable=hello_world,
)
task_map[f"table_sensor_{table_script_id}"] = table_sensor
# 在DAG的末尾;以这种方式获得导入错误
task_map.get("table_sensor_1") >> something_else1
task_map.get("table_sensor_2") >> [something_else2, something_else3]
英文:
I don't know exactly what you are trying to do but you can store tasks in some dict and apply dependency something like below
task_map = {}
for table_script_id in [1, 2, 3, 4]:
table_sensor = PythonSensor(
task_id=f"table_sensor_{table_script_id}",
python_callable=hello_world,
)
task_map[f"table_sensor_{table_script_id}"] = table_sensor
# at the end of the DAG; got an import error this way
task_map.get("table_sensor_1") >> something_else1
task_map.get("table_sensor_2") >> [something_else2, something_else3]
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论