英文:
Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)
问题
我尝试运行Java Kafka消费者用于Spark,无论我做什么都会得到异常。在异常中,我看到(ConsumerStrategy.scala:85)。为什么这里会出现scala?这是否意味着它使用了Scala方法而不是Java?我的任何库是否发生冲突?
我的POM文件:
<dependencies>
<!-- 其他依赖项 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
我的代码:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public class Main {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("kafkaTest");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark_group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("partition.assignment.strategy", "range");
Collection<String> topics = Arrays.asList("spark");
ConsumerStrategy<String, String> cons = ConsumerStrategies.Subscribe(topics, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
cons);
messages.foreachRDD(rdd -> {
System.out.printf("Mssg received {}", rdd);
});
// 启动流式计算
streamingContext.start();
streamingContext.awaitTermination();
}
}
您运行的命令:
spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar
您得到的异常:
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
// 更多堆栈信息
您已尝试的解决方法:
- 设置
SPARK_KAFKA_VERSION=0.10
- 添加
kafka-clients
版本为0.10.2.1
仍然获得相同的结果。
(注意:上述内容仅为原文的翻译,不包含任何额外信息。)
英文:
I try to run java kafka consumer for spark and no matter what i do i get the exception. In the exception i see (ConsumerStrategy.scala:85) Why does it says scala here? does this mean that it it uses Scala methods instead of java? Are any of my libraries conflicting?
My pom
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.5</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
my code:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("kafkaTest");
// sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "spark_group1");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("partition.assignment.strategy", "range");
System.out.println("Hello1");
Collection<String> topics = Arrays.asList("spark");
System.out.println("Hello2");
ConsumerStrategy<String, String> cons = ConsumerStrategies.Subscribe(topics, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
cons);
messages.foreachRDD(rdd -> {
System.out.printf("Mssg received {}", rdd);
});
i ran it:
spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar
(also withouti tried --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar
and version 2.4.5 of this lib)
and get the exception
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:259)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I tried export SPARK_KAFKA_VERSION=0.10
also tried adding adding kafka-clients 0.10.2.1
still get the same result.
答案1
得分: 1
问题是在Spark上还有另一个Kafka库。/opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar。为了覆盖它,您使用了Maven Shade插件。对我来说,没有其他方法可行。有关详细信息,请参阅此链接:https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d
英文:
the problem was that there is another kafka library on spark.
/opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar .
to overwrite it u used maven shade plugin. nothing else worked for me. see this link for details:
https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论