如何在使用Spring AMQP的@RabbitListener时,在响应式Spring-Webflux应用中触发重试?

huangapple go评论150阅读模式
英文:

How do I trigger retries when using Spring AMQP @RabbitListener in reactive Spring-Webflux app

问题

我有一个使用spring-webflux的应用程序,必须从rabbitMQ消费消息。在以前的应用程序中,当不使用spring-webflux时,我已经能够:

  1. 在声明队列时配置重试策略
  2. 使用@RabbitListener注解设置Rabbit监听器
  3. 通过在处理程序函数中抛出异常来触发重试

在spring-webflux中,我不能抛出错误,我只有一个MonoError,如何触发重试?

我的代码当前看起来像这样:

@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
    Mono.just(message)
      .map(operationFactory::generateOperationFromMessage)
      .flatMap(service::handleOperation) // 想要在下游应用程序宕机时进行重试
      .subscribe();
  }
}

编辑

我现在已经找到了解决方法。例如,如果客户端代码返回一个Mono<Exception>,那么这将触发重试。同样,我可以通过映射到Mono<Exception>来有条件地触发重试。例如,如果我想在消息中的产品不存在时触发重试,我可以这样做:

repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("我的异常")))
        .then(...) // 如果存在就继续

希望对你有所帮助。

英文:

I have a spring-webflux app which must consume messages from rabbitMQ. In previous apps when NOT using spring-webflux I've been able to:

  1. Configure a retry policy when declaring the queue
  2. Setup a rabbit listener using the @RabbitListener annotation
  3. Trigger a retry by throwing an exception in the handler function

In spring-webflux I'm not able to throw an error, I just have a MonoError, how do I trigger a retry?

My code looks like something like this currently

@Component
@RequiredArgsConstructor
public class vehicleUpdateListener {

  private final VehicleService service;
  private final OperationFactory operationFactory;

  @RabbitListener(queues = VEHICLE_UPDATE_QUEUE)
  void handleVehicleUpdated(Message message) {
    Mono.just(message)
      .map(operationFactory::generateOperationFromMessage)
      .flatMap(service::handleOperation) // want to retry if downstream app is down
      .subscribe();
  }
}

EDIT

I have now worked out that it is possible. If client code for example returns a Mono<Exception> then this will trigger retries. Likewise I could conditionally trigger retries my mapping to a Mono<Exception>. For example if I want to trigger a retry when a product from a message does not exist, I could do the following

repository.findByProductId(product.getProductId())
        .hasElement()
        .filter(exists -> !exists)
        .flatMap(missing -> Mono.error(new Exception("my exception")))
        .then(...) // carry on if it does exist

答案1

得分: 1

使用非响应式监听容器与反应堆存在许多挑战。

  1. 您必须使用手动确认(acks),并在反应式流完成后确认/拒绝(ack/nack)传递。
  2. 您必须使用反应堆的重试机制。

考虑查看 https://github.com/reactor/reactor-rabbitmq 项目,而不是使用 Spring AMQP。在将来的某个时候,我们希望构建反应式的 @RabbitListener,但目前还没有实现。

英文:

Using reactor with a non-reactive listener container has many challenges.

  1. You must use MANUAL acks and ack/nack the delivery after the reactive flow completes.
  2. You must use reactor's retry mechanisms.

Consider looking at the https://github.com/reactor/reactor-rabbitmq project instead of Spring AMQP. At some time in the future we hope to build reactive @RabbitListeners, but they are not there yet.

huangapple
  • 本文由 发表于 2020年7月30日 03:10:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/63160819.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定