Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

huangapple go评论56阅读模式
英文:

Spark with kafka: NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)

问题

我尝试运行Java Kafka消费者用于Spark,无论我做什么都会得到异常。在异常中,我看到(ConsumerStrategy.scala:85)。为什么这里会出现scala?这是否意味着它使用了Scala方法而不是Java?我的任何库是否发生冲突?

我的POM文件:

<dependencies>
    <!-- 其他依赖项 -->
    <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka_2.13</artifactId>
       <version>2.4.1</version>
    </dependency>
</dependencies>

我的代码:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("kafkaTest");

        JavaStreamingContext streamingContext = new JavaStreamingContext(
                sparkConf, Durations.seconds(1));

        Map<String, Object> kafkaParams = new HashMap<String, Object>();
        kafkaParams.put("bootstrap.servers", "kafka.kafka:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "spark_group1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        kafkaParams.put("partition.assignment.strategy", "range");

        Collection<String> topics = Arrays.asList("spark");
        ConsumerStrategy<String, String> cons = ConsumerStrategies.Subscribe(topics, kafkaParams);

        JavaInputDStream<ConsumerRecord<String, String>> messages =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        cons);

        messages.foreachRDD(rdd -> {
            System.out.printf("Mssg received {}", rdd);
        });

        // 启动流式计算
        streamingContext.start();
        streamingContext.awaitTermination();
    }
}

您运行的命令:

spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

您得到的异常:

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
    at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
    // 更多堆栈信息

您已尝试的解决方法:

  • 设置SPARK_KAFKA_VERSION=0.10
  • 添加kafka-clients版本为0.10.2.1

仍然获得相同的结果。

(注意:上述内容仅为原文的翻译,不包含任何额外信息。)

英文:

I try to run java kafka consumer for spark and no matter what i do i get the exception. In the exception i see (ConsumerStrategy.scala:85) Why does it says scala here? does this mean that it it uses Scala methods instead of java? Are any of my libraries conflicting?

My pom

&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-core_2.11&lt;/artifactId&gt;
&lt;version&gt;2.3.0&lt;/version&gt;
&lt;scope&gt;provided&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-sql_2.11&lt;/artifactId&gt;
&lt;version&gt;2.3.0&lt;/version&gt;
&lt;scope&gt;provided&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-streaming_2.11&lt;/artifactId&gt;
&lt;version&gt;2.3.0&lt;/version&gt;
&lt;scope&gt;provided&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-streaming-kafka-0-10_2.11&lt;/artifactId&gt;
&lt;version&gt;2.4.5&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.datastax.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-cassandra-connector_2.11&lt;/artifactId&gt;
&lt;version&gt;2.3.0&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;com.datastax.spark&lt;/groupId&gt;
&lt;artifactId&gt;spark-cassandra-connector-java_2.11&lt;/artifactId&gt;
&lt;version&gt;1.5.2&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.apache.kafka&lt;/groupId&gt;
&lt;artifactId&gt;kafka_2.13&lt;/artifactId&gt;
&lt;version&gt;2.4.1&lt;/version&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;

my code:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName(&quot;kafkaTest&quot;);
// sparkConf.set(&quot;spark.cassandra.connection.host&quot;, &quot;127.0.0.1&quot;);
JavaStreamingContext streamingContext = new JavaStreamingContext(
sparkConf, Durations.seconds(1));
Map&lt;String, Object&gt; kafkaParams = new HashMap&lt;String, Object&gt;();
kafkaParams.put(&quot;bootstrap.servers&quot;, &quot;kafka.kafka:9092&quot;);
kafkaParams.put(&quot;key.deserializer&quot;, StringDeserializer.class);
kafkaParams.put(&quot;value.deserializer&quot;, StringDeserializer.class);
kafkaParams.put(&quot;group.id&quot;, &quot;spark_group1&quot;);
kafkaParams.put(&quot;auto.offset.reset&quot;, &quot;latest&quot;);
kafkaParams.put(&quot;enable.auto.commit&quot;, false);
kafkaParams.put(&quot;partition.assignment.strategy&quot;, &quot;range&quot;);
System.out.println(&quot;Hello1&quot;);
Collection&lt;String&gt; topics = Arrays.asList(&quot;spark&quot;);
System.out.println(&quot;Hello2&quot;);
ConsumerStrategy&lt;String, String&gt; cons = ConsumerStrategies.Subscribe(topics, kafkaParams);
JavaInputDStream&lt;ConsumerRecord&lt;String, String&gt;&gt; messages =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
cons);
messages.foreachRDD(rdd -&gt; {
System.out.printf(&quot;Mssg received {}&quot;, rdd);
});

i ran it:

spark-submit --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar --class Main spark-kafka-1.0-SNAPSHOT-jar-with-dependencies.jar

(also withouti tried --jars spark-streaming-kafka-0-10_2.11-2.3.0.jar
and version 2.4.5 of this lib)

and get the exception

Exception in thread &quot;streaming-start&quot; java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:85)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:73)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:259)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I tried export SPARK_KAFKA_VERSION=0.10
also tried adding adding kafka-clients 0.10.2.1

still get the same result.

答案1

得分: 1

问题是在Spark上还有另一个Kafka库。/opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar。为了覆盖它,您使用了Maven Shade插件。对我来说,没有其他方法可行。有关详细信息,请参阅此链接:https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d

英文:

the problem was that there is another kafka library on spark.
/opt/hadoop/share/hadoop/tools/lib/kafka-clients-0.8.2.1.jar .
to overwrite it u used maven shade plugin. nothing else worked for me. see this link for details:
https://medium.com/@minyodev/relocating-classes-using-apache-maven-shade-plugin-6957a1a8666d

huangapple
  • 本文由 发表于 2020年4月8日 22:18:47
  • 转载请务必保留本文链接:https://go.coder-hub.com/61102933.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定