英文:
spring kafka error handling without KafkaListener
问题
以下是要翻译的代码部分:
@Component
public class Consumer implements AcknowledgingMessageListener<String, String> {
@Override
@KafkaListener(topics = { "ff808081672c17c8016730733d020001.CBS_PROFILE" })
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
log.info("Computed ");
throw new RuntimeException();
}
}
@Configuration
@Slf4j
public class KafkaConsumerConfig {
/** 处理错误和恢复逻辑 **/
public DefaultErrorHandler errorHandler() {
System.out.println("Inside error handler");
// 如果在消费者中发生这些异常,记录将不会重试
List<Class<? extends Exception>> exceptionsToIgnore = List.of(JsonSchemaValidationException.class);
// 如果在消费者中发生这些异常,记录将会重试
List<Class<? extends Exception>> exceptionsToRetry = List.of(DataEngineNotAvailableException.class, ListenerExecutionFailedException.class);
// 固定间隔一秒后重试两次
var fixedBackOff = new FixedBackOff(500L, 1);
var expBackoff = new ExponentialBackOffWithMaxRetries(2);
expBackoff.setInitialInterval(1000L);
var errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -> {
log.info("Failed record {} in retry listeners, Exception : {}, delivery attempt : {}", consumerRecord,
ex.getMessage(), deliveryAttempt);
});
exceptionsToIgnore.forEach(errorHandler::addNotRetryableExceptions);
exceptionsToRetry.forEach(errorHandler::addRetryableExceptions);
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// 应该等于分区数
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}
但是,如果按以下方式实现消费者,我无法实现重试机制。期望重试 `dataProcessor.readRecord(r, consumer)` 方法。
```java
@PostConstruct
public void onLoad() {
configService.setConfigurations();
//加载消费者
consumerRunnerV2.loadConsumers();
}
public void loadConsumers() {
consumerServiceV2.fillConsumerMap();
Map<String, LRConsumer> consumersMap = consumerServiceV2.getConsumersMap();
for (Map.Entry<String, LRConsumer> consumer : consumersMap.entrySet()) {
consumerTaskExecutor.execute(() -> runConsumer(consumer.getValue()));
}
}
private void runConsumer(LRConsumer consumer) {
while (true) {
// 检查消费者的状态
consumer = consumerServiceV2.getConsumerById(consumer.getConsumerId());
if (consumer.getState() != ConsumerState.PAUSED) {
log.info("Waiting to receive messages.");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records)
// 期望重试这个方法
dataProcessor.readRecord(r, consumer);
} else {
Thread.sleep(1000);
log.info("Consumer is in paused state.");
}
}
}
希望这有助于您的工作。
英文:
I am using the spring kafka in my project and have implemented a retry functionality. A message is consumed and if it fails I need to retry. Below code is working fine if I am using and doing retry for some exceptions.
@Component
public class Consumer implements AcknowledgingMessageListener<String, String> {
@Override
@KafkaListener(topics = { "ff808081672c17c8016730733d020001.CBS_PROFILE"})
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
log.info("Computed ");
throw new RuntimeException();
}
}
@Configuration
@Slf4j
public class KafkaConsumerConfig {
/** handles error and recovery logic **/
public DefaultErrorHandler errorHandler() {
System.out.println("Inside error handler");
// if these exceptions occur in the consumer, record will not be retried
List<Class<? extends Exception>> exceptionsToIgnore = List.of(JsonSchemaValidationException.class);
// if these exceptions occur in the consumer, record will be retried
List<Class<? extends Exception>> exceptionsToRetry = List.of(DataEngineNotAvailableException.class, ListenerExecutionFailedException.class);
// retry twice after fixed interval of one second
var fixedBackOff = new FixedBackOff(500L, 1);
var expBackoff = new ExponentialBackOffWithMaxRetries(2);
expBackoff.setInitialInterval(1000L);
var errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners((consumerRecord, ex, deliveryAttempt) -> {
log.info("Failed record {} in retry listeners, Exception : {}, delivery attempt : {}", consumerRecord,
ex.getMessage(), deliveryAttempt);
});
exceptionsToIgnore.forEach(errorHandler::addNotRetryableExceptions);
exceptionsToRetry.forEach(errorHandler::addRetryableExceptions);
return errorHandler;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
// should be equal to number of partitions
factory.setConcurrency(3);
factory.setCommonErrorHandler(errorHandler());
return factory;
}
}
But I am not able to implement retry mechanism if I implement consumer like below.
Expectation is to retry the dataProcessor.readRecord(r, consumer)
method.
@PostConstruct
public void onLoad() {
configService.setConfigurations();
//load consumer
consumerRunnerV2.loadConsumers();
}
public void loadConsumers() {
consumerServiceV2.fillConsumerMap();
Map<String, LRConsumer> consumersMap = consumerServiceV2.getConsumersMap();
for (Map.Entry<String, LRConsumer> consumer : consumersMap.entrySet()) {
consumerTaskExecutor.execute(() -> runConsumer(consumer.getValue()));
}
}
private void runConsumer(LRConsumer consumer) {
while (true) {
// to check state of consumer
consumer = consumerServiceV2.getConsumerById(consumer.getConsumerId());
if (consumer.getState() != ConsumerState.PAUSED) {
log.info("Waiting to receive messages.");
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> r : records)
//expectation is to retry this method
dataProcessor.readRecord(r, consumer);
}
else {
Thread.sleep(1000);
log.info("Consumer is in paused state.");
}
}
}
答案1
得分: 1
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
当然,这不会起作用;您完全绕过了Spring代码,直接与KafkaConsumer
进行交互。
在这种情况下,您将不得不实现自己的错误处理。
但是,在这种情况下,您不必使用@KafkaListener
来使用容器,您可以使用容器工厂来创建一个容器,并使用setMessageListener
将您的监听器添加到容器属性中。
在这种情况下,您应该移除@KafkaListener
,这样框架不会自动为其配置容器。
英文:
>ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
Of course it won't work with that; you are completely bypassing the Spring code and interacting with the KafkaConsumer
yourself.
In that case you will have to implement your own error handling.
However, you don't have to use @KafkaListener
to use a container, you can use the container factory to create a container and use setMessageListener
to add your listener to the container properties.
You should remove the @KafkaListener
in that case, so the framework doesn't automatically configure a container for it.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论