Kafka Consumer – How can I do Replay/Get the same message from kafka broker in case my service is down/some exception occurred

huangapple go评论83阅读模式
英文:

Kafka Consumer - How can I do Replay/Get the same message from kafka broker in case my service is down/some exception occurred

问题

从我现在的理解来看,当我将“auto commit”设置为false并将“ack mode”设置为“manual immediate”时,如果发生任何异常,我将不会向代理程序确认。因此,当我使用相同的“consumergroupid”重新启动我的消费者进程时,我需要重新读取消息,因为我没有确认。

 @KafkaListener(topics = "testtopic", groupId = "testgroupID")
    public void listenGroupFoo(String message, Acknowledgment acknowledgment,
                               @Header(KafkaHeaders.OFFSET) int offsets,
                               @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
                               @Payload String payload,
                               @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                               @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                               @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
                               ) {
                    
                     //执行一些操作,比如插入到数据库或
                     //调用某些下游进程,一旦这些进程完成
                     //我想通过实现 acknowledgment.acknowledge(); 来通知Kafka代理我已经处理完毕
    }

ConsumerFactory 配置是:

enable.auto.commit = false
auto.offset.reset = earliest

ConcurrentKafkaListenerContainerFactory 的配置是:

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
英文:

what's my understanding is as of now when I set auto commit to false and ack mode to manual immediate then any exception occurs. I'm not going to ack to broker. So when I restart my consumer process with same consumergroupid I need to read the message again since I didn't ack.

 @KafkaListener(topics = "testtopic", groupId = "testgroupID")
    public void listenGroupFoo(String message, Acknowledgment acknowledgment,
                               @Header(KafkaHeaders.OFFSET) int offsets,
                               @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
                               @Payload String payload,
                               @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                               @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                               @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
                               ) {
                    
                     //performing some process like insert to DB or 
                     calling some downstream process, once this processes is done 
                     I would like to inform kafka broker that I'm done with processing 
                     by implementing  acknowledgment.acknowledge(); for each individual message
                     one by one
    }

and ConsumerFactory config is

enable.auto.commit = false
auto.offset.reset = earliest

and ConcurrentKafkaListenerContainerFactory config is

factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

答案1

得分: 1

这正是将会发生的情况,除非你确认分区中的下一条记录;在这种情况下,失败的记录将被跳过;Kafka只保留一个“最后”提交的偏移量。

如果对于失败的记录抛出异常,默认情况下,框架将倒回分区并重新传递失败的记录,最多9次;请参阅处理异常

英文:

That is exactly what will happen, unless you ack the next record in the partition; in that case the failed record will be skipped; Kafka only keeps a "last" committed offset.

If you throw an exception for the failed record, by default, the framework will rewind the partition and redeliver the failed record up to 9 times; see Handling Exceptions.

huangapple
  • 本文由 发表于 2023年5月25日 07:31:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76327993.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定