Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

huangapple go评论131阅读模式
英文:

Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

问题

我尝试运行Java Kafka消费者用于Spark,无论我做什么都会得到异常。在异常中,我看到(ConsumerStrategy.scala:85)。为什么这里会出现scala?这是否意味着它使用了Scala方法而不是Java?我的任何库是否发生冲突?

我的POM文件:

  1. <dependencies>
  2. <!-- 其他依赖项 -->
  3. <dependency>
  4. <groupId>org.apache.kafka</groupId>
  5. <artifactId>kafka_2.13</artifactId>
  6. <version>2.4.1</version>
  7. </dependency>
  8. </dependencies>

我的代码:

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.streaming.Durations;
  5. import org.apache.spark.streaming.api.java.JavaDStream;
  6. import org.apache.spark.streaming.api.java.JavaInputDStream;
  7. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  8. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
  9. import org.apache.spark.streaming.kafka010.KafkaUtils;
  10. import org.apache.spark.streaming.kafka010.LocationStrategies;
  11. import java.util.Arrays;
  12. import java.util.Collection;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. public class Main {
  16. public static void main(String[] args) throws InterruptedException {
  17. SparkConf sparkConf = new SparkConf();
  18. sparkConf.setAppName("kafkaTest");
  19. JavaStreamingContext streamingContext = new JavaStreamingContext(
  20. sparkConf, Durations.seconds(1));
  21. Map<String, Object> kafkaParams = new HashMap<String, Object>();
  22. kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
  23. kafkaParams.put("key.deserializer", StringDeserializer.class);
  24. kafkaParams.put("value.deserializer", StringDeserializer.class);
  25. kafkaParams.put("group.id", "spark_group1");
  26. kafkaParams.put("auto.offset.reset", "latest");
  27. kafkaParams.put("enable.auto.commit", false);
  28. kafkaParams.put("partition.assignment.strategy", "range");
  29. Collection<String> topics = Arrays.asList("spark");
  30. ConsumerStrategy<String, String> cons = ConsumerStrategies.Subscribe(topics, kafkaParams);
  31. JavaInputDStream<ConsumerRecord<String, String>> messages =
  32. KafkaUtils.createDirectStream(
  33. streamingContext,
  34. LocationStrategies.PreferConsistent(),
  35. cons);
  36. messages.foreachRDD(rdd -> {
  37. System.out.printf("Mssg received {}", rdd);
  38. });
  39. // 启动流式计算
  40. streamingContext.start();
  41. streamingContext.awaitTermination();
  42. }
  43. }

您运行的命令:

  1. spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

您得到的异常:

  1. Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
  2. at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
  3. // 更多堆栈信息

您已尝试的解决方法:

  • 设置SPARK_KAFKA_VERSION=0.10
  • 添加kafka-clients版本为0.10.2.1

仍然获得相同的结果。

(注意:上述内容仅为原文的翻译,不包含任何额外信息。)

英文:

I try to run java kafka consumer for spark and no matter what i do i get the exception. In the exception i see (ConsumerStrategy.scala:85) Why does it says scala here? does this mean that it it uses Scala methods instead of java? Are any of my libraries conflicting?

My pom

  1. &lt;dependencies&gt;
  2. &lt;dependency&gt;
  3. &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
  4. &lt;artifactId&gt;spark-core_2.11&lt;/artifactId&gt;
  5. &lt;version&gt;2.3.0&lt;/version&gt;
  6. &lt;scope&gt;provided&lt;/scope&gt;
  7. &lt;/dependency&gt;
  8. &lt;dependency&gt;
  9. &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
  10. &lt;artifactId&gt;spark-sql_2.11&lt;/artifactId&gt;
  11. &lt;version&gt;2.3.0&lt;/version&gt;
  12. &lt;scope&gt;provided&lt;/scope&gt;
  13. &lt;/dependency&gt;
  14. &lt;dependency&gt;
  15. &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
  16. &lt;artifactId&gt;spark-streaming_2.11&lt;/artifactId&gt;
  17. &lt;version&gt;2.3.0&lt;/version&gt;
  18. &lt;scope&gt;provided&lt;/scope&gt;
  19. &lt;/dependency&gt;
  20. &lt;dependency&gt;
  21. &lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
  22. &lt;artifactId&gt;spark-streaming-kafka-0-10_2.11&lt;/artifactId&gt;
  23. &lt;version&gt;2.4.5&lt;/version&gt;
  24. &lt;/dependency&gt;
  25. &lt;dependency&gt;
  26. &lt;groupId&gt;com.datastax.spark&lt;/groupId&gt;
  27. &lt;artifactId&gt;spark-cassandra-connector_2.11&lt;/artifactId&gt;
  28. &lt;version&gt;2.3.0&lt;/version&gt;
  29. &lt;/dependency&gt;
  30. &lt;dependency&gt;
  31. &lt;groupId&gt;com.datastax.spark&lt;/groupId&gt;
  32. &lt;artifactId&gt;spark-cassandra-connector-java_2.11&lt;/artifactId&gt;
  33. &lt;version&gt;1.5.2&lt;/version&gt;
  34. &lt;/dependency&gt;
  35. &lt;dependency&gt;
  36. &lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
  37. &lt;artifactId&gt;kafka_2.13&lt;/artifactId&gt;
  38. &lt;version&gt;2.4.1&lt;/version&gt;
  39. &lt;/dependency&gt;
  40. &lt;/dependencies&gt;

my code:

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.apache.kafka.common.serialization.StringDeserializer;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.streaming.Durations;
  5. import org.apache.spark.streaming.api.java.JavaDStream;
  6. import org.apache.spark.streaming.api.java.JavaInputDStream;
  7. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  8. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
  9. import org.apache.spark.streaming.kafka010.KafkaUtils;
  10. import org.apache.spark.streaming.kafka010.LocationStrategies;
  11. import java.util.Arrays;
  12. import java.util.Collection;
  13. import java.util.HashMap;
  14. import java.util.Map;
  15. public static void main(String[] args) throws InterruptedException {
  16. SparkConf sparkConf = new SparkConf();
  17. sparkConf.setAppName(&quot;kafkaTest&quot;);
  18. // sparkConf.set(&quot;spark.cassandra.connection.host&quot;, &quot;127.0.0.1&quot;);
  19. JavaStreamingContext streamingContext = new JavaStreamingContext(
  20. sparkConf, Durations.seconds(1));
  21. Map&lt;String, Object&gt; kafkaParams = new HashMap&lt;String, Object&gt;();
  22. kafkaParams.put(&quot;bootstrap.servers&quot;, &quot;kafka.kafka:9092&quot;);
  23. kafkaParams.put(&quot;key.deserializer&quot;, StringDeserializer.class);
  24. kafkaParams.put(&quot;value.deserializer&quot;, StringDeserializer.class);
  25. kafkaParams.put(&quot;group.id&quot;, &quot;spark_group1&quot;);
  26. kafkaParams.put(&quot;auto.offset.reset&quot;, &quot;latest&quot;);
  27. kafkaParams.put(&quot;enable.auto.commit&quot;, false);
  28. kafkaParams.put(&quot;partition.assignment.strategy&quot;, &quot;range&quot;);
  29. System.out.println(&quot;Hello1&quot;);
  30. Collection&lt;String&gt; topics = Arrays.asList(&quot;spark&quot;);
  31. System.out.println(&quot;Hello2&quot;);
  32. ConsumerStrategy&lt;String, String&gt; cons = ConsumerStrategies.Subscribe(topics, kafkaParams);
  33. JavaInputDStream&lt;ConsumerRecord&lt;String, String&gt;&gt; messages =
  34. KafkaUtils.createDirectStream(
  35. streamingContext,
  36. LocationStrategies.PreferConsistent(),
  37. cons);
  38. messages.foreachRDD(rdd -&gt; {
  39. System.out.printf(&quot;Mssg received {}&quot;, rdd);
  40. });

i ran it:

  1. spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

(also withouti tried --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar
and version 2.4.5 of this lib)

and get the exception

  1. Exception in thread &quot;streaming-start&quot; java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
  2. at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
  3. at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
  4. at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:259)
  5. at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
  6. at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
  7. at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
  8. at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
  9. at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
  10. at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
  11. at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
  12. at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
  13. at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
  14. at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
  15. at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
  16. at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
  17. at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
  18. at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  19. at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  20. at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  21. at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I tried export SPARK_KAFKA_VERSION=0.10
also tried adding adding kafka-clients 0.10.2.1

still get the same result.

答案1

得分: 1

问题是在Spark上还有另一个Kafka库。/opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar。为了覆盖它,您使用了Maven Shade插件。对我来说,没有其他方法可行。有关详细信息,请参阅此链接:https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d

英文:

the problem was that there is another kafka library on spark.
/opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar .
to overwrite it u used maven shade plugin. nothing else worked for me. see this link for details:
https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d

huangapple
  • 本文由 发表于 2020年4月8日 22:18:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/61102933.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定