如何使用Java应用程序将Parquet数据集转换为Delta。

huangapple go评论98阅读模式
英文:

How to use Java app to convert parquet dataset to delta

问题

I will provide the translation for the code parts you provided. Here is the translation:

  1. 我正在尝试使用JavaParquet文件转换为Delta使用Java 11Spark以及Maven依赖项Scala在尝试执行时我遇到了异常
  2. SparkSession spark = SparkSession.builder().
  3. appName("Solo-spark").master("local[1]").getOrCreate();

这里我们创建了一个Spark会话。

现在你可以使用以下代码将其转换为Delta:

  1. DeltaTable
  2. .convertToDelta(spark, "parquet.`/Users/hokage/Downloads/python-paraquet`");

我遇到了以下错误:

  1. Exception in thread "main" java.lang.NoClassDefFoundError: scala/$less$colon$less
  2. at org.soloworld.App.main(App.java:24)
  3. Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
  4. at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
  5. at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
  6. at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
  7. ... 1 more

这是我的依赖项:

  1. <dependency>
  2. <groupId>io.delta</groupId>
  3. <artifactId>delta-core_2.13</artifactId>
  4. <version>2.3.0</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>io.delta</groupId>
  8. <artifactId>delta-iceberg_2.13</artifactId>
  9. <version>2.3.0</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.scala-lang</groupId>
  13. <artifactId>scala-library</artifactId>
  14. <version>2.12.17</version>
  15. </dependency>

请注意,这只是代码的翻译部分,不包括问题或其他内容。

英文:

im trying to convert parquet file to delta using java , using java 11 and spark and scale as maven dependencies while trying to exceute im getting Exception

  1. SparkSession spark = SparkSession.builder().
  2. appName(&quot;Solo-spark&quot;).master(&quot;local[1]&quot;).getOrCreate();`

Here we have get a spark session.

Now you have covert to delta using

  1. DeltaTable
  2. .convertToDelta(spark,&quot;parquet.`/Users/hokage/Downloads/python-paraquet`&quot;);

and im getting the following error

  1. Exception in thread &quot;main&quot; java.lang.NoClassDefFoundError: scala/$less$colon$less
  2. at org.soloworld.App.main(App.java:24)
  3. Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
  4. at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
  5. at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
  6. at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
  7. ... 1 more

and these are my dependencies:

  1. &lt;dependency&gt;
  2. &lt;groupId&gt;io.delta&lt;/groupId&gt;
  3. &lt;artifactId&gt;delta-core_2.13&lt;/artifactId&gt;
  4. &lt;version&gt;2.3.0&lt;/version&gt;
  5. &lt;/dependency&gt;
  6. &lt;dependency&gt;
  7. &lt;groupId&gt;io.delta&lt;/groupId&gt;
  8. &lt;artifactId&gt;delta-iceberg_2.13&lt;/artifactId&gt;
  9. &lt;version&gt;2.3.0&lt;/version&gt;
  10. &lt;/dependency&gt;
  11. &lt;dependency&gt;
  12. &lt;groupId&gt;org.scala-lang&lt;/groupId&gt;
  13. &lt;artifactId&gt;scala-library&lt;/artifactId&gt;
  14. &lt;version&gt;2.12.17&lt;/version&gt;
  15. &lt;/dependency&gt;

答案1

得分: 1

你混淆了不同的Scala版本 - 你正在使用为Scala 2.13编译的库(带有_2.13后缀),但你使用的是Scala 2.12(2.12.17版本)。你还需要检查Apache Spark分发版本是针对哪个Scala版本编译的。

P.S. 你也不需要delta-iceberg依赖来从Parquet转换为Delta格式。

英文:

You're mixing up different Scala versions - you're using libraries compiled for Scala 2.13 (the _2.13 suffix) with the Scala 2.12 (the 2.12.17) version. You also need to check for which Scala version the Apache Spark distribution is compiled.

P.S. You also don't need delta-iceberg dependency to convert from Parquet to Delta format.

答案2

得分: 0

创建 Parquet 数据集

您的路径是 /Users/hokage/Downloads/python-paraquet,但如果您不介意的话,让我们使用 /tmp/parquet-dataset 😉

让我们打开 spark-shell 并编写一些 Scala 代码。这只是用来创建一个 Parquet 表(数据集)。

  1. val parquetTableDir = "/tmp/parquet-dataset"
  1. spark
  2. .range(5)
  3. .write
  4. .format("parquet")
  5. .save(parquetTableDir)
  1. $ ls -l /tmp/parquet-dataset
  2. total 48
  3. -rw-r--r--@ 1 jacek wheel 0 May 17 09:32 _SUCCESS
  4. -rw-r--r--@ 1 jacek wheel 297 May 17 09:32 part-00000-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  5. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00002-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  6. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00004-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  7. -rw-r--r--@ 1 jacek wheel 471 May 17 09:32 part-00007-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  8. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00009-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  9. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00011-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet

Java 项目(Maven)

我使用 IntelliJ IDEA 创建了一个带有 Maven 的 Java 项目。我已经有一段时间没有这样做了! 😊

pom.xml

请注意所有依赖项及其版本。它们必须匹配,重要的事项包括:

  1. Scala 的版本(Delta Lake 和 Apache Spark 依赖项)
  2. Delta Lake 2.3.0 支持 Spark 3.3
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>delta-convert</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <maven.compiler.source>17</maven.compiler.source>
  11. <maven.compiler.target>17</maven.compiler.target>
  12. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  13. </properties>
  14. <dependencies>
  15. <dependency>
  16. <groupId>io.delta</groupId>
  17. <artifactId>delta-core_2.13</artifactId>
  18. <version>2.3.0</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.spark</groupId>
  22. <artifactId>spark-sql_2.13</artifactId>
  23. <version>3.3.2</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.scala-lang</groupId>
  27. <artifactId>scala-library</artifactId>
  28. <version>2.13.8</version>
  29. </dependency>
  30. </dependencies>
  31. </project>

Java 应用程序

  1. package so;
  2. import io.delta.tables.DeltaTable;
  3. import org.apache.spark.sql.SparkSession;
  4. public class DeltaConvert {
  5. public static void main(String[] args) {
  6. SparkSession spark = SparkSession
  7. .builder()
  8. .appName("Solo-spark")
  9. .master("local[*]") // 使用星号代替
  10. // 以下配置对于 Delta Lake 在 Spark 上运行是必需的
  11. .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  12. .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  13. .getOrCreate();
  14. DeltaTable
  15. .convertToDelta(spark,"parquet.`/tmp/parquet-dataset`");
  16. }
  17. }

展示时间 🎉

运行 Java 应用程序,您应该在控制台(标准输出)中看到大量 INFO 消息。在这些消息中,您应该找到以下内容:

  1. 23/05/17 09:41:19 INFO SparkContext: Running Spark version 3.3.2
  2. ...
  3. 23/05/17 09:41:22 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
  4. 23/05/17 09:41:22 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
  5. 23/05/17 09:41:24 INFO InitialSnapshot: [tableId=b071541b-9864-4a06-a8d5-d45690ab1a26] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(2876805c-dde3-4d69-96a2-d2c3ad3c81e7,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284165)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
  6. 23/05/17 09:41:24 INFO DeltaLog: No delta log found for the Delta table at file:/tmp/parquet-dataset/_delta_log
  7. 23/05/17 09:41:24 INFO InitialSnapshot: [tableId=2876805c-dde3-4d69-96a2-d2c3ad3c81e7] Created snapshot InitialSnapshot(path=file
  8. <details>
  9. <summary>英文:</summary>
  10. # Create Parquet Dataset
  11. Yours was `/Users/hokage/Downloads/python-paraquet`, but let&#39;s use `/tmp/parquet-dataset` if you don&#39;t mind &#128521;
  12. Let&#39;s open `spark-shell` and write some Scala. This is only to create a parquet table (dataset).
  13. ```scala
  14. val parquetTableDir = &quot;/tmp/parquet-dataset&quot;
  1. spark
  2. .range(5)
  3. .write
  4. .format(&quot;parquet&quot;)
  5. .save(parquetTableDir)
  1. $ ls -l /tmp/parquet-dataset
  2. total 48
  3. -rw-r--r--@ 1 jacek wheel 0 May 17 09:32 _SUCCESS
  4. -rw-r--r--@ 1 jacek wheel 297 May 17 09:32 part-00000-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  5. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00002-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  6. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00004-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  7. -rw-r--r--@ 1 jacek wheel 471 May 17 09:32 part-00007-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  8. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00009-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
  9. -rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00011-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet

Java Project (Maven)

I used IntelliJ IDEA to create a Java project with Maven. It's been a while since I've done it! 😊

pom.xml

Mind all the dependencies and their versions. They have to match and important things are:

  1. The version of Scala (Delta Lake and Apache Spark deps)
  2. Delta Lake 2.3.0 supports Spark 3.3
  1. &lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
  2. &lt;project xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot;
  3. xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
  4. xsi:schemaLocation=&quot;http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&quot;&gt;
  5. &lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;
  6. &lt;groupId&gt;org.example&lt;/groupId&gt;
  7. &lt;artifactId&gt;delta-convert&lt;/artifactId&gt;
  8. &lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;
  9. &lt;properties&gt;
  10. &lt;maven.compiler.source&gt;17&lt;/maven.compiler.source&gt;
  11. &lt;maven.compiler.target&gt;17&lt;/maven.compiler.target&gt;
  12. &lt;project.build.sourceEncoding&gt;UTF-8&lt;/project.build.sourceEncoding&gt;
  13. &lt;/properties&gt;
  14. &lt;dependencies&gt;
  15. &lt;dependency&gt;
  16. &lt;groupId&gt;io.delta&lt;/groupId&gt;
  17. &lt;artifactId&gt;delta-core_2.13&lt;/artifactId&gt;
  18. &lt;version&gt;2.3.0&lt;/version&gt;
  19. &lt;/dependency&gt;
  20. &lt;dependency&gt;
  21. &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
  22. &lt;artifactId&gt;spark-sql_2.13&lt;/artifactId&gt;
  23. &lt;version&gt;3.3.2&lt;/version&gt;
  24. &lt;/dependency&gt;
  25. &lt;dependency&gt;
  26. &lt;groupId&gt;org.scala-lang&lt;/groupId&gt;
  27. &lt;artifactId&gt;scala-library&lt;/artifactId&gt;
  28. &lt;version&gt;2.13.8&lt;/version&gt;
  29. &lt;/dependency&gt;
  30. &lt;/dependencies&gt;
  31. &lt;/project&gt;

Java Application

  1. package so;
  2. import io.delta.tables.DeltaTable;
  3. import org.apache.spark.sql.SparkSession;
  4. public class DeltaConvert {
  5. public static void main(String[] args) {
  6. SparkSession spark = SparkSession
  7. .builder()
  8. .appName(&quot;Solo-spark&quot;)
  9. .master(&quot;local[*]&quot;) // Use the star instead
  10. // the following configs are required for Delta Lake to run on Spark
  11. .config(&quot;spark.sql.extensions&quot;, &quot;io.delta.sql.DeltaSparkSessionExtension&quot;)
  12. .config(&quot;spark.sql.catalog.spark_catalog&quot;, &quot;org.apache.spark.sql.delta.catalog.DeltaCatalog&quot;)
  13. .getOrCreate();
  14. DeltaTable
  15. .convertToDelta(spark,&quot;parquet.`/tmp/parquet-dataset`&quot;);
  16. }
  17. }

Show Time 🎉

Execute the Java app and you should see tons of INFO messages in the console (standard output). Among the lines you should find the following:

  1. 23/05/17 09:41:19 INFO SparkContext: Running Spark version 3.3.2
  2. ...
  3. 23/05/17 09:41:22 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
  4. 23/05/17 09:41:22 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
  5. 23/05/17 09:41:24 INFO InitialSnapshot: [tableId=b071541b-9864-4a06-a8d5-d45690ab1a26] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(2876805c-dde3-4d69-96a2-d2c3ad3c81e7,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284165)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
  6. 23/05/17 09:41:24 INFO DeltaLog: No delta log found for the Delta table at file:/tmp/parquet-dataset/_delta_log
  7. 23/05/17 09:41:24 INFO InitialSnapshot: [tableId=2876805c-dde3-4d69-96a2-d2c3ad3c81e7] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(29ac2dc9-a37b-47cd-9aec-45090056afcc,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284204)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
  8. ...
  9. 23/05/17 09:41:26 INFO DeltaFileOperations: Listing file:/tmp/parquet-dataset
  10. 23/05/17 09:41:26 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
  11. ...
  12. 23/05/17 09:41:41 INFO FileScanRDD: Reading File path: file:///tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet, range: 0-13899, partition values: [empty row]
  13. ...
  14. 23/05/17 09:41:41 INFO Snapshot: [tableId=11d76ddd-3035-47b5-9dfe-f7e14de9ef71] Created snapshot Snapshot(path=file:/tmp/parquet-dataset/_delta_log, version=0, metadata=Metadata(11d76ddd-3035-47b5-9dfe-f7e14de9ef71,null,null,Format(parquet,Map()),{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;id&quot;,&quot;type&quot;:&quot;long&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}}]},List(),Map(),Some(1684309287681)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,0,ArraySeq(),ArraySeq(DeprecatedRawLocalFileStatus{path=file:/tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet; isDirectory=false; length=13899; replication=1; blocksize=33554432; modification_time=1684309298044; access_time=1684309297927; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),Some(0),1684309291045), checksumOpt=None)
  15. 23/05/17 09:41:41 INFO DeltaLog: Updated snapshot to Snapshot(path=file:/tmp/parquet-dataset/_delta_log, version=0, metadata=Metadata(11d76ddd-3035-47b5-9dfe-f7e14de9ef71,null,null,Format(parquet,Map()),{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;id&quot;,&quot;type&quot;:&quot;long&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}}]},List(),Map(),Some(1684309287681)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,0,ArraySeq(),ArraySeq(DeprecatedRawLocalFileStatus{path=file:/tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet; isDirectory=false; length=13899; replication=1; blocksize=33554432; modification_time=1684309298044; access_time=1684309297927; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),Some(0),1684309291045), checksumOpt=None)
  16. ...

PySpark

Use pyspark (with Delta Lake "installed") to access the delta table.

  1. $ ./bin/pyspark \
  2. --packages io.delta:delta-core_2.12:2.3.0 \
  3. --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  4. --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  5. Welcome to
  6. ____ __
  7. / __/__ ___ _____/ /__
  8. _\ \/ _ \/ _ `/ __/ &#39;_/
  9. /__ / .__/\_,_/_/ /_/\_\ version 3.3.2
  10. /_/
  11. Using Python version 3.10.11 (main, May 10 2023 19:07:22)
  12. Spark context Web UI available at http://192.168.68.101:4040
  13. Spark context available as &#39;sc&#39; (master = local[*], app id = local-1684309786375).
  14. SparkSession available as &#39;spark&#39;.
  1. &gt;&gt;&gt; spark.read.format(&quot;delta&quot;).load(&quot;/tmp/parquet-dataset&quot;).show(truncate = False)
  2. +---+
  3. |id |
  4. +---+
  5. |0 |
  6. |3 |
  7. |4 |
  8. |1 |
  9. |2 |
  10. +---+

huangapple
  • 本文由 发表于 2023年5月14日 04:34:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76244764.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定