英文:
Can't do a simple Request-reply with RabbitMQ (RPC) and Apache Camel - not working
问题
希望你一切都好!首先,我是EIP世界的新手。我正在尝试使用以下工具进行简单的请求-响应:
- 一个使用Golang编写的RabbitMQ客户端
- 一个使用Kotlin编写的Apache Camel路由,充当RabbitMQ服务器
我已经尝试阅读了所有可能的文档并搜索答案,但是没有找到任何有用的信息。我感到非常绝望。主要参考了这个链接,但是还没有成功。
我的目标是实现如下图所示的同步请求-响应模式:
我的Golang客户端代码如下:
func (r *RabbitMQConn) GetQueue(name string) *amqp.Queue {
ch := r.GetChannel()
defer ch.Close()
q, err := ch.QueueDeclare(
name,
false,
false,
true,
false,
nil,
)
if err != nil {
panic(err)
}
return &q
}
func (r *RabbitMQConn) PublishAndWait(routingKey string, correlationId string, event domain.SyncEventExtSend) (domain.SyncEventExtReceive, error) {
message, err := json.Marshal(event)
if err != nil {
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
ch := r.GetChannel()
defer ch.Close()
q := r.GetQueue("response")
h, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = ch.PublishWithContext(
ctx,
"",
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: message,
CorrelationId: correlationId,
ReplyTo: q.Name,
},
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
for d := range h {
fmt.Println("Received a message:", string(d.Body))
if d.CorrelationId == correlationId {
var event domain.SyncEventExtReceive
err = json.Unmarshal(d.Body, &event)
return event, err
}
}
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
基本上,这段代码从默认交换机中消费消息,并使用一个命名的响应队列。我将队列名作为ReplyTo参数发送,并给它一个关联ID。在这个例子中,发送的路由键是daily-weather
。
在服务器端,我尝试使用默认交换机创建服务器,但是Apache Camel不允许我对默认交换机进行任何操作。
from("rabbitmq:?queue=daily-weather&autoAck=true&autoDelete=false")
所以,我将其指定为amq.direct
交换机。然而,这也没有起作用。
"rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false"
然后,我添加了第二个RabbitMQ端点来查看是否发送了消息,但是仍然没有成功。
from("rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false")
.log(LoggingLevel.INFO, "weather-daily", "Received message: ${body}")
.to("rabbitmq:amq.direct?queue=response&autoAck=true&autoDelete=false")
我想知道是否有人有使用Apache Camel实现这个功能的简单示例,因为我感到非常迷茫。如果你联系我,我可以提供更多详细信息。
非常感谢!!!
英文:
I hope you are well! First, I am new to the EIP world. I am trying to do a simple request reply with:
- A Golang rabbitMQ client
- An apache Camel route in Kotlin acting as a RabbitMQ server
I have tried to read all the docs I could and search for answers but I could't find nothing. I am basically desperate. Mainly I saw this and nothing has worked yet.
My goal is to do a sync request-reply as the image.
My Golang client looks like this:
func (r *RabbitMQConn) GetQueue(name string) *amqp.Queue {
ch := r.GetChannel()
defer ch.Close()
q, err := ch.QueueDeclare(
name,
false,
false,
true,
false,
nil,
)
if err != nil {
panic(err)
}
return &q
}
func (r *RabbitMQConn) PublishAndWait(routingKey string, correlationId string, event domain.SyncEventExtSend) (domain.SyncEventExtReceive, error) {
message, err := json.Marshal(event)
if err != nil {
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
ch := r.GetChannel()
defer ch.Close()
q := r.GetQueue("response")
h, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = ch.PublishWithContext(
ctx,
"",
routingKey,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: message,
CorrelationId: correlationId,
ReplyTo: q.Name,
},
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
for d := range h {
fmt.Println("Received a message:", string(d.Body))
if d.CorrelationId == correlationId {
var event domain.SyncEventExtReceive
err = json.Unmarshal(d.Body, &event)
return event, err
}
}
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
Basically, just consuming from the default exchange with a named response queue. Also, I send the queue name as the ReplyTo parameter and I give it a correlation id. The routing-key that is sent is daily-weather
in this case.
On the server side, I tried to do the server with the default exchange, but Apache Camel forbids me to do nothing with that exchange.
from("rabbitmq:?queue=daily-weather&autoAck=true&autoDelete=false")
So, I assigned it the amq.direct
exchange. However, that didn't also worked.
"rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false"
Then, I added a second RabbitMQ endpoint to see if it would sent it, but nothing.
from("rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false")
.log(LoggingLevel.INFO, "weather-daily", "Received message: \${body}")
.to("rabbitmq:amq.direct?queue=response&autoAck=true&autoDelete=false")
I ask if anybody has any simple example to do this with Apache Camel, because I am ultra lost. Any further detail can be shared if you contact me.
Thank you very much!!!!
答案1
得分: 0
已解决
嗨!经过一段时间后,我决定看一下spring-rabbitmq Camel组件。我意识到Camel有交换模式,而rabbitmq默认将其设置为inOut
。这样,它会自动将信息返回到replyTo
属性。
val RABBIMQ_ROUTE =
"spring-rabbitmq:default?queues={{rabbitmq.weather.daily.routing_key}}"
default
指的是默认交换队列。
英文:
SOLVED
Hi! After some time I decided to take a look to the spring-rabbitmq Camel component. I realised that Camel has exchange patterns, and rabbitmq, by default, sets it to inOut
. This way, automatically returns the information back to the replyTo
property.
val RABBIMQ_ROUTE =
"spring-rabbitmq:default?queues={{rabbitmq.weather.daily.routing_key}}"
default
refers to the default exchange queue.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论