英文:
airflow PostgresOperator report number of inserts/updates/deletes
问题
我正在探索用Apache Airflow替换我们自己构建的SQL文件编排框架。
我们目前在执行时间、历史记录和记录数(INSERTED
/UPDATED
/DELETED
)上有详尽的日志记录。前两者由Airflow标准日志支持,但我找不到记录操作结果计数的方法。
如何记录这些数据呢?最好是通过SQL文件,然后如何将它们可视化在一个漂亮的图表中?
我的简单示例DAG如下:
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2023, 2, 2),
schedule_interval=None,
catchup=False,
) as dag:
proc_r = PostgresOperator(task_id='proc_r',
postgres_conn_id='postgres_dbad2a',
sql=['001-test.sql', '002-test.sql'])
proc_r
英文:
I'm exploring replacing our home-build SQL file orchestration framework with apache airflow.
We currently have extensive logging on execution time, history and number of records INSERTED
/UPDATED
/DELETED
. The first two are supported by Airflow standard logging, however, I could not find a way to log the resulting counts of the operations.
What would be the way to log these? Preferably by sql file? And how make them visible in a nice graph?
My simple exmaple DAG looks like this:
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2023, 2, 2),
schedule_interval=None,
catchup=False,
) as dag:
proc_r= PostgresOperator(task_id='proc_r',
postgres_conn_id='postgres_dbad2a',
sql=['001-test.sql','002-test.sql'])
proc_r
答案1
得分: 3
First, PostgresOperator
is deprecated. You should use SQLExecuteQueryOperator
(see source code).
I raised a PR to address this which is expected to be released in the next version of apache-airflow-providers-common-sql
.
For apache-airflow-providers-common-sql>1.3.4
:
SQLExecuteQueryOperator(
...,
show_return_value_in_logs=True
)
For apache-airflow-providers-common-sql<=1.3.4
:
The operator does not support printing to log, it can only push to xcom the result value. You can handle it by writing a custom operator (Noting: It require to override a private function which is risky! so use this with judgment)
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
class MySQLExecuteQueryOperator(SQLExecuteQueryOperator):
def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]:
self.log.info("result is: %s", results)
return results
Running:
MySQLExecuteQueryOperator(task_id='some_sql',
conn_id='postgres_default',
sql="SELECT 4*5"
)
英文:
First, PostgresOperator
is deprecated. You should use SQLExecuteQueryOperator
(see source code).
I raised a PR to address this which is expected to be released in next version of apache-airflow-providers-common-sql
.
For apache-airflow-providers-common-sql>1.3.4
:
SQLExecuteQueryOperator(
...,
show_return_value_in_logs=True
)
For apache-airflow-providers-common-sql<=1.3.4
:
The operator does not support printing to log, it can only push to xcom the result value. You can handle it by writing a custom operator (Noting: It require to override a private function which is risky! so use this with judgment)
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
class MySQLExecuteQueryOperator(SQLExecuteQueryOperator):
def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]:
self.log.info("result is: %s", results)
return results
Running:
MySQLExecuteQueryOperator(task_id='some_sql',
conn_id='postgres_default',
sql="SELECT 4*5"
)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论