英文:
How to drop a column from a Delta Table stored in ADLS as Parquet files?
问题
I have a ton of parquet data stored in ADLS with Delta Lake as an abstraction over it. However, I've run into an issue where some columns have incorrect datatypes due to using spark's inferSchema, and since Delta Lake will throw errors on mismatched schemas, new data that contains the correct datatype will fail to write. I'd like to remove the old columns from the schema so that new data can write successfully (and the column will get re-added to the schema with the correct datatype). However, I'm not sure how to go about this.
I'm interacting with my data via Spark Scala in an Azure Synapse notebook, but all the docs I find mention reading/writing the data as a table, which doesn't make sense for my scenario as the data is split over tons of parquet files in a standard yyyy/mm/dd folder structure. Or docs such as this: https://delta.io/blog/2022-08-29-delta-lake-drop-column/ will mention using Spark SQL to interact with the table, but again I don't understand where this table comes from since my data isn't stored in a single file (even if I read all the data into a single DF and then to a table, how will I write it back?).
Here are some of the things I tried:
val deltaTable = DeltaTable.forPath("abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/")
deltaTable.toDF.printSchema()
This shows me the current schema, and from there I identified columns I want to remove, e.g. ABC. But I'm not sure how to do that, the docs for this DeltaTable show a delete function that takes a boolean expression, but to me that sounds like it will delete rows that match the expression, not remove a column (and even if I got that working, I'm not sure how to write this back to ADLS).
I also tried:
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.hadoop.fs.Path
val path = new Path("abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/")
val testTable = DeltaTableV2(spark, path)
AlterTableDropColumnsDeltaCommand(testTable, Seq(Seq("ABC"))).run(spark)
However, that gives this error:
org.apache.spark.sql.delta.DeltaAnalysisException: DROP COLUMN is not supported for your Delta table.
Please upgrade your Delta table to reader version 2 and writer version 5
and change the column mapping mode to 'name' mapping. You can use the following command:
ALTER TABLE <table_name> SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')
But I'm not sure how to run this SQL against this DeltaTable, and I haven't been able to find relevant documentation. Can anyone point me in the right direction?
英文:
I have a ton of parquet data stored in ADLS with Delta Lake as an abstraction over it. However I've run into an issue where some columns have incorrect datatypes due to using spark's inferSchema, and since Delta Lake will throw errors on mismatched schemas, new data that contains the correct datatype will fail to write. I'd like to remove the old columns from the schema so that new data can write successfully (and the column will get re-added to the schema with the correct datatype). However I'm not sure how to go about this.
I'm interacting with my data via Spark Scala in an Azure Synapse notebook, but all the docs I find mention reading/writing the data as a table, which doesn't make sense for my scenario as the data is split over tons of parquet files in a standard yyyy/mm/dd folder structure. Or docs such as this: https://delta.io/blog/2022-08-29-delta-lake-drop-column/ will mention using Spark SQL to interact with the table, but again I don't understand where this table comes from since my data isn't stored in a single file (even if I read all the data into a single DF and then to a table, how will I write it back?).
Here are some of the things I tried:
val deltaTable = DeltaTable.forPath("abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/")
deltaTable.toDF.printSchema()
This shows me the current schema, and from there I identified columns I want to remove, e.g. ABC. But I'm not sure how to do that, the docs for this DeltaTable show a delete function that takes a boolean expression, but to me that sounds like it will delete rows that match the expression, not remove a column (and even if I got that working, I'm not sure how to write this back to ADLS).
I also tried:
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.hadoop.fs.Path
val path = new Path("abfss://CONTAINER@STORAGEACCOUNT.dfs.core.windows.net/path/to/data/folder/")
val testTable = DeltaTableV2(spark, path)
AlterTableDropColumnsDeltaCommand(testTable, Seq(Seq("ABC"))).run(spark)
However that gives this error:
org.apache.spark.sql.delta.DeltaAnalysisException: DROP COLUMN is not supported for your Delta table.
Please upgrade your Delta table to reader version 2 and writer version 5
and change the column mapping mode to 'name' mapping. You can use the following command:
ALTER TABLE <table_name> SET TBLPROPERTIES (
'delta.columnMapping.mode' = 'name',
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5')
But I'm not sure how to run this SQL against this DeltaTable, and I haven't been able to find relevant documentation. Can anyone point me in the right direction?
答案1
得分: 0
My coworker figured out a solution using another API within Spark:
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableDropColumnsDeltaCommand, AlterTableSetPropertiesDeltaCommand}
val tablePath = "abfss://container@storage.dfs.core.windows.net/path/to/data"
val deltaTableV2 = DeltaTableV2(
spark,
new Path(tablePath)
)
AlterTableSetPropertiesDeltaCommand(
deltaTableV2,
Map(
"delta.columnMapping.mode" -> "name",
"delta.minReaderVersion" -> "2",
"delta.minWriterVersion" -> "5"
)
).run(
spark
)
AlterTableDropColumnsDeltaCommand(
deltaTableV2,
Seq(Seq("columnName"))
).run(spark)
spark.read.format("delta").load(tablePath).printSchema()
Not sure why this works and other methods like
spark.sql("ALTER TABLE delta.`abfss://container@storage.dfs.core.windows.net/path/to/data` DROP COLUMN columnName")
kept giving me a null path error, but at least we have a solution now, hopefully this helps someone else too.
英文:
My coworker figured out a solution using another API within Spark:
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.commands.{AlterTableDropColumnsDeltaCommand, AlterTableSetPropertiesDeltaCommand}
val tablePath = "abfss://container@storage.dfs.core.windows.net/path/to/data"
val deltaTableV2 = DeltaTableV2(
spark,
new Path(tablePath)
)
AlterTableSetPropertiesDeltaCommand(
deltaTableV2,
Map(
"delta.columnMapping.mode" -> "name",
"delta.minReaderVersion" -> "2",
"delta.minWriterVersion" -> "5"
)
).run(
spark
)
AlterTableDropColumnsDeltaCommand(
deltaTableV2,
Seq(Seq("columnName"))
).run(spark)
spark.read.format("delta").load(tablePath).printSchema()
Not sure why this works and other methods like
spark.sql("ALTER TABLE delta.`abfss://container@storage.dfs.core.windows.net/path/to/data` DROP COLUMN columnName")
kept giving me a null path error, but at least we have a solution now, hopefully this helps someone else too.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论