英文:
Failed to load : com/amazon/deequ/checks/Check
问题
我正在构建一个Spark应用程序,用于加载两个JSON文件,比较它们并打印出差异。我还尝试使用Amazon的aws deequ
库来验证这些文件,但我遇到了以下异常:
警告:使用--illegal-access=warn来启用进一步非法反射访问操作的警告
警告:所有非法访问操作将在将来的版本中被拒绝
20/08/07 11:56:33 WARN NativeCodeLoader: 无法加载适用于您的平台的本地Hadoop库...在适用的情况下使用内置的Java类
错误:无法加载com.deeq.CompareDataFrames:com/amazon/deequ/checks/Check
log4j:警告:找不到附加程序,用于记录器(org.apache.spark.util.ShutdownHookManager)。
log4j:警告:请
当我提交作业给Spark时:
./spark-submit --class com.deeq.CompareDataFrames--master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
我正在使用Ubuntu来托管Spark,之前在没有添加deequ来运行一些验证之前,它是正常工作的。我想知道是否在部署过程中漏掉了什么。这个错误似乎不是互联网上众所周知的错误。
代码:
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.Tuple2;
import scala.collection.mutable.ArraySeq;
import scala.collection.mutable.Seq;
public class CompareDataFrames {
public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("CompareDataFrames").getOrCreate();
session.sparkContext().setLogLevel("ALL");
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("CUST_ID", DataTypes.StringType, true),
DataTypes.createStructField("RECORD_LOCATOR_ID", DataTypes.StringType, true),
DataTypes.createStructField("EVNT_ID", DataTypes.StringType, true)
});
Dataset<Row> first = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV1.json");
System.out.println("======= DataSet 1 =======");
first.printSchema();
first.show(false);
Dataset<Row> second = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV2.json");
System.out.println("======= DataSet 2 =======");
second.printSchema();
second.show(false);
// This will show all the rows which are present in the first dataset
// but not present in the second dataset. But the comparison is at row
// level and not at column level.
System.out.println("======= Expect =======");
first.except(second).show();
StructType one = first.schema();
JavaPairRDD<String, Row> pair1 = first.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
JavaPairRDD<String, Row> pair2 = second.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
System.out.println("======= Pair1 & Pair2 were created =======");
JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);
JavaRDD<Row> rdd = subs.values();
Dataset<Row> diff = session.createDataFrame(rdd, one);
System.out.println("======= Diff Show =======");
diff.show();
Seq<Constraint> cons = new ArraySeq<>(0);
VerificationResult vr = new VerificationSuite().onData(first)
.addCheck(new Check(CheckLevel.Error(), "unit test", cons)
.isComplete("EVNT_ID", Option.empty())
)
.run();
Seq<Check> checkSeq = new ArraySeq<>(0);
if (vr.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vr.checkResultsAsDataFrame(session, vr, checkSeq);
vrr.show(false);
}
}
}
Maven依赖:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
请注意,这只是代码和依赖项的翻译。如果您需要任何进一步的帮助或解释,请随时提问。
英文:
I'm building a spark application to load two json files, compare them, and print the differences. I also try to validate these files using amazon library aws deequ
, but I'm getting the below exception:
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
20/08/07 11:56:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Error: Failed to load com.deeq.CompareDataFrames: com/amazon/deequ/checks/Check
log4j:WARN No appenders could be found for logger (org.apache.spark.util.ShutdownHookManager).
log4j:WARN Please
when I submit the job to spark:
./spark-submit --class com.deeq.CompareDataFrames--master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
I'm using Ubuntu to host spark, it was working without any issues before I added deequ to run some validation. I wonder if I'm missing something in the deployment process. It doesn't seem like this error is a well-know one on the internet.
Code :
<!-- begin snippet: java hide: false console: true babel: false -->
<!-- language: lang-java -->
import com.amazon.deequ.VerificationResult;
import com.amazon.deequ.VerificationSuite;
import com.amazon.deequ.checks.Check;
import com.amazon.deequ.checks.CheckLevel;
import com.amazon.deequ.checks.CheckStatus;
import com.amazon.deequ.constraints.Constraint;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Option;
import scala.Tuple2;
import scala.collection.mutable.ArraySeq;
import scala.collection.mutable.Seq;
public class CompareDataFrames {
public static void main(String[] args) {
SparkSession session = SparkSession.builder().appName("CompareDataFrames").getOrCreate();
session.sparkContext().setLogLevel("ALL");
StructType schema = DataTypes.createStructType(new StructField[]{
DataTypes.createStructField("CUST_ID", DataTypes.StringType, true),
DataTypes.createStructField("RECORD_LOCATOR_ID", DataTypes.StringType, true),
DataTypes.createStructField("EVNT_ID", DataTypes.StringType, true)
});
Dataset<Row> first = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV1.json");
System.out.println("======= DataSet 1 =======");
first.printSchema();
first.show(false);
Dataset<Row> second = session.read().option("multiline", "true").schema(schema).json("/home/saif/Downloads/FILE_DEV2.json");
System.out.println("======= DataSet 2 =======");
second.printSchema();
second.show(false);
// This will show all the rows which are present in the first dataset
// but not present in the second dataset. But the comparison is at row
// level and not at column level.
System.out.println("======= Expect =======");
first.except(second).show();
StructType one = first.schema();
JavaPairRDD<String, Row> pair1 = first.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
JavaPairRDD<String, Row> pair2 = second.toJavaRDD().mapToPair((PairFunction<Row, String, Row>)
row -> new Tuple2<>(row.getString(1), row));
System.out.println("======= Pair1 & Pair2 were created =======");
JavaPairRDD<String, Row> subs = pair1.subtractByKey(pair2);
JavaRDD<Row> rdd = subs.values();
Dataset<Row> diff = session.createDataFrame(rdd, one);
System.out.println("======= Diff Show =======");
diff.show();
Seq<Constraint> cons = new ArraySeq<>(0);
VerificationResult vr = new VerificationSuite().onData(first)
.addCheck(new Check(CheckLevel.Error(), "unit test", cons)
.isComplete("EVNT_ID", Option.empty())
)
.run();
Seq<Check> checkSeq = new ArraySeq<>(0);
if (vr.status() != CheckStatus.Success()) {
Dataset<Row> vrr = vr.checkResultsAsDataFrame(session, vr, checkSeq);
vrr.show(false);
}
}
}
<!-- end snippet -->
**Maven: **
<!-- begin snippet: xml hide: false console: true babel: false -->
<!-- language: lang-xml -->
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
<!-- end snippet -->
答案1
得分: 1
请按照以下方法解决您的问题。
方法 1.
使用 --jars
选项提交 Spark 作业,
从以下 Maven 仓库下载 JAR 文件到您的机器上,链接为 https://mvnrepository.com/artifact/com.amazon.deequ/deequ/1.0.4,保存到 ~/Downloads/deequ-1.0.4.jar
。
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --jars ~/Downloads/deequ-1.0.4.jar ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
方法 2.
使用 --packages
选项提交 Spark 作业,
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --packages com.amazon.deequ:deequ:1.0.4 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
注:
- 只有在需要引用自定义仓库时才需要使用
--repositories
选项,默认情况下,如果未提供--repositories
选项,则使用 Maven 中央仓库。 - 当指定
--packages
选项时,提交操作会尝试在~/.ivy2/cache
、~/.ivy2/jars
和~/.m2/repository
目录中查找包及其依赖项。如果找不到,则会使用 Ivy 从 Maven 中央仓库下载并存储在~/.ivy2
目录下。
编辑 1:
方法 3:
如果上述方法 1 和 2 不起作用,请使用 maven-shade-plugin
构建 uber jar
,然后使用 spark-submit
进行部署。
使用以下 pom.xml
文件来使用 maven-shade-plugin
构建 uber jar。添加下面的 POM 配置并重新构建您的 JAR 文件,然后使用以下命令进行部署:
spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.deeq</groupId>
<artifactId>deeq-trial-1.0-SNAPSHOT</artifactId>
<version>1.0</version>
<name>Spark-3.0 Spark Application</name>
<url>https://maven.apache.org</url>
<repositories>
<repository>
<id>codelds</id>
<url>https://code.lds.org/nexus/content/groups/main-repo</url>
</repository>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<java.version>1.8</java.version>
<CodeCacheSize>512m</CodeCacheSize>
<es.version>2.4.6</es.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase
<details>
<summary>英文:</summary>
Please follow the below approaches to resolve your problem.
**Approach 1.**
spark submit with `--jars` option,
Download the jar from the following Maven Repo to your machine, https://mvnrepository.com/artifact/com.amazon.deequ/deequ/1.0.4 to `~/Downloads/deequ-1.0.4.jar`
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --jars ~/Downloads/deequ-1.0.4.jar ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
**Approach 2.**
spark submit with `--packages` option,
./spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 --packages com.amazon.deequ:deequ:1.0.4 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
**Notes:**
1. The `--repositories` option is required only if some custom repository has to be referenced
By default the maven central repository is used if the `--repositories` option is not provided
When `--packages` option is specified, the submit operation tries to look for the packages and their dependencies in the `~/.ivy2/cache`, `~/.ivy2/jars`, `~/.m2/repository` directories.
If they are not found, then they are downloaded from maven central using ivy and stored under the `~/.ivy2` directory.
**Edit 1:**
**Approach 3:**
If the above solutions 1 & 2 is not working then use `maven-shade-plugin` to build the `uber jar` and proceed with the `spark-submit`.
use the below `pom.xml` file for building uber jar using `maven-shade-plugin`. adding the below pom and rebuild your jar and deploy it with `spark-submit`.
spark-submit --class com.deeq.CompareDataFrames --master
spark://saif-VirtualBox:7077 ~/Downloads/deeq-trial-1.0-SNAPSHOT.jar
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.deeq</groupId>
<artifactId>deeq-trial-1.0-SNAPSHOT</artifactId>
<version>1.0</version>
<name>Spark-3.0 Spark Application</name>
<url>https://maven.apache.org</url>
<repositories>
<repository>
<id>codelds</id>
<url>https://code.lds.org/nexus/content/groups/main-repo</url>
</repository>
<repository>
<id>central</id>
<name>Maven Repository Switchboard</name>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.12.8</scala.version>
<java.version>1.8</java.version>
<CodeCacheSize>512m</CodeCacheSize>
<es.version>2.4.6</es.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.amazon.deequ</groupId>
<artifactId>deequ</artifactId>
<version>1.0.4</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.13</artifactId>
<version>0.9.1</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>eclipse-add-source</id>
<goals>
<goal>add-source</goal>
</goals>
</execution>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile-first</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
<execution>
<id>attach-scaladocs</id>
<phase>verify</phase>
<goals>
<goal>doc-jar</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<recompileMode>incremental</recompileMode>
<useZincServer>true</useZincServer>
<args>
<arg>-unchecked</arg>
<arg>-deprecation</arg>
<arg>-feature</arg>
</args>
<jvmArgs>
<jvmArg>-Xms1024m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
<jvmArg>-XX:ReservedCodeCacheSize=${CodeCacheSize}</jvmArg>
</jvmArgs>
<javacArgs>
<javacArg>-source</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-target</javacArg>
<javacArg>${java.version}</javacArg>
<javacArg>-Xlint:all,-serial,-path</javacArg>
</javacArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.xerial.snappy</exclude>
<exclude>org.scala-lang.modules</exclude>
<exclude>org.scala-lang</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>:</artifact>
<excludes>
<exclude>META-INF/.SF</exclude>
<exclude>META-INF/.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<relocations>
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>shaded.com.google.common</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论