英文:
How to make spark streaming commit in each batch when limiting Kafka batch size?
问题
为了在使用Spark流处理时限制批处理大小,我参考了这个答案。
Kafka中大约有5000万条记录(即将被消耗)。该主题有3个分区。
我的消费者应用程序:
public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "device1:9092,device2:9092,device3:9092";
String groupId = "spark";
String topics = "zhihu_comment";
// 创建带有特定秒批次间隔的上下文
SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("max.poll.records", "500");
// 使用经纪人和主题创建直接的Kafka流
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// 获取行,将其拆分为单词,计算单词并打印
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
lines.count().print();
jssc.start();
jssc.awaitTermination();
}
}
我已经限制了Spark流处理的消耗大小,在我的情况下,我将maxRatePerPartition
设置为10000,这意味着在我的情况下每批次消耗300000条记录。
问题是,尽管Spark流处理能够处理具有特定限制的记录,但Kafka显示的当前偏移量不是Spark流处理正在处理的偏移量。由于Kafka的当前偏移突然下降到最新偏移:
zhihu_comment 0 28700537 28700676 139 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 1 30305102 30305224 122 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 2 28695033 28695146 113 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
似乎Spark流处理不会在每个批次中提交偏移量,它会在开始消耗时提交最新的偏移量!
有没有办法使Spark流处理在每个批次中提交偏移量?
Spark流处理日志,证明了它每批次消耗的记录数:
20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000
20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms
英文:
To limit the batch size when using Spark streaming, I referenced this answer
There is about 50 millions records stocking (about to be consumed) in Kafka.
The topic is with 3 partitions.
zhihu_comment 0 10906153 28668062 17761909 - - -
zhihu_comment 1 10972464 30271728 19299264 - - -
zhihu_comment 2 10906395 28662007 17755612 - - -
My consumer app:
public final class SparkConsumer {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
String brokers = "device1:9092,device2:9092,device3:9092";
String groupId = "spark";
String topics = "zhihu_comment";
// Create context with a certain seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
sparkConf.set("spark.streaming.backpressure.enabled", "true");
sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
kafkaParams.put("enable.auto.commit", true);
kafkaParams.put("max.poll.records", "500");
// Create direct kafka stream with brokers and topics
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topicsSet, kafkaParams));
// Get the lines, split them into words, count the words and print
JavaDStream<String> lines = messages.map(ConsumerRecord::value);
lines.count().print();
jssc.start();
jssc.awaitTermination();
}
}
I have limited the consuming size of spark streaming, in my case, I set maxRatePerPartition
to 10000, which means it consumed 300000 records per batch in my case.
The question is although spark streaming is able to handle records with specific limit, the current offset showing by kafka is not the offset that spark streaming is handling. As the kafka's current offset suddenly goes down to latest offset
:
zhihu_comment 0 28700537 28700676 139 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 1 30305102 30305224 122 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
zhihu_comment 2 28695033 28695146 113 consumer-1-ddcb0abd-e206-470d-925a-63ca4dc1d62a /192.168.0.102 consumer-1
It appears that Spark streaming does not commit the offset in each batch, it commits the latest offset at the beginning when it starts to consume!
Is there any way to make spark streaming commit with each batch?
Spark streaming log, proving the records num it consumed each batch:
20/05/04 22:28:13 INFO scheduler.DAGScheduler: Job 15 finished: print at SparkConsumer.java:65, took 0.012606 s
-------------------------------------------
Time: 1588602490000 ms
-------------------------------------------
300000
20/05/04 22:28:13 INFO scheduler.JobScheduler: Finished job streaming job 1588602490000 ms.0 from job set of time 1588602490000 ms
答案1
得分: 2
你需要禁用
kafkaParams.put("enable.auto.commit", false);
而是使用
messages.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// 在这里对rdd进行一些转换和操作,通常会像这样:
rdd.foreachPartition(it -> {
it.foreach(row -> ...)
})
// 提交消息的偏移量
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});
如Spark + Kafka集成指南中所述。
您还可以使用commitSync
进行同步提交。
英文:
You need to disable
kafkaParams.put("enable.auto.commit", false);
and rather use
messages.foreachRDD(rdd -> {
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// do here some transformations and action on the rdd, typically like:
rdd.foreachPartition(it -> {
it.foreach(row -> ...)
})
// commit messages
((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
});
as described in the Spark + Kafka Integration Guide.
You could also use commitSync
for synchronous commits.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论