Copy (以增量方式) 的追加式增量表,该表位于 JDBC (SQL) 中。

huangapple go评论66阅读模式
英文:

Copy (in delta format) of an append-only incremental table that is in JDBC (SQL)

问题

我的最终目标是拥有一个以增量追加方式记录的 JDBC(SQL)表的副本(以delta格式)。

我有一个批处理过程,使用spark.read从增量追加的JDBC(SQL)表中读取数据(因为JDBC源不支持.readStream)。每天,将最新一天的数据保存为delta表。请注意,这不是一个仅追加的delta表 - 相反,它每天都会被最新一天的数据覆盖。

我认为接下来我需要的是从delta表中使用spark.readStream,并设置.option("skipChangeCommits", "true")。Databricks文档这里详细说明了这一点。

在我的DLT管道中,最终流式表的代码如下:

@table(
    name='streaming table',
)
def create_df():
    return spark.readStream.option("skipChangeCommits", "true").table("daily_batch")

然而,这里出现了错误NameError: name 'table' is not defined, None, Map(), Map(), List(), List(), Map())

如果文档中有拼写错误,我也尝试过使用dlt.table,但出现了以下错误:

pyspark.errors.exceptions.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view "daily_batch" cannot be found. Verify the spelling and correctness of the schema and catalog.

英文:

My ultimate goal is to have a copy (in delta format) of an append-only incremental table that is in JDBC (SQL).

I have a batch process reading from the incremental append-only JDBC (SQL) table, with spark.read (since .readStream is not supported for JDBC sources). Every day, the most recent day of data is saved as a delta table. Note, this is not an append-only delta table - rather it is overwritten every day with the most recent day of data.

What I think I need next is spark.readStream out of the delta table, with the .option("skipChangeCommits", "true"). The Databricks Documentation here outlines exactly this.

I have selected Preview channel in pipeline settings.

The code for the final streaming table in my DLT pipeline is:

@table(
    name='streaming table',
)
def create_df():
    return spark.readStream.option("skipChangeCommits", "true").table("daily_batch")

However the error here is NameError: name 'table' is not defined,None,Map(),Map(),List(),List(),Map())

In case it is a typo in the documentation I have also tried with dlt.table and the error is:

pyspark.errors.exceptions.AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view "daily_batch" cannot be found. Verify the spelling and correctness of the schema and catalog.

答案1

得分: 1

  • 请使用 @dlt.table 而不是 @table(直到你导入了 from dlt import table
  • 表名不要使用 daily_batch,而要使用完全合格的名称 - 三个级别(catalog.schema.table)或两个级别(schema.table)(取决于是否使用 Unity Catalog)

但你真的确定这会帮助你吗?文档中说:

> skipChangeCommits 完全忽略了文件更改操作。由于数据更改操作(例如 UPDATE、MERGE INTO、DELETE 和 OVERWRITE)而在源表中重写的数据文件将被完全忽略。为了反映上游源表的更改,你必须实现单独的逻辑来传播这些更改。

现在你可以使用略有不同的方法(尚未经过测试,但应该可以工作) - 不要覆盖数据,而是截断并追加。

英文:

Few things:

  • instead of @table use @dlt.table (until you did from dlt import table)
  • instead of daily_batch as table name you need to use fully-qualified name - three (catalog.schema.table) or two levels (schema.table) (depends on if you use Unity Catalog or not)

But are really sure that it will help you? The doc says:

> skipChangeCommits disregards file changing operations entirely. Data files that are rewritten in the source table due to data changing operation such as UPDATE, MERGE INTO, DELETE, and OVERWRITE are ignored entirely. In order to reflect changes in upstream source tables, you must implement separate logic to propagate these changes.

Right now you may use a bit different approach (not tested, but should work) - not overwrite the data, but truncate & then append.

答案2

得分: 0

我的解决方案是简单地使用常规作业,而不是DLT。这样做可以简化很多事情,因为不需要中间表格。它只是:

[JDBC增量SQL] --Databricks作业--> [delta表以追加模式写入]。

我将水印保存为delta表中的元数据,使用以下代码:

.option("userMetadata", watermark.strftime("%Y-%m-%dT%H:%M:%S"))

并在每个后续批次运行时读取它:

dt = delta.tables.DeltaTable.forPath(spark, path)
watermark = dt.history().select(F.col("userMetadata")).first()[0]

我使用spark.read.format("sqlserver")来从JDBC服务器查询。

英文:

My solution to this was to simply use a regular job, not DLT. This simplifies a lot, as there is no need for an intermediate table. It was just:

[JDBC incremental SQL] --Databricks-job--> [delta table write with append only mode].

I saved the watermark as metadata in the delta table with

.option("userMetadata", watermark.strftime("%Y-%m-%dT%H:%M:%S"))`

And read it with each subsequent batch run:

dt = delta.tables.DeltaTable.forPath(spark, path)
watermark = dt.history().select(F.col("userMetadata")).first()[0]

I used spark.read.format("sqlserver") to query from the JDBC server.

huangapple
  • 本文由 发表于 2023年7月20日 12:59:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/76726786.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定