英文:
AWS Airflow trigger with configuration JSON
问题
我尝试使用Airflow创建动态触发器,也就是说,在触发Airflow时,我想在触发脚本之前更改多个参数。目前,我卡在如何使用AWS Airflow提供的配置JSON界面传递参数上。
Airflow脚本如下:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_hello():
message = 'hello world'
print(message)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 8, 7),
'retries': 1,
}
dag = DAG(
'hello_world_dag',
default_args=default_args,
schedule_interval=None,
catchup=False,
)
print_hello_task = PythonOperator(
task_id='print_hello_task',
python_callable=print_hello,
dag=dag,
)
print_hello_task
在这个示例中,我将参数传递到配置JSON中。
如何将参数“message”从配置JSON传递到脚本中?
英文:
i try to make dynamic trigger with airflow, means during trigger airflow i want to change multiple parameter before i trigger the scripts.. right now i stuck on how to pass the parameter using configuration json UI provide by aws airflow.
airflow scripts as below :
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_hello():
message = 'hello world'
print(message)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 8, 7),
'retries': 1,
}
dag = DAG(
'hello_world_dag',
default_args=default_args,
schedule_interval=None,
catchup=False,
)
print_hello_task = PythonOperator(
task_id='print_hello_task',
python_callable=print_hello,
dag=dag,
)
print_hello_task
this example i pass the parameter into configuration json.
how i can pass parameter "message" from configuration json to the scripts?
答案1
得分: 2
在此情况下,非常重要将 render_template_as_native_obj=True
添加到Dag定义中,因为您希望将此配置视为字典。
配置将作为kwargs传递给函数
with DAG(
dag_id="hello_world_dag",
schedule_interval=None,
default_args=default_args,
render_template_as_native_obj=True,
catchup=False,
) as dag:
def print_hello(**kwargs):
print(kwargs.get("message"))
print_hello_task = PythonOperator(
task_id='print_hello_task',
python_callable=print_hello,
op_kwargs="{{ dag_run.conf }}",
)
print_hello_task
英文:
Another option in to use Jinja Template {{ dag_run.conf }} as mention in the ui
in this case very important to add render_template_as_native_obj=True
to the Dag definition cause you want to consider this conf as dict.
the conf would be passed to the function as kwargs
with DAG(
dag_id="hello_world_dag",
schedule_interval=None,
default_args=default_args,
render_template_as_native_obj=True,
catchup=False,) as dag:
def print_hello(**kwargs):
print(kwargs.get("message"))
print_hello_task = PythonOperator(
task_id='print_hello_task',
python_callable=print_hello,
op_kwargs="{{ dag_run.conf }}",
)
print_hello_task
答案2
得分: 1
以下是您要翻译的代码部分:
这是一个示例DAG,它使用Airflow的TaskFlow API打印提供的DAG运行配置对象`{"hello": "world"}`的一些属性(请参阅[上下文](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#context)部分以获取有关访问任务实例上下文的更多信息):
from pendulum import datetime
from airflow.decorators import (
dag,
task,
)
@dag(
schedule="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
default_args={
"retries": 2,
},
tags=["example"],
)
def read_config():
@task
def read_conf(dag_run=None):
print(type(dag_run.conf))
print(f"DAG Run config: {dag_run.conf}")
print(f"Hello {dag_run.conf['hello']}")
read_conf()
read_config()
任务实例日志:
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - <class 'dict'>
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {'hello': 'world'}
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world
英文:
Here's an example DAG that prints some properties of a provided dag run config object of {"hello": "world"}
using Airflow's TaskFlow API (see the context section for more info on accessing a task instance's context):
from pendulum import datetime
from airflow.decorators import (
dag,
task,
)
@dag(
schedule="@daily",
start_date=datetime(2023, 1, 1),
catchup=False,
default_args={
"retries": 2,
},
tags=["example"],
)
def read_config():
@task
def read_conf(dag_run=None):
print(type(dag_run.conf))
print(f"DAG Run config: {dag_run.conf}")
print(f"Hello {dag_run.conf['hello']}")
read_conf()
read_config()
Task instance logs:
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - <class 'dict'>
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {'hello': 'world'}
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论