Airflow – 在动态任务之间传递参数

huangapple go评论59阅读模式
英文:

Airflow - pass parameters between dynamic tasks

问题

I am trying to get my head around dynamic task mapping. I need to create a pretty simple workflow but I am unable to figure out how to pass arguments between different tasks.

What I have:

def get_files():
    return [['23', 'abcd'], ['49', 'xyz']]

def create_instance(index, some_param, **kwargs):
    # do stuff
    return '<instance-id>'

def terminate_instance(instance_id):
    # terminate instance using instance_id
    pass

...

with dag:
    run_get_files = PythonOperator(
        task_id = 'get-files',
        python_callable = get_files,
    )

    run_create_ec2_instance = PythonOperator.partial(
            task_id = 'create-instance',
            python_callable = create_instance,
        ).expand(
            op_args = run_get_files.output
    )

    start_instance = EC2StartInstanceOperator.partial(
        task_id = 'start-instance',
        region_name = 'eu-central-1',
    ).expand(
        instance_id = run_create_ec2_instance.output
    )

    run_terminate_instance = PythonOperator.partial(
            task_id = 'terminate-instance',
            python_callable = terminate_instance,
        ).expand(
            op_args = run_create_ec2_instance.output # error
    )

This works fine UNTIL the run_terminate_instance - somehow Airflow is not able to get the arguments correctly - it takes arguments from all instances at once (as one list). How to do this correctly, please?

英文:

I am trying to get my head around dynamic task mapping. I need to create pretty simple workflow but I am unable to figure out how to pass arguments between different tasks.

What I have:

def get_files():
    return [[&#39;23&#39;, &#39;abcd&#39;], [&#39;49&#39;, &#39;xyz&#39;]]

def create_instance(index, some_param, **kwargs):
    # do stuff
    return &#39;&lt;instance-id&gt;&#39;

def terminate_instance(instance_id):
    # terminate instance using instance_id
    pass

...

with dag:
    run_get_files = PythonOperator(
        task_id = &#39;get-files&#39;,
        python_callable = get_files,
    )

    run_create_ec2_instance = PythonOperator.partial(
            task_id = &#39;create-instance&#39;,
            python_callable = create_instance,
        ).expand(
            op_args = run_get_files.output
    )

    start_instance = EC2StartInstanceOperator.partial(
        task_id = &#39;start-instance&#39;,
        region_name = &#39;eu-central-1&#39;,
    ).expand(
        instance_id = run_create_ec2_instance.output
    )

    run_terminate_instance = PythonOperator.partial(
            task_id = &#39;terminate-instance&#39;,
            python_callable = terminate_instance,
        ).expand(
            op_args = run_create_ec2_instance.output # error
    )

This works fine UNTIL the run_terminate_instance - somehow Airflow is not able to get the arguments correctly - it takes arguments from all instances at once (as one list). How to do this correctly, please?

Airflow – 在动态任务之间传递参数

答案1

得分: 1

尝试定义您想要为每个键执行的任务集。然后扩展该TaskGroup。这是在Airflow 2.5中添加的。

英文:

Try defining the set of tasks that you want to do for each key. Then expand on that TaskGroup. This was added in Airflow 2.5.

答案2

得分: 0

XComArg将你的参数包装起来应该可以解决问题:

from airflow import XComArg

run_create_ec2_instance = PythonOperator.partial(
    task_id='create-instance',
    python_callable=create_instance,
).expand(
    op_args=run_get_files.output
)
run_terminate_instance = PythonOperator.partial(
    task_id='terminate-instance',
    python_callable=terminate_instance,
).expand(
    op_args=XComArg(run_create_ec2_instance)
)

注意:用task装饰器替换PythonOperator会更容易(也更推荐),因为你可以自行格式化输出。

英文:

Wrapping your args with XComArg should to the trick :

from airflow import XComArg

run_create_ec2_instance = PythonOperator.partial(
    task_id = &#39;create-instance&#39;,
    python_callable = create_instance,
    ).expand(
            op_args = run_get_files.output
)
run_terminate_instance = PythonOperator.partial(
            task_id = &#39;terminate-instance&#39;,
            python_callable = terminate_instance,
        ).expand(
            op_args = XComArg(run_create_ec2_instance)
    )

Note : replacing the PythonOperator by function with task decorator would be easier (and recommended) as you can format the output yourself.

huangapple
  • 本文由 发表于 2023年4月10日 22:27:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/75977963.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定