如何通过Go将消息传递给NestJS,并使用RabbitMQ进行消费?

huangapple go评论117阅读模式
英文:

how to pass a message from go and consuming it from nestjs with rabbitmq?

问题

我有一个go服务,它将消息发布到rabbitmq,负责该部分的代码如下:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func main() {
	conn, amqError := amqp.Dial("amqp://localhost:5672/")
	if amqError != nil {
		panic(amqError)
	}

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	q, err := ch.QueueDeclare(
		"default", // name
		true,      // durable
		false,     // delete when unused
		false,     // exclusive
		false,     // no-wait
		nil,       // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := `{ "body":"Hello...", "pattern":"test",  "age":"20"}`

	err = ch.Publish(
		"",      // exchange
		q.Name, // routing key
		false,   // mandatory
		false,   // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

而消费消息的Nestjs部分如下:

import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, Payload, RmqContext } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor() { }

  @EventPattern("test")
  getEventMessage(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log("data is -> ", data) // always undefined
    console.log(
      "content of message is -> ",
      JSON.parse(
        context.getMessage().content.toString() // from buffer to string
      )
    )
  }
}

现在的问题是,我无法从data中获取消息,而是需要从ctx中解析它,另外,我需要以json格式将消息发送到go中,而不跳过双引号,就像这样"\\""

英文:

I have a go service that publishes a message to rabbitmq and the code which is responsible for that part is as below:

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func main() {
	conn, amqError := amqp.Dial("amqp://localhost:5672/")
	if amqError != nil {
		panic(amqError)
	}

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	q, err := ch.QueueDeclare(
		"default", // name
		true,      // durable
		false,     // delete when unused
		false,     // exclusive
		false,     // no-wait
		nil,       // arguments
	)
	failOnError(err, "Failed to declare a queue")

	body := "{ \"body\":\"Hello...\", \"pattern\":\"test\",  \"age\":\"20\"}"

	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		},
	)
	failOnError(err, "Failed to publish a message")

}

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

And the Nestjs part that is consuming the message is as:

import { Controller } from '@nestjs/common';
import { Ctx, EventPattern, Payload, RmqContext } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor() { }

  @EventPattern("test")
  getEventMessage(@Payload() data: any, @Ctx() context: RmqContext) {
    console.log("data is -> ", data) // always undefined
    console.log(
      "content of message is -> ",
      JSON.parse(
        context.getMessage().content.toString() // from buffer to string
      )
    )
  }
}

Now the problem is that I can't get the message from data instead of parsing it from ctx, also I need to send the message in go as json without skipping double quotes like this "\""

答案1

得分: 1

以下是对你的问题的回答:

  1. 在 NextJS 的示例中,他们没有提供如何使用有效载荷数据的方法,而是说:

要访问原始的 RabbitMQ 消息(包括属性、字段和内容),请使用 RmqContext 的 getMessage() 方法

根据上述说明,你正在正确解析队列中的消息。

  1. 为了避免手动创建要发送的字符串体,你应该使用一个称为结构体的过程进行 json 编组,例如:
  • 你应该创建一个包含要发送到队列的信息的结构体
  • 对结构体进行编组并生成 []byte
type MessageQueue struct {
	Body    string `json:"body"`
	Pattern string `json:"pattern"`
	Age     string `json:"age"`
    Data    string `json:"data"`
}

func NewMessageQueue(body, pattern, age, data string) *MessageQueue {
	return &MessageQueue{
		Body:    body,
		Pattern: pattern,
		Age:     age,
        Data:    data,
	}
}

func (m *MessageQueue) Marshal() ([]byte, error) {
	bytes, err := json.Marshal(m)

	if err != nil {
		return nil, err
	}
	return bytes, err
}

func main() {
    ...

	message := NewMessageQueue("Hello...", "test", "20", "data...")
	// TODO: 检查错误
	body, _ := message.Marshal()

	err = ch.Publish(
		"",     	// exchange
		q.Name,     // routing key
		false,  	// mandatory
		false,  	// immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        body,
		},
	)

    ...
}

更新:

  1. getEventMessage 方法中控制器接收的 data 参数应该在 Golang 中作为 body 发送,以便由 Nestjs 进行反序列化。这意味着结构应该如下所示:
type MessageQueue struct {
	Body    string `json:"body"`
	Pattern string `json:"pattern"`
	Age     string `json:"age"`
    Data    string `json:"data"`
}
英文:

The following is the response to your questions:

  1. In the examples of NextJS they don't provide how to use the payload data, instead of it, they say:

> To access the original RabbitMQ message (with the properties, fields,
> and content), use the getMessage() method of the RmqContext

Given the above statement, you are parsing properly the message in the queue.

  1. Avoiding to do manually the body string to send, you should use a process called json Marhsal of struct, for example:
  • You should create a struct that contains the information to send to queue
  • Marshal struct and generates the []byte

type MessageQueue struct {
	Body    string `json:"body"`
	Pattern string `json:"pattern"`
	Age     string `json:"age"`
    Data    string `json:"data"`
}

func NewMessageQueue(body, pattern, age string, data) *MessageQueue {
	return &MessageQueue{
		body, pattern, age, data
	}
}

func (m *MessageQueue) Marshal() ([]byte, error) {
	bytes, err := json.Marshal(m)

	if err != nil {
		return nil, err
	}
	return bytes, err
}

func main() {
    ...

	message := NewMessageQueue("Hello...", "test", "20", "data...")
	// TODO: check the error
	body, _ := message.Marshal()

	err = ch.Publish(
		"",     	// exchange
		q.Name,     // routing key
		false,  	// mandatory
		false,  	// immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        body,
		},
	)

    ...
}

Update:

  1. The data param received by controller on getEventMessage method, it should be sent on body from Golang to be deserialized by Nestjs. It means the structure should be the following:

type MessageQueue struct {
	Body    string `json:"body"`
	Pattern string `json:"pattern"`
	Age     string `json:"age"`
    Data    string `json:"data"`
}

huangapple
  • 本文由 发表于 2021年12月29日 06:09:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/70513069.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定