Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

huangapple go评论60阅读模式
英文:

Azure Synapse, Data Duplication Issue in PySpark When Reading and Writing to Delta Lake

问题

我正在开发一个PySpark数据管道,该管道从Azure Data Lake Storage Gen2的“bronze”层读取Salesforce用户数据,对数据进行转换,然后将其写入Delta Lake的“silver”层。我面临的问题是,尽管我在PySpark中使用了dropDuplicates(["Id"])函数来根据“Id”列删除任何重复的记录,但我仍然在我的“silver”层中遇到重复的“Id”记录。即使反复运行管道,这个重复问题仍然存在,并且对依赖于这些数据的下游分析和流程造成了重大问题。

为了处理可能存在的重复记录问题,我在我的PySpark脚本中包含了dropDuplicates(["Id"])函数。此函数在联合操作之后使用,该操作将现有的“silver”数据与新的“bronze”数据组合在一起。我期望这个函数会确保在将组合数据集写回Delta Lake的“silver”层之前,会删除任何重复的记录(基于“Id”列)。

以下是相关代码的摘录:

# 将bronze和silver数据组合并根据“Id”列进行去重
deduplicated_df = bronze_df.union(silver_df).dropDuplicates(["Id"])

# 将去重后的数据写入silver层
deduplicated_df.write.format("delta").mode("overwrite").save(prod_user_silver_path)

运行管道3次后的示例数据:

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

我期望的结果是一个没有重复“Id”记录的“silver”层数据集。然而,实际情况是,尽管使用了dropDuplicates(["Id"]),但我在“silver”层中看到了重复的“Id”条目。这种我期望和实际结果之间的不一致是我试图解决的核心问题。

有何想法?

英文:

I'm working on a PySpark data pipeline that reads Salesforce User data from an Azure Data Lake Storage Gen2 "bronze" layer, transforms the data, and then writes it to a "silver" layer in Delta Lake. The problem I'm facing is that, despite using the dropDuplicates(["Id"]) function in PySpark to remove any duplicate records based on the "Id" column, I'm still encountering duplicate "Id" records in my "silver" layer. This duplication issue persists even after repeated runs of the pipeline, and it's causing significant problems in downstream analyses and processes that rely on this data.

To handle the potential issue of duplicate records, I included the dropDuplicates(["Id"]) function in my PySpark script. This function is used after the union operation, which combines the existing "silver" data with the new "bronze" data. My expectation was that this function would ensure that any duplicate records (based on the "Id" column) would be removed from the combined dataset before it is written back to the "silver" layer in Delta Lake.

Here is a snippet of the relevant code:

# Combine the bronze and silver data and deduplicate based on the "Id" column
deduplicated_df = bronze_df.union(silver_df).dropDuplicates(["Id"])

# Write the deduplicated data to the silver layer
deduplicated_df.write.format("delta").mode("overwrite").save(prod_user_silver_path)

Sample data after running pipelines 3x:

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

The result I was expecting was a "silver" layer dataset with no duplicate "Id" records. However, what I'm actually seeing is duplicate "Id" entries in the "silver" layer despite using dropDuplicates(["Id"]). This discrepancy between my expected and actual outcomes is the core issue I'm trying to resolve.

Any thoughts?

答案1

得分: 0

我已在我的环境中复制了您的代码,它成功运行。

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

去重后。

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

对于您仍然在最终表中获得重复项的情况,以下是可能的原因。

  1. 数据类型不匹配

打印两个数据框的模式。

bronze_df.printSchema()
silver_df.printSchema()

如果这些列的数据类型不匹配,请使用以下代码。

from pyspark.sql.functions import col

bronze_df = bronze_df.withColumn("Id", col("Id").cast("string"))
silver_df = silver_df.withColumn("Id", col("Id").cast("string"))

去重后的数据框 = bronze_df.union(silver_df).dropDuplicates(["Id"])
  1. 数据分区
    即使在整个数据集中去重,重复的记录可能仍存在于单个分区中。

您可以使用以下代码解决此问题。

去重后的数据框 = bronze_df.union(silver_df).repartition("Id").dropDuplicates(["Id"])
  1. 数据中的空格

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

代码:

sp_df.withColumn("Id",trim(col("Id"))).dropDuplicates(["Id"]).show()

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

如果按照以上方式仍未获得预期结果,请尝试以下方法。

bronze_df.union(silver_df).groupBy(col("Id")).agg(first(col("name")))

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

英文:

I have reproduced your code in my environment, it worked successfully.

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

After dropping duplicates.

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

For you it still getting duplicates in end table, below are the possible causes.

  1. Data type mismatch.

Print schema of both dataframe.

bronze_df.printSchema()
silver_df.printSchema()

If there is mismatch between these columns datatype use below code.

from pyspark.sql.functions import col

bronze_df = bronze_df.withColumn("Id", col("Id").cast("string"))
silver_df = silver_df.withColumn("Id", col("Id").cast("string"))

deduplicated_df = bronze_df.union(silver_df).dropDuplicates(["Id"])

2.Data partitioning
Duplicate records might exist in single partition even though you deduplicate overall dataset.

you can use below code for above issue.

deduplicated_df = bronze_df.union(silver_df).repartition("Id").dropDuplicates(["Id"])
  1. Spaces in the data.

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

code:

sp_df.withColumn("Id",trim(col("Id"))).dropDuplicates(["Id"]).show()

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

If you still not getting expected results by following above ways.
Try this.

bronze_df.union(silver_df).groupBy(col("Id")).agg(first(col("name")))

Azure Synapse,在 PySpark 读写 Delta Lake 时存在数据重复问题。

答案2

得分: 0

感谢您的回复和提供的建议。我感谢您的帮助。虽然我在按照您的建议后在数据框中获得了正确的输出,但在在PowerBi中使用数据时仍然遇到了重复项。然而,最近我成功找到了解决此问题的方法。

为了解决问题,我修改了我的方法,将数据从Delta写回单个Parquet文件,覆盖了任何现有数据。然后,我将此Parquet文件存储在我的“gold”层中。这个额外的步骤确保了重复项被消除,并且我的“gold”层中的数据是干净的,没有任何重复的“Id”记录。

再次感谢您的帮助。

英文:

@jayashankarGS

Thank you for your response and the suggestions you provided. I appreciate your help. While I did get the correct output in the dataframe after following your suggestions, I still encountered duplicates when using the data in PowerBi. However, I was able to find a solution to this issue recently.

To resolve the problem, I modified my approach by writing the data from the Delta back to a single Parquet file, overwriting any existing data. I then stored this Parquet file in my "gold" layer. This additional step ensured that the duplicates were eliminated, and the data in my "gold" layer was clean and free of any duplicate "Id" records.

Thank you once again for your assistance.

huangapple
  • 本文由 发表于 2023年7月3日 19:22:57
  • 转载请务必保留本文链接:https://go.coder-hub.com/76604261.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定