RabbitMQ:运行时向监听器动态添加队列

huangapple go评论112阅读模式
英文:

RabbitMQ: Dynamic addition of queues to a listener at runtime

问题

我有一个使用案例,需要在运行时消费从队列中接收到的消息。

这里我有一个配置类和一个监听器类。我已经为两个现有队列定义了一个消费者,并希望能够消费在运行时可能发现的新队列,这些新队列遵循相同的命名约定,即队列.animals.*。

另外,我还有另一个服务,将在名为 "newQueues" 的队列上发送新发现的队列名称。如果不需要的话,可以更改这种方法,并且可以摆脱发送消息到 "newQueues" 的服务。

以下是代码示例:

  1. @Configuration
  2. @EnableRabbit
  3. public class RabbitConfiguration implements RabbitListenerConfigurer {
  4. public static final String queue1 = "queue.animals.cat";
  5. public static final String queue2 = "queue.animals.dog";
  6. @Autowired
  7. private ConnectionFactory connectionFactory;
  8. @Bean
  9. public AmqpAdmin amqpAdmin() {
  10. return new RabbitAdmin(connectionFactory);
  11. }
  12. @Bean
  13. public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
  14. DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
  15. factory.setConnectionFactory(connectionFactory);
  16. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  17. return factory;
  18. }
  19. @Bean
  20. public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
  21. return new RabbitListenerEndpointRegistry();
  22. }
  23. @Override
  24. public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
  25. registrar.setEndpointRegistry(listenerEndpointRegistry());
  26. }
  27. @Autowired
  28. private RabbitListenerEndpointRegistry listenerEndpointRegistry;
  29. @RabbitListener(id = "qEvent", queues = {"newQueues"})
  30. public void processQueueEvents(String newQueueName) {
  31. ((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
  32. .addQueueNames(newQueueName);
  33. System.out.println("Received a message with the new queue name: " + newQueueName);
  34. }
  35. @RabbitListener(id = "animalQContainer", queues = {queue1, queue2})
  36. public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
  37. System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
  38. //处理 animalObj
  39. }
  40. }

我目前遇到以下异常:

  1. Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

我对RabbitMQ非常陌生,所以不确定是否所有组件都配置正确。感谢您的帮助。

英文:

I have a use case where I need to consume messages from queues which are discovered at runtime.

Here I have a config class and the listener class. I have defined a consumer for the two existing queues and want to consume messages from new queues which may be discovered at runtime and follow the same naming convention i.e. queue.animals.*

Also, I have another service which will send me the newly discovered queue name on a queue named "newQueues". This approach can be changed if not needed and we can get rid of the service sending messages on "newQueues".

  1. @EnableRabbit
  2. public class RabbitConfiguration implements RabbitListenerConfigurer {
  3. public static final String queue1= "queue.animals.cat";
  4. public static final String queue2= "queue.animals.dog";
  5. @Autowired
  6. private ConnectionFactory connectionFactory;
  7. @Bean
  8. public AmqpAdmin amqpAdmin() {
  9. return new RabbitAdmin(connectionFactory);
  10. }
  11. @Bean
  12. public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
  13. DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
  14. factory.setConnectionFactory(connectionFactory);
  15. factory.setMessageConverter(new Jackson2JsonMessageConverter());
  16. return factory;
  17. }
  18. @Bean
  19. public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
  20. return new RabbitListenerEndpointRegistry();
  21. }
  22. @Override
  23. public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
  24. registrar.setEndpointRegistry(listenerEndpointRegistry());
  25. }
  1. @Autowired
  2. private RabbitListenerEndpointRegistry listenerEndpointRegistry;
  3. @RabbitListener(id = "qEvent", queues = {"newQueues"})
  4. public void processQueueEvents(String newQueueName) {
  5. ((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
  6. .addQueueNames(newQueueName);
  7. System.out.println("Received a message with the new queue name: " + newQueueName);
  8. }
  9. @RabbitListener(id = "animalQContainer" , queues = { queue1, queue2 })
  10. public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
  11. System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
  12. //process animalObj
  13. }

I am getting the following exception currently:

  1. Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')

I am very new to RabbitMQ, so not sure if I have all the pieces correctly. Thank you for your help.

答案1

得分: 0

你的问题有点混淆,标题是关于添加新队列,但之后你提到了JSON异常。听起来你可能收到了非JSON内容。

问题具体在哪里?

要添加新队列,可以使用以下代码:

  1. ((AbstractMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
  2. .addQueueNames("newQueue1", "newQueue2");
英文:

Your question is confusing - the headline is about adding new queues but then you talk about JSON exceptions. Sounds like you are receiving non-json content.

What is the problem exactly?

To add new queues,

  1. ((AbsrtactMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
  2. .addQeueNames("newQueue1", "newQueue2");

答案2

得分: 0

Sure, here's the translation of the provided content:

由于在此处配置了Jackson2JsonMessageConverter,processQueueEvents方法无法解析字符串。创建了一个新的类并传入一个对象到processQueueEvent方法,以解决问题中提到的异常:

  1. public void processQueueEvents(NewQueue newQueueName) {
  2. System.out.println("Received a message on a new queue: " + newQueueName);
  3. String name = newQueueName.toString();
  4. <details>
  5. <summary>英文:</summary>
  6. Since the Jackson2JsonMessageConverter is configured here, the processQueueEvents method can not parse a string. Created a new class and passed in an object to the processQueueEvent method to get past the exception mentioned in the question:
  7. ``` @RabbitListener(id = &quot;qEvent&quot;, queues = {&quot;new_queues&quot;})
  8. public void processQueueEvents(NewQueue newQueueName) {
  9. System.out.println(&quot;Received a message on a new queue: &quot; + newQueueName);
  10. String name = newQueueName.toString();
  11. </details>

huangapple
  • 本文由 发表于 2020年7月30日 05:38:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/63162812.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定