Airflow 动态任务组范围创建

huangapple go评论79阅读模式
英文:

airflow dynamic task group range creation

问题

我正在尝试根据用户提供的输入动态创建Airflow任务组。与下面的示例类似,但这里我们希望根据用户提供的输入(无需硬编码)创建多个任务组。

sub_groups = []
for s_id in range(1, 3):
    @task_group(group_id=f"sub_group{s_id}")
    def tg2():
        st1 = EmptyOperator(task_id="task1")
        st2 = EmptyOperator(task_id="task2")
        st1 >> st2
    sub_groups.append(tg2())

t1 >> sub_groups >> t2
groups.append(tg1())
groups[0] >> groups[1]

如何从用户输入或DAG参数中获取此属性?

我们参考了上面的链接,但它具有静态数量的组,这对我们的用例不起作用。

期望任务组的数量应该由用户提供。

英文:

I am trying to create airflow task group dynamically based on user input provided.
As like example given below, but here we want number of task groups created based on user input provided (without hard coding).

`

    sub_groups = []
    for s_id in range(1,3):
        @task_group(group_id=f"sub_group{s_id}")
        def tg2():
            st1 = EmptyOperator(task_id="task1")
            st2 = EmptyOperator(task_id="task2")
            st1 >> st2
        sub_groups.append(tg2())
    
    t1 >> sub_groups >> t2
    groups.append(tg1())
    groups[0] >> groups[1]

How to bring this attribute from user input or dag params ?

<https://docs.astronomer.io/learn/task-groups#generate-task-groups-dynamically-at-runtime>

we referred the above link, but it has static number of groups. which is not working for our use case.

Expectation is number of task group should come user.

答案1

得分: 2

当您触发DAG时,在配置中添加参数number_of_groups,还要在DAG定义中设置render_template_as_native_obj=True

@task
def get_number_of_groups(dag_run=None):
    return list(range(0, dag_run.conf["number_of_groups"]))

@task_group(group_id="group1")
def tg1(my_num):
    @task
    def print_num(num):
        return num

@task
def add_42(num):
    return num + 42

print_num(my_num) >> add_42(my_num)

tg1_object = tg1.expand(my_num=get_number_of_groups())
英文:

Follow @RNHTTR answer

When you trigger the dag in the confuguration add param number_of_groups

also set render_template_as_native_obj=Truein the dag definition

@task
def get_number_of_groups(dag_run=None):
   return list(range(0,dag_run.conf[&quot;number_of_groups&quot;]))

@task_group(group_id=&quot;group1&quot;)
def tg1(my_num):
    @task
    def print_num(num):
        return num

@task
def add_42(num):
    return num + 42

print_num(my_num) &gt;&gt; add_42(my_num)

tg1_object = tg1.expand(my_num= get_no_groups())

答案2

得分: 1

你需要使用动态任务映射。你可以将动态任务映射用于单个任务或任务组

编辑:你分享的Astronomer文档中的示例不必是静态的。而是调用:

tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])

你可以传递不同任务的结果:

import random

...

@task
random_length_list():
    l = []
    for i in range(0, random.randint(0, 7)):
        l.append(i)
    return l

@task_group(group_id="group1")
def tg1(my_num):
    @task
    def print_num(num):
        return num

    @task
    def add_42(num):
        return num + 42

    print_num(my_num) >> add_42(my_num)

tg1_object = tg1.expand(my_num=random_length_list())
英文:

You'll want to use dynamic task mapping. You can use dynamic task mapping for single tasks or for task groups.

EDIT: The example in the Astronomer documentation you shared doesn't have to be static. Instead of calling

tg1_object = tg1.expand(my_num=[19, 23, 42, 8, 7, 108])

You could pass the result of a different task:

import random

...

@task
random_length_list():
    l = []
    for i in range(0, random.randint(0, 7)):
        l.append(i)
    return l

@task_group(group_id=&quot;group1&quot;)
def tg1(my_num):
    @task
    def print_num(num):
        return num

    @task
    def add_42(num):
        return num + 42

    print_num(my_num) &gt;&gt; add_42(my_num)

tg1_object = tg1.expand(my_num=random_length_list())


</details>



huangapple
  • 本文由 发表于 2023年8月10日 20:32:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/76875769.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定