读取 Delta Lake 表的最高版本。

huangapple go评论50阅读模式
英文:

Reading highest version of delta lake table

问题

我正在使用由Airflow编排的批处理作业,在选定的Delta表上运行Spark转换。目前,我正在处理整个表,但我想切换到使用Delta CDC(我正在处理创建/更新/删除)的变更数据捕获流程。

我的想法是:

  1. 读取Delta表的最新版本
  2. 处理数据
  3. 将最新版本保存到存储桶中,以便将来使用

因此,在即将进行的运行中,我将首先读取最新版本,然后从存储中读取上一次运行的最新版本,并使用CDC来查询自上一次运行以来引入的更改。

为了使读取过程具有原子性,我正在使用以下代码片段:

deltaTable = DeltaTable.forPath(spark, pathToTable)

# 读取最新版本以便以后保存到GCP
latest_table_version = deltaTable.history(1).collect()[0]['version']

previous_table_version = my_read_previous_table_version_from_gcp_fcn()

# 如果在这两行之间引入了任何更改,将endingVersion设置为最新版本
df = (
   spark.read.format("delta")
   .option("readChangeFeed", "true")
   .option("startingVersion", previous_table_version )
   .option("endingVersion", latest_table_version )
   .load(pathToTable)
)

output_df = df.transform(my_transform_fcn)
save_output_fcn(output_df)

overwrite_previous_table_version_on_gcp(value=previous_table_version)

它可以工作,但感觉有点像一种权宜之计。我是否使用了正确的方法?

英文:

I'm having batch jobs orchiestrated by airflow that runs spark transoformations on selected delta tables. Currently i'm processing the whole table and i want to switch to change data capture flow ultilizing delta cdf (i'm processing Create/Update/Delete)

My idea was to:

  1. Read latest version of delta table
  2. Process the data
  3. Save the latest version on the bucket to use it in the future runs

So in upcoming runs i will first read latest version, then read latest version from previous run from the storage and use cdf to query only the changes that was introduced since the previous run.

To make read process atomic, i'm using following code snippet:

deltaTable = DeltaTable.forPath(spark, pathToTable)

# Reading latest version to be able to save it on gcp later
latest_table_version = deltaTable.history(1).collect()[0]['version']

previous_table_version = my_read_previous_table_version_from_gcp_fcn()

# Setting endingVersion to latest verison in case any changes are intoduced between those lines
df = (
   spark.read.format("delta")
   .option("readChangeFeed", "true")
   .option("startingVersion", previous_table_version )
   .option("endingVersion", latest_table_version )
   .load(pathToTable)
)

output_df = df.transform(my_transform_fcn)
save_output_fcn(output_df)

overwrite_previous_table_version_on_gcp(value=previous_table_version)

It works but feels little bit hackish. Am I using the correct approach?

答案1

得分: 1

是的,这确实不是最佳的方法。它可能取决于您正在执行的转换逻辑,但实际上您正在重新发明一种已经由Spark Structured Streaming提供的功能 - 它会跟踪最新处理的版本,并重新处理新数据。

Delta Lake具有对结构化流的本机支持,因此您可以极大地简化代码,只需确保在某处保留检查点,以便Spark能够记住已处理的版本。

df = (
   spark.readStream.format("delta")
   .load(pathToTable)
)

output_df = df.transform(my_transform_fcn)
stream = (output_df.writeStream.format("delta")
  .option("checkpointLocation", "path-to-checkpoint")
  .trigger(availableNow=True)
  .start("outputPath")
  )
stream.awaitTermination()

.trigger(availableNow=True)将启动流程来处理所有新数据,然后停止。否则,它将持续运行。

默认情况下,它将重新处理所有数据,但如果您想保留已经处理的数据,可以在readStream中指定startingVersion选项。

英文:

Yes, it's really not the best approach to do that. It may depend on the logic of transformations you're doing, but you're really re-inventing a functionality that is by the Spark Structured Streaming out of the box - it will track the latest processed version(s) and will reprocess what is new.

Delta Lake has native support for structured streaming, so you can greatly simplify the code, you just need to make sure that you keep a checkpoint somewhere, so Spark will be able to remember what versions were processed already.

df = (
   spark.readStream.format("delta")
   .load(pathToTable)
)

output_df = df.transform(my_transform_fcn)
stream = (output_df.writeStream.format("delta")
  .option("checkpointLocation", "path-to-checkpoint")
  .trigger(availableNow=True)
  .start(outputPath")
  )
stream.awaitTermination()

.trigger(availableNow=True) will start the stream process all new data, and stop it. Otherwise it will run continuously.

By default it will reprocess all data from the beginning, but you can specify the startingVersion option in the readStream if you want to keep data that you already processed.

huangapple
  • 本文由 发表于 2023年6月30日 03:24:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76584068.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定