无法使用RabbitMQ(RPC)和Apache Camel进行简单的请求-回复操作,不起作用。

huangapple go评论79阅读模式
英文:

Can't do a simple Request-reply with RabbitMQ (RPC) and Apache Camel - not working

问题

希望你一切都好!首先,我是EIP世界的新手。我正在尝试使用以下工具进行简单的请求-响应:

  • 一个使用Golang编写的RabbitMQ客户端
  • 一个使用Kotlin编写的Apache Camel路由,充当RabbitMQ服务器

我已经尝试阅读了所有可能的文档并搜索答案,但是没有找到任何有用的信息。我感到非常绝望。主要参考了这个链接,但是还没有成功。

我的目标是实现如下图所示的同步请求-响应模式:

无法使用RabbitMQ(RPC)和Apache Camel进行简单的请求-回复操作,不起作用。

我的Golang客户端代码如下:

func (r *RabbitMQConn) GetQueue(name string) *amqp.Queue {
	ch := r.GetChannel()
	defer ch.Close()
	q, err := ch.QueueDeclare(
		name,
		false,
		false,
		true,
		false,
		nil,
	)
	if err != nil {
		panic(err)
	}
	return &q
}
func (r *RabbitMQConn) PublishAndWait(routingKey string, correlationId string, event domain.SyncEventExtSend) (domain.SyncEventExtReceive, error) {
	message, err := json.Marshal(event)
	if err != nil {
		return domain.SyncEventExtReceive{}, apperrors.ErrInternal
	}
	ch := r.GetChannel()
	defer ch.Close()
	q := r.GetQueue("response")
	h, err := ch.Consume(
		q.Name,
		"",
		true,
		false,
		false,
		false,
		nil,
	)

	if err != nil {
		return domain.SyncEventExtReceive{}, err
	}
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	err = ch.PublishWithContext(
		ctx,
		"",
		routingKey,
		false,
		false,
		amqp.Publishing{
			ContentType:   "application/json",
			Body:          message,
			CorrelationId: correlationId,
			ReplyTo:       q.Name,
		},
	)
	if err != nil {
		return domain.SyncEventExtReceive{}, err
	}

	for d := range h {
		fmt.Println("Received a message:", string(d.Body))
		if d.CorrelationId == correlationId {
			var event domain.SyncEventExtReceive
			err = json.Unmarshal(d.Body, &event)
			return event, err
		}
	}
	return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}

基本上,这段代码从默认交换机中消费消息,并使用一个命名的响应队列。我将队列名作为ReplyTo参数发送,并给它一个关联ID。在这个例子中,发送的路由键是daily-weather


在服务器端,我尝试使用默认交换机创建服务器,但是Apache Camel不允许我对默认交换机进行任何操作。

from("rabbitmq:?queue=daily-weather&autoAck=true&autoDelete=false")

所以,我将其指定为amq.direct交换机。然而,这也没有起作用。

"rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false"

然后,我添加了第二个RabbitMQ端点来查看是否发送了消息,但是仍然没有成功。

from("rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false")
    .log(LoggingLevel.INFO, "weather-daily", "Received message: ${body}")
    .to("rabbitmq:amq.direct?queue=response&autoAck=true&autoDelete=false")

我想知道是否有人有使用Apache Camel实现这个功能的简单示例,因为我感到非常迷茫。如果你联系我,我可以提供更多详细信息。

非常感谢!!! 无法使用RabbitMQ(RPC)和Apache Camel进行简单的请求-回复操作,不起作用。

英文:

I hope you are well! First, I am new to the EIP world. I am trying to do a simple request reply with:

  • A Golang rabbitMQ client
  • An apache Camel route in Kotlin acting as a RabbitMQ server

I have tried to read all the docs I could and search for answers but I could't find nothing. I am basically desperate. Mainly I saw this and nothing has worked yet.

My goal is to do a sync request-reply as the image.

无法使用RabbitMQ(RPC)和Apache Camel进行简单的请求-回复操作,不起作用。

My Golang client looks like this:

func (r *RabbitMQConn) GetQueue(name string) *amqp.Queue {
ch := r.GetChannel()
defer ch.Close()
q, err := ch.QueueDeclare(
name,
false,
false,
true,
false,
nil,
)
if err != nil {
panic(err)
}
return &q
}
func (r *RabbitMQConn) PublishAndWait(routingKey string, correlationId string, event domain.SyncEventExtSend) (domain.SyncEventExtReceive, error) {
message, err := json.Marshal(event)
if err != nil {
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}
ch := r.GetChannel()
defer ch.Close()
q := r.GetQueue("response")
h, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
err = ch.PublishWithContext(
ctx,
"",
routingKey,
false,
false,
amqp.Publishing{
ContentType:   "application/json",
Body:          message,
CorrelationId: correlationId,
ReplyTo:       q.Name,
},
)
if err != nil {
return domain.SyncEventExtReceive{}, err
}
for d := range h {
fmt.Println("Received a message:", string(d.Body))
if d.CorrelationId == correlationId {
var event domain.SyncEventExtReceive
err = json.Unmarshal(d.Body, &event)
return event, err
}
}
return domain.SyncEventExtReceive{}, apperrors.ErrInternal
}

Basically, just consuming from the default exchange with a named response queue. Also, I send the queue name as the ReplyTo parameter and I give it a correlation id. The routing-key that is sent is daily-weather in this case.


On the server side, I tried to do the server with the default exchange, but Apache Camel forbids me to do nothing with that exchange.

from("rabbitmq:?queue=daily-weather&autoAck=true&autoDelete=false")

So, I assigned it the amq.direct exchange. However, that didn't also worked.

"rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false"

Then, I added a second RabbitMQ endpoint to see if it would sent it, but nothing.


from("rabbitmq:amq.direct?queue=daily-weather&autoAck=true&autoDelete=false")
.log(LoggingLevel.INFO, "weather-daily", "Received message: \${body}")
.to("rabbitmq:amq.direct?queue=response&autoAck=true&autoDelete=false")

I ask if anybody has any simple example to do this with Apache Camel, because I am ultra lost. Any further detail can be shared if you contact me.

Thank you very much!!!! 无法使用RabbitMQ(RPC)和Apache Camel进行简单的请求-回复操作,不起作用。

答案1

得分: 0

已解决

嗨!经过一段时间后,我决定看一下spring-rabbitmq Camel组件。我意识到Camel有交换模式,而rabbitmq默认将其设置为inOut。这样,它会自动将信息返回到replyTo属性。

  val RABBIMQ_ROUTE =
"spring-rabbitmq:default?queues={{rabbitmq.weather.daily.routing_key}}"

default指的是默认交换队列。

英文:

SOLVED

Hi! After some time I decided to take a look to the spring-rabbitmq Camel component. I realised that Camel has exchange patterns, and rabbitmq, by default, sets it to inOut. This way, automatically returns the information back to the replyTo property.

  val RABBIMQ_ROUTE =
"spring-rabbitmq:default?queues={{rabbitmq.weather.daily.routing_key}}"

default refers to the default exchange queue.

huangapple
  • 本文由 发表于 2022年11月1日 03:09:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/74268096.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定