如何从存储在ADLS中的Parquet文件中的Delta表中删除列?

huangapple go评论69阅读模式
英文:

How to drop a column from a Delta Table stored in ADLS as Parquet files?

问题

I have a ton of parquet data stored in ADLS with Delta Lake as an abstraction over it. However, I've run into an issue where some columns have incorrect datatypes due to using spark's inferSchema, and since Delta Lake will throw errors on mismatched schemas, new data that contains the correct datatype will fail to write. I'd like to remove the old columns from the schema so that new data can write successfully (and the column will get re-added to the schema with the correct datatype). However, I'm not sure how to go about this.

I'm interacting with my data via Spark Scala in an Azure Synapse notebook, but all the docs I find mention reading/writing the data as a table, which doesn't make sense for my scenario as the data is split over tons of parquet files in a standard yyyy/mm/dd folder structure. Or docs such as this: https://delta.io/blog/2022-08-29-delta-lake-drop-column/ will mention using Spark SQL to interact with the table, but again I don't understand where this table comes from since my data isn't stored in a single file (even if I read all the data into a single DF and then to a table, how will I write it back?).

Here are some of the things I tried:

val deltaTable = DeltaTable.forPath("abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/")

deltaTable.toDF.printSchema()

This shows me the current schema, and from there I identified columns I want to remove, e.g. ABC. But I'm not sure how to do that, the docs for this DeltaTable show a delete function that takes a boolean expression, but to me that sounds like it will delete rows that match the expression, not remove a column (and even if I got that working, I'm not sure how to write this back to ADLS).

I also tried:

import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.hadoop.fs.Path

val path = new Path("abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/")

val testTable = DeltaTableV2(spark, path)

AlterTableDropColumnsDeltaCommand(testTable, Seq(Seq("ABC"))).run(spark)

However, that gives this error:

org.apache.spark.sql.delta.DeltaAnalysisException: DROP COLUMN is not supported for your Delta table.
Please upgrade your Delta table to reader version 2 and writer version 5
and change the column mapping mode to 'name' mapping. You can use the following command:
ALTER TABLE <table_name> SET TBLPROPERTIES (
   'delta.columnMapping.mode' = 'name',
   'delta.minReaderVersion' = '2',
   'delta.minWriterVersion' = '5')

But I'm not sure how to run this SQL against this DeltaTable, and I haven't been able to find relevant documentation. Can anyone point me in the right direction?

英文:

I have a ton of parquet data stored in ADLS with Delta Lake as an abstraction over it. However I've run into an issue where some columns have incorrect datatypes due to using spark's inferSchema, and since Delta Lake will throw errors on mismatched schemas, new data that contains the correct datatype will fail to write. I'd like to remove the old columns from the schema so that new data can write successfully (and the column will get re-added to the schema with the correct datatype). However I'm not sure how to go about this.

I'm interacting with my data via Spark Scala in an Azure Synapse notebook, but all the docs I find mention reading/writing the data as a table, which doesn't make sense for my scenario as the data is split over tons of parquet files in a standard yyyy/mm/dd folder structure. Or docs such as this: https://delta.io/blog/2022-08-29-delta-lake-drop-column/ will mention using Spark SQL to interact with the table, but again I don't understand where this table comes from since my data isn't stored in a single file (even if I read all the data into a single DF and then to a table, how will I write it back?).

Here are some of the things I tried:

val deltaTable = DeltaTable.forPath(&quot;abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/&quot;)

deltaTable.toDF.printSchema()

This shows me the current schema, and from there I identified columns I want to remove, e.g. ABC. But I'm not sure how to do that, the docs for this DeltaTable show a delete function that takes a boolean expression, but to me that sounds like it will delete rows that match the expression, not remove a column (and even if I got that working, I'm not sure how to write this back to ADLS).

I also tried:

import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.hadoop.fs.Path

val path = new Path(&quot;abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/&quot;)

val testTable = DeltaTableV2(spark, path)

AlterTableDropColumnsDeltaCommand(testTable, Seq(Seq(&quot;ABC&quot;))).run(spark)

However that gives this error:

org.apache.spark.sql.delta.DeltaAnalysisException: DROP COLUMN is not supported for your Delta table.
Please upgrade your Delta table to reader version 2 and writer version 5

and change the column mapping mode to &#39;name&#39; mapping. You can use the following command:



ALTER TABLE &lt;table_name&gt; SET TBLPROPERTIES (

   &#39;delta.columnMapping.mode&#39; = &#39;name&#39;,

   &#39;delta.minReaderVersion&#39; = &#39;2&#39;,

   &#39;delta.minWriterVersion&#39; = &#39;5&#39;)

But I'm not sure how to run this SQL against this DeltaTable, and I haven't been able to find relevant documentation. Can anyone point me in the right direction?

答案1

得分: 0

My coworker figured out a solution using another API within Spark:

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableDropColumnsDeltaCommand, AlterTableSetPropertiesDeltaCommand}

val tablePath = "abfss://container@storage.dfs.core.windows.net/path/to/data"
val deltaTableV2 =  DeltaTableV2(
    spark,
    new Path(tablePath)
)
AlterTableSetPropertiesDeltaCommand(
    deltaTableV2,
    Map(
    "delta.columnMapping.mode" -> "name",
    "delta.minReaderVersion" -> "2",
    "delta.minWriterVersion" -> "5"
    )
).run(
    spark
)

AlterTableDropColumnsDeltaCommand(
    deltaTableV2,
    Seq(Seq("columnName"))
).run(spark)

spark.read.format("delta").load(tablePath).printSchema()

Not sure why this works and other methods like

spark.sql("ALTER TABLE delta.`abfss://container@storage.dfs.core.windows.net/path/to/data` DROP COLUMN columnName")

kept giving me a null path error, but at least we have a solution now, hopefully this helps someone else too.

英文:

My coworker figured out a solution using another API within Spark:

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableDropColumnsDeltaCommand, AlterTableSetPropertiesDeltaCommand}

val tablePath = &quot;abfss://container@storage.dfs.core.windows.net/path/to/data&quot;
val deltaTableV2 =  DeltaTableV2(
    spark,
    new Path(tablePath)
)
AlterTableSetPropertiesDeltaCommand(
    deltaTableV2,
    Map(
    &quot;delta.columnMapping.mode&quot; -&gt; &quot;name&quot;,
    &quot;delta.minReaderVersion&quot; -&gt; &quot;2&quot;,
    &quot;delta.minWriterVersion&quot; -&gt; &quot;5&quot;
    )
).run(
    spark
)

AlterTableDropColumnsDeltaCommand(
    deltaTableV2,
    Seq(Seq(&quot;columnName&quot;))
).run(spark)

spark.read.format(&quot;delta&quot;).load(tablePath).printSchema()

Not sure why this works and other methods like

spark.sql(&quot;ALTER TABLE delta.`abfss://container@storage.dfs.core.windows.net/path/to/data` DROP COLUMN columnName&quot;)

kept giving me a null path error, but at least we have a solution now, hopefully this helps someone else too.

huangapple
  • 本文由 发表于 2023年5月11日 06:34:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/76222988.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定