性能和数据完整性问题与Hudi用于长期数据保留

huangapple go评论63阅读模式
英文:

Performance and Data Integrity Issues with Hudi for Long-Term Data Retention

问题

我们的项目要求我们每天执行完整加载,并保留这些版本以供将来查询。在实施 Hudi 来维护以下设置的 6 年数据后:

"hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
"hoodie.cleaner.hours.retained": "52560", # 24 小时 * 365 天 * 6 年

我们观察到,在大约 30 次运行后,数据完整性出现了妥协。在读取期间,数据的版本混淆并产生重复记录,导致我们的 DataLake (S3) 中出现一系列重要的问题,因为其他脚本使用这些表。

为了解决这些问题,我们对提交的最大和最小数量进行了调整,应用了以下配置,如问题 #7600 中所引用:

"hoodie.keep.max.commits": "2300", # (365 天 * 6 年) + delta
"hoodie.keep.min.commits": "2200", # (365 天 * 6 年) + delta2

然而,随着时间的推移,这种解决方案变得过于昂贵。我们模拟了多次运行脚本,按天分区,对于一个小表的数据来说,差异和写入成本都显著增加。在 1 年内,脚本的平均运行时间从 00m:25s 增加到了 02m:30s。由于我们需要保留 6 年的历史记录,这个处理时间很可能会进一步增加。

复制

按照以下说明重现这种行为:

  1. 创建示例数据框:
data = [
    Row(SK=-6698625589789238999, DSC='A', COD=1), 
    Row(SK=8420071140774656230,  DSC='B', COD=2), 
    Row(SK=-8344648708406692296, DSC='C', COD=4), 
    Row(SK=504019808641096632,   DSC='D', COD=5), 
    Row(SK=-233500712460350175,  DSC='E', COD=6), 
    Row(SK=2786828215451145335,  DSC='F', COD=7), 
    Row(SK=-8285521376477742517, DSC='G', COD=8), 
    Row(SK=-2852032610340310743, DSC='H', COD=9), 
    Row(SK=-188596373586653926,  DSC='I', COD=10), 
    Row(SK=890099540967675307,   DSC='J', COD=11), 
    Row(SK=72738756111436295,    DSC='K', COD=12), 
    Row(SK=6122947679528380961,  DSC='L', COD=13), 
    Row(SK=-3715488255824917081, DSC='M', COD=14), 
    Row(SK=7553013721279796958,  DSC='N', COD=15)
]
dataframe = spark.createDataFrame(data)
  1. 使用以下 Hudi 配置:
hudi_options = {
    "hoodie.table.name": "example_hudi",
    "hoodie.datasource.write.recordkey.field": "SK",
    "hoodie.datasource.write.table.name": "example_hudi",
    "hoodie.datasource.write.operation": "insert_overwrite_table",
    "hoodie.datasource.write.partitionpath.field": "LOAD_DATE",
    "hoodie.datasource.hive_sync.database": "default",
    "hoodie.datasource.hive_sync.table": "example_hudi",
    "hoodie.datasource.hive_sync.partition_fields": "LOAD_DATE",
    "hoodie.cleaner.policy": "KEEP_LATEST_BY_HOURS",
    "hoodie.cleaner.hours.retained": "52560",
    "hoodie.keep.max.commits": "2300", 
    "hoodie.keep.min.commits":"2200",  
    "hoodie.datasource.write.precombine.field":"",
    "hoodie.datasource.hive_sync.partition_extractor_class":"org.apache.hudi.hive.MultiPartKeysValueExtractor",
    "hoodie.datasource.hive_sync.enable":"true",
    "hoodie.datasource.hive_sync.use_jdbc":"false",
    "hoodie.datasource.hive_sync.mode":"hms",
}
  1. 现在,写入日期范围:
date = datetime.strptime('2023-06-02', '%Y-%m-%d')  # 初始日期 (yyyy-mm-dd)
final_date = datetime.strptime('2023-11-01', '%Y-%m-%d')  # 最终日期 (yyyy-mm-dd)
while date <= final_date:
    dataframe = dataframe.withColumn("LOAD_DATE", to_date(lit(date.strftime('%Y-%m-%d'))))
    dataframe.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(basePath)
    date += timedelta(days=1)
  1. 然后,分析每次加载之间消耗的时间,注意时间逐渐增长。如果增长继续以这个速度进行,时间将变得难以管理,因为有比示例表大得多的表。

预期行为

我们期望:

  • 在完成 30 次提交后不会出现重复的文件。
  • 执行时间不会随时间显著增加。
  • 元数据将遵循由 hoodie.cleaner.policy KEEP_LATEST_BY_HOURS 属性确定的行为。

环境

  • Hudi 版本:0.12.2
  • Spark 版本:3.3.1
  • Hive 版本:3.1.3
  • 存储:S3 (EMRFS)
  • 平台:AWS EMR
英文:

Our project requires that we perform full loads daily, retaining these versions for future queries. Upon implementing Hudi to maintain 6 years of data with the following setup:

&quot;hoodie.cleaner.policy&quot;: &quot;KEEP_LATEST_BY_HOURS&quot;,
&quot;hoodie.cleaner.hours.retained&quot;: &quot;52560&quot;, # 24 hours * 365 days * 6 years

We observed, after about 30 runs, a compromise in data integrity. During reading, the versions of data mix up and produce duplicate records, causing a series of significant issues in our DataLake (S3), since these tables are used by other scripts.

To solve these problems, we made adjustments for the maximum and minimum amount of commits, applying the following configurations, as referenced in the issue #7600:

&quot;hoodie.keep.max.commits&quot;: &quot;2300&quot;, # (365 days * 6 years) + delta
&quot;hoodie.keep.min.commits&quot;: &quot;2200&quot;, # (365 days * 6 years) + delta2

However, this solution becomes excessively costly over time. We simulated running the scripts multiple times, partitioning by day, and both the difference and the writing cost grew significantly for a small table over a year of data. In 1 year, the average runtime for a script went from 00m:25s to 02m:30s. As we need to keep 6 years of history, this processing time tends to scale even more.

Replication

Follow the instructions below to reproduce the behavior:

  1. Create the example dataframe:
data = [
    Row(SK=-6698625589789238999, DSC=&#39;A&#39;, COD=1), 
    Row(SK=8420071140774656230,  DSC=&#39;B&#39;, COD=2), 
    Row(SK=-8344648708406692296, DSC=&#39;C&#39;, COD=4), 
    Row(SK=504019808641096632,   DSC=&#39;D&#39;, COD=5), 
    Row(SK=-233500712460350175,  DSC=&#39;E&#39;, COD=6), 
    Row(SK=2786828215451145335,  DSC=&#39;F&#39;, COD=7), 
    Row(SK=-8285521376477742517, DSC=&#39;G&#39;, COD=8), 
    Row(SK=-2852032610340310743, DSC=&#39;H&#39;, COD=9), 
    Row(SK=-188596373586653926,  DSC=&#39;I&#39;, COD=10), 
    Row(SK=890099540967675307,   DSC=&#39;J&#39;, COD=11), 
    Row(SK=72738756111436295,    DSC=&#39;K&#39;, COD=12), 
    Row(SK=6122947679528380961,  DSC=&#39;L&#39;, COD=13), 
    Row(SK=-3715488255824917081, DSC=&#39;M&#39;, COD=14), 
    Row(SK=7553013721279796958,  DSC=&#39;N&#39;, COD=15)
]
dataframe = spark.createDataFrame(data)
  1. With the following Hudi configuration:
hudi_options = {
    &quot;hoodie.table.name&quot;: &quot;example_hudi&quot;,
    &quot;hoodie.datasource.write.recordkey.field&quot;: &quot;SK&quot;,
    &quot;hoodie.datasource.write.table.name&quot;: &quot;example_hudi&quot;,
    &quot;hoodie.datasource.write.operation&quot;: &quot;insert_overwrite_table&quot;,
    &quot;hoodie.datasource.write.partitionpath.field&quot;: &quot;LOAD_DATE&quot;,
    &quot;hoodie.datasource.hive_sync.database&quot;: &quot;default&quot;,
    &quot;hoodie.datasource.hive_sync.table&quot;: &quot;example_hudi&quot;,
    &quot;hoodie.datasource.hive_sync.partition_fields&quot;: &quot;LOAD_DATE&quot;,
    &quot;hoodie.cleaner.policy&quot;: &quot;KEEP_LATEST_BY_HOURS&quot;,
    &quot;hoodie.cleaner.hours.retained&quot;: &quot;52560&quot;,
    &quot;hoodie.keep.max.commits&quot;: &quot;2300&quot;, 
    &quot;hoodie.keep.min.commits&quot;:&quot;2200&quot;,  
    &quot;hoodie.datasource.write.precombine.field&quot;:&quot;&quot;,
    &quot;hoodie.datasource.hive_sync.partition_extractor_class&quot;:&quot;org.apache.hudi.hive.MultiPartKeysValueExtractor&quot;,
    &quot;hoodie.datasource.hive_sync.enable&quot;:&quot;true&quot;,
    &quot;hoodie.datasource.hive_sync.use_jdbc&quot;:&quot;false&quot;,
    &quot;hoodie.datasource.hive_sync.mode&quot;:&quot;hms&quot;,
}
  1. Now, write the date range:
date = datetime.strptime(&#39;2023-06-02&#39;, &#39;%Y-%m-%d&#39;)  # Initial date (yyyy-mm-dd)
final_date = datetime.strptime(&#39;2023-11-01&#39;, &#39;%Y-%m-%d&#39;)  # Final date (yyyy-mm-dd)
while date &lt;= final_date:
    dataframe = dataframe.withColumn(&quot;LOAD_DATE&quot;, to_date(lit(date.strftime(&#39;%Y-%m-%d&#39;))))
    dataframe.write.format(&quot;hudi&quot;). \
        options(**hudi_options). \
        mode(&quot;append&quot;). \
        save(basePath)
    date += timedelta(days=1)
  1. After this, analyze the time consumed between each load to notice the progressive growth of time. If the increase continues at this rate, the time will become unmanageable, since there are tables much larger than the example one.

Expected behavior

We expected:

  • No duplicate files would emerge after the completion of the 30 commits.
  • Execution time would not increase significantly over time.
  • Metadata would follow the behavior determined by the hoodie.cleaner.policy KEEP_LATEST_BY_HOURS attribute.

Environment

  • Hudi Version: 0.12.2
  • Spark Version: 3.3.1
  • Hive Version: 3.1.3
  • Storage: S3 (EMRFS)
  • Platform: AWS EMR

答案1

得分: 0

根据此GitHub问题讨论,由于特定的Hudi配置和使用模式,预计会存在性能权衡。

当我们修改最小和最大提交值时,Hudi必须加载相应数量的提交来执行索引查找,这是处理更新操作所必需的过程。当最小和最大提交计数较小时,Hudi仅加载最近的30个提交进行索引查找。因此,如果更新操作针对的记录在30个提交之前被摄入,可能会出现重复条目。

然而,当我们提高最小和最大提交计数时,Hudi被迫加载更多的提交进行索引查找,导致执行时间增加,因为需要进行必要的洗牌操作。这种情况展示了我们需要考虑的权衡:较大的提交计数可以提高数据完整性,但也会延长执行时间。

英文:

Based on the discussion in this GitHub issue, the performance trade-off is anticipated due to the specific Hudi configuration and usage pattern.

When we modify the minimum and maximum commit values, Hudi has to load the equivalent number of commits to perform index lookups, a process essential for handling updates. With a smaller minimum and maximum commit count, Hudi only loaded the most recent 30 commits for index lookup. Consequently, if an update operation targeted a record ingested over 30 commits ago, duplicate entries might emerge.

However, as we raise the minimum and maximum commit counts, Hudi is forced to load more commits for index lookup, triggering an increase in execution time due to the necessary shuffling operation. This scenario demonstrates a trade-off we need to consider: while larger commit counts boost data integrity, they also extend execution times.

huangapple
  • 本文由 发表于 2023年5月29日 22:11:42
  • 转载请务必保留本文链接:https://go.coder-hub.com/76358068.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定