英文:
Reading highest version of delta lake table
问题
我正在使用由Airflow编排的批处理作业,在选定的Delta表上运行Spark转换。目前,我正在处理整个表,但我想切换到使用Delta CDC(我正在处理创建/更新/删除)的变更数据捕获流程。
我的想法是:
- 读取Delta表的最新版本
- 处理数据
- 将最新版本保存到存储桶中,以便将来使用
因此,在即将进行的运行中,我将首先读取最新版本,然后从存储中读取上一次运行的最新版本,并使用CDC来查询自上一次运行以来引入的更改。
为了使读取过程具有原子性,我正在使用以下代码片段:
deltaTable = DeltaTable.forPath(spark, pathToTable)
# 读取最新版本以便以后保存到GCP
latest_table_version = deltaTable.history(1).collect()[0]['version']
previous_table_version = my_read_previous_table_version_from_gcp_fcn()
# 如果在这两行之间引入了任何更改,将endingVersion设置为最新版本
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", previous_table_version )
.option("endingVersion", latest_table_version )
.load(pathToTable)
)
output_df = df.transform(my_transform_fcn)
save_output_fcn(output_df)
overwrite_previous_table_version_on_gcp(value=previous_table_version)
它可以工作,但感觉有点像一种权宜之计。我是否使用了正确的方法?
英文:
I'm having batch jobs orchiestrated by airflow that runs spark transoformations on selected delta tables. Currently i'm processing the whole table and i want to switch to change data capture flow ultilizing delta cdf (i'm processing Create/Update/Delete)
My idea was to:
- Read latest version of delta table
- Process the data
- Save the latest version on the bucket to use it in the future runs
So in upcoming runs i will first read latest version, then read latest version from previous run from the storage and use cdf to query only the changes that was introduced since the previous run.
To make read process atomic, i'm using following code snippet:
deltaTable = DeltaTable.forPath(spark, pathToTable)
# Reading latest version to be able to save it on gcp later
latest_table_version = deltaTable.history(1).collect()[0]['version']
previous_table_version = my_read_previous_table_version_from_gcp_fcn()
# Setting endingVersion to latest verison in case any changes are intoduced between those lines
df = (
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", previous_table_version )
.option("endingVersion", latest_table_version )
.load(pathToTable)
)
output_df = df.transform(my_transform_fcn)
save_output_fcn(output_df)
overwrite_previous_table_version_on_gcp(value=previous_table_version)
It works but feels little bit hackish. Am I using the correct approach?
答案1
得分: 1
是的,这确实不是最佳的方法。它可能取决于您正在执行的转换逻辑,但实际上您正在重新发明一种已经由Spark Structured Streaming提供的功能 - 它会跟踪最新处理的版本,并重新处理新数据。
Delta Lake具有对结构化流的本机支持,因此您可以极大地简化代码,只需确保在某处保留检查点,以便Spark能够记住已处理的版本。
df = (
spark.readStream.format("delta")
.load(pathToTable)
)
output_df = df.transform(my_transform_fcn)
stream = (output_df.writeStream.format("delta")
.option("checkpointLocation", "path-to-checkpoint")
.trigger(availableNow=True)
.start("outputPath")
)
stream.awaitTermination()
.trigger(availableNow=True)
将启动流程来处理所有新数据,然后停止。否则,它将持续运行。
默认情况下,它将重新处理所有数据,但如果您想保留已经处理的数据,可以在readStream
中指定startingVersion
选项。
英文:
Yes, it's really not the best approach to do that. It may depend on the logic of transformations you're doing, but you're really re-inventing a functionality that is by the Spark Structured Streaming out of the box - it will track the latest processed version(s) and will reprocess what is new.
Delta Lake has native support for structured streaming, so you can greatly simplify the code, you just need to make sure that you keep a checkpoint somewhere, so Spark will be able to remember what versions were processed already.
df = (
spark.readStream.format("delta")
.load(pathToTable)
)
output_df = df.transform(my_transform_fcn)
stream = (output_df.writeStream.format("delta")
.option("checkpointLocation", "path-to-checkpoint")
.trigger(availableNow=True)
.start(outputPath")
)
stream.awaitTermination()
.trigger(availableNow=True)
will start the stream process all new data, and stop it. Otherwise it will run continuously.
By default it will reprocess all data from the beginning, but you can specify the startingVersion
option in the readStream
if you want to keep data that you already processed.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论