创建并动态绑定队列到单个交换机。

huangapple go评论61阅读模式
英文:

Create and bind queues dynamically to a single Exchange

问题

我有一个交换(exchange1),它可以根据路由键表达式路由到 n 个不同的队列。

  1. 所有图像消息应该发送到队列1
  2. 所有文档消息应该发送到队列2
  3. 所有视频消息应该发送到默认队列3

未来,队列数量可以增加,例如(所有视频和 mp4 扩展应该发送到 queue4

我们如何动态创建和绑定队列到一个特定的交换,并且应该只使用一个流监听器?

英文:

I have an exchange (exchange1) and it can be routed to n different queues based on routing-key-expression.

  1. All Images messages should go to queue1
  2. All Document messages should go to queue2
  3. All video messages should go to default queue3

and in future, the queue number can be increased like (all video and mp4 extensions should go to queue4)

How we can create and bind queues dynamically to one particular exchange and should use only one stream Listener?

答案1

得分: 0

不能直接使用Spring Cloud Stream的属性来完成。

您需要声明具有所需路由键的ExchangeQueueBinding @Bean,然后配置消费者绑定以不声明队列和绑定,并设置s.c.s.consumer.multiplex=trues.c.s.destination=queue1,queue2,queue3

请参阅使用现有队列/交换以了解如何禁用绑定器的自动配置。

英文:

It can't be done directly with Spring Cloud Stream via properties.

You would have to declare Exchange, Queue and Binding @Beans with the required routing keys and then configure the consumer binding to not declare the queue and binding) and then set s.c.s.consumer.multiplex=true and s.c.s.destination=queue1,queue2,queue3.

See Using Existing Queues/Exhanges for how to disable binder provisioning.

答案2

得分: 0

以下是您要翻译的内容:

"使用Spring Boot中的Spring-AMQP插件可以轻松完成。我在下面的代码片段中演示了如何实现它。 (我相信代码胜过理论):

package com.savk.workout.spring.rabbitmqconversendreceivefanoutproducer;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DynamicBindingDemo {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private boolean switchedOn = true;

    //Toggle binding programatically
    @Scheduled(fixedRate = 10000L)
    public void manipulateBinding() {
        Exchange exchange = ExchangeBuilder.directExchange("exchange").autoDelete().build();
        Queue queue = QueueBuilder.nonDurable("queue").build();

        Binding binding = BindingBuilder.bind(queue).to(exchange).with("routingkey").noargs();

        if(switchedOn) {
            rabbitAdmin.declareBinding(binding);
        } else {
            rabbitAdmin.removeBinding(binding);
        }
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(rabbitTemplate.getConnectionFactory());
    }

}

确保您还有相应的RabbitListener:

@RabbitListener(queues = "queuename")
@RabbitHandler
public String handle(String msg)  {
    System.out.println("RCVD :: " + msg);
    String response = "PONG { " + msg + " } from INSTANCE " + INSTANCE;
    return response;
}

您可以看到一切都可以动态完成。您只需操作绑定即可。"

英文:

It can be easily done using Spring-AMQP plugin in Spring boot. I have enclosed a below snippet how it can be achieved. (I believe code speaks more than theoritical words) :

package com.savk.workout.spring.rabbitmqconversendreceivefanoutproducer;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

@Component
public class DynamicBindingDemo {

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    private boolean switchedOn = true;

    //Toggle binding programatically
    @Scheduled(fixedRate = 10000L)
    public void manipulateBinding() {
        Exchange exchange = ExchangeBuilder.directExchange("exchange").autoDelete().build();
        Queue queue = QueueBuilder.nonDurable("queue").build();
    
        Binding binding = BindingBuilder.bind(queue).to(exchange).with("routingkey").noargs();
        
        if(switchedOn) {
            rabbitAdmin.declareBinding(binding);
        } else {
            rabbitAdmin.removeBinding(binding);
        }
    }

    @Bean
    public RabbitAdmin rabbitAdmin() {
        return new RabbitAdmin(rabbitTemplate.getConnectionFactory());
    }

}

Make sure you also have corresponding RabbitListener for it :

@RabbitListener(queues = "queuename")
@RabbitHandler
public String handle(String msg)  {
    System.out.println("RCVD :: " + msg);
    String response = "PONG { " + msg + " } from INSTANCE " + INSTANCE;
    return response;
}

You see everything can be done dynamically. All you need is to manipulate with bindings.

huangapple
  • 本文由 发表于 2020年8月13日 23:17:22
  • 转载请务必保留本文链接:https://go.coder-hub.com/63398170.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定