无法在AWS中更改Hudi表的列名。

huangapple go评论71阅读模式
英文:

Unable to alter column name for a Hudi table in AWS

问题

I'm unable to alter the column name of Hudi table.
spark.sql("ALTER TABLE customer_db.customer RENAME COLUMN subid TO subidentifier") unable to change the column name.
Getting the following error when trying to change the column using above code:
RENAME COLUMN is only supported with v2 tables

英文:

I'm unable to alter the column name of Hudi table .
spark.sql("ALTER TABLE customer_db.customer RENAME COLUMN subid TO subidentifier") unbable to change the column name.

A clear and concise description of the problem.

I'm unable to alter the column name of Hudi table .
spark.sql("ALTER TABLE customer_db.customer RENAME COLUMN subid TO subidentifier") code is unable to change the column name.

Getting the following error when trying to change the column using above code:
RENAME COLUMN is only supported with v2 tables

To Reproduce

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.{GlueArgParser, Job}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.JavaConverters._
import scala.collection.mutable

object ReportingJob {

  var spark: SparkSession = _
  var glueContext: GlueContext = _

  def main(inputParams: Array[String]): Unit = {

    val args: Map[String, String] = GlueArgParser.getResolvedOptions(inputParams, Seq("JOB_NAME").toArray)
    val sysArgs: mutable.Map[String, String] = scala.collection.mutable.Map(args.toSeq: _*)
   
    implicit val glueContext: GlueContext = init(sysArgs)
    implicit val spark: SparkSession = glueContext.getSparkSession

    import spark.implicits._
     
val partitionColumnName: String = "id"
    val hudiTableName: String = "Customer"
    val preCombineKey: String = "id"
    val recordKey = "id"
    val basePath= "s3://aws-amazon-uk/customer/production/"
    
    
   val df= Seq((123,"1","seq1"),(124,"0","seq2")).toDF("id","subid","subseq")
    
      val hudiCommonOptions: Map[String, String] = Map(
        "hoodie.table.name" -> hudiTableName,
        "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.ComplexKeyGenerator",
        "hoodie.datasource.write.precombine.field" -> preCombineKey,
        "hoodie.datasource.write.recordkey.field" -> recordKey,
        "hoodie.datasource.write.operation" -> "bulk_insert",
        //"hoodie.datasource.write.operation" -> "upsert",
        "hoodie.datasource.write.row.writer.enable" -> "true",
        "hoodie.datasource.write.reconcile.schema" -> "true",
        "hoodie.datasource.write.partitionpath.field" -> partitionColumnName,
        "hoodie.datasource.write.hive_style_partitioning" -> "true",
        // "hoodie.bulkinsert.shuffle.parallelism" -> "2000",
        //  "hoodie.upsert.shuffle.parallelism" -> "400",
        "hoodie.datasource.hive_sync.enable" -> "true",
        "hoodie.datasource.hive_sync.table" -> hudiTableName,
        "hoodie.datasource.hive_sync.database" -> "customer_db",
        "hoodie.datasource.hive_sync.partition_fields" -> partitionColumnName,
        "hoodie.datasource.hive_sync.partition_extractor_class" -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc" -> "false",
        "hoodie.combine.before.upsert" -> "true",
        "hoodie.avro.schema.external.transformation" -> "true",
        "hoodie.schema.on.read.enable" -> "true",
        "hoodie.datasource.write.schema.allow.auto.evolution.column.drop" -> "true",
        "hoodie.index.type" -> "BLOOM",
        "spark.hadoop.parquet.avro.write-old-list-structure" -> "false",
        DataSourceWriteOptions.TABLE_TYPE.key() -> "COPY_ON_WRITE"
      )


 
      df.write.format("org.apache.hudi")
        .options(hudiCommonOptions)
        .mode(SaveMode.Overwrite)
        .save(basePath+hudiTableName)
		
		spark.sql("ALTER TABLE customer_db.customer RENAME COLUMN subid TO subidentifier")
  commit()
  }

  def commit(): Unit = {
    Job.commit()
  }


  def init(sysArgs: mutable.Map[String, String]): GlueContext = {

    val conf = new SparkConf()

    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED")
    conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED")
    conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED")
    conf.set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED")
    conf.set("spark.sql.avro.datetimeRebaseModeInRead", "CORRECTED")
    val sparkContext = new SparkContext(conf)
    glueContext = new GlueContext(sparkContext)
    Job.init(sysArgs("JOB_NAME"), glueContext, sysArgs.asJava)
    glueContext

  }
}

Steps to reproduce the behavior:

  1. I'm using AWS glue job to run the above job.
  2. In Dependent JARs path
    hudi-spark3-bundle_2.12-0.12.1
    calcite-core-1.16.0
    libfb303-0.9.3
  3. Run the above code.

Expected behavior

spark.sql("ALTER TABLE customer_db.customer RENAME COLUMN subid TO subidentifier") should be able to rename a column name. Could you suggest any other way to rename the Hudi column name.

A clear and concise description of what you expected to happen.
Change Column name of a hudi table

Environment Description

  • Hudi version : 0.12.1

  • Spark version :3.3

Glue Version : 4

Jars used:
hudi-spark3-bundle_2.12-0.12.1
calcite-core-1.16.0
libfb303-0.9.3

  • Storage (HDFS/S3/GCS..) :S3

  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here.

Stacktrace

Exception in User Class: org.apache.spark.sql.AnalysisException : RENAME COLUMN is only supported with v2 tables.
at org.apache.spark.sql.errors.QueryCompilationErrors$.operationOnlySupportedWithV2TableError(QueryCompilationErrors.scala:506) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:94) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:49) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:30) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:30) ~[spark-catalyst_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:49) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:43) ~[spark-sql_2.12-3.3.0-amzn-1.jar:3.3.0-amzn-1]

答案1

得分: 1

我看到你没有设置 spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog 在你的 spark 配置中。 这是使用关系 V2 并受益于模式演变功能所需的。

英文:

I see you did't set spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog in your spark conf. This is needed to use relation V2, and benefit from the schema evolution feature.

答案2

得分: 0

  • 因为你正在使用 Glue 4.0,所以实际上不需要添加任何外部的 Hudi jars。它支持 Hudi 版本 0.12.1
  • 最重要的是,要启用 hudi,你需要添加一个 Glue 作业参数 --datalake-formats,其值为 hudi
  • 你需要设置 spark.serializer=org.apache.spark.serializer.KyroSerializerspark.sql.hive.convertMetastoreParquet=false,这些参数帮助 Spark 正确处理 Hudi 表,可以在初始化 SparkSession 时将这些配置设置为 SparkConf,或者将它们作为作业参数添加到 --conf,值为 spark.serializer=org.apache.spark.serializer.KyroSerializer --conf spark.sql.hive.convertMetastoreParquet=false

此外,你可以从 Glue 文档 中获取所有这些详细信息。

英文:

So a few things:

  • As you are using Glue 4.0, you don't really need to add any external hudi jars. It supports Hudi version 0.12.1
  • Also most importantly, to enable hudi you actually need to add a Glue job parameter --datalake-formats with value hudi
  • You need to set spark.serializer=org.apache.spark.serializer.KyroSerializer and spark.sql.hive.convertMetastoreParquet=false, these parameters help Spark to handle Hudi tables correctly and these configurations can be set in SparkConf when you are initializing a SparkSession or can add these as job parameters in --conf with value spark.serializer=org.apache.spark.serializer.KyroSerializer --conf spark.sql.hive.convertMetastoreParquet=false

Also, you can get all these details from Glue documentation.

huangapple
  • 本文由 发表于 2023年6月30日 02:23:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76583725.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定