如何在KafkaListener中更改DELIVERY_ATTEMPT失败之前的尝试次数?

huangapple go评论156阅读模式
英文:

How to change amount of DELIVERY_ATTEMPT before fail in KafkaListener?

问题

让我们假设我有以下的监听器:

  1. @KafkaListener(topics = "${some_topic}", autoStartup = "true")
  2. public void listenForMessage(String message) {
  3. log.warn("Accepted message");
  4. if (true) {
  5. throw new RuntimeException("some error");
  6. }
  7. }

我尝试发送消息到 ${some_topic} 主题并看到以下输出:

  1. 2023-06-15 10:00:03.311 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  2. 2023-06-15 10:00:03.381 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  3. ...
  4. (省略部分)
  5. ...
  6. 2023-06-15 10:00:03.509 ERROR 33580 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Records discarded: my-topic-0@33
  7. org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.****.listenForMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: some error; nested exception is java.lang.RuntimeException: Ololo
  8. ...
  9. (省略部分)
  10. ...

我们看到监听器尝试接受消息10次,在第10次失败后会打印错误(并发送确认,据我理解)。我想尝试调整这种行为。

我尝试实现以下错误处理器以将重试次数减少到3次:

  1. @Bean
  2. public KafkaListenerErrorHandler eh() {
  3. return (msg, ex) -> {
  4. if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 3) {
  5. return "FAILED";
  6. }
  7. throw ex;
  8. };
  9. }

并在监听器中引用它:

  1. @KafkaListener(topics = "${some_topic}", autoStartup = "true", errorHandler = "eh")
  2. public void listenForMessage(String message) {
  3. log.warn("Accepted message");
  4. if (true) {
  5. throw new RuntimeException("some error");
  6. }
  7. }

但是 msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) 抛出异常。所以行为仍然是相同的:尝试10次接受消息,然后我看到异常:

  1. ...
  2. Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null
  3. ...

据我了解,可以标记一些异常为可重试,而另一些不行。你能否提供一个配置示例?

英文:

Lets say I have the following listener:

  1. @KafkaListener(topics = "${some_topic}", autoStartup = "true")
  2. public void listenForMessage(String message) {
  3. log.warn("Accepted message");
  4. if (true) {
  5. throw new RuntimeException("some error");
  6. }
  7. }

I try to send message to the topic ${some_topic} and see following:

  1. 2023-06-15 10:00:03.311 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  2. 2023-06-15 10:00:03.381 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  3. 2023-06-15 10:00:03.399 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  4. 2023-06-15 10:00:03.414 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  5. 2023-06-15 10:00:03.430 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  6. 2023-06-15 10:00:03.445 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  7. 2023-06-15 10:00:03.460 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  8. 2023-06-15 10:00:03.477 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  9. 2023-06-15 10:00:03.492 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  10. 2023-06-15 10:00:03.509 WARN 33580 --- [ntainer#0-0-C-1] r.v.u.e.s.MyServiceImpl : Accepted message
  11. 2023-06-15 10:00:03.509 ERROR 33580 --- [ntainer#0-0-C-1] o.s.k.l.FallbackBatchErrorHandler : Records discarded: my-topic-0@33
  12. org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.****.listenForMessage(java.lang.String)' threw exception; nested exception is java.lang.RuntimeException: some error; nested exception is java.lang.RuntimeException: Ololo
  13. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2871) ~[spring-kafka-2.9.7.jar:2.9.7]
  14. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2407) ~[spring-kafka-2.9.7.jar:2.9.7]
  15. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchOnMessageWithRecordsOrList(KafkaMessageListenerContainer.java:2377) ~[spring-kafka-2.9.7.jar:2.9.7]
  16. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$invokeBatchErrorHandler$46(KafkaMessageListenerContainer.java:2419) ~[spring-kafka-2.9.7.jar:2.9.7]
  17. at org.springframework.kafka.listener.ErrorHandlingUtils.retryBatch(ErrorHandlingUtils.java:185) ~[spring-kafka-2.9.7.jar:2.9.7]
  18. at org.springframework.kafka.listener.FallbackBatchErrorHandler.handle(FallbackBatchErrorHandler.java:131) ~[spring-kafka-2.9.7.jar:2.9.7]
  19. at org.springframework.kafka.listener.ErrorHandlerAdapter.handleBatch(ErrorHandlerAdapter.java:160) ~[spring-kafka-2.9.7.jar:2.9.7]
  20. at org.springframework.kafka.listener.FailedBatchProcessor.fallback(FailedBatchProcessor.java:191) ~[spring-kafka-2.9.7.jar:2.9.7]
  21. at org.springframework.kafka.listener.FailedBatchProcessor.handle(FailedBatchProcessor.java:159) ~[spring-kafka-2.9.7.jar:2.9.7]
  22. at org.springframework.kafka.listener.FailedBatchProcessor.doHandle(FailedBatchProcessor.java:150) ~[spring-kafka-2.9.7.jar:2.9.7]
  23. at org.springframework.kafka.listener.DefaultErrorHandler.handleBatch(DefaultErrorHandler.java:182) ~[spring-kafka-2.9.7.jar:2.9.7]
  24. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchErrorHandler(KafkaMessageListenerContainer.java:2417) ~[spring-kafka-2.9.7.jar:2.9.7]
  25. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchListener(KafkaMessageListenerContainer.java:2234) ~[spring-kafka-2.9.7.jar:2.9.7]
  26. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeBatchListener(KafkaMessageListenerContainer.java:2095) ~[spring-kafka-2.9.7.jar:2.9.7]
  27. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2074) ~[spring-kafka-2.9.7.jar:2.9.7]
  28. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1429) ~[spring-kafka-2.9.7.jar:2.9.7]
  29. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1393) ~[spring-kafka-2.9.7.jar:2.9.7]
  30. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1290) ~[spring-kafka-2.9.7.jar:2.9.7]
  31. at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) ~[?:?]
  32. at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264) ~[?:?]
  33. at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:?]
  34. at java.lang.Thread.run(Thread.java:833) ~[?:?]
  35. Suppressed: org.springframework.kafka.listener.ListenerExecutionFailedException: Restored Stack Trace
  36. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:363) ~[spring-kafka-2.9.7.jar:2.9.7]
  37. at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.9.7.jar:2.9.7]
  38. at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.9.7.jar:2.9.7]
  39. at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.9.7.jar:2.9.7]
  40. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2387) ~[spring-kafka-2.9.7.jar:2.9.7]
  41. Caused by: java.lang.RuntimeException: some error
  42. at ***(MyServiceImpl.java:59) ~[classes/:?]
  43. at ***.listenForMessage(MyServiceImpl.java:39) ~[classes/:?]
  44. at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
  45. at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[?:?]
  46. at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
  47. at java.lang.reflect.Method.invoke(Method.java:568) ~[?:?]
  48. at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:169) ~[spring-messaging-5.3.26.jar:5.3.26]
  49. at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:119) ~[spring-messaging-5.3.26.jar:5.3.26]
  50. at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:56) ~[spring-kafka-2.9.7.jar:2.9.7]
  51. at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:339) ~[spring-kafka-2.9.7.jar:2.9.7]
  52. at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.invoke(BatchMessagingMessageListenerAdapter.java:180) ~[spring-kafka-2.9.7.jar:2.9.7]
  53. at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:172) ~[spring-kafka-2.9.7.jar:2.9.7]
  54. at org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter.onMessage(BatchMessagingMessageListenerAdapter.java:61) ~[spring-kafka-2.9.7.jar:2.9.7]
  55. at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeBatchOnMessage(KafkaMessageListenerContainer.java:2387) ~[spring-kafka-2.9.7.jar:2.9.7]

We see that listener tries to accept message 10 times and after fail #10 it prints error (and sends ack as far as I understand). I wanted to play with that behaviout

I tried to implement following errorHandler to decrease retry amount to 3:

  1. @Bean
  2. public KafkaListenerErrorHandler eh() {
  3. return (msg, ex) -> {
  4. if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 3) {
  5. return "FAILED";
  6. }
  7. throw ex;
  8. };
  9. }

and reference to it

  1. @KafkaListener(topics = "${some_topic}", autoStartup = "true", errorHandler = "eh")
  2. public void listenForMessage(String message) {
  3. log.warn("Accepted message");
  4. if (true) {
  5. throw new RuntimeException("some error");
  6. }
  7. }

But msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) throws exception. So the behaviour is the same: 10 attempts to accept and then I see exception:

  1. ...
  2. Caused by: java.lang.NullPointerException: Cannot invoke "java.lang.Integer.intValue()" because the return value of "org.springframework.messaging.MessageHeaders.get(Object, java.lang.Class)" is null

As far I understand there is a way to mark some exceptions retriable and some - not. Could you please provide example of configuration ?

答案1

得分: 1

请查看文档链接:https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

以及

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

使用不同的FixedBackOff添加DefaultErrorHandler(默认为9次重试,重试之间没有延迟)。

  1. DefaultErrorHandler errorHandler =
  2. new DefaultErrorHandler((record, exception) -> {
  3. // 在3次失败后恢复,没有延迟 - 例如,发送到死信主题
  4. }, new FixedBackOff(0L, 2L));

你需要启用传递尝试次数头,它不是默认启用的。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header

英文:

See the documentation: https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling

and

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

Add a DefaultErrorHandler with a different FixedBackOff (the default is 9 retries with no back off between attempts).

  1. DefaultErrorHandler errorHandler =
  2. new DefaultErrorHandler((record, exception) -> {
  3. // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
  4. }, new FixedBackOff(0L, 2L));

You have to enable the delivery attempts header, it is not enabled by default.

https://docs.spring.io/spring-kafka/docs/current/reference/html/#delivery-header

答案2

得分: 0

总结评论并添加示例:

DELIVERY_ATTEMPT 头部不起作用,因为您需要启用它 - 将容器属性 deliveryAttemptHeader 设置为 true(在此处查看文档:docs.spring.io/spring-kafka/reference/html/#delivery-header)。您需要创建一个 KafkaListenerContainerFactory bean:

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. @Bean
  5. KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
  6. kafkaListenerContainerFactory() {
  7. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  8. new ConcurrentKafkaListenerContainerFactory<>();
  9. factory.setConsumerFactory(consumerFactory());
  10. factory.getContainerProperties().setDeliveryAttemptHeader(true);
  11. return factory;
  12. }

在这里查看其他 bean 的示例(例如 consumerFactory()):https://docs.spring.io/spring-kafka/reference/html/#record-listener。然后在监听器中,您可以这样做:

  1. @KafkaListener(topics = "${some_topic}", autoStartup = "true", containerFactory = "kafkaListenerContainerFactory")

当然,还要检查 Gary Russell 的回答。

英文:

Summing up the comments + adding the example

The DELIVERY_ATTEMPT header doesn't work because you need to enable it - set the container property deliveryAttemptHeader to true (check here docs.spring.io/spring-kafka/reference/html/#delivery-header). You need to create a KafkaListenerContainerFactory bean:

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. @Bean
  5. KafkaListenerContainerFactory&lt;ConcurrentMessageListenerContainer&lt;Integer, String&gt;&gt;
  6. kafkaListenerContainerFactory() {
  7. ConcurrentKafkaListenerContainerFactory&lt;String, String&gt; factory =
  8. new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
  9. factory.setConsumerFactory(consumerFactory());
  10. factory.getContainerProperties().setDeliveryAttemptHeader(true);
  11. return factory;
  12. }

Check an example of the rest of the beans (e.g. consumerFactory()) here: https://docs.spring.io/spring-kafka/reference/html/#record-listener. And then in the listener, you do:

  1. @KafkaListener(topics = &quot;${some_topic}&quot;, autoStartup = &quot;true&quot;, containerFactory = &quot;kafkaListenerContainerFactory&quot;)

And of course, also check the answer by Gary Russell

huangapple
  • 本文由 发表于 2023年6月15日 15:50:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/76480241.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定