英文:
Is it possible to create multiple queues and exchanges from config using RabbitMQ in java?
问题
我有一个配置:
rabbitQueueProps:
myQueue1:
routingKey: route1
exchangeName: myExchange
exchangeType: DIRECT
maxPriority: 10
myQueue2:
routingKey: route2
exchangeName: myExchange
exchangeType: DIRECT
maxPriority: 7
对于上述配置,我想要动态创建交换机和队列。
我尝试了以下代码,但没有创建任何交换机或队列。
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DynamicQueueSetUp {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
RabbitAdminConfiguration rabbitTemplate;
@Autowired
private RabbitQueueConfiguration rabbitQueueConfig;
@EventListener(ApplicationReadyEvent.class)
public void init() {
log.info("validating/creating new queues");
Map<String, RabbitProps> rabbitQueueprops = rabbitQueueConfig.getRabbitQueueProps();
rabbitQueueprops.keySet().parallelStream().forEach(queueName -> {
RabbitProps rabbitProps = rabbitQueueConfig.getRabbitQueueProps().get(queueName);
Optional<Properties> queueProps = Optional.ofNullable(rabbitAdmin.getQueueProperties(queueName));
if (!queueProps.isPresent()) {
log.info("rabbitProps {} , queueName {} ", rabbitProps, queueName);
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", rabbitProps.getMaxPriority());
if (DIRECT.equalsIgnoreCase(rabbitProps.getExchangeName())) {
DirectExchange directExchange = new DirectExchange(rabbitProps.getExchangeName(), true, false);
Queue queue = new Queue(queueName, true, false, false, args);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(directExchange);
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(rabbitProps.getRoutingKey());
rabbitAdmin.declareBinding(binding);
}
}
});
}
@RabbitListener(queues = { "myQueue1", "myQueue2" })
public void listen(String in) {
System.out.println(in);
}
}
是否可能根据配置动态创建队列和交换机,还是我需要为每个条目分别声明 Exchange、Queue 和 Binding 的 @Bean?
英文:
I have a config :
rabbitQueueProps:
myQueue1:
routingKey: route1
exchangeName: myExchange
exchangeType: DIRECT
maxPriority: 10
myQueue2:
routingKey: route2
exchangeName: myExchange
exchangeType: DIRECT
maxPriority: 7
For the above Config, I want to create exchanges and queues dynamically.
I have tried this, but did't created any exchange or queue.
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DynamicQueueSetUp {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
RabbitAdminConfiguration rabbitTemplate;
@Autowired
private RabbitQueueConfiguration rabbitQueueConfig;
@EventListener(ApplicationReadyEvent.class)
public void init() {
log.info("validating/creating new queues");
Map<String, RabbitProps> rabbitQueueprops = rabbitQueueConfig.getRabbitQueueProps();
rabbitQueueprops.keySet().parallelStream().forEach(queueName -> {
RabbitProps rabbitProps = rabbitQueueConfig.getRabbitQueueProps().get(queueName);
Optional<Properties> queueProps = Optional.ofNullable(rabbitAdmin.getQueueProperties(queueName));
if (!queueProps.isPresent()) {
log.info("rabbitProps {} , queueName {} ", rabbitProps, queueName);
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", rabbitProps.getMaxPriority());
if (DIRECT.equalsIgnoreCase(rabbitProps.getExchangeName())) {
DirectExchange directExchange = new DirectExchange(rabbitProps.getExchangeName(), true, false);
Queue queue = new Queue(queueName, true, false, false, args);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(directExchange);
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(rabbitProps.getRoutingKey());
rabbitAdmin.declareBinding(binding);
}
}
});
}
@RabbitListener(queues = { "myQueue1", "myQueue2" })
public void listen(String in) {
System.out.println(in);
}
}
Is it possible create queues and exchanges dynamically from config or do I need to declare Exchange, Queue and Binding @Bean for each entry individually?
答案1
得分: 0
声明Declarable
对象,例如Exchange
、Queue
和Binding
,无法直接从配置中一起声明。
您可以做的最好的事情是使用Declarables
:
@Bean
public Declarables declarables() {
return new Declarables(
new DirectExchange("exchangeName", false, true),
new Queue("queueName", false, false, true),
new Binding("destination", DestinationType.QUEUE, "exchange", "routingKey", null));
}
以在Java代码中一起声明您的RabbitMQ对象。但是您仍然需要从配置中获取"queueName"
、"exchangeName"
等。因此,我建议创建以下辅助类:
@Configuration
public QueueHelper {
@Value("my.queue.names") private List<String> queueNames;
@Bean
public Declarables queueDeclarable() {
Declarables declarables = new Declarables();
declarables.addAll(queueNames.stream().map(queueName -> new Queue(queueName, false, false, true)).collect(Collectors.toList()));
return declarables;
}
}
因此,不需要更改代码,只需修改配置中的my.queue.names
,即可通过配置动态定义队列。
因此,如果您有my.queue.names=queue1,queue1
,您将获得两个队列:queue1
和queue2
;但是如果您有my.queue.names=queue1,queue2,queue3
,您将获得三个命名为queue1
、queue2
和queue3
的队列。
英文:
There's no way of declaring Declarable
objects such as Exchange
, Queue
and Binding
altogether from your configuration natively.
What you can do best is to use Declarables
:
@Bean
public Declarables declarables() {
return new Declarables(
new DirectExchange("exchangeName", false, true),
new Queue("queueName", false, false, true),
new Binding("destination", DestinationType.QUEUE, "exchange", "routingKey", null));
}
to declare your AMQP objects on RabbitMQ altogether in Java code. But you still need to fetch "queueName"
, "exchangeName"
etc. from config. So I suggest creating helper classes as follows:
@Configuration
public QueueHelper {
@Value("my.queue.names") private List<String> queueNames;
@Bean
public Declarables queueDeclarable() {
Declarables declarables = new Declarables();
declarables.addAll(queueNames.stream().map(queueName -> new Queue(queueName, false, false, true)).collect(Collectors.toList());
return declarables;
}
}
So, without changing the code but just modifying the my.queue.names
in your configuration you can dynamically define your queues through your config.
So if you have my.queue.names=queue1,queue1
you'll get two queues; queue1
and queue2
, but if you have my.queue.names=queue1,queue2,queue3
you'll get three queues named queue1
, queue2
and queue3
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论