Airflow PostgresOperator 报告插入/更新/删除的数量。

huangapple go评论67阅读模式
英文:

airflow PostgresOperator report number of inserts/updates/deletes

问题

我正在探索用Apache Airflow替换我们自己构建的SQL文件编排框架。

我们目前在执行时间、历史记录和记录数(INSERTED/UPDATED/DELETED)上有详尽的日志记录。前两者由Airflow标准日志支持,但我找不到记录操作结果计数的方法。

如何记录这些数据呢?最好是通过SQL文件,然后如何将它们可视化在一个漂亮的图表中?

我的简单示例DAG如下:

with DAG(
    dag_id="postgres_operator_dag",
    start_date=datetime.datetime(2023, 2, 2),
    schedule_interval=None,
    catchup=False,
) as dag:

    proc_r = PostgresOperator(task_id='proc_r',
                             postgres_conn_id='postgres_dbad2a',
                             sql=['001-test.sql', '002-test.sql'])

    proc_r
英文:

I'm exploring replacing our home-build SQL file orchestration framework with apache airflow.

We currently have extensive logging on execution time, history and number of records INSERTED/UPDATED/DELETED. The first two are supported by Airflow standard logging, however, I could not find a way to log the resulting counts of the operations.

What would be the way to log these? Preferably by sql file? And how make them visible in a nice graph?

My simple exmaple DAG looks like this:

with DAG(
    dag_id="postgres_operator_dag",
    start_date=datetime.datetime(2023, 2, 2),
    schedule_interval=None,
    catchup=False,
) as dag:

    proc_r= PostgresOperator(task_id='proc_r',
                             postgres_conn_id='postgres_dbad2a',
                             sql=['001-test.sql','002-test.sql'])


    proc_r

答案1

得分: 3

First, PostgresOperator is deprecated. You should use SQLExecuteQueryOperator (see source code).

I raised a PR to address this which is expected to be released in the next version of apache-airflow-providers-common-sql.

For apache-airflow-providers-common-sql>1.3.4:

SQLExecuteQueryOperator(
...,
show_return_value_in_logs=True
)

For apache-airflow-providers-common-sql<=1.3.4:

The operator does not support printing to log, it can only push to xcom the result value. You can handle it by writing a custom operator (Noting: It require to override a private function which is risky! so use this with judgment)

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

class MySQLExecuteQueryOperator(SQLExecuteQueryOperator):
def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]:
self.log.info("result is: %s", results)
return results

Running:

MySQLExecuteQueryOperator(task_id='some_sql',
conn_id='postgres_default',
sql="SELECT 4*5"
)

Will give:
Airflow PostgresOperator 报告插入/更新/删除的数量。

英文:

First, PostgresOperator is deprecated. You should use SQLExecuteQueryOperator (see source code).

I raised a PR to address this which is expected to be released in next version of apache-airflow-providers-common-sql.

For apache-airflow-providers-common-sql>1.3.4:

SQLExecuteQueryOperator(
    ...,
    show_return_value_in_logs=True
)

For apache-airflow-providers-common-sql<=1.3.4:

The operator does not support printing to log, it can only push to xcom the result value. You can handle it by writing a custom operator (Noting: It require to override a private function which is risky! so use this with judgment)

from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

class MySQLExecuteQueryOperator(SQLExecuteQueryOperator):
    def _process_output(self, results: list[Any], descriptions: list[Sequence[Sequence] | None]) -> list[Any]:
        self.log.info("result is: %s", results)
        return results

Running:

MySQLExecuteQueryOperator(task_id='some_sql',
                          conn_id='postgres_default',
                          sql="SELECT 4*5"
                          )

Will give:
Airflow PostgresOperator 报告插入/更新/删除的数量。

huangapple
  • 本文由 发表于 2023年3月7日 19:02:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/75661144.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定