英文:
How do I trigger retries when using Spring AMQP @RabbitListener in reactive Spring-Webflux app
问题
我有一个使用spring-webflux的应用程序,必须从rabbitMQ消费消息。在以前的应用程序中,当不使用spring-webflux时,我已经能够:
- 在声明队列时配置重试策略
- 使用@RabbitListener注解设置Rabbit监听器
- 通过在处理程序函数中抛出异常来触发重试
在spring-webflux中,我不能抛出错误,我只有一个MonoError,如何触发重试?
我的代码当前看起来像这样:
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // 想要在下游应用程序宕机时进行重试
.subscribe();
}
}
编辑
我现在已经找到了解决方法。例如,如果客户端代码返回一个Mono<Exception>
,那么这将触发重试。同样,我可以通过映射到Mono<Exception>
来有条件地触发重试。例如,如果我想在消息中的产品不存在时触发重试,我可以这样做:
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("我的异常")))
.then(...) // 如果存在就继续
希望对你有所帮助。
英文:
I have a spring-webflux app which must consume messages from rabbitMQ. In previous apps when NOT using spring-webflux I've been able to:
- Configure a retry policy when declaring the queue
- Setup a rabbit listener using the @RabbitListener annotation
- Trigger a retry by throwing an exception in the handler function
In spring-webflux I'm not able to throw an error, I just have a MonoError, how do I trigger a retry?
My code looks like something like this currently
@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {
private final VehicleService service;
private final OperationFactory operationFactory;
@RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
void handleVehicleUpdated(Message message) {
Mono.just(message)
.map(operationFactory::generateOperationFromMessage)
.flatMap(service::handleOperation) // want to retry if downstream app is down
.subscribe();
}
}
EDIT
I have now worked out that it is possible. If client code for example returns a Mono<Exception>
then this will trigger retries. Likewise I could conditionally trigger retries my mapping to a Mono<Exception>
. For example if I want to trigger a retry when a product from a message does not exist, I could do the following
repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> !exists)
.flatMap(missing -> Mono.error(new Exception("my exception")))
.then(...) // carry on if it does exist
答案1
得分: 1
使用非响应式监听容器与反应堆存在许多挑战。
- 您必须使用手动确认(acks),并在反应式流完成后确认/拒绝(ack/nack)传递。
- 您必须使用反应堆的重试机制。
考虑查看 https://github.com/reactor/reactor-rabbitmq 项目,而不是使用 Spring AMQP。在将来的某个时候,我们希望构建反应式的 @RabbitListener
,但目前还没有实现。
英文:
Using reactor with a non-reactive listener container has many challenges.
- You must use MANUAL acks and ack/nack the delivery after the reactive flow completes.
- You must use reactor's retry mechanisms.
Consider looking at the https://github.com/reactor/reactor-rabbitmq project instead of Spring AMQP. At some time in the future we hope to build reactive @RabbitListener
s, but they are not there yet.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论