英文:
How to use Java app to convert parquet dataset to delta
问题
I will provide the translation for the code parts you provided. Here is the translation:
我正在尝试使用Java将Parquet文件转换为Delta,使用Java 11和Spark以及Maven依赖项Scala。在尝试执行时,我遇到了异常。
SparkSession spark = SparkSession.builder().
appName("Solo-spark").master("local[1]").getOrCreate();
这里我们创建了一个Spark会话。
现在你可以使用以下代码将其转换为Delta:
DeltaTable
.convertToDelta(spark, "parquet.`/Users/hokage/Downloads/python-paraquet`");
我遇到了以下错误:
Exception in thread "main" java.lang.NoClassDefFoundError: scala/$less$colon$less
at org.soloworld.App.main(App.java:24)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 1 more
这是我的依赖项:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.13</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-iceberg_2.13</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.17</version>
</dependency>
请注意,这只是代码的翻译部分,不包括问题或其他内容。
英文:
im trying to convert parquet file to delta using java , using java 11 and spark and scale as maven dependencies while trying to exceute im getting Exception
SparkSession spark = SparkSession.builder().
appName("Solo-spark").master("local[1]").getOrCreate();`
Here we have get a spark session.
Now you have covert to delta using
DeltaTable
.convertToDelta(spark,"parquet.`/Users/hokage/Downloads/python-paraquet`");
and im getting the following error
Exception in thread "main" java.lang.NoClassDefFoundError: scala/$less$colon$less
at org.soloworld.App.main(App.java:24)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 1 more
and these are my dependencies:
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.13</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-iceberg_2.13</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.17</version>
</dependency>
答案1
得分: 1
你混淆了不同的Scala版本 - 你正在使用为Scala 2.13编译的库(带有_2.13
后缀),但你使用的是Scala 2.12(2.12.17
版本)。你还需要检查Apache Spark分发版本是针对哪个Scala版本编译的。
P.S. 你也不需要delta-iceberg
依赖来从Parquet转换为Delta格式。
英文:
You're mixing up different Scala versions - you're using libraries compiled for Scala 2.13 (the _2.13
suffix) with the Scala 2.12 (the 2.12.17
) version. You also need to check for which Scala version the Apache Spark distribution is compiled.
P.S. You also don't need delta-iceberg
dependency to convert from Parquet to Delta format.
答案2
得分: 0
创建 Parquet 数据集
您的路径是 /Users/hokage/Downloads/python-paraquet
,但如果您不介意的话,让我们使用 /tmp/parquet-dataset
😉
让我们打开 spark-shell
并编写一些 Scala 代码。这只是用来创建一个 Parquet 表(数据集)。
val parquetTableDir = "/tmp/parquet-dataset"
spark
.range(5)
.write
.format("parquet")
.save(parquetTableDir)
$ ls -l /tmp/parquet-dataset
total 48
-rw-r--r--@ 1 jacek wheel 0 May 17 09:32 _SUCCESS
-rw-r--r--@ 1 jacek wheel 297 May 17 09:32 part-00000-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00002-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00004-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 471 May 17 09:32 part-00007-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00009-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00011-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
Java 项目(Maven)
我使用 IntelliJ IDEA 创建了一个带有 Maven 的 Java 项目。我已经有一段时间没有这样做了! 😊
pom.xml
请注意所有依赖项及其版本。它们必须匹配,重要的事项包括:
- Scala 的版本(Delta Lake 和 Apache Spark 依赖项)
- Delta Lake 2.3.0 支持 Spark 3.3
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>delta-convert</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.13</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.8</version>
</dependency>
</dependencies>
</project>
Java 应用程序
package so;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;
public class DeltaConvert {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Solo-spark")
.master("local[*]") // 使用星号代替
// 以下配置对于 Delta Lake 在 Spark 上运行是必需的
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
DeltaTable
.convertToDelta(spark,"parquet.`/tmp/parquet-dataset`");
}
}
展示时间 🎉
运行 Java 应用程序,您应该在控制台(标准输出)中看到大量 INFO 消息。在这些消息中,您应该找到以下内容:
23/05/17 09:41:19 INFO SparkContext: Running Spark version 3.3.2
...
23/05/17 09:41:22 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
23/05/17 09:41:22 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=b071541b-9864-4a06-a8d5-d45690ab1a26] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(2876805c-dde3-4d69-96a2-d2c3ad3c81e7,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284165)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
23/05/17 09:41:24 INFO DeltaLog: No delta log found for the Delta table at file:/tmp/parquet-dataset/_delta_log
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=2876805c-dde3-4d69-96a2-d2c3ad3c81e7] Created snapshot InitialSnapshot(path=file
<details>
<summary>英文:</summary>
# Create Parquet Dataset
Yours was `/Users/hokage/Downloads/python-paraquet`, but let's use `/tmp/parquet-dataset` if you don't mind 😉
Let's open `spark-shell` and write some Scala. This is only to create a parquet table (dataset).
```scala
val parquetTableDir = "/tmp/parquet-dataset"
spark
.range(5)
.write
.format("parquet")
.save(parquetTableDir)
$ ls -l /tmp/parquet-dataset
total 48
-rw-r--r--@ 1 jacek wheel 0 May 17 09:32 _SUCCESS
-rw-r--r--@ 1 jacek wheel 297 May 17 09:32 part-00000-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00002-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00004-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 471 May 17 09:32 part-00007-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00009-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek wheel 472 May 17 09:32 part-00011-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
Java Project (Maven)
I used IntelliJ IDEA to create a Java project with Maven. It's been a while since I've done it! 😊
pom.xml
Mind all the dependencies and their versions. They have to match and important things are:
- The version of Scala (Delta Lake and Apache Spark deps)
- Delta Lake 2.3.0 supports Spark 3.3
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>delta-convert</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.13</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.8</version>
</dependency>
</dependencies>
</project>
Java Application
package so;
import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;
public class DeltaConvert {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Solo-spark")
.master("local[*]") // Use the star instead
// the following configs are required for Delta Lake to run on Spark
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate();
DeltaTable
.convertToDelta(spark,"parquet.`/tmp/parquet-dataset`");
}
}
Show Time 🎉
Execute the Java app and you should see tons of INFO messages in the console (standard output). Among the lines you should find the following:
23/05/17 09:41:19 INFO SparkContext: Running Spark version 3.3.2
...
23/05/17 09:41:22 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
23/05/17 09:41:22 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=b071541b-9864-4a06-a8d5-d45690ab1a26] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(2876805c-dde3-4d69-96a2-d2c3ad3c81e7,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284165)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
23/05/17 09:41:24 INFO DeltaLog: No delta log found for the Delta table at file:/tmp/parquet-dataset/_delta_log
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=2876805c-dde3-4d69-96a2-d2c3ad3c81e7] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(29ac2dc9-a37b-47cd-9aec-45090056afcc,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284204)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
...
23/05/17 09:41:26 INFO DeltaFileOperations: Listing file:/tmp/parquet-dataset
23/05/17 09:41:26 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
...
23/05/17 09:41:41 INFO FileScanRDD: Reading File path: file:///tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet, range: 0-13899, partition values: [empty row]
...
23/05/17 09:41:41 INFO Snapshot: [tableId=11d76ddd-3035-47b5-9dfe-f7e14de9ef71] Created snapshot Snapshot(path=file:/tmp/parquet-dataset/_delta_log, version=0, metadata=Metadata(11d76ddd-3035-47b5-9dfe-f7e14de9ef71,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]},List(),Map(),Some(1684309287681)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,0,ArraySeq(),ArraySeq(DeprecatedRawLocalFileStatus{path=file:/tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet; isDirectory=false; length=13899; replication=1; blocksize=33554432; modification_time=1684309298044; access_time=1684309297927; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),Some(0),1684309291045), checksumOpt=None)
23/05/17 09:41:41 INFO DeltaLog: Updated snapshot to Snapshot(path=file:/tmp/parquet-dataset/_delta_log, version=0, metadata=Metadata(11d76ddd-3035-47b5-9dfe-f7e14de9ef71,null,null,Format(parquet,Map()),{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]},List(),Map(),Some(1684309287681)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,0,ArraySeq(),ArraySeq(DeprecatedRawLocalFileStatus{path=file:/tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet; isDirectory=false; length=13899; replication=1; blocksize=33554432; modification_time=1684309298044; access_time=1684309297927; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),Some(0),1684309291045), checksumOpt=None)
...
PySpark
Use pyspark
(with Delta Lake "installed") to access the delta table.
$ ./bin/pyspark \
--packages io.delta:delta-core_2.12:2.3.0 \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.3.2
/_/
Using Python version 3.10.11 (main, May 10 2023 19:07:22)
Spark context Web UI available at http://192.168.68.101:4040
Spark context available as 'sc' (master = local[*], app id = local-1684309786375).
SparkSession available as 'spark'.
>>> spark.read.format("delta").load("/tmp/parquet-dataset").show(truncate = False)
+---+
|id |
+---+
|0 |
|3 |
|4 |
|1 |
|2 |
+---+
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论