英文:
Kafka offset increments even without reaching Ack
问题
我有一个消费者,它消费一条消息,执行一些繁重的任务,然后进行确认操作。
@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory = "ContainerFactory")
public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {
try {
// 繁重的任务
ack.acknowledge();
} catch (Exception e) {
log("Kafka 消费者错误");
}
}
现在,如果出现异常,它应该进入 catch 块,并且不应进行确认操作,如果确认操作未完成,它应该回到队列中并被重新处理。但实际情况并非如此。偏移量会更新,并且会选择下一条消息。
我理解消费者有一个轮询大小,使其可以一次选择多条消息。但即使一条消息没有被确认,它也应该重新处理它,而不是忽略它并更新偏移量。
以下是 Kafka 消费者配置:
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
英文:
I have a consumer which consumes a message, does some heavy job and then acks.
@KafkaListener(topics = "${kafka.topic}", groupId = "group", containerFactory ="ContainerFactory")
public void consumeMessage(@Payload Payload message, @Headers MessageHeaders headers, Acknowledgment ack) {
try {
//Heavy Job
ack.acknowledge();
} catch (Exception e) {
log("Error in Kafka Consumer);
}
}
Now if there is an exception, it should go to the catch block and Acknowledgement should not happen and if acknowledgement did not happen it should go back to the queue and be processed again. But it is not happening. The offset updates and the next message is picked.
I understand that there is a poll size of the consumer which enables it to pick more than one message at a time. But even if one message is not acked it should reprocess it instead of ignoring it and updating the offset.
Here is the Kafka Consumer Config
`Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 20000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
答案1
得分: 2
这是底层 KafkaConsumer
的预期行为。
在内部,KafkaConsumer 使用了在 JavaDocs 中描述的 poll
API,如下所示:
> “在每次轮询中,消费者将尝试使用上次消费的偏移量作为起始偏移量并按顺序获取数据。上次消费的偏移量可以通过 seek(TopicPartition, long)
手动设置,或者自动设置为订阅的分区的最后提交偏移量。”
这意味着它不会检查最后一次“提交”的偏移量,而是检查最后一次“消费”的偏移量,然后按顺序获取数据。只有在重新启动作业时,它才会继续从该消费者组的最后提交偏移量读取,或者如果您使用基于 auto_offset_reset
配置的新消费者组。
为了解决您的问题,在 catch 块中,我看到以下几种解决方案可供您应用:
- 不仅记录“Kafka 消费者中的错误”,而是使作业关闭。修复代码,然后重新启动应用程序。
- 使用导致异常的偏移量编号,通过
seek
API 将您的消费者重新定位到相同的偏移量。有关 seek 方法的详细信息可以在 此处 找到。
英文:
This is the expected behaviour of the underlying KafkaConsumer
.
Under the covers, the KafkaConsumer uses the poll
API which is described in the JavaDocs as:
> "On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions."
This means, it does not check for the last committed offsets but rather for the last consumed offsets and then fetches the data sequentially. Only when re-starting your job it will continue reading from the last committed offset for that consumer group or if you use a new consumer group based on the auto_offset_reset
configuration.
To solve your problem, I see the following solutions that you can apply in the catch block:
- Instead of just logging the "Error in Kafka Consumer" make your job shut down. Fix the code and re-start your application
- Use the offset number (that caused the Exception) to re-position your consumer to the same offset again using the
seek
API. Details on seek method can be found here
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论