英文:
How to handler errors/exceptions while using Spring Kafka framework?
问题
我无法找到如何在Spring Kafka消费者中进行自定义错误处理。
我的要求是:
- 对于任何反序列化错误,只需将错误和消息写入数据库。
- 对于
@KafkaListener
方法执行中的任何错误,重试3次,然后将错误和消息写入数据库。
根据Spring文档,我发现:
对于第一点,我需要使用ErrorHandlingDeserializer
,然后它会调用@KafkaListener
的错误处理程序。
对于第二点,框架提供了SeekToCurrentErrorHandler
来处理消息的重试。
我不明白在哪里可以添加代码,除了启用配置的重试之外,还要将异常/消息写入数据库。
英文:
I am not able to find how to do custom error handling in for a spring kafka consumer.
My requirement is:
- For any deserialization errors, just write the error and message to database.
- For any errors in execution under
@KafkaListener
method, retry 3 times and then write the error and message to database.
From the spring docs, I found that,
For 1, I will have to use ErrorHandlingDeserializer
and it will then call the @KafkaListener error handler.
For 2, framework provides SeekToCurrentErrorHandler
which handles message retries.
I am not understanding where can I add the code to write the exception/message to database in addition to enable configured retries.
答案1
得分: 1
将一个recoverer添加到SeekToCurrentErrorHandler
new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}
}, new FixedBackOff(2000L, 2L));
默认情况下,不会对反序列化异常进行重试;而对于大多数其他异常,在调用恢复程序之前会进行重试。
英文:
Add a recoverer to the SeekToCurrentErrorHandler
new SeekToCurrentErrorHandler((rec, ex) -> {
Throwable cause = ex.getCause();
if (cause instanceof DeserializationException) {
...
}
else {
...
}, new FixedBackOff(2000L, 2L));
By default, deserialization exceptions are not retried; most others are retried before calling the recoverer.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论