“Databricks Delta Lake 中的并发问题”

huangapple go评论59阅读模式
英文:

Concurrency issue in Databricks Delta lake

问题

我在Databricks Delta Lake中有一张审计表,包含四个字段:id、task_name、start_time和end_time。这张表的目的是捕获每个作业的开始和结束时间。然而,我目前在同时运行五个笔记本时遇到了并发问题,导致插入和更新时出现冲突。为解决更新并发问题,我已根据task_name字段对审计表进行了分区,但尚未测试。现在我遇到了并发行插入的困难。我正在寻找一种在不依赖Delta表的身份属性的情况下生成ID值的并发安全逻辑,因为Delta表中存在问题。

非常感谢您能提供的任何建议。

英文:

I have an audit table in Databricks Delta Lake with four fields: id, task_name, start_time, and end_time. The purpose of this table is to capture the start and end times of each job. However, I am currently facing concurrency issues when running five notebooks in parallel, resulting in conflicts during insertion and updating. To address the update concurrency problem, I have partitioned the audit table based on the task_name field and yet to test it. I am now encountering difficulties with concurrent row insertion. I am seeking a concurrent-safe logic for generating ID values without relying on the Delta table's identity property, as it presents issues in the Delta table.
I would greatly appreciate any suggestions you can provide.

答案1

得分: 1

即使你进行了分区,你仍然需要在特定分区值上设置条件,不仅仅是source.partition = dest.partition,它应该是source.partition = dest.partition AND dest.partition = 'job_name'。这在Delta Lake文档中有演示。但这将生成许多包含小文件的分区,这会影响访问数据时的性能。

但是,如果您切换到只追加的解决方案,您可以避免Delta表中的冲突,在这种解决方案中,您将按行追加开始和停止,然后在该表上创建一个视图以查找最新状态。类似这样:

  • 创建并填充包含示例数据的表:
create table if not exists process_statuses (
  run_id long,
  process string,
  status string,
  timestamp timestamp
);

insert into process_statuses(run_id, process, status, timestamp) values(1, 'test1', 'started', current_timestamp());
insert into process_statuses(run_id, process, status, timestamp) values(2, 'test2', 'started', current_timestamp());
insert into process_statuses(run_id, process, status, timestamp) values(1, 'test1', 'finished', current_timestamp());
  • 创建一个视图以获取所有作业或特定作业的最新状态:
create or replace view latest_process_status as (
  with cte as (
     select *, 
         (row_number() OVER (PARTITION BY run_id ORDER BY timestamp desc)) AS rn 
       from process_statuses)
  select * except(rn) from cte where rn = 1
)
英文:

Even if you do the partitioning, you still need to have a condition on the specific partition value, not only on source.partition = dest.partition - it should be source.partition = dest.partition AND dest.partition = 'job_name' . That's is demonstrated int he delta lake documentation. But this will generate quite many partitions with small files that will harm the performance when you access your data.

But you can avoid conflicts in the delta table if you switch to the append-only solution, where you will append starts & stops as individual rows, and then have a view on top of that table to find the latest status. Something like this:

  • Create and fill the table with sample data:
create table if not exists process_statuses (
  run_id long,
  process string,
  status string,
  timestamp timestamp
);

insert into process_statuses(run_id, process, status, timestamp) values(1, 'test1', 'started', current_timestamp());
insert into process_statuses(run_id, process, status, timestamp) values(2, 'test2', 'started', current_timestamp());
insert into process_statuses(run_id, process, status, timestamp) values(1, 'test1', 'finished', current_timestamp());
  • Create a view to get latest status of all jobs or specific job:
create or replace view latest_process_status as (
  with cte as (
     select *, 
         (row_number() OVER (PARTITION BY run_id ORDER BY timestamp desc)) AS rn 
       from process_statuses)
  select * except(rn) from cte where rn = 1
)

huangapple
  • 本文由 发表于 2023年6月13日 17:12:41
  • 转载请务必保留本文链接:https://go.coder-hub.com/76463357.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定