如何在RabbitMQ上每次消耗固定数量的消息?

huangapple go评论82阅读模式
英文:

How to consume fixed messages per time on RabbitMQ?

问题

我正在使用spring-boot-starter-amqp模块,可以使用@RabbitListener注解在后台读取消息。如何在调用方法时仅消耗队列中的固定数量消息?

我尝试构建的是一个“按需”消费者,我将在一个端点上接收HTTP请求,其中包含消息数量,并且我只想消耗定义的这个数量。

英文:

Currently, I'm using the spring-boot-starter-amqp module and can use the annotation @RabbitListener to read messages on background, How can I consume a fixed number of messages from a queue only when a method is called?

What I'm trying to build is an "on-demand" consumer for a queue, I'll receive an HTTP request on an endpoint with the number of messages and I want to consume only this amount defined.

答案1

得分: 1

为此,您不应使用@RabbitListener,而应使用专用的API来进行按需接收。请参阅RabbitTemplate

/**
 * 如果有消息,则接收默认队列中的消息。立即返回,可能为null。
 *
 * @return 如果没有等待的消息,则返回消息或null
 * @throws AmqpException 如果出现问题
 */
@Nullable
Message receive() throws AmqpException;

要优化此调用以进行批量请求,您可以考虑使用作用域操作:

/**
 * 调用回调并在专用的线程绑定通道上运行模板参数上的所有操作,然后可靠地关闭通道。
 * @param action 回调函数
 * @param <T> 返回类型
 * @return 来自 {@link OperationsCallback#doInRabbit(RabbitOperations operations)} 的结果。
 * @throws AmqpException 如果发生错误。
 * @since 2.0
 */
@Nullable
default <T> T invoke(OperationsCallback<T> action) throws AmqpException {

了解更多信息,请参阅文档:https://docs.spring.io/spring-amqp/reference/html/#amqp-template

英文:

For that purpose you must not use a @RabbitListener, but rather a dedicated API for on-demand receive. See RabbitTemplate:

/**
 * Receive a message if there is one from a default queue. Returns immediately,
 * possibly with a null value.
 *
 * @return a message or null if there is none waiting
 * @throws AmqpException if there is a problem
 */
@Nullable
Message receive() throws AmqpException;

To optimize this call for a batch request you may consider to use a scoped operation:

/**
 * Invoke the callback and run all operations on the template argument in a dedicated
 * thread-bound channel and reliably close the channel afterwards.
 * @param action the call back.
 * @param &lt;T&gt; the return type.
 * @return the result from the
 * {@link OperationsCallback#doInRabbit(RabbitOperations operations)}.
 * @throws AmqpException if one occurs.
 * @since 2.0
 */
@Nullable
default &lt;T&gt; T invoke(OperationsCallback&lt;T&gt; action) throws AmqpException {

See more in docs: https://docs.spring.io/spring-amqp/reference/html/#amqp-template

huangapple
  • 本文由 发表于 2023年3月4日 00:16:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/75629476.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定