如何在使用Camel Spring-RabbitMQ组件时自动声明交换机。

huangapple go评论65阅读模式
英文:

How to auto-declare exchange when using camel spring-rabbitmq component

问题

我正在尝试从Camel 3.x迁移到Camel 4.x版本,因此我需要从rabbitmq组件迁移到替代组件spring-rabbitmq。在rabbitmq组件中,我使用了declare选项,以便Camel可以自动创建和绑定队列和交换机,现在我正在寻找在spring-rabbitmq组件中的等效选项:

从文档中看起来这是可能的:https://camel.apache.org/components/3.20.x/spring-rabbitmq-component.html#_auto_declare_exchanges_queues_and_bindings :

> 在您可以发送或接收RabbitMQ消息之前,必须首先设置交换机、队列和绑定。
>
> 在开发模式下,让Camel自动执行这些操作可能是可取的。您可以通过在SpringRabbitMQComponent上设置autoDeclare=true来启用此功能。
>
> 然后,Spring RabbitMQ将自动声明必要的元素,并设置交换机、队列和路由键之间的绑定。

因此,我在我的application.properties中配置了camel.component.spring-rabbitmq.auto-declare属性为true:但似乎这只影响了rabbitmq消费者端点,而不是生产者。

使用这个简单的路由:

<route id="receive-send-rabbitmq">
    <from uri="spring-rabbitmq:in-exchange?queues=queue-in"/>
    <log message="message received" />    
    <to uri="spring-rabbitmq:out-exchange"/>
</route>

in-exchangequeue-in 对象在启动路由时由Camel正确自动声明,消息正确接收。但由于交换机 out-exchange 不可用,发送部分失败。堆栈跟踪:

2023-07-10T11:54:12.094+02:00 DEBUG 3796 --- [pool-2-thread-5] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message [(Body:'[B@149e60f3(byte[3])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [out-exchange], routingKey = []
2023-07-10T11:54:12.099+02:00  INFO 3796 --- [ 127.0.0.1:5672] com.rabbitmq.client.impl.AMQConnection   : Received a frame on an unknown channel, ignoring it
2023-07-10T11:54:12.099+02:00 DEBUG 3796 --- [pool-2-thread-5] o.s.amqp.rabbit.connection.RabbitUtils   : Unexpected exception on closing RabbitMQ Channel

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'out-exchange' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)

	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'out-exchange' in vhost '/', class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:743)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:670)
	... 1 common frames omitted

我知道我可以使用RabbitAdmin来声明这些交换机,但我不想在我的应用程序代码中手动处理这个问题,路由以XML文件的形式交付到专用文件夹,由应用程序在启动时动态加载。

使用Camel 4.0.0-rc1和Springboot 3.1.1。

英文:

I'm trying to migrate from camel 3.x to camel 4.x version, so I need to migrate from the rabbitmq component to the replacing one spring-rabbitmq. With rabbitmq component I was using the declare option to let Camel automatically create and bind queues and exchanges, now I'm looking for equivalent options in the spring-rabbitmq component:

From the documentation it looks like this is possible : https://camel.apache.org/components/3.20.x/spring-rabbitmq-component.html#_auto_declare_exchanges_queues_and_bindings :

> Before you can send or receive messages from RabbitMQ, then exchanges,
> queues and bindings must be setup first.
>
> In development mode it may be desirable to let Camel automatic do
> this. You can enable this by setting autoDeclare=true on the
> SpringRabbitMQComponent.
>
> Then Spring RabbitMQ will automatic necessary declare the elements and
> setup the binding between the exchange, queue and routing keys.

So I've configured the camel.component.spring-rabbitmq.auto-declare property to true in my application.properties : but it seems that this affects only the rabbitmq consumer endpoints, not the producer.

Using this simple route:

&lt;route id=&quot;receive-send-rabbitmq&quot;&gt;
    &lt;from uri=&quot;spring-rabbitmq:in-exchange?queues=queue-in&quot;/&gt;
    &lt;log message=&quot;message received&quot; /&gt;    
    &lt;to uri=&quot;spring-rabbitmq:out-exchange&quot;/&gt;
&lt;/route&gt;

The in-exchange and queue-in objects are properly auto-declared by Camel when starting the route, messages are properly received. But the sending part is failing due to exchange out-exchange not available. Stack trace :

2023-07-10T11:54:12.094+02:00 DEBUG 3796 --- [pool-2-thread-5] o.s.amqp.rabbit.core.RabbitTemplate      : Publishing message [(Body:&#39;[B@149e60f3(byte[3])&#39; MessageProperties [headers={}, contentType=application/octet-stream, contentLength=3, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])] on exchange [out-exchange], routingKey = []
2023-07-10T11:54:12.099+02:00  INFO 3796 --- [ 127.0.0.1:5672] com.rabbitmq.client.impl.AMQConnection   : Received a frame on an unknown channel, ignoring it
2023-07-10T11:54:12.099+02:00 DEBUG 3796 --- [pool-2-thread-5] o.s.amqp.rabbit.connection.RabbitUtils   : Unexpected exception on closing RabbitMQ Channel

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method&lt;channel.close&gt;(reply-code=404, reply-text=NOT_FOUND - no exchange &#39;out-exchange&#39; in vhost &#39;/&#39;, class-id=60, method-id=40)
	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)

	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method&lt;channel.close&gt;(reply-code=404, reply-text=NOT_FOUND - no exchange &#39;out-exchange&#39; in vhost &#39;/&#39;, class-id=60, method-id=40)
	at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:517)
	at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:341)
	at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:182)
	at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:114)
	at com.rabbitmq.client.impl.AMQConnection.readFrame(AMQConnection.java:743)
	at com.rabbitmq.client.impl.AMQConnection.access$300(AMQConnection.java:47)
	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:670)
	... 1 common frames omitted

I know I could declare these exchange using the RabbitAdmin but I don't want to handle this
manually in my application code, the routes delivered as XML files in a dedicated folder, loaded dynamically by the application at startup.

Using Camel 4.0.0-rc1 with Springboot 3.1.1

答案1

得分: 1

因为这个组件选项仅适用于消费者端点,正如文档中明确说明的,我引用如下:

> 指定消费者在启动时是否应自动声明交换、队列和路由键之间的绑定关系。启用此选项对于开发很有帮助,可以轻松在代理上建立交换、队列和绑定。

对于生产者端点,您需要通过声明您的Queue为常规Bean来手动创建它,Spring将自动为您创建队列。

@Bean
public Queue myDurableQueue() {
    // 此队列具有以下属性:
    // 名称:my_durable
    // 持久性:true
    // 互斥性:false
    // 自动删除:false
    return new Queue("my_durable", true, false, false);
}

有关更多详细信息,请参阅此答案

英文:

Simply because this component option is only for consumer endpoints as clearly stated in the documentation, I quote:

> Specifies whether the consumer should auto declare binding between exchange, queue and routing key when starting. Enabling this can be good for development to make it easy to standup exchanges, queues and bindings on the broker.

For a producer endpoint, you will need to create it manually by declaring your Queue as a regular bean, Spring will automatically create the Queue for you.

@Bean
public Queue myDurableQueue() {
    // This queue has the following properties:
    // name: my_durable
    // durable: true
    // exclusive: false
    // auto_delete: false
    return new Queue(&quot;my_durable&quot;, true, false, false);
}

See this answer for more details.

huangapple
  • 本文由 发表于 2023年7月10日 18:02:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/76652673.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定