AWS Airflow配置JSON触发器

huangapple go评论88阅读模式
英文:

AWS Airflow trigger with configuration JSON

问题

我尝试使用Airflow创建动态触发器,也就是说,在触发Airflow时,我想在触发脚本之前更改多个参数。目前,我卡在如何使用AWS Airflow提供的配置JSON界面传递参数上。

Airflow脚本如下:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    message = 'hello world'
    print(message)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 7),
    'retries': 1,
}

dag = DAG(
    'hello_world_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
)

print_hello_task = PythonOperator(
    task_id='print_hello_task',
    python_callable=print_hello,
    dag=dag,
)

print_hello_task

在这个示例中,我将参数传递到配置JSON中。

如何将参数“message”从配置JSON传递到脚本中?

英文:

i try to make dynamic trigger with airflow, means during trigger airflow i want to change multiple parameter before i trigger the scripts.. right now i stuck on how to pass the parameter using configuration json UI provide by aws airflow.

airflow scripts as below :

    from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    message = 'hello world'
    print(message)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 7),
    'retries': 1,
}

dag = DAG(
    'hello_world_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
)

print_hello_task = PythonOperator(
    task_id='print_hello_task',
    python_callable=print_hello,
    dag=dag,
)

print_hello_task

this example i pass the parameter into configuration json.
AWS Airflow配置JSON触发器

how i can pass parameter "message" from configuration json to the scripts?

答案1

得分: 2

在此情况下,非常重要将 render_template_as_native_obj=True 添加到Dag定义中,因为您希望将此配置视为字典。

配置将作为kwargs传递给函数

with DAG(
    dag_id="hello_world_dag",
    schedule_interval=None,
    default_args=default_args,
    render_template_as_native_obj=True,
    catchup=False,
) as dag:

    def print_hello(**kwargs):
        print(kwargs.get("message"))

    print_hello_task = PythonOperator(
        task_id='print_hello_task',
        python_callable=print_hello,
        op_kwargs="{{ dag_run.conf }}",
    )

    print_hello_task
英文:

Another option in to use Jinja Template {{ dag_run.conf }} as mention in the ui

in this case very important to add render_template_as_native_obj=True to the Dag definition cause you want to consider this conf as dict.

the conf would be passed to the function as kwargs

with DAG(
    dag_id="hello_world_dag",
    schedule_interval=None,
    default_args=default_args,
    render_template_as_native_obj=True,
    catchup=False,) as dag:

    def print_hello(**kwargs):
        print(kwargs.get("message"))


    print_hello_task = PythonOperator(
        task_id='print_hello_task',
        python_callable=print_hello,
        op_kwargs="{{ dag_run.conf }}",
    )

    print_hello_task

答案2

得分: 1

以下是您要翻译的代码部分:

这是一个示例DAG它使用Airflow的TaskFlow API打印提供的DAG运行配置对象`{"hello": "world"}`的一些属性请参阅[上下文](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#context)部分以获取有关访问任务实例上下文的更多信息):

from pendulum import datetime

from airflow.decorators import (
    dag,
    task,
)

@dag(
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    default_args={
        "retries": 2,
    },
    tags=["example"],
)
def read_config():
    @task
    def read_conf(dag_run=None):
        print(type(dag_run.conf))
        print(f"DAG Run config: {dag_run.conf}")
        print(f"Hello {dag_run.conf['hello']}")

    read_conf()

read_config()

任务实例日志:

[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - <class 'dict'>
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {'hello': 'world'}
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world
英文:

Here's an example DAG that prints some properties of a provided dag run config object of {&quot;hello&quot;: &quot;world&quot;} using Airflow's TaskFlow API (see the context section for more info on accessing a task instance's context):

from pendulum import datetime

from airflow.decorators import (
    dag,
    task,
)


@dag(
    schedule=&quot;@daily&quot;,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    default_args={
        &quot;retries&quot;: 2,
    },
    tags=[&quot;example&quot;],
)
def read_config():
    @task
    def read_conf(dag_run=None):
        print(type(dag_run.conf))
        print(f&quot;DAG Run config: {dag_run.conf}&quot;)
        print(f&quot;Hello {dag_run.conf[&#39;hello&#39;]}&quot;)

    read_conf()


read_config()

Task instance logs:

[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - &lt;class &#39;dict&#39;&gt;
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {&#39;hello&#39;: &#39;world&#39;}
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world

huangapple
  • 本文由 发表于 2023年8月9日 09:35:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76864037-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定