“Databricks DLT pipeline with for..loop reports error ‘AnalysisException: Cannot redefine dataset'”

huangapple go评论67阅读模式
英文:

Databricks DLT pipeline with for..loop reports error "AnalysisException: Cannot redefine dataset"

问题

我有以下代码对于单个表运行良好但当我尝试使用for..loop()处理数据库中的所有表时出现错误"AnalysisException: 无法重新定义数据集'source_ds',Map(),Map(),List(),List(),Map())"

我需要将表名传递给source_ds以便根据键和sequence_columns处理CDC请提供任何帮助/建议

英文:

I have the following code which works fine for a single table. But when I try to use a for..loop() to process all the tables in my database, I am getting the error, "AnalysisException: Cannot redefine dataset 'source_ds',Map(),Map(),List(),List(),Map())".

I need to pass the table name to source_ds so as to process CDC based on key & sequence_columns. Appreciate any help/suggestions please.

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
raw_db_name = "raw_db"

def generate_silver_tables(target_table, source_table, keys_col_list):

 @dlt.table
 def source_ds():
        return spark.table(f"{raw_db_name}.{source_table}")

  ### Create the target table definition
 dlt.create_target_table(name=target_table,
 comment= f"Clean, merged {target_table}",
 #partition_cols=["topic"],
 table_properties={
   "quality": "silver",
   "pipelines.autoOptimize.managed": "true"
 }
 )
  
 ## Do the merge
 dlt.apply_changes(
   target = target_table,
   source = "source_ds",
   keys = keys_col_list,
   apply_as_deletes = expr("operation = 'DELETE'"),
   sequence_by = col("ts_ms"),
   ignore_null_updates = False,
   except_column_list = ["operation", "timestamp_ms"],
   stored_as_scd_type = "1"
 )
 return

# THIS WORKS FINE
#---------------
# raw_dbname = "raw_db"
# raw_tbl_name = 'raw_table'
# processed_tbl_name = raw_tbl_name.replace("raw", "processed")
# generate_silver_tables(processed_tbl_name, raw_tbl_name)


table_list = spark.sql(f"show tables in landing_db ").collect()
for row in table_list:
    landing_tbl_name = row.tableName
    s2 = spark.sql(f"select key from {landing_db_name}.{landing_tbl_name} limit 1")
    keys_col_list = list(json.loads(s2.collect()[0][0]).keys())
    raw_tbl_name = landing_tbl_name.replace("landing", "raw")
    processed_tbl_name = landing_tbl_name.replace("landing", "processed")
    generate_silver_tables(processed_tbl_name, raw_tbl_name, keys_col_list)
#     time.sleep(10)

答案1

得分: 1

你需要为每个表提供一个唯一的名称,通过为源表的dlt.table注释提供name属性,然后在apply_changes中使用相同的名称。否则,它将从函数名称中提取,因为您已经定义了该函数。像这样:

def generate_silver_tables(target_table, source_table, keys_col_list):

 @dlt.table(
    name=source_table
 )
 def source_ds():
        return spark.table(f"{raw_db_name}.{source_table}")

  ### 创建目标表定义
 dlt.create_target_table(name=target_table,
 comment= f"Clean, merged {target_table}",
 #partition_cols=["topic"],
 table_properties={
   "quality": "silver",
   "pipelines.autoOptimize.managed": "true"
 }
 )
  
 ## 进行合并操作
 dlt.apply_changes(
   target = target_table,
   source = source_table,
   keys = keys_col_list,
   apply_as_deletes = expr("operation = 'DELETE'"),
   sequence_by = col("ts_ms"),
   ignore_null_updates = False,
   except_column_list = ["operation", "timestamp_ms"],
   stored_as_scd_type = "1"
 )
 return

请参阅DLT Cookbook以获取完整示例。

英文:

You need to give unique names to each table by providing name attribute to the dlt.table annotation for source table, and then use the same name in the apply_changes. Otherwise it will be take from the function name and fail because you already defined that function. Like this:

def generate_silver_tables(target_table, source_table, keys_col_list):

 @dlt.table(
    name=source_table
 )
 def source_ds():
        return spark.table(f"{raw_db_name}.{source_table}")

  ### Create the target table definition
 dlt.create_target_table(name=target_table,
 comment= f"Clean, merged {target_table}",
 #partition_cols=["topic"],
 table_properties={
   "quality": "silver",
   "pipelines.autoOptimize.managed": "true"
 }
 )
  
 ## Do the merge
 dlt.apply_changes(
   target = target_table,
   source = source_table,
   keys = keys_col_list,
   apply_as_deletes = expr("operation = 'DELETE'"),
   sequence_by = col("ts_ms"),
   ignore_null_updates = False,
   except_column_list = ["operation", "timestamp_ms"],
   stored_as_scd_type = "1"
 )
 return

See DLT Cookbook for full example.

huangapple
  • 本文由 发表于 2023年2月14日 21:02:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/75448242.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定