AWS Airflow使用配置JSON触发任务。

huangapple go评论79阅读模式
英文:

AWS Airflow trigger with configuration JSON

问题

我尝试使用Airflow创建动态触发器,也就是在触发Airflow之前,我想要更改多个参数。目前我遇到了如何使用AWS Airflow提供的配置JSON界面传递参数的问题。

以下是Airflow脚本示例:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    message = 'hello world'
    print(message)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 7),
    'retries': 1,
}

dag = DAG(
    'hello_world_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
)

print_hello_task = PythonOperator(
    task_id='print_hello_task',
    python_callable=print_hello,
    dag=dag,
)

print_hello_task

在这个示例中,我将参数传递到配置JSON中。

我该如何将配置JSON中的"message"参数传递给脚本呢?

英文:

i try to make dynamic trigger with airflow, means during trigger airflow i want to change multiple parameter before i trigger the scripts.. right now i stuck on how to pass the parameter using configuration json UI provide by aws airflow.

airflow scripts as below :

    from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def print_hello():
    message = 'hello world'
    print(message)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 8, 7),
    'retries': 1,
}

dag = DAG(
    'hello_world_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
)

print_hello_task = PythonOperator(
    task_id='print_hello_task',
    python_callable=print_hello,
    dag=dag,
)

print_hello_task

this example i pass the parameter into configuration json.
AWS Airflow使用配置JSON触发任务。

how i can pass parameter "message" from configuration json to the scripts?

答案1

得分: 2

在UI中,另一个选项是使用Jinja模板{{ dag_run.conf }}。

在这种情况下,非常重要的是在Dag定义中添加render_template_as_native_obj=True,因为您希望将此conf视为字典。

conf将作为kwargs传递给函数。

with DAG(
    dag_id="hello_world_dag",
    schedule_interval=None,
    default_args=default_args,
    render_template_as_native_obj=True,
    catchup=False,) as dag:

    def print_hello(**kwargs):
        print(kwargs.get("message"))


    print_hello_task = PythonOperator(
        task_id='print_hello_task',
        python_callable=print_hello,
        op_kwargs="{{ dag_run.conf }}",
    )

    print_hello_task
英文:

Another option in to use Jinja Template {{ dag_run.conf }} as mention in the ui

in this case very important to add render_template_as_native_obj=True to the Dag definition cause you want to consider this conf as dict.

the conf would be passed to the function as kwargs

with DAG(
    dag_id="hello_world_dag",
    schedule_interval=None,
    default_args=default_args,
    render_template_as_native_obj=True,
    catchup=False,) as dag:

    def print_hello(**kwargs):
        print(kwargs.get("message"))


    print_hello_task = PythonOperator(
        task_id='print_hello_task',
        python_callable=print_hello,
        op_kwargs="{{ dag_run.conf }}",
    )

    print_hello_task

答案2

得分: 1

这是一个示例DAG,使用Airflow的TaskFlow API打印提供的dag运行配置对象{"hello": "world"}的一些属性(有关访问任务实例上下文的更多信息,请参见上下文部分):

from pendulum import datetime

from airflow.decorators import (
    dag,
    task,
)


@dag(
    schedule="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    default_args={
        "retries": 2,
    },
    tags=["example"],
)
def read_config():
    @task
    def read_conf(dag_run=None):
        print(type(dag_run.conf))
        print(f"DAG Run config: {dag_run.conf}")
        print(f"Hello {dag_run.conf['hello']}")

    read_conf()


read_config()

任务实例日志:

[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - <class 'dict'>
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {'hello': 'world'}
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world
英文:

Here's an example DAG that prints some properties of a provided dag run config object of {&quot;hello&quot;: &quot;world&quot;} using Airflow's TaskFlow API (see the context section for more info on accessing a task instance's context):

from pendulum import datetime

from airflow.decorators import (
    dag,
    task,
)


@dag(
    schedule=&quot;@daily&quot;,
    start_date=datetime(2023, 1, 1),
    catchup=False,
    default_args={
        &quot;retries&quot;: 2,
    },
    tags=[&quot;example&quot;],
)
def read_config():
    @task
    def read_conf(dag_run=None):
        print(type(dag_run.conf))
        print(f&quot;DAG Run config: {dag_run.conf}&quot;)
        print(f&quot;Hello {dag_run.conf[&#39;hello&#39;]}&quot;)

    read_conf()


read_config()

Task instance logs:

[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - &lt;class &#39;dict&#39;&gt;
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {&#39;hello&#39;: &#39;world&#39;}
[2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world

huangapple
  • 本文由 发表于 2023年8月9日 09:35:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76864037.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定