Spring Kafka没有KafkaListener的错误处理

huangapple go评论61阅读模式
英文:

spring kafka error handling without KafkaListener

问题

以下是要翻译的代码部分:

@Component
public class Consumer implements AcknowledgingMessageListener<String, String> {

    @Override
    @KafkaListener(topics = { "ff808081672c17c8016730733d020001.CBS_PROFILE" })
    public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {

        log.info("Computed ");
        throw new RuntimeException();

    }

}

@Configuration
@Slf4j
public class KafkaConsumerConfig {

    /** 处理错误和恢复逻辑 **/
    public DefaultErrorHandler errorHandler() {

        System.out.println("Inside error handler");

        // 如果在消费者中发生这些异常,记录将不会重试
        List<Class<? extends Exception>> exceptionsToIgnore = List.of(JsonSchemaValidationException.class);

        // 如果在消费者中发生这些异常,记录将会重试
        List<Class<? extends Exception>> exceptionsToRetry = List.of(DataEngineNotAvailableException.class, ListenerExecutionFailedException.class);

        // 固定间隔一秒后重试两次
        var fixedBackOff = new FixedBackOff(500L, 1);
        var expBackoff = new ExponentialBackOffWithMaxRetries(2);
        expBackoff.setInitialInterval(1000L);
        var errorHandler = new DefaultErrorHandler(fixedBackOff);

        errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -> {
            log.info("Failed record {} in retry listeners, Exception : {}, delivery attempt : {}", consumerRecord,
                    ex.getMessage(), deliveryAttempt);
        });

        exceptionsToIgnore.forEach(errorHandler::addNotRetryableExceptions);
        exceptionsToRetry.forEach(errorHandler::addRetryableExceptions);
        return errorHandler;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL);

        // 应该等于分区数
        factory.setConcurrency(3);
        factory.setCommonErrorHandler(errorHandler());
        return factory;
    }

}

但是如果按以下方式实现消费者我无法实现重试机制期望重试 `dataProcessor.readRecord(r, consumer)` 方法

```java
@PostConstruct
public void onLoad() {

    configService.setConfigurations();

    //加载消费者
    consumerRunnerV2.loadConsumers();

}

public void loadConsumers() {

    consumerServiceV2.fillConsumerMap();
    Map<String, LRConsumer> consumersMap = consumerServiceV2.getConsumersMap();

    for (Map.Entry<String, LRConsumer> consumer : consumersMap.entrySet()) {
        consumerTaskExecutor.execute(() -> runConsumer(consumer.getValue()));
    }

}

private void runConsumer(LRConsumer consumer) {

    while (true) {

        // 检查消费者的状态
        consumer = consumerServiceV2.getConsumerById(consumer.getConsumerId());

        if (consumer.getState() != ConsumerState.PAUSED) {

            log.info("Waiting to receive messages.");
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

            for (ConsumerRecord<String, String> r : records)

                // 期望重试这个方法
                dataProcessor.readRecord(r, consumer);
        } else {
            Thread.sleep(1000);
            log.info("Consumer is in paused state.");
        }

    }

}

希望这有助于您的工作。

英文:

I am using the spring kafka in my project and have implemented a retry functionality. A message is consumed and if it fails I need to retry. Below code is working fine if I am using and doing retry for some exceptions.

@Component
public class Consumer implements AcknowledgingMessageListener&lt;String, String&gt; {
@Override
@KafkaListener(topics = { &quot;ff808081672c17c8016730733d020001.CBS_PROFILE&quot;})
public void onMessage(ConsumerRecord&lt;String, String&gt; consumerRecord, Acknowledgment acknowledgment) {
log.info(&quot;Computed &quot;);
throw new RuntimeException();
}
}
@Configuration
@Slf4j
public class KafkaConsumerConfig {
/** handles error and recovery logic **/
public DefaultErrorHandler errorHandler() {
System.out.println(&quot;Inside error handler&quot;);
// if these exceptions occur in the consumer, record will not be retried
List&lt;Class&lt;? extends Exception&gt;&gt; exceptionsToIgnore = List.of(JsonSchemaValidationException.class);
// if these exceptions occur in the consumer, record will be retried
List&lt;Class&lt;? extends Exception&gt;&gt; exceptionsToRetry = List.of(DataEngineNotAvailableException.class, ListenerExecutionFailedException.class);
// retry twice after fixed interval of one second
var fixedBackOff = new FixedBackOff(500L, 1);
var expBackoff = new ExponentialBackOffWithMaxRetries(2);
expBackoff.setInitialInterval(1000L);
var errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -&gt; {
log.info(&quot;Failed record {} in retry listeners, Exception : {}, delivery attempt : {}&quot;, consumerRecord,
ex.getMessage(), deliveryAttempt);
});
exceptionsToIgnore.forEach(errorHandler::addNotRetryableExceptions);
exceptionsToRetry.forEach(errorHandler::addRetryableExceptions);
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory&lt;?, ?&gt; kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory&lt;Object, Object&gt; kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory&lt;Object, Object&gt; factory = new ConcurrentKafkaListenerContainerFactory&lt;&gt;();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// should be equal to number of partitions
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}

But I am not able to implement retry mechanism if I implement consumer like below.
Expectation is to retry the dataProcessor.readRecord(r, consumer) method.

@PostConstruct
public void onLoad() {
configService.setConfigurations();
//load consumer
consumerRunnerV2.loadConsumers();
}
public void loadConsumers() {
consumerServiceV2.fillConsumerMap();
Map&lt;String, LRConsumer&gt; consumersMap = consumerServiceV2.getConsumersMap();
for (Map.Entry&lt;String, LRConsumer&gt; consumer : consumersMap.entrySet()) {
consumerTaskExecutor.execute(() -&gt; runConsumer(consumer.getValue()));
}
}
private void runConsumer(LRConsumer consumer) {
while (true) {
// to check state of consumer
consumer = consumerServiceV2.getConsumerById(consumer.getConsumerId());
if (consumer.getState() != ConsumerState.PAUSED) {
log.info(&quot;Waiting to receive messages.&quot;);
ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord&lt;String, String&gt; r : records)
//expectation is to retry this method
dataProcessor.readRecord(r, consumer);
}
else {
Thread.sleep(1000);
log.info(&quot;Consumer is in paused state.&quot;);
}
}
}

答案1

得分: 1

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

当然,这不会起作用;您完全绕过了Spring代码,直接与KafkaConsumer进行交互。

在这种情况下,您将不得不实现自己的错误处理。

但是,在这种情况下,您不必使用@KafkaListener来使用容器,您可以使用容器工厂来创建一个容器,并使用setMessageListener将您的监听器添加到容器属性中。

在这种情况下,您应该移除@KafkaListener,这样框架不会自动为其配置容器。

英文:

>ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(1000));

Of course it won't work with that; you are completely bypassing the Spring code and interacting with the KafkaConsumer yourself.

In that case you will have to implement your own error handling.

However, you don't have to use @KafkaListener to use a container, you can use the container factory to create a container and use setMessageListener to add your listener to the container properties.

You should remove the @KafkaListener in that case, so the framework doesn't automatically configure a container for it.

huangapple
  • 本文由 发表于 2023年4月4日 13:25:56
  • 转载请务必保留本文链接:https://go.coder-hub.com/75925778.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定