创建具有独立依赖关系的动态Airflow任务。

huangapple go评论70阅读模式
英文:

Create Dynamic Airflow tasks with separate dependencies

问题

我想在循环中生成多个Airflow传感器/操作器,但我希望能够逐个访问它们,因为它们具有不同的依赖关系。例如,task1 依赖于 operator1operator2operator3;而 task2 依赖于 operator4operator1operator3,依此类推。有些差异会重叠,但大多数是不同的。

看一下我制作的示例图。
创建具有独立依赖关系的动态Airflow任务。

我尝试在循环中生成它们,并在我的DAG末尾分配依赖关系,但是Airflow报告了一个导入错误,说它无法看到我在文件末尾引用的变量。

我尝试了类似以下的方法

for table_script_id in [1,2,3,4]:
    table_sensor = PythonSensor(
        task_id=f"table_sensor_{table_script_id}",
        python_callable=hello_world,
    )
    
    

# 在DAG末尾;这种方式报告了一个导入错误
table_sensor_1 >> something_else1
table_sensor_2 >> [something_else2, something_else3]

但它没有按照我希望的方式进行,因为所有传感器都生成到 table_sensor 变量中,并相互覆盖,就我所知。

有没有人有一个 最佳 方法来做到这一点的想法?有人做过类似的事情吗?我确定有!但在两天的Google搜索之后,我找不到任何示例,其中动态生成的任务具有不同的依赖关系...

英文:

I want to generate multiple Airflow sensors/operators in a loop, but I want to be able to access them one-by-one, as they have different dependencies. For example, task1 has a dependency for operator1, operator2, and operator3; while task2 has a dependency for operator4, operator1, and operator3, etc. Some of the differences overlap, but most of them are different.

Take a look at the sample graph I made.
创建具有独立依赖关系的动态Airflow任务。

I tried generating them in a loop, and assign the dependencies at the end of my DAG, but I got an import error from Airflow, saying it can't see the variable I referenced at the end of my file.

I tried something along the lines of this

for table_script_id in [1,2,3,4]:
    table_sensor = PythonSensor(
        task_id=f"table_sensor_{table_script_id}",
        python_callable=hello_world,
    )
    
    

# at the end of the DAG; got an import error this way
table_sensor_1 >> something_else1
table_sensor_2 >> [something_else2, something_else3]

But it didn't go as I hoped so, as all the sensors get generated into the table_sensor variable, and overwrite each other as I understand.

Does anyone have an idea for an optimal way to do this? Has anyone done something like this? I'm sure! But after two days of Googling, I could not find any examples where dynamically generated tasks get different dependencies...

答案1

得分: 0

不确定您想要做什么,但您可以将任务存储在某个字典中,并像下面这样应用依赖关系:

task_map = {}
for table_script_id in [1, 2, 3, 4]:
    table_sensor = PythonSensor(
        task_id=f"table_sensor_{table_script_id}",
        python_callable=hello_world,
    )
    task_map[f"table_sensor_{table_script_id}"] = table_sensor

# 在DAG的末尾;以这种方式获得导入错误
task_map.get("table_sensor_1") >> something_else1
task_map.get("table_sensor_2") >> [something_else2, something_else3]
英文:

I don't know exactly what you are trying to do but you can store tasks in some dict and apply dependency something like below

    task_map = {}
    for table_script_id in [1, 2, 3, 4]:
        table_sensor = PythonSensor(
            task_id=f"table_sensor_{table_script_id}",
            python_callable=hello_world,
        )
        task_map[f"table_sensor_{table_script_id}"] = table_sensor

    # at the end of the DAG; got an import error this way
    task_map.get("table_sensor_1") >> something_else1
    task_map.get("table_sensor_2") >> [something_else2, something_else3]

huangapple
  • 本文由 发表于 2023年6月29日 21:47:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/76581650.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定