英文:
Kafka Consumer - How can I do Replay/Get the same message from kafka broker in case my service is down/some exception occurred
问题
从我现在的理解来看,当我将“auto commit”设置为false并将“ack mode”设置为“manual immediate”时,如果发生任何异常,我将不会向代理程序确认。因此,当我使用相同的“consumergroupid”重新启动我的消费者进程时,我需要重新读取消息,因为我没有确认。
@KafkaListener(topics = "testtopic", groupId = "testgroupID")
public void listenGroupFoo(String message, Acknowledgment acknowledgment,
@Header(KafkaHeaders.OFFSET) int offsets,
@Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
//执行一些操作,比如插入到数据库或
//调用某些下游进程,一旦这些进程完成
//我想通过实现 acknowledgment.acknowledge(); 来通知Kafka代理我已经处理完毕
}
ConsumerFactory 配置是:
enable.auto.commit = false
auto.offset.reset = earliest
而 ConcurrentKafkaListenerContainerFactory 的配置是:
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
英文:
what's my understanding is as of now when I set auto commit to false
and ack mode to manual immediate
then any exception occurs. I'm not going to ack to broker. So when I restart my consumer process with same consumergroupid
I need to read the message again since I didn't ack.
@KafkaListener(topics = "testtopic", groupId = "testgroupID")
public void listenGroupFoo(String message, Acknowledgment acknowledgment,
@Header(KafkaHeaders.OFFSET) int offsets,
@Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
//performing some process like insert to DB or
calling some downstream process, once this processes is done
I would like to inform kafka broker that I'm done with processing
by implementing acknowledgment.acknowledge(); for each individual message
one by one
}
and ConsumerFactory config is
enable.auto.commit = false
auto.offset.reset = earliest
and ConcurrentKafkaListenerContainerFactory config is
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
答案1
得分: 1
这正是将会发生的情况,除非你确认分区中的下一条记录;在这种情况下,失败的记录将被跳过;Kafka只保留一个“最后”提交的偏移量。
如果对于失败的记录抛出异常,默认情况下,框架将倒回分区并重新传递失败的记录,最多9次;请参阅处理异常。
英文:
That is exactly what will happen, unless you ack the next record in the partition; in that case the failed record will be skipped; Kafka only keeps a "last" committed offset.
If you throw an exception for the failed record, by default, the framework will rewind the partition and redeliver the failed record up to 9 times; see Handling Exceptions.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论