AWS Airflow配置JSON触发器

huangapple go评论118阅读模式
英文:

AWS Airflow trigger with configuration JSON

问题

我尝试使用Airflow创建动态触发器,也就是说,在触发Airflow时,我想在触发脚本之前更改多个参数。目前,我卡在如何使用AWS Airflow提供的配置JSON界面传递参数上。

Airflow脚本如下:

  1. from airflow import DAG
  2. from airflow.operators.python_operator import PythonOperator
  3. from datetime import datetime
  4. def print_hello():
  5. message = 'hello world'
  6. print(message)
  7. default_args = {
  8. 'owner': 'airflow',
  9. 'depends_on_past': False,
  10. 'start_date': datetime(2023, 8, 7),
  11. 'retries': 1,
  12. }
  13. dag = DAG(
  14. 'hello_world_dag',
  15. default_args=default_args,
  16. schedule_interval=None,
  17. catchup=False,
  18. )
  19. print_hello_task = PythonOperator(
  20. task_id='print_hello_task',
  21. python_callable=print_hello,
  22. dag=dag,
  23. )
  24. print_hello_task

在这个示例中,我将参数传递到配置JSON中。

如何将参数“message”从配置JSON传递到脚本中?

英文:

i try to make dynamic trigger with airflow, means during trigger airflow i want to change multiple parameter before i trigger the scripts.. right now i stuck on how to pass the parameter using configuration json UI provide by aws airflow.

airflow scripts as below :

  1. from airflow import DAG
  2. from airflow.operators.python_operator import PythonOperator
  3. from datetime import datetime
  4. def print_hello():
  5. message = 'hello world'
  6. print(message)
  7. default_args = {
  8. 'owner': 'airflow',
  9. 'depends_on_past': False,
  10. 'start_date': datetime(2023, 8, 7),
  11. 'retries': 1,
  12. }
  13. dag = DAG(
  14. 'hello_world_dag',
  15. default_args=default_args,
  16. schedule_interval=None,
  17. catchup=False,
  18. )
  19. print_hello_task = PythonOperator(
  20. task_id='print_hello_task',
  21. python_callable=print_hello,
  22. dag=dag,
  23. )
  24. print_hello_task

this example i pass the parameter into configuration json.
AWS Airflow配置JSON触发器

how i can pass parameter "message" from configuration json to the scripts?

答案1

得分: 2

在此情况下,非常重要将 render_template_as_native_obj=True 添加到Dag定义中,因为您希望将此配置视为字典。

配置将作为kwargs传递给函数

  1. with DAG(
  2. dag_id="hello_world_dag",
  3. schedule_interval=None,
  4. default_args=default_args,
  5. render_template_as_native_obj=True,
  6. catchup=False,
  7. ) as dag:
  8. def print_hello(**kwargs):
  9. print(kwargs.get("message"))
  10. print_hello_task = PythonOperator(
  11. task_id='print_hello_task',
  12. python_callable=print_hello,
  13. op_kwargs="{{ dag_run.conf }}",
  14. )
  15. print_hello_task
英文:

Another option in to use Jinja Template {{ dag_run.conf }} as mention in the ui

in this case very important to add render_template_as_native_obj=True to the Dag definition cause you want to consider this conf as dict.

the conf would be passed to the function as kwargs

  1. with DAG(
  2. dag_id="hello_world_dag",
  3. schedule_interval=None,
  4. default_args=default_args,
  5. render_template_as_native_obj=True,
  6. catchup=False,) as dag:
  7. def print_hello(**kwargs):
  8. print(kwargs.get("message"))
  9. print_hello_task = PythonOperator(
  10. task_id='print_hello_task',
  11. python_callable=print_hello,
  12. op_kwargs="{{ dag_run.conf }}",
  13. )
  14. print_hello_task

答案2

得分: 1

以下是您要翻译的代码部分:

  1. 这是一个示例DAG它使用AirflowTaskFlow API打印提供的DAG运行配置对象`{"hello": "world"}`的一些属性请参阅[上下文](https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/taskflow.html#context)部分以获取有关访问任务实例上下文的更多信息):
  2. from pendulum import datetime
  3. from airflow.decorators import (
  4. dag,
  5. task,
  6. )
  7. @dag(
  8. schedule="@daily",
  9. start_date=datetime(2023, 1, 1),
  10. catchup=False,
  11. default_args={
  12. "retries": 2,
  13. },
  14. tags=["example"],
  15. )
  16. def read_config():
  17. @task
  18. def read_conf(dag_run=None):
  19. print(type(dag_run.conf))
  20. print(f"DAG Run config: {dag_run.conf}")
  21. print(f"Hello {dag_run.conf['hello']}")
  22. read_conf()
  23. read_config()

任务实例日志:

  1. [2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - <class 'dict'>
  2. [2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {'hello': 'world'}
  3. [2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world
英文:

Here's an example DAG that prints some properties of a provided dag run config object of {&quot;hello&quot;: &quot;world&quot;} using Airflow's TaskFlow API (see the context section for more info on accessing a task instance's context):

  1. from pendulum import datetime
  2. from airflow.decorators import (
  3. dag,
  4. task,
  5. )
  6. @dag(
  7. schedule=&quot;@daily&quot;,
  8. start_date=datetime(2023, 1, 1),
  9. catchup=False,
  10. default_args={
  11. &quot;retries&quot;: 2,
  12. },
  13. tags=[&quot;example&quot;],
  14. )
  15. def read_config():
  16. @task
  17. def read_conf(dag_run=None):
  18. print(type(dag_run.conf))
  19. print(f&quot;DAG Run config: {dag_run.conf}&quot;)
  20. print(f&quot;Hello {dag_run.conf[&#39;hello&#39;]}&quot;)
  21. read_conf()
  22. read_config()

Task instance logs:

  1. [2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - &lt;class &#39;dict&#39;&gt;
  2. [2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - DAG Run config: {&#39;hello&#39;: &#39;world&#39;}
  3. [2023-08-08, 22:35:48 EDT] {logging_mixin.py:150} INFO - Hello world

huangapple
  • 本文由 发表于 2023年8月9日 09:35:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76864037-2.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定