如何使用Java应用程序将Parquet数据集转换为Delta。

huangapple go评论58阅读模式
英文:

How to use Java app to convert parquet dataset to delta

问题

I will provide the translation for the code parts you provided. Here is the translation:

我正在尝试使用Java将Parquet文件转换为Delta使用Java 11和Spark以及Maven依赖项Scala在尝试执行时我遇到了异常

SparkSession spark = SparkSession.builder().
appName("Solo-spark").master("local[1]").getOrCreate();

这里我们创建了一个Spark会话。

现在你可以使用以下代码将其转换为Delta:

DeltaTable
    .convertToDelta(spark, "parquet.`/Users/hokage/Downloads/python-paraquet`");

我遇到了以下错误:

Exception in thread "main" java.lang.NoClassDefFoundError: scala/$less$colon$less
	at org.soloworld.App.main(App.java:24)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	... 1 more

这是我的依赖项:

<dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-core_2.13</artifactId>
      <version>2.3.0</version>
</dependency>
<dependency>
      <groupId>io.delta</groupId>
      <artifactId>delta-iceberg_2.13</artifactId>
      <version>2.3.0</version>
</dependency>
<dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.12.17</version>
</dependency>

请注意,这只是代码的翻译部分,不包括问题或其他内容。

英文:

im trying to convert parquet file to delta using java , using java 11 and spark and scale as maven dependencies while trying to exceute im getting Exception

SparkSession spark = SparkSession.builder().
appName(&quot;Solo-spark&quot;).master(&quot;local[1]&quot;).getOrCreate();`

Here we have get a spark session.

Now you have covert to delta using

DeltaTable
    .convertToDelta(spark,&quot;parquet.`/Users/hokage/Downloads/python-paraquet`&quot;);

and im getting the following error

Exception in thread &quot;main&quot; java.lang.NoClassDefFoundError: scala/$less$colon$less
	at org.soloworld.App.main(App.java:24)
Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less
	at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
	at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	... 1 more

and these are my dependencies:

&lt;dependency&gt;
      &lt;groupId&gt;io.delta&lt;/groupId&gt;
      &lt;artifactId&gt;delta-core_2.13&lt;/artifactId&gt;
      &lt;version&gt;2.3.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
      &lt;groupId&gt;io.delta&lt;/groupId&gt;
      &lt;artifactId&gt;delta-iceberg_2.13&lt;/artifactId&gt;
      &lt;version&gt;2.3.0&lt;/version&gt;
    &lt;/dependency&gt;
    &lt;dependency&gt;
      &lt;groupId&gt;org.scala-lang&lt;/groupId&gt;
      &lt;artifactId&gt;scala-library&lt;/artifactId&gt;
      &lt;version&gt;2.12.17&lt;/version&gt;
&lt;/dependency&gt;

答案1

得分: 1

你混淆了不同的Scala版本 - 你正在使用为Scala 2.13编译的库(带有_2.13后缀),但你使用的是Scala 2.12(2.12.17版本)。你还需要检查Apache Spark分发版本是针对哪个Scala版本编译的。

P.S. 你也不需要delta-iceberg依赖来从Parquet转换为Delta格式。

英文:

You're mixing up different Scala versions - you're using libraries compiled for Scala 2.13 (the _2.13 suffix) with the Scala 2.12 (the 2.12.17) version. You also need to check for which Scala version the Apache Spark distribution is compiled.

P.S. You also don't need delta-iceberg dependency to convert from Parquet to Delta format.

答案2

得分: 0

创建 Parquet 数据集

您的路径是 /Users/hokage/Downloads/python-paraquet,但如果您不介意的话,让我们使用 /tmp/parquet-dataset 😉

让我们打开 spark-shell 并编写一些 Scala 代码。这只是用来创建一个 Parquet 表(数据集)。

val parquetTableDir = "/tmp/parquet-dataset"
spark
  .range(5)
  .write
  .format("parquet")
  .save(parquetTableDir)
$ ls -l /tmp/parquet-dataset
total 48
-rw-r--r--@ 1 jacek  wheel    0 May 17 09:32 _SUCCESS
-rw-r--r--@ 1 jacek  wheel  297 May 17 09:32 part-00000-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00002-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00004-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  471 May 17 09:32 part-00007-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00009-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00011-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet

Java 项目(Maven)

我使用 IntelliJ IDEA 创建了一个带有 Maven 的 Java 项目。我已经有一段时间没有这样做了! 😊

pom.xml

请注意所有依赖项及其版本。它们必须匹配,重要的事项包括:

  1. Scala 的版本(Delta Lake 和 Apache Spark 依赖项)
  2. Delta Lake 2.3.0 支持 Spark 3.3
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>delta-convert</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-core_2.13</artifactId>
            <version>2.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.13.8</version>
        </dependency>
    </dependencies>
</project>

Java 应用程序

package so;

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;

public class DeltaConvert {

    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("Solo-spark")
                .master("local[*]") // 使用星号代替
                // 以下配置对于 Delta Lake 在 Spark 上运行是必需的
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
                .getOrCreate();

        DeltaTable
                .convertToDelta(spark,"parquet.`/tmp/parquet-dataset`");

    }
}

展示时间 🎉

运行 Java 应用程序,您应该在控制台(标准输出)中看到大量 INFO 消息。在这些消息中,您应该找到以下内容:

23/05/17 09:41:19 INFO SparkContext: Running Spark version 3.3.2
...
23/05/17 09:41:22 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
23/05/17 09:41:22 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=b071541b-9864-4a06-a8d5-d45690ab1a26] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(2876805c-dde3-4d69-96a2-d2c3ad3c81e7,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284165)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
23/05/17 09:41:24 INFO DeltaLog: No delta log found for the Delta table at file:/tmp/parquet-dataset/_delta_log
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=2876805c-dde3-4d69-96a2-d2c3ad3c81e7] Created snapshot InitialSnapshot(path=file

<details>
<summary>英文:</summary>

# Create Parquet Dataset

Yours was `/Users/hokage/Downloads/python-paraquet`, but let&#39;s use `/tmp/parquet-dataset` if you don&#39;t mind &#128521;

Let&#39;s open `spark-shell` and write some Scala. This is only to create a parquet table (dataset).

```scala
val parquetTableDir = &quot;/tmp/parquet-dataset&quot;
spark
  .range(5)
  .write
  .format(&quot;parquet&quot;)
  .save(parquetTableDir)
$ ls -l /tmp/parquet-dataset
total 48
-rw-r--r--@ 1 jacek  wheel    0 May 17 09:32 _SUCCESS
-rw-r--r--@ 1 jacek  wheel  297 May 17 09:32 part-00000-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00002-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00004-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  471 May 17 09:32 part-00007-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00009-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet
-rw-r--r--@ 1 jacek  wheel  472 May 17 09:32 part-00011-f01e6a39-220a-401b-bfd0-2f9bd1712981-c000.snappy.parquet

Java Project (Maven)

I used IntelliJ IDEA to create a Java project with Maven. It's been a while since I've done it! 😊

pom.xml

Mind all the dependencies and their versions. They have to match and important things are:

  1. The version of Scala (Delta Lake and Apache Spark deps)
  2. Delta Lake 2.3.0 supports Spark 3.3
&lt;?xml version=&quot;1.0&quot; encoding=&quot;UTF-8&quot;?&gt;
&lt;project xmlns=&quot;http://maven.apache.org/POM/4.0.0&quot;
         xmlns:xsi=&quot;http://www.w3.org/2001/XMLSchema-instance&quot;
         xsi:schemaLocation=&quot;http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd&quot;&gt;
    &lt;modelVersion&gt;4.0.0&lt;/modelVersion&gt;

    &lt;groupId&gt;org.example&lt;/groupId&gt;
    &lt;artifactId&gt;delta-convert&lt;/artifactId&gt;
    &lt;version&gt;1.0-SNAPSHOT&lt;/version&gt;

    &lt;properties&gt;
        &lt;maven.compiler.source&gt;17&lt;/maven.compiler.source&gt;
        &lt;maven.compiler.target&gt;17&lt;/maven.compiler.target&gt;
        &lt;project.build.sourceEncoding&gt;UTF-8&lt;/project.build.sourceEncoding&gt;
    &lt;/properties&gt;

    &lt;dependencies&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;io.delta&lt;/groupId&gt;
            &lt;artifactId&gt;delta-core_2.13&lt;/artifactId&gt;
            &lt;version&gt;2.3.0&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
            &lt;artifactId&gt;spark-sql_2.13&lt;/artifactId&gt;
            &lt;version&gt;3.3.2&lt;/version&gt;
        &lt;/dependency&gt;
        &lt;dependency&gt;
            &lt;groupId&gt;org.scala-lang&lt;/groupId&gt;
            &lt;artifactId&gt;scala-library&lt;/artifactId&gt;
            &lt;version&gt;2.13.8&lt;/version&gt;
        &lt;/dependency&gt;
    &lt;/dependencies&gt;
&lt;/project&gt;

Java Application

package so;

import io.delta.tables.DeltaTable;
import org.apache.spark.sql.SparkSession;

public class DeltaConvert {

    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName(&quot;Solo-spark&quot;)
                .master(&quot;local[*]&quot;) // Use the star instead
                // the following configs are required for Delta Lake to run on Spark
                .config(&quot;spark.sql.extensions&quot;, &quot;io.delta.sql.DeltaSparkSessionExtension&quot;)
                .config(&quot;spark.sql.catalog.spark_catalog&quot;, &quot;org.apache.spark.sql.delta.catalog.DeltaCatalog&quot;)
                .getOrCreate();

        DeltaTable
                .convertToDelta(spark,&quot;parquet.`/tmp/parquet-dataset`&quot;);

    }
}

Show Time 🎉

Execute the Java app and you should see tons of INFO messages in the console (standard output). Among the lines you should find the following:

23/05/17 09:41:19 INFO SparkContext: Running Spark version 3.3.2
...
23/05/17 09:41:22 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
23/05/17 09:41:22 INFO DeltaLog: Creating initial snapshot without metadata, because the directory is empty
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=b071541b-9864-4a06-a8d5-d45690ab1a26] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(2876805c-dde3-4d69-96a2-d2c3ad3c81e7,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284165)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
23/05/17 09:41:24 INFO DeltaLog: No delta log found for the Delta table at file:/tmp/parquet-dataset/_delta_log
23/05/17 09:41:24 INFO InitialSnapshot: [tableId=2876805c-dde3-4d69-96a2-d2c3ad3c81e7] Created snapshot InitialSnapshot(path=file:/tmp/parquet-dataset/_delta_log, version=-1, metadata=Metadata(29ac2dc9-a37b-47cd-9aec-45090056afcc,null,null,Format(parquet,Map()),null,List(),Map(),Some(1684309284204)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,-1,List(),List(),None,-1), checksumOpt=None)
...
23/05/17 09:41:26 INFO DeltaFileOperations: Listing file:/tmp/parquet-dataset
23/05/17 09:41:26 INFO DelegatingLogStore: LogStore `LogStoreAdapter(io.delta.storage.HDFSLogStore)` is used for scheme `file`
...
23/05/17 09:41:41 INFO FileScanRDD: Reading File path: file:///tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet, range: 0-13899, partition values: [empty row]
...
23/05/17 09:41:41 INFO Snapshot: [tableId=11d76ddd-3035-47b5-9dfe-f7e14de9ef71] Created snapshot Snapshot(path=file:/tmp/parquet-dataset/_delta_log, version=0, metadata=Metadata(11d76ddd-3035-47b5-9dfe-f7e14de9ef71,null,null,Format(parquet,Map()),{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;id&quot;,&quot;type&quot;:&quot;long&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}}]},List(),Map(),Some(1684309287681)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,0,ArraySeq(),ArraySeq(DeprecatedRawLocalFileStatus{path=file:/tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet; isDirectory=false; length=13899; replication=1; blocksize=33554432; modification_time=1684309298044; access_time=1684309297927; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),Some(0),1684309291045), checksumOpt=None)
23/05/17 09:41:41 INFO DeltaLog: Updated snapshot to Snapshot(path=file:/tmp/parquet-dataset/_delta_log, version=0, metadata=Metadata(11d76ddd-3035-47b5-9dfe-f7e14de9ef71,null,null,Format(parquet,Map()),{&quot;type&quot;:&quot;struct&quot;,&quot;fields&quot;:[{&quot;name&quot;:&quot;id&quot;,&quot;type&quot;:&quot;long&quot;,&quot;nullable&quot;:true,&quot;metadata&quot;:{}}]},List(),Map(),Some(1684309287681)), logSegment=LogSegment(file:/tmp/parquet-dataset/_delta_log,0,ArraySeq(),ArraySeq(DeprecatedRawLocalFileStatus{path=file:/tmp/parquet-dataset/_delta_log/00000000000000000000.checkpoint.parquet; isDirectory=false; length=13899; replication=1; blocksize=33554432; modification_time=1684309298044; access_time=1684309297927; owner=; group=; permission=rw-rw-rw-; isSymlink=false; hasAcl=false; isEncrypted=false; isErasureCoded=false}),Some(0),1684309291045), checksumOpt=None)
...

PySpark

Use pyspark (with Delta Lake "installed") to access the delta table.

$ ./bin/pyspark \
  --packages io.delta:delta-core_2.12:2.3.0 \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  &#39;_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.3.2
      /_/

Using Python version 3.10.11 (main, May 10 2023 19:07:22)
Spark context Web UI available at http://192.168.68.101:4040
Spark context available as &#39;sc&#39; (master = local[*], app id = local-1684309786375).
SparkSession available as &#39;spark&#39;.
&gt;&gt;&gt; spark.read.format(&quot;delta&quot;).load(&quot;/tmp/parquet-dataset&quot;).show(truncate = False)
+---+
|id |
+---+
|0  |
|3  |
|4  |
|1  |
|2  |
+---+

huangapple
  • 本文由 发表于 2023年5月14日 04:34:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76244764.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定