英文:
Spring Kafka batch error handler - DeSerialiser Error handling with manual commit
问题
我的服务在尝试处理JSON反序列化错误时陷入无限循环。我的服务使用手动即时确认模式,自动偏移重置为假。在主代码中,我使用acknowledge.acknowledge()
来提交批量记录,但在批量错误处理程序中,我无法提交无效消息的偏移量。我尝试过ConsumerAwareBatchErrorHandler
和BatchErrorHandler
,但isAckAfterHandle()
方法或consumer.commitSync()
都不起作用。
问题1:需要理解批量确认/提交偏移量的过程。
问题2:我收到的数据为null。我尝试从数据(为null)或thrownexception中读取原始消息,但失败了。
有人能帮助我理解提交偏移量并继续处理下一批次的过程吗?我希望将失败的消息插入死信或错误队列并继续处理下一批次的消息。
尝试的代码:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return props;
}
@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
}
@Bean(KAFKA_LISTENER)
public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
if (thrownException instanceof SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
//Code to push data in error queue
//consumer.commitSync();
}
@Override
public boolean isAckAfterHandle() {
return true;
}
});
return factory;
}
英文:
My service is stuck in infinite loop when trying to handle JSON deserializer error. My service is using manual_immediate acknowledge mode with auto offset reset as false. I am using acknowledge.acknowledge()
commit batch records in main code but in batch error handler, I am not able to commit offset for invalid messages. I tried ConsumerAwareBatchErrorHandler
& BatchErrorHandler
but isAckAfterHandle()
method or consumer.commitSync()
are not working.
Issue1: Need to understand the process to acknowledge batch/commit offset.
Issue2: I am getting data as null. I tried to read original message from data (which is null) or thrownexception but failed.
Can Someone please help me with process to commit offset and move to next batch? I am looking to insert failed messages in dead letter or error queue and move on to next batch of messages.
Code tried:
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
return props;
}
@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
}
@Bean(KAFKA_LISTENER)
public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
@Override
public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
if (thrownException instanceof SerializationException){
String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
String topics = s.split("-")[0];
int offset = Integer.valueOf(s.split("offset ")[1]);
int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
TopicPartition topicPartition = new TopicPartition(topics, partition);
consumer.seek(topicPartition, offset + 1);
}
//Code to push data in error queue
//consumer.commitSync();
}
@Override
public boolean isAckAfterHandle() {
return true;
}
});
return factory;
}
答案1
得分: 1
你需要在监听器中处理反序列化异常,而不是在错误处理程序中处理,并正常提交批处理偏移量。
或者考虑使用新的RecoveringBatchErrorHandler。
英文:
You have to deal with deserialization exceptions in the listener instead of the error handler and commit the batch offsets normally.
Or consider using the new RecoveringBatchErrorHandler instead.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论