英文:
Airflow - pass parameters between dynamic tasks
问题
I am trying to get my head around dynamic task mapping. I need to create a pretty simple workflow but I am unable to figure out how to pass arguments between different tasks.
What I have:
def get_files():
return [['23', 'abcd'], ['49', 'xyz']]
def create_instance(index, some_param, **kwargs):
# do stuff
return '<instance-id>'
def terminate_instance(instance_id):
# terminate instance using instance_id
pass
...
with dag:
run_get_files = PythonOperator(
task_id = 'get-files',
python_callable = get_files,
)
run_create_ec2_instance = PythonOperator.partial(
task_id = 'create-instance',
python_callable = create_instance,
).expand(
op_args = run_get_files.output
)
start_instance = EC2StartInstanceOperator.partial(
task_id = 'start-instance',
region_name = 'eu-central-1',
).expand(
instance_id = run_create_ec2_instance.output
)
run_terminate_instance = PythonOperator.partial(
task_id = 'terminate-instance',
python_callable = terminate_instance,
).expand(
op_args = run_create_ec2_instance.output # error
)
This works fine UNTIL the run_terminate_instance - somehow Airflow is not able to get the arguments correctly - it takes arguments from all instances at once (as one list). How to do this correctly, please?
英文:
I am trying to get my head around dynamic task mapping. I need to create pretty simple workflow but I am unable to figure out how to pass arguments between different tasks.
What I have:
def get_files():
return [['23', 'abcd'], ['49', 'xyz']]
def create_instance(index, some_param, **kwargs):
# do stuff
return '<instance-id>'
def terminate_instance(instance_id):
# terminate instance using instance_id
pass
...
with dag:
run_get_files = PythonOperator(
task_id = 'get-files',
python_callable = get_files,
)
run_create_ec2_instance = PythonOperator.partial(
task_id = 'create-instance',
python_callable = create_instance,
).expand(
op_args = run_get_files.output
)
start_instance = EC2StartInstanceOperator.partial(
task_id = 'start-instance',
region_name = 'eu-central-1',
).expand(
instance_id = run_create_ec2_instance.output
)
run_terminate_instance = PythonOperator.partial(
task_id = 'terminate-instance',
python_callable = terminate_instance,
).expand(
op_args = run_create_ec2_instance.output # error
)
This works fine UNTIL the run_terminate_instance - somehow Airflow is not able to get the arguments correctly - it takes arguments from all instances at once (as one list). How to do this correctly, please?
答案1
得分: 1
尝试定义您想要为每个键执行的任务集。然后扩展该TaskGroup。这是在Airflow 2.5中添加的。
英文:
Try defining the set of tasks that you want to do for each key. Then expand on that TaskGroup. This was added in Airflow 2.5.
答案2
得分: 0
用XComArg
将你的参数包装起来应该可以解决问题:
from airflow import XComArg
run_create_ec2_instance = PythonOperator.partial(
task_id='create-instance',
python_callable=create_instance,
).expand(
op_args=run_get_files.output
)
run_terminate_instance = PythonOperator.partial(
task_id='terminate-instance',
python_callable=terminate_instance,
).expand(
op_args=XComArg(run_create_ec2_instance)
)
注意:用task
装饰器替换PythonOperator
会更容易(也更推荐),因为你可以自行格式化输出。
英文:
Wrapping your args with XComArg
should to the trick :
from airflow import XComArg
run_create_ec2_instance = PythonOperator.partial(
task_id = 'create-instance',
python_callable = create_instance,
).expand(
op_args = run_get_files.output
)
run_terminate_instance = PythonOperator.partial(
task_id = 'terminate-instance',
python_callable = terminate_instance,
).expand(
op_args = XComArg(run_create_ec2_instance)
)
Note : replacing the PythonOperator
by function with task
decorator would be easier (and recommended) as you can format the output yourself.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论