在RabbitMQ中安排一条消息的计划。

huangapple go评论75阅读模式
英文:

Schedule a message in Rabbitmq

问题

我正在尝试按照官方教程操作,同时添加延迟/定时发送消息到RabbitMQ的功能。我在Docker中使用rabbitmq:3-management-alpine进行设置,并尝试设置x-delay头部,但消息仍然立即发送。

send.go

package main

import (
	"context"
	"log"
	"time"

	amqp "github.com/rabbitmq/amqp091-go"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Panicf("%s: %s", msg, err)
	}
}

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	body := "Hello World!"
	err = ch.PublishWithContext(ctx,
		"",      // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			Headers: map[string]interface{}{
				"x-delay": 5000,
			},
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
	log.Printf(" [x] Sent %s\n", body)
}

receive.go

package main

import (
	"log"

	amqp "github.com/rabbitmq/amqp091-go"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	var forever chan struct{}

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	<-forever
}

以上是你提供的代码的翻译。

英文:

I'm trying to follow the official tutorial, but also adding the possibility to delay/schedule a message in RabbitMQ. I've my setup running in docker with rabbitmq:3-management-alpine and I've been trying to set the x-delay header, but messages still get sent instantly.

send.go

package main
import (
&quot;context&quot;
&quot;log&quot;
&quot;time&quot;
amqp &quot;github.com/rabbitmq/amqp091-go&quot;
)
func failOnError(err error, msg string) {
if err != nil {
log.Panicf(&quot;%s: %s&quot;, msg, err)
}
}
func main() {
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;)
failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, &quot;Failed to open a channel&quot;)
defer ch.Close()
q, err := ch.QueueDeclare(
&quot;hello&quot;, // name
false,   // durable
false,   // delete when unused
false,   // exclusive
false,   // no-wait
nil,     // arguments
)
failOnError(err, &quot;Failed to declare a queue&quot;)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
body := &quot;Hello World!&quot;
err = ch.PublishWithContext(ctx,
&quot;&quot;,     // exchange
q.Name, // routing key
false,  // mandatory
false,  // immediate
amqp.Publishing{
Headers: map[string]interface{}{
&quot;x-delay&quot;: 5000,
},
ContentType: &quot;text/plain&quot;,
Body:        []byte(body),
})
failOnError(err, &quot;Failed to publish a message&quot;)
log.Printf(&quot; [x] Sent %s\n&quot;, body)
}

receive.go

package main
import (
&quot;log&quot;
amqp &quot;github.com/rabbitmq/amqp091-go&quot;
)
func main() {
conn, err := amqp.Dial(&quot;amqp://guest:guest@localhost:5672/&quot;)
failOnError(err, &quot;Failed to connect to RabbitMQ&quot;)
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, &quot;Failed to open a channel&quot;)
defer ch.Close()
q, err := ch.QueueDeclare(
&quot;hello&quot;, // name
false,   // durable
false,   // delete when unused
false,   // exclusive
false,   // no-wait
nil,     // arguments
)
failOnError(err, &quot;Failed to declare a queue&quot;)
msgs, err := ch.Consume(
q.Name, // queue
&quot;&quot;,     // consumer
true,   // auto-ack
false,  // exclusive
false,  // no-local
false,  // no-wait
nil,    // args
)
failOnError(err, &quot;Failed to register a consumer&quot;)
var forever chan struct{}
go func() {
for d := range msgs {
log.Printf(&quot;Received a message: %s&quot;, d.Body)
}
}()
log.Printf(&quot; [*] Waiting for messages. To exit press CTRL+C&quot;)
&lt;-forever
}

答案1

得分: 1

为了安排一条消息,您需要发布一条具有特定属性的消息到交换机中,而您在提供的代码中没有这样做。
以下是一个声明支持调度的交换机的示例(摘自官方文档):

args := make(amqp.Table)
args["x-delayed-type"] = "direct"
channel.ExchangeDeclare("my-exchange", "x-delayed-message", true, false, false, false, args)

然后,这是发布消息的方式(同样摘自官方文档):

messageBodyBytes := []byte("delayed payload")
props := amqp.Publishing{
    Headers: amqp.Table{
        "x-delay": 5000,
    },
}
channel.Publish("my-exchange", "", false, false, props, messageBodyBytes)

PS:很抱歉没有提供Go语言的示例,但我相信您可以通过Go库来设置所需的内容。

英文:

In order to schedule a message you need to publish a message to the exchange with specific properties defined, which you don't do in the code you have provided.
Here is an example of how to declare the exchange which supports scheduling(taken from official documentation):

Map&lt;String, Object&gt; args = new HashMap&lt;String, Object&gt;();
args.put(&quot;x-delayed-type&quot;, &quot;direct&quot;);
channel.exchangeDeclare(&quot;my-exchange&quot;, &quot;x-delayed-message&quot;, true, false, args);

Then this is the way you publish(again taken from official documentation):

byte[] messageBodyBytes = &quot;delayed payload&quot;.getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap&lt;String, Object&gt;();
headers.put(&quot;x-delay&quot;, 5000);
props.headers(headers);
channel.basicPublish(&quot;my-exchange&quot;, &quot;&quot;, props.build(), messageBodyBytes);

PS: sorry for not providing examples in Go, I'm sure you can figure out how set things up using Go library

huangapple
  • 本文由 发表于 2022年11月9日 05:56:16
  • 转载请务必保留本文链接:https://go.coder-hub.com/74367596.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定