Kafka Consumer – How can I do Replay/Get the same message from kafka broker in case my service is down/some exception occurred

huangapple go评论130阅读模式
英文:

Kafka Consumer - How can I do Replay/Get the same message from kafka broker in case my service is down/some exception occurred

问题

从我现在的理解来看,当我将“auto commit”设置为false并将“ack mode”设置为“manual immediate”时,如果发生任何异常,我将不会向代理程序确认。因此,当我使用相同的“consumergroupid”重新启动我的消费者进程时,我需要重新读取消息,因为我没有确认。

  1. @KafkaListener(topics = "testtopic", groupId = "testgroupID")
  2. public void listenGroupFoo(String message, Acknowledgment acknowledgment,
  3. @Header(KafkaHeaders.OFFSET) int offsets,
  4. @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
  5. @Payload String payload,
  6. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  7. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
  8. @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
  9. ) {
  10. //执行一些操作,比如插入到数据库或
  11. //调用某些下游进程,一旦这些进程完成
  12. //我想通过实现 acknowledgment.acknowledge(); 来通知Kafka代理我已经处理完毕
  13. }

ConsumerFactory 配置是:

  1. enable.auto.commit = false
  2. auto.offset.reset = earliest

ConcurrentKafkaListenerContainerFactory 的配置是:

  1. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
英文:

what's my understanding is as of now when I set auto commit to false and ack mode to manual immediate then any exception occurs. I'm not going to ack to broker. So when I restart my consumer process with same consumergroupid I need to read the message again since I didn't ack.

  1. @KafkaListener(topics = "testtopic", groupId = "testgroupID")
  2. public void listenGroupFoo(String message, Acknowledgment acknowledgment,
  3. @Header(KafkaHeaders.OFFSET) int offsets,
  4. @Header(value = KafkaHeaders.RECEIVED_MESSAGE_KEY, required =false) Integer key,
  5. @Payload String payload,
  6. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  7. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
  8. @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
  9. ) {
  10. //performing some process like insert to DB or
  11. calling some downstream process, once this processes is done
  12. I would like to inform kafka broker that I'm done with processing
  13. by implementing acknowledgment.acknowledge(); for each individual message
  14. one by one
  15. }

and ConsumerFactory config is

  1. enable.auto.commit = false
  2. auto.offset.reset = earliest

and ConcurrentKafkaListenerContainerFactory config is

  1. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

答案1

得分: 1

这正是将会发生的情况,除非你确认分区中的下一条记录;在这种情况下,失败的记录将被跳过;Kafka只保留一个“最后”提交的偏移量。

如果对于失败的记录抛出异常,默认情况下,框架将倒回分区并重新传递失败的记录,最多9次;请参阅处理异常

英文:

That is exactly what will happen, unless you ack the next record in the partition; in that case the failed record will be skipped; Kafka only keeps a "last" committed offset.

If you throw an exception for the failed record, by default, the framework will rewind the partition and redeliver the failed record up to 9 times; see Handling Exceptions.

huangapple
  • 本文由 发表于 2023年5月25日 07:31:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/76327993.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定