如何在限制Kafka批次大小时使Spark Streaming在每个批次中进行提交?

huangapple go评论74阅读模式
英文:

How to make spark streaming commit in each batch when limiting Kafka batch size?

问题

为了在使用Spark流处理时限制批处理大小,我参考了这个答案

Kafka中大约有5000万条记录(即将被消耗)。该主题有3个分区。

我的消费者应用程序:

public final class SparkConsumer {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    String brokers = "device1:9092,device2:9092,device3:9092";
    String groupId = "spark";
    String topics = "zhihu_comment";

    // 创建带有特定秒批次间隔的上下文
    SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
    sparkConf.set("spark.streaming.backpressure.enabled", "true");
    sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

    kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    kafkaParams.put("enable.auto.commit", true);
    kafkaParams.put("max.poll.records", "500");

    // 使用经纪人和主题创建直接的Kafka流
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    // 获取行,将其拆分为单词,计算单词并打印
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);
    lines.count().print();

    jssc.start();
    jssc.awaitTermination();
  }
}

我已经限制了Spark流处理的消耗大小,在我的情况下,我将maxRatePerPartition设置为10000,这意味着在我的情况下每批次消耗300000条记录。

问题是,尽管Spark流处理能够处理具有特定限制的记录,但Kafka显示的当前偏移量不是Spark流处理正在处理的偏移量。由于Kafka的当前偏移突然下降到最新偏移:

zhihu_comment   0          28700537        28700676        139             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   1          30305102        30305224        122             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   2          28695033        28695146        113             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1

似乎Spark流处理不会在每个批次中提交偏移量,它会在开始消耗时提交最新的偏移量!

有没有办法使Spark流处理在每个批次中提交偏移量?

Spark流处理日志,证明了它每批次消耗的记录数:

20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000

20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms
英文:

To limit the batch size when using Spark streaming, I referenced this answer

There is about 50 millions records stocking (about to be consumed) in Kafka.
The topic is with 3 partitions.

zhihu_comment   0          10906153        28668062        17761909        -               -               -
zhihu_comment   1          10972464        30271728        19299264        -               -               -
zhihu_comment   2          10906395        28662007        17755612        -               -               -

My consumer app:

public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(&quot; &quot;);
public static void main(String[] args) throws Exception {
String brokers = &quot;device1:9092,device2:9092,device3:9092&quot;;
String groupId = &quot;spark&quot;;
String topics = &quot;zhihu_comment&quot;;
// Create context with a certain seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName(&quot;TestKafkaStreaming&quot;);
sparkConf.set(&quot;spark.streaming.backpressure.enabled&quot;, &quot;true&quot;);
sparkConf.set(&quot;spark.streaming.backpressure.initialRate&quot;, &quot;10000&quot;);
sparkConf.set(&quot;spark.streaming.kafka.maxRatePerPartition&quot;, &quot;10000&quot;);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set&lt;String&gt; topicsSet = new HashSet&lt;&gt;(Arrays.asList(topics.split(&quot;,&quot;)));
Map&lt;String, Object&gt; kafkaParams = new HashMap&lt;&gt;();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &quot;earliest&quot;);
kafkaParams.put(&quot;enable.auto.commit&quot;, true);
kafkaParams.put(&quot;max.poll.records&quot;, &quot;500&quot;);
// Create direct kafka stream with brokers and topics
JavaInputDStream&lt;ConsumerRecord&lt;String, String&gt;&gt; messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream&lt;String&gt; lines = messages.map(ConsumerRecord::value);
lines.count().print();
jssc.start();
jssc.awaitTermination();
}
}

I have limited the consuming size of spark streaming, in my case, I set maxRatePerPartition to 10000, which means it consumed 300000 records per batch in my case.

The question is although spark streaming is able to handle records with specific limit, the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka&#39;s current offset suddenly goes down to latest offset:

zhihu_comment   0          28700537        28700676        139             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   1          30305102        30305224        122             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1
zhihu_comment   2          28695033        28695146        113             consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102  consumer-1

It appears that Spark streaming does not commit the offset in each batch, it commits the latest offset at the beginning when it starts to consume!

Is there any way to make spark streaming commit with each batch?

Spark streaming log, proving the records num it consumed each batch:

20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000
20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms

答案1

得分: 2

你需要禁用

kafkaParams.put("enable.auto.commit", false);

而是使用

messages.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // 在这里对rdd进行一些转换和操作,通常会像这样:
  rdd.foreachPartition(it -> {
    it.foreach(row -> ...)
  })

  // 提交消息的偏移量
  ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});

Spark + Kafka集成指南中所述。

您还可以使用commitSync进行同步提交。

英文:

You need to disable

kafkaParams.put(&quot;enable.auto.commit&quot;, false);

and rather use

messages.foreachRDD(rdd -&gt; {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

  // do here some transformations and action on the rdd, typically like:
  rdd.foreachPartition(it -&gt; {
    it.foreach(row -&gt; ...)
  })

  // commit messages
  ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});

as described in the Spark + Kafka Integration Guide.

You could also use commitSync for synchronous commits.

huangapple
  • 本文由 发表于 2020年5月4日 22:55:10
  • 转载请务必保留本文链接:https://go.coder-hub.com/61595171.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定