在Java中使用RabbitMQ从配置文件中创建多个队列和交换机是否可能?

huangapple go评论85阅读模式
英文:

Is it possible to create multiple queues and exchanges from config using RabbitMQ in java?

问题

我有一个配置:

rabbitQueueProps: 
    myQueue1:
      routingKey: route1
      exchangeName: myExchange
      exchangeType: DIRECT
      maxPriority: 10
    myQueue2:
      routingKey: route2
      exchangeName: myExchange
      exchangeType: DIRECT
      maxPriority: 7

对于上述配置,我想要动态创建交换机和队列。

我尝试了以下代码,但没有创建任何交换机或队列。

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class DynamicQueueSetUp {

    @Autowired
    private RabbitAdmin rabbitAdmin;
    @Autowired
    RabbitAdminConfiguration rabbitTemplate;
    @Autowired
    private RabbitQueueConfiguration rabbitQueueConfig;

    @EventListener(ApplicationReadyEvent.class)
    public void init() {
        log.info("validating/creating new queues");
        Map<String, RabbitProps> rabbitQueueprops = rabbitQueueConfig.getRabbitQueueProps();

        rabbitQueueprops.keySet().parallelStream().forEach(queueName -> {
            RabbitProps rabbitProps = rabbitQueueConfig.getRabbitQueueProps().get(queueName);
            Optional<Properties> queueProps = Optional.ofNullable(rabbitAdmin.getQueueProperties(queueName));
            if (!queueProps.isPresent()) {
                log.info("rabbitProps {} , queueName {} ", rabbitProps, queueName);
                Map<String, Object> args = new HashMap<>();
                args.put("x-max-priority", rabbitProps.getMaxPriority());
                if (DIRECT.equalsIgnoreCase(rabbitProps.getExchangeName())) {
                    DirectExchange directExchange = new DirectExchange(rabbitProps.getExchangeName(), true, false);
                    Queue queue = new Queue(queueName, true, false, false, args);
                    rabbitAdmin.declareQueue(queue);
                    rabbitAdmin.declareExchange(directExchange);
                    Binding binding = BindingBuilder.bind(queue).to(directExchange).with(rabbitProps.getRoutingKey());
                    rabbitAdmin.declareBinding(binding);
                }
            }
        });
    }

    @RabbitListener(queues = { "myQueue1", "myQueue2" })
    public void listen(String in) {
        System.out.println(in);
    }
}

是否可能根据配置动态创建队列和交换机,还是我需要为每个条目分别声明 Exchange、Queue 和 Binding 的 @Bean?

英文:

I have a config :

rabbitQueueProps: 
    myQueue1:
      routingKey: route1
      exchangeName: myExchange
      exchangeType: DIRECT
      maxPriority: 10
    myQueue2:
      routingKey: route2
      exchangeName: myExchange
      exchangeType: DIRECT
      maxPriority: 7

For the above Config, I want to create exchanges and queues dynamically.

I have tried this, but did't created any exchange or queue.

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class DynamicQueueSetUp {
@Autowired
private RabbitAdmin rabbitAdmin;
@Autowired
RabbitAdminConfiguration rabbitTemplate;
@Autowired
private RabbitQueueConfiguration rabbitQueueConfig;
@EventListener(ApplicationReadyEvent.class)
public void init() {
log.info(&quot;validating/creating new queues&quot;);
Map&lt;String, RabbitProps&gt; rabbitQueueprops = rabbitQueueConfig.getRabbitQueueProps();
rabbitQueueprops.keySet().parallelStream().forEach(queueName -&gt; {
RabbitProps rabbitProps = rabbitQueueConfig.getRabbitQueueProps().get(queueName);
Optional&lt;Properties&gt; queueProps = Optional.ofNullable(rabbitAdmin.getQueueProperties(queueName));
if (!queueProps.isPresent()) {
log.info(&quot;rabbitProps {} , queueName {} &quot;, rabbitProps, queueName);
Map&lt;String, Object&gt; args = new HashMap&lt;&gt;();
args.put(&quot;x-max-priority&quot;, rabbitProps.getMaxPriority());
if (DIRECT.equalsIgnoreCase(rabbitProps.getExchangeName())) {
DirectExchange directExchange = new DirectExchange(rabbitProps.getExchangeName(), true, false);
Queue queue = new Queue(queueName, true, false, false, args);
rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(directExchange);
Binding binding = BindingBuilder.bind(queue).to(directExchange).with(rabbitProps.getRoutingKey());
rabbitAdmin.declareBinding(binding);
}
}
});
}
@RabbitListener(queues = { &quot;myQueue1&quot;, &quot;myQueue2&quot; })
public void listen(String in) {
System.out.println(in);
}
}

Is it possible create queues and exchanges dynamically from config or do I need to declare Exchange, Queue and Binding @Bean for each entry individually?

答案1

得分: 0

声明Declarable对象,例如ExchangeQueueBinding,无法直接从配置中一起声明。

您可以做的最好的事情是使用Declarables

@Bean
public Declarables declarables() {
    return new Declarables(
        new DirectExchange("exchangeName", false, true),
        new Queue("queueName", false, false, true),
        new Binding("destination", DestinationType.QUEUE, "exchange", "routingKey", null));
}

以在Java代码中一起声明您的RabbitMQ对象。但是您仍然需要从配置中获取"queueName""exchangeName"等。因此,我建议创建以下辅助类:

@Configuration
public QueueHelper {

    @Value("my.queue.names") private List<String> queueNames;

    @Bean
    public Declarables queueDeclarable() {
        Declarables declarables = new Declarables();
        declarables.addAll(queueNames.stream().map(queueName -> new Queue(queueName, false, false, true)).collect(Collectors.toList()));
        return declarables;
    }
} 

因此,不需要更改代码,只需修改配置中的my.queue.names,即可通过配置动态定义队列。

因此,如果您有my.queue.names=queue1,queue1,您将获得两个队列:queue1queue2;但是如果您有my.queue.names=queue1,queue2,queue3,您将获得三个命名为queue1queue2queue3的队列。

英文:

There's no way of declaring Declarable objects such as Exchange, Queue and Binding altogether from your configuration natively.

What you can do best is to use Declarables:

@Bean
public Declarables declarables() {
    return new Declarables(
        new DirectExchange(&quot;exchangeName&quot;, false, true),
        new Queue(&quot;queueName&quot;, false, false, true),
        new Binding(&quot;destination&quot;, DestinationType.QUEUE, &quot;exchange&quot;, &quot;routingKey&quot;, null));
}

to declare your AMQP objects on RabbitMQ altogether in Java code. But you still need to fetch &quot;queueName&quot;, &quot;exchangeName&quot; etc. from config. So I suggest creating helper classes as follows:

@Configuration
public QueueHelper {

    @Value(&quot;my.queue.names&quot;) private List&lt;String&gt; queueNames;

    @Bean
    public Declarables queueDeclarable() {
        Declarables declarables = new Declarables();
        declarables.addAll(queueNames.stream().map(queueName -&gt; new Queue(queueName, false, false, true)).collect(Collectors.toList());
        return declarables;
    }
} 

So, without changing the code but just modifying the my.queue.names in your configuration you can dynamically define your queues through your config.

So if you have my.queue.names=queue1,queue1 you'll get two queues; queue1 and queue2, but if you have my.queue.names=queue1,queue2,queue3 you'll get three queues named queue1, queue2 and queue3.

huangapple
  • 本文由 发表于 2020年8月14日 21:53:06
  • 转载请务必保留本文链接:https://go.coder-hub.com/63414125.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定