英文:
RabbitMQ: Dynamic addition of queues to a listener at runtime
问题
我有一个使用案例,需要在运行时消费从队列中接收到的消息。
这里我有一个配置类和一个监听器类。我已经为两个现有队列定义了一个消费者,并希望能够消费在运行时可能发现的新队列,这些新队列遵循相同的命名约定,即队列.animals.*。
另外,我还有另一个服务,将在名为 "newQueues" 的队列上发送新发现的队列名称。如果不需要的话,可以更改这种方法,并且可以摆脱发送消息到 "newQueues" 的服务。
以下是代码示例:
@Configuration
@EnableRabbit
public class RabbitConfiguration implements RabbitListenerConfigurer {
public static final String queue1 = "queue.animals.cat";
public static final String queue2 = "queue.animals.dog";
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setEndpointRegistry(listenerEndpointRegistry());
}
@Autowired
private RabbitListenerEndpointRegistry listenerEndpointRegistry;
@RabbitListener(id = "qEvent", queues = {"newQueues"})
public void processQueueEvents(String newQueueName) {
((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
.addQueueNames(newQueueName);
System.out.println("Received a message with the new queue name: " + newQueueName);
}
@RabbitListener(id = "animalQContainer", queues = {queue1, queue2})
public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
//处理 animalObj
}
}
我目前遇到以下异常:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
我对RabbitMQ非常陌生,所以不确定是否所有组件都配置正确。感谢您的帮助。
英文:
I have a use case where I need to consume messages from queues which are discovered at runtime.
Here I have a config class and the listener class. I have defined a consumer for the two existing queues and want to consume messages from new queues which may be discovered at runtime and follow the same naming convention i.e. queue.animals.*
Also, I have another service which will send me the newly discovered queue name on a queue named "newQueues". This approach can be changed if not needed and we can get rid of the service sending messages on "newQueues".
@EnableRabbit
public class RabbitConfiguration implements RabbitListenerConfigurer {
public static final String queue1= "queue.animals.cat";
public static final String queue2= "queue.animals.dog";
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory);
}
@Bean
public DirectRabbitListenerContainerFactory rabbitListenerContainerFactory() {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitListenerEndpointRegistry listenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setEndpointRegistry(listenerEndpointRegistry());
}
@Autowired
private RabbitListenerEndpointRegistry listenerEndpointRegistry;
@RabbitListener(id = "qEvent", queues = {"newQueues"})
public void processQueueEvents(String newQueueName) {
((DirectMessageListenerContainer) this.listenerEndpointRegistry.getListenerContainer("animalQContainer"))
.addQueueNames(newQueueName);
System.out.println("Received a message with the new queue name: " + newQueueName);
}
@RabbitListener(id = "animalQContainer" , queues = { queue1, queue2 })
public void processAnimals(Animal animalObj, @Header(AmqpHeaders.CONSUMER_QUEUE) String queue) {
System.out.println("Received a message on queue: " + queue + "data: " + animalObj);
//process animalObj
}
I am getting the following exception currently:
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'queue': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
I am very new to RabbitMQ, so not sure if I have all the pieces correctly. Thank you for your help.
答案1
得分: 0
你的问题有点混淆,标题是关于添加新队列,但之后你提到了JSON异常。听起来你可能收到了非JSON内容。
问题具体在哪里?
要添加新队列,可以使用以下代码:
((AbstractMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
.addQueueNames("newQueue1", "newQueue2");
英文:
Your question is confusing - the headline is about adding new queues but then you talk about JSON exceptions. Sounds like you are receiving non-json content.
What is the problem exactly?
To add new queues,
((AbsrtactMessageListenerContainer) rabbitListenEndpointRegistry.getMessageListenerContainer("animalQContainer"))
.addQeueNames("newQueue1", "newQueue2");
答案2
得分: 0
Sure, here's the translation of the provided content:
由于在此处配置了Jackson2JsonMessageConverter,processQueueEvents方法无法解析字符串。创建了一个新的类并传入一个对象到processQueueEvent方法,以解决问题中提到的异常:
public void processQueueEvents(NewQueue newQueueName) {
System.out.println("Received a message on a new queue: " + newQueueName);
String name = newQueueName.toString();
<details>
<summary>英文:</summary>
Since the Jackson2JsonMessageConverter is configured here, the processQueueEvents method can not parse a string. Created a new class and passed in an object to the processQueueEvent method to get past the exception mentioned in the question:
``` @RabbitListener(id = "qEvent", queues = {"new_queues"})
public void processQueueEvents(NewQueue newQueueName) {
System.out.println("Received a message on a new queue: " + newQueueName);
String name = newQueueName.toString();
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论