Spring Kafka批量错误处理程序 – 使用手动提交的反序列化错误处理

huangapple go评论56阅读模式
英文:

Spring Kafka batch error handler - DeSerialiser Error handling with manual commit

问题

我的服务在尝试处理JSON反序列化错误时陷入无限循环。我的服务使用手动即时确认模式,自动偏移重置为假。在主代码中,我使用acknowledge.acknowledge()来提交批量记录,但在批量错误处理程序中,我无法提交无效消息的偏移量。我尝试过ConsumerAwareBatchErrorHandlerBatchErrorHandler,但isAckAfterHandle()方法或consumer.commitSync()都不起作用。

问题1:需要理解批量确认/提交偏移量的过程。
问题2:我收到的数据为null。我尝试从数据(为null)或thrownexception中读取原始消息,但失败了。

有人能帮助我理解提交偏移量并继续处理下一批次的过程吗?我希望将失败的消息插入死信或错误队列并继续处理下一批次的消息。

尝试的代码:

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
    props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
    return props;
}

@Bean
public DefaultKafkaConsumerFactory consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
}

@Bean(KAFKA_LISTENER)
public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
    factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
        @Override
        public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {

            if (thrownException instanceof SerializationException){
                String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                String topics = s.split("-")[0];
                int offset = Integer.valueOf(s.split("offset ")[1]);
                int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);

                TopicPartition topicPartition = new TopicPartition(topics, partition);
                consumer.seek(topicPartition, offset + 1);
            }
            //Code to push data in error queue
            //consumer.commitSync();
        }

        @Override
        public boolean isAckAfterHandle() {
            return true;
        }
    });
    return factory;
}
英文:

My service is stuck in infinite loop when trying to handle JSON deserializer error. My service is using manual_immediate acknowledge mode with auto offset reset as false. I am using acknowledge.acknowledge() commit batch records in main code but in batch error handler, I am not able to commit offset for invalid messages. I tried ConsumerAwareBatchErrorHandler & BatchErrorHandler but isAckAfterHandle() method or consumer.commitSync() are not working.

Issue1: Need to understand the process to acknowledge batch/commit offset.
Issue2: I am getting data as null. I tried to read original message from data (which is null) or thrownexception but failed.

Can Someone please help me with process to commit offset and move to next batch? I am looking to insert failed messages in dead letter or error queue and move on to next batch of messages.

Code tried:

       @Bean
       public Map<String, Object> consumerConfigs() {
           Map<String, Object> props = new HashMap<>();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
           props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
           props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMsConfig);
           props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
           props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
           props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
           props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
           return props;
       }
   
       @Bean
       public DefaultKafkaConsumerFactory consumerFactory() {
           return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer(LocationRecordDto.class));
       }
   
       @Bean(KAFKA_LISTENER)
       public ConcurrentKafkaListenerContainerFactory<String, MyDTO> kafkaListenerContainerFactory() {
           ConcurrentKafkaListenerContainerFactory<String, MYDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
           factory.setConsumerFactory(consumerFactory());
           factory.setBatchListener(true);
           factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
           factory.setBatchErrorHandler(new ConsumerAwareBatchErrorHandler() {
               @Override
               public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consumer<?, ?> consumer) {
      
                   if (thrownException instanceof SerializationException){
                       String s = thrownException.getMessage().split("Error deserializing key/value for partition ")[1].split(". If needed, please seek past the record to continue consumption.")[0];
                       String topics = s.split("-")[0];
                       int offset = Integer.valueOf(s.split("offset ")[1]);
                       int partition = Integer.valueOf(s.split("-")[1].split(" at")[0]);
   
                       TopicPartition topicPartition = new TopicPartition(topics, partition);
                       consumer.seek(topicPartition, offset + 1);
                   }
                   //Code to push data in error queue
                   //consumer.commitSync();
               }
   
               @Override
               public boolean isAckAfterHandle() {
                   return true;
               }
           });
           return factory;
       }

答案1

得分: 1

你需要在监听器中处理反序列化异常,而不是在错误处理程序中处理,并正常提交批处理偏移量。

或者考虑使用新的RecoveringBatchErrorHandler

英文:

You have to deal with deserialization exceptions in the listener instead of the error handler and commit the batch offsets normally.

Or consider using the new RecoveringBatchErrorHandler instead.

huangapple
  • 本文由 发表于 2020年8月9日 21:05:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/63326676.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定