英文:
Better way of error handling in Kafka Consumer
问题
我有一个配置了spring-kafka的Springboot应用程序,我想要处理在监听主题时可能发生的各种错误。如果由于反序列化或任何其他异常而错过了任何消息/无法消费,将进行2次重试,之后消息应记录到错误文件中。我有两种可行的方法可以遵循:-
第一种方法(使用SeekToCurrentErrorHandler和DeadLetterPublishingRecoverer):-
@Autowired
KafkaTemplate<String, Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
But for this we require addition topic(a new .DLT topic) and then we can log it to a file.
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
方法2(使用自定义SeekToCurrentErrorHandler):-
@Bean
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = 2;
CustomSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
}
}
}
有人可以提供他们对实施这种功能的标准方法的建议吗?在第一种方法中,我们确实看到了创建.DLT主题和额外的@KafkaListener的开销。在第二种方法中,我们可以直接记录我们的消费者记录异常。
英文:
I have a Springboot app configured with spring-kafka where I want to handle all sorts of error that can happen while listening to a topic. If any message is missed / not able to be consumed because of either Deserialization or any other Exception, there will be 2 retries and after which the message should be logged to an error file. I have two approaches that can be followed :-
First Approach( Using SeekToCurrentErrorHandler with DeadLetterPublishingRecoverer):-
@Autowired
KafkaTemplate<String,Object> template;
@Bean(name = "kafkaSourceProvider")
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".DLT", r.partition());
}
});
ErrorHandler errorHandler = new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
factory.setErrorHandler(errorHandler);
return factory;
}
But for this we require addition topic(a new .DLT topic) and then we can log it to a file.
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
return new KafkaAdmin(configs);
}
@KafkaListener( topics = MY_TOPIC + ".DLT", groupId = MY_ID)
public void listenDlt(ConsumerRecord<String, SomeClassName> consumerRecord,
@Header(KafkaHeaders.DLT_EXCEPTION_STACKTRACE) String exceptionStackTrace) {
logger.error(exceptionStackTrace);
}
Approach 2 ( Using custom SeekToCurrentErrorHandler) :-
@Bean
public ConcurrentKafkaListenerContainerFactory<K, V> consumerFactory() {
Map<String, Object> config = appProperties.getSource()
.getProperties();
ConcurrentKafkaListenerContainerFactory<K, V> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setErrorHandler(new CustomSeekToCurrentErrorHandler());
factory.setRetryTemplate(retryTemplate());
return factory;
}
private RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setBackOffPolicy(backOffPolicy());
retryTemplate.setRetryPolicy(aSimpleReturnPolicy);
}
public class CustomSeekToCurrentErrorHandler extends SeekToCurrentErrorHandler {
private static final int MAX_RETRY_ATTEMPTS = 2;
CustomSeekToCurrentErrorHandler() {
super(MAX_RETRY_ATTEMPTS);
}
@Override
public void handle(Exception exception, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
try {
if (!records.isEmpty()) {
log.warn("Exception: {} occurred with message: {}", exception, exception.getMessage());
super.handle(exception, records, consumer, container);
}
} catch (SerializationException e) {
log.warn("Exception: {} occurred with message: {}", e, e.getMessage());
}
}
}
Can anyone provide their suggestions on what's the standard way to implement this kind of feature. In first approach we do see an overhead of creation of .DLT topics and an additional @KafkaListener. In second approach, we can directly log our consumer record exception.
答案1
得分: 5
使用第一种方法,不需要使用DeadLetterPublishingRecoverer
,你可以使用任何你想要的ConsumerRecordRecoverer
;实际上,默认的恢复器只是记录了失败的消息。
/**
* 使用默认的恢复器构造一个实例,该恢复器在BackOff返回STOP后记录记录。
* @param backOff {@link BackOff}。
* @since 2.3
*/
public SeekToCurrentErrorHandler(BackOff backOff) {
this(null, backOff);
}
而在FailedRecordTracker
中...
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
...
logger.error(thr, "Backoff "
+ (failedRecord == null
? "none"
: failedRecord.getBackOffExecution())
+ " exhausted for " + ListenerUtils.recordToString(rec));
};
}
在监听器适配器中添加重试后,错误处理程序添加了Backoff(以及重试次数限制),所以它是“较新”的(也是首选)。
此外,如果使用内存中的重试,如果采用较长的BackOff
,可能会导致重新平衡的问题。
最后,只有SeekToCurrentErrorHandler
可以处理反序列化问题(通过ErrorHandlingDeserializer
)。
编辑
使用ErrorHandlingDeserializer
与SeekToCurrentErrorHandler
一起使用。反序列化异常被视为致命的,恢复器会立即被调用。
请参阅文档。
以下是演示的简单Spring Boot应用程序:
public class So63236346Application {
private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
public static void main(String[] args) {
SpringApplication.run(So63236346Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
}
@Bean
ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
+ ex.getMessage()));
}
@KafkaListener(id = "so63236346", topics = "so63236346")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63236346", "{\"field\":\"value1\"}");
template.send("so63236346", "junk");
template.send("so63236346", "{\"field\":\"value2\"}");
};
}
}
package com.example.demo;
public class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
结果
Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782 INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]
英文:
With the first approach, it is not necessary to use a DeadLetterPublishingRecoverer
, you can use any ConsumerRecordRecoverer
that you want; in fact the default recoverer simply logs the failed message.
/**
* Construct an instance with the default recoverer which simply logs the record after
* the backOff returns STOP for a topic/partition/offset.
* @param backOff the {@link BackOff}.
* @since 2.3
*/
public SeekToCurrentErrorHandler(BackOff backOff) {
this(null, backOff);
}
And, in the FailedRecordTracker
...
if (recoverer == null) {
this.recoverer = (rec, thr) -> {
...
logger.error(thr, "Backoff "
+ (failedRecord == null
? "none"
: failedRecord.getBackOffExecution())
+ " exhausted for " + ListenerUtils.recordToString(rec));
};
}
Backoff (and a limit to retries) was added to the error handler after adding retry in the listener adapter, so it's "newer" (and preferred).
Also, using in-memory retry can cause issues with rebalancing if long BackOff
s are employed.
Finally, only the SeekToCurrentErrorHandler
can deal with deserialization problems (via the ErrorHandlingDeserializer
).
EDIT
Use the ErrorHandlingDeserializer
together with a SeekToCurrentErrorHandler
. Deserialization exceptions are considered fatal and the recoverer is called immediately.
See the documentation.
Here is a simple Spring Boot application that demonstrates it:
public class So63236346Application {
private static final Logger log = LoggerFactory.getLogger(So63236346Application.class);
public static void main(String[] args) {
SpringApplication.run(So63236346Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so63236346").partitions(1).replicas(1).build();
}
@Bean
ErrorHandler errorHandler() {
return new SeekToCurrentErrorHandler((rec, ex) -> log.error(ListenerUtils.recordToString(rec, true) + "\n"
+ ex.getMessage()));
}
@KafkaListener(id = "so63236346", topics = "so63236346")
public void listen(String in) {
System.out.println(in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("so63236346", "{\"field\":\"value1\"}");
template.send("so63236346", "junk");
template.send("so63236346", "{\"field\":\"value2\"}");
};
}
}
package com.example.demo;
public class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.kafka.consumer.properties.spring.deserializer.value.delegate.class=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.value.default.type=com.example.demo.Thing
Result
Thing [field=value1]
2020-08-10 14:30:14.780 ERROR 78857 --- [o63236346-0-C-1] com.example.demo.So63236346Application : so63236346-0@7
Listener failed; nested exception is org.springframework.kafka.support.serializer.DeserializationException: failed to deserialize; nested exception is org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[106, 117, 110, 107]] from topic [so63236346]
2020-08-10 14:30:14.782 INFO 78857 --- [o63236346-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-so63236346-1, groupId=so63236346] Seeking to offset 8 for partition so63236346-0
Thing [field=value2]
答案2
得分: 0
预期是在容器级别和监听器级别记录可能出现的任何异常。
没有重试的情况下,我已经完成了以下错误处理方式:
如果我们在容器级别遇到任何异常,我们应该能够记录带有错误描述的消息负载,然后寻找该偏移量并跳过它,继续接收下一个偏移量。尽管这仅针对DeserializationException执行,但其他异常也需要寻找并跳过它们的偏移量。
@Component
public class KafkaContainerErrorHandler implements ErrorHandler {
private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
// 根据您的主题命名约定修改下面的逻辑
String topics = s.substring(0, s.lastIndexOf('-'));
int offset = Integer.parseInt(s.split("offset ")[1]);
int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);
logger.error("...");
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping {} - {} offset {}", topics, partition, offset);
consumer.seek(topicPartition, offset + 1);
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
}
factory.setErrorHandler(kafkaContainerErrorHandler);
如果在@KafkaListener级别遇到任何异常,那么我会使用自定义错误处理程序配置我的监听器,并记录异常的消息,如下所示:
@Bean("customErrorHandler")
public KafkaListenerErrorHandler listenerErrorHandler() {
return (m, e) -> {
logger.error(...);
return m;
};
}
英文:
The expectation was to log any exception that we might get at the container level as well as the listener level.
Without retrying, following is the way I have done error handling:-
If we encounter any exception at the container level, we should be able to log the message payload with the error description and seek that offset and skip it and go ahead receiving the next offset. Though it is done only for DeserializationException, the rest of the exceptions also needs to be seek and offsets needs to be skipped for them.
@Component
public class KafkaContainerErrorHandler implements ErrorHandler {
private static final Logger logger = LoggerFactory.getLogger(KafkaContainerErrorHandler.class);
@Override
public void handle(Exception thrownException, List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
// modify below logic according to your topic nomenclature
String topics = s.substring(0, s.lastIndexOf('-'));
int offset = Integer.parseInt(s.split("offset ")[1]);
int partition = Integer.parseInt(s.substring(s.lastIndexOf('-') + 1).split(" at")[0]);
logger.error("...")
TopicPartition topicPartition = new TopicPartition(topics, partition);
logger.info("Skipping {} - {} offset {}", topics, partition, offset);
consumer.seek(topicPartition, offset + 1);
}
@Override
public void handle(Exception e, ConsumerRecord<?, ?> consumerRecord) {
}
}
factory.setErrorHandler(kafkaContainerErrorHandler);
If we get any exception at the @KafkaListener level, then I am configuring my listener with my custom error handler and logging the exception with the message as can be seen below:-
@Bean("customErrorHandler")
public KafkaListenerErrorHandler listenerErrorHandler() {
return (m, e) -> {
logger.error(...);
return m;
};
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论