英文:
How to consume fixed messages per time on RabbitMQ?
问题
我正在使用spring-boot-starter-amqp
模块,可以使用@RabbitListener
注解在后台读取消息。如何在调用方法时仅消耗队列中的固定数量消息?
我尝试构建的是一个“按需”消费者,我将在一个端点上接收HTTP请求,其中包含消息数量,并且我只想消耗定义的这个数量。
英文:
Currently, I'm using the spring-boot-starter-amqp
module and can use the annotation @RabbitListener
to read messages on background, How can I consume a fixed number of messages from a queue only when a method is called?
What I'm trying to build is an "on-demand" consumer for a queue, I'll receive an HTTP request on an endpoint with the number of messages and I want to consume only this amount defined.
答案1
得分: 1
为此,您不应使用@RabbitListener
,而应使用专用的API来进行按需接收。请参阅RabbitTemplate
:
/**
* 如果有消息,则接收默认队列中的消息。立即返回,可能为null。
*
* @return 如果没有等待的消息,则返回消息或null
* @throws AmqpException 如果出现问题
*/
@Nullable
Message receive() throws AmqpException;
要优化此调用以进行批量请求,您可以考虑使用作用域操作:
/**
* 调用回调并在专用的线程绑定通道上运行模板参数上的所有操作,然后可靠地关闭通道。
* @param action 回调函数
* @param <T> 返回类型
* @return 来自 {@link OperationsCallback#doInRabbit(RabbitOperations operations)} 的结果。
* @throws AmqpException 如果发生错误。
* @since 2.0
*/
@Nullable
default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
了解更多信息,请参阅文档:https://docs.spring.io/spring-amqp/reference/html/#amqp-template
英文:
For that purpose you must not use a @RabbitListener
, but rather a dedicated API for on-demand receive. See RabbitTemplate
:
/**
* Receive a message if there is one from a default queue. Returns immediately,
* possibly with a null value.
*
* @return a message or null if there is none waiting
* @throws AmqpException if there is a problem
*/
@Nullable
Message receive() throws AmqpException;
To optimize this call for a batch request you may consider to use a scoped operation:
/**
* Invoke the callback and run all operations on the template argument in a dedicated
* thread-bound channel and reliably close the channel afterwards.
* @param action the call back.
* @param <T> the return type.
* @return the result from the
* {@link OperationsCallback#doInRabbit(RabbitOperations operations)}.
* @throws AmqpException if one occurs.
* @since 2.0
*/
@Nullable
default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
See more in docs: https://docs.spring.io/spring-amqp/reference/html/#amqp-template
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论