如何在Airflow中的每个任务上运行Python代码?

huangapple go评论57阅读模式
英文:

How do I run python code on each task in Airflow

问题

我有一个DAG上的任务序列

例如

task1 >> task2 >> task3

我需要传达任务的功能,而不重复代码。例如记录日志,想要在每个任务中执行log.info(task_id)。
或者一些外部操作,比如每个任务的HTTP调用。

我对Airflow还不太熟悉,可能会错过一些概念,但我应该如何做到这一点?
自定义操作器?
每个任务执行的拦截器。

我想避免像这样的情况

[task1, logging] >> [task2, logging] >> [task3, logging]

另一个例子
如果我尝试以通用方式处理每个任务存储的xcom参数,我应该如何做呢?我想要将每个任务的xcom处理逻辑放在这个单独的操作器/任务/任何其他处理方式中,日志记录或其他功能也是如此。

触发我的操作器/任务/其他每个任务,我猜这是一种方法,但我是否需要像这样每次都在每个任务上添加它?

[task1, process_xcom] >> [task2, process_xcom] >> [task3, process_xcom]

在Java中类似的概念可能是在服务上使用Aspect。

英文:

I have a sequence of Tasks on my DAG

eg.

task1 >> task2 >> task3

What I need is to put across the tasks functionality without repeating code.
For example logging, would like to execute log.info(task_id) per task
Or some external operation, like http call per task

I am new to airflow and probably missing all the concepts, but how I would do something like this?
a custom operator?
An interceptor for each task execution.

I would like to avoid something like this

[task1,logging] >> [task2,logging] >> [task3, logging]

Another example
If I try to process the xcom params stored per task with a common way how would I do it? Instead of putting the process logic on every task I would like to have the xcom per task on this seperate operator/task/whatever will be to process, same goes for logging or other functionalities

Triggering my operator/task/whatever per task i guess would be one way to do this, but again do I need to add it every time like this on each task?

[task1,process_xcom] >> [task2,process_xcom] >> [task3, process_xcom]

> A similar concept in java could be the usage of Aspects on Services

答案1

得分: 1

  1. 你可以在DAG中设置"default_args"的"on_execute_callback",这样每个任务在启动之前都会调用这个函数,或者在任务结束时调用"on_success_callback"或"on_failure_callback"。

    这里的优势是所有任务都会调用这个函数,你只需要在DAG中定义它一次。

    def log(context):
        print(context["ti"].task_id)
    
    with DAG(
       dag_id="test_dag",
       schedule_interval=None,
       default_args={
          "start_date": datetime(2022, 1, 1),
          "on_execute_callback": log,
          "catchup": False,
       },
    ) as dag:
    
  2. 另一种方法是,在每个任务上可以调用"pre_execute"和/或"post_execute",例如,如果你想对xcom执行某些操作。

    在这个示例中,我将所有JSON键作为单独的键推送到xcom。

    def push_json_xcom(context, result):
        ti = context["ti"]
        data = result if isinstance(result, dict) else json.loads(result)
        for key, value in data.items():
            ti.xcom_push(key, value)
    
    PythonOperator(task_id="test_id", post_execute=push_json_xcom, python_callable=python_func)
    
英文:

Few option that I think can help in this case :

  1. you can set in the dag default_args "on_execute_callback" that each task before it starts will call this function or "on_success_callback"/"on_failure_callback" when it ends.

the advantage here is that all tasks will call this function and you need to defined it once per dag.

    def log(context):
        print(context["ti"].task_id)

    with DAG(
       dag_id="test_dag",
       schedule_interval=None,
       default_args={
          "start_date": datetime(2022, 1, 1),
          "on_execute_callback": log,
          "catchup": False,
    },
  ) as dag:

2.another way is, on each task you can call pre_execute and/or post_execute, for example if you want to do something with the xcom.

in this example I push all json keys to xcom as separate key

def push_json_xcom(context, result):
    ti = context["ti"]
    data = result if isinstance(result, dict) else json.loads(result)
    for key, value in data.items():
        ti.xcom_push(key, value)

PythonOperator(task_id="test_id", post_execute=push_json_xcom, python_callable=python_func)

huangapple
  • 本文由 发表于 2023年5月25日 20:30:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76332295.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定