英文:
How do I run python code on each task in Airflow
问题
我有一个DAG上的任务序列
例如
task1 >> task2 >> task3
我需要传达任务的功能,而不重复代码。例如记录日志,想要在每个任务中执行log.info(task_id)。
或者一些外部操作,比如每个任务的HTTP调用。
我对Airflow还不太熟悉,可能会错过一些概念,但我应该如何做到这一点?
自定义操作器?
每个任务执行的拦截器。
我想避免像这样的情况
[task1, logging] >> [task2, logging] >> [task3, logging]
另一个例子
如果我尝试以通用方式处理每个任务存储的xcom参数,我应该如何做呢?我想要将每个任务的xcom处理逻辑放在这个单独的操作器/任务/任何其他处理方式中,日志记录或其他功能也是如此。
触发我的操作器/任务/其他每个任务,我猜这是一种方法,但我是否需要像这样每次都在每个任务上添加它?
[task1, process_xcom] >> [task2, process_xcom] >> [task3, process_xcom]
在Java中类似的概念可能是在服务上使用Aspect。
英文:
I have a sequence of Tasks on my DAG
eg.
task1 >> task2 >> task3
What I need is to put across the tasks functionality without repeating code.
For example logging, would like to execute log.info(task_id) per task
Or some external operation, like http call per task
I am new to airflow and probably missing all the concepts, but how I would do something like this?
a custom operator?
An interceptor for each task execution.
I would like to avoid something like this
[task1,logging] >> [task2,logging] >> [task3, logging]
Another example
If I try to process the xcom params stored per task with a common way how would I do it? Instead of putting the process logic on every task I would like to have the xcom per task on this seperate operator/task/whatever will be to process, same goes for logging or other functionalities
Triggering my operator/task/whatever per task i guess would be one way to do this, but again do I need to add it every time like this on each task?
[task1,process_xcom] >> [task2,process_xcom] >> [task3, process_xcom]
> A similar concept in java could be the usage of Aspects on Services
答案1
得分: 1
-
你可以在DAG中设置"default_args"的"on_execute_callback",这样每个任务在启动之前都会调用这个函数,或者在任务结束时调用"on_success_callback"或"on_failure_callback"。
这里的优势是所有任务都会调用这个函数,你只需要在DAG中定义它一次。
def log(context): print(context["ti"].task_id) with DAG( dag_id="test_dag", schedule_interval=None, default_args={ "start_date": datetime(2022, 1, 1), "on_execute_callback": log, "catchup": False, }, ) as dag:
-
另一种方法是,在每个任务上可以调用"pre_execute"和/或"post_execute",例如,如果你想对xcom执行某些操作。
在这个示例中,我将所有JSON键作为单独的键推送到xcom。
def push_json_xcom(context, result): ti = context["ti"] data = result if isinstance(result, dict) else json.loads(result) for key, value in data.items(): ti.xcom_push(key, value) PythonOperator(task_id="test_id", post_execute=push_json_xcom, python_callable=python_func)
英文:
Few option that I think can help in this case :
- you can set in the dag default_args "on_execute_callback" that each task before it starts will call this function or "on_success_callback"/"on_failure_callback" when it ends.
the advantage here is that all tasks will call this function and you need to defined it once per dag.
def log(context):
print(context["ti"].task_id)
with DAG(
dag_id="test_dag",
schedule_interval=None,
default_args={
"start_date": datetime(2022, 1, 1),
"on_execute_callback": log,
"catchup": False,
},
) as dag:
2.another way is, on each task you can call pre_execute and/or post_execute, for example if you want to do something with the xcom.
in this example I push all json keys to xcom as separate key
def push_json_xcom(context, result):
ti = context["ti"]
data = result if isinstance(result, dict) else json.loads(result)
for key, value in data.items():
ti.xcom_push(key, value)
PythonOperator(task_id="test_id", post_execute=push_json_xcom, python_callable=python_func)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论