Nats Jetstream 精确一次投递

huangapple go评论89阅读模式
英文:

Nats Jetstream Exactly Once Delivery

问题

我想使用Nats Jetstream实现一种仅一次传递的系统。文档中提到Jetstream有这个选项,但是没有关于它如何工作以及客户端如何实现的示例或详细信息。我知道在发布者端我们可以设置MsgId并在创建流时指定重复窗口,但是消费者端呢?

英文:

I want to implement an exactly once delivery system with Nats Jetstream. Documentation says that Jetstream has this option, but there is no samples or details about that how it's work and how clients can implement this. I know that in publisher side we can set MsgId and specify duplication window when creating Stream, but what about consumer side?

答案1

得分: 4

以下是exactly-once delivery的文档。这个术语有点误导,因为实际上需要的是“仅处理一次”。正如你所指出的,这是服务器在接收到发布的消息时进行去重,并且订阅者在接收到消息后进行双重确认调用(如果需要,还会进行重试)的组合。

这是一个示例(为简洁起见省略了多余的错误处理)。使用启用了JetStream的服务器运行以下代码:nats-server --js,然后运行此代码(假设使用的是nats.go v1.16+)。

package main

import (
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func failOnErr(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	// 连接并获取JetStream上下文。
	nc, _ := nats.Connect(nats.DefaultURL)
	js, _ := nc.JetStream()

	// 创建一个测试流。
	_, err := js.AddStream(&nats.StreamConfig{
		Name:       "test",
		Storage:    nats.MemoryStorage,
		Subjects:   []string{"test.>"},
		Duplicates: time.Minute,
	})
	failOnErr(err)

	defer js.DeleteStream("test")

	// 发布一些带有重复消息的消息。
	js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
	js.Publish("test.2", []byte("world"), nats.MsgId("2"))
	js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
	js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
	js.Publish("test.2", []byte("world"), nats.MsgId("2"))
	js.Publish("test.2", []byte("world"), nats.MsgId("2"))

	// 在流上创建一个显式的拉取消费者。
	_, err = js.AddConsumer("test", &nats.ConsumerConfig{
		Durable:       "test",
		AckPolicy:     nats.AckExplicitPolicy,
		DeliverPolicy: nats.DeliverAllPolicy,
	})
	failOnErr(err)
	defer js.DeleteConsumer("test", "test")

	// 在拉取消费者上创建一个订阅。
	// 由于默认绑定到流的所有主题,因此主题可以为空。
	sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
	failOnErr(err)

	// 只有两个消息会被传递。
	batch, _ := sub.Fetch(10)
	log.Printf("%d messages", len(batch))

	// 使用AckSync确保服务器接收到确认。
	batch[0].AckSync()
	batch[1].AckSync()

	// 应该为零。
	batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
	log.Printf("%d messages", len(batch))
}

值得注意的是,如果AckSync失败(可能会返回错误),则代码需要重试确认,直到收到响应为止。客户端的冗余确认是无操作的。

英文:

Here are the docs for exactly-once delivery. This is a bit of a misnomer since what is actually needed (and what this feature provides) is exactly-once processing.

As you point out, it is a combination of de-duplication by the server when receiving a published message as well as a double ack call by the subscription that had received the message (plus retries if necessary).

Here is an example (excess error handling elided for brevity). Start the server with JetStream enabled: nats-server --js and then run this code (it assuming nats.go v1.16+).

package main

import (
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func failOnErr(err error) {
	if err != nil {
		log.Fatal(err)
	}
}

func main() {
	// Connect and get the JetStream context.
	nc, _ := nats.Connect(nats.DefaultURL)
	js, _ := nc.JetStream()

	// Create a test stream.
	_, err := js.AddStream(&nats.StreamConfig{
		Name:       "test",
		Storage:    nats.MemoryStorage,
		Subjects:   []string{"test.>"},
		Duplicates: time.Minute,
	})
	failOnErr(err)

	defer js.DeleteStream("test")

	// Publish some messages with duplicates.
	js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
	js.Publish("test.2", []byte("world"), nats.MsgId("2"))
	js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
	js.Publish("test.1", []byte("hello"), nats.MsgId("1"))
	js.Publish("test.2", []byte("world"), nats.MsgId("2"))
	js.Publish("test.2", []byte("world"), nats.MsgId("2"))

	// Create an explicit pull consumer on the stream.
	_, err = js.AddConsumer("test", &nats.ConsumerConfig{
		Durable:       "test",
		AckPolicy:     nats.AckExplicitPolicy,
		DeliverPolicy: nats.DeliverAllPolicy,
	})
	failOnErr(err)
	defer js.DeleteConsumer("test", "test")

	// Create a subscription on the pull consumer.
	// Subject can be empty since it defaults to all subjects bound to the stream.
	sub, err := js.PullSubscribe("", "test", nats.BindStream("test"))
	failOnErr(err)

	// Only two should be delivered.
	batch, _ := sub.Fetch(10)
	log.Printf("%d messages", len(batch))

	// AckSync both to ensure the server received the ack.
	batch[0].AckSync()
	batch[1].AckSync()

	// Should be zero.
	batch, _ = sub.Fetch(10, nats.MaxWait(time.Second))
	log.Printf("%d messages", len(batch))
}

It is worth noting that if an AckSync does fail (an error can be returned from it) then its on this code to retry the ack again until a response is received. A redundant ack from the client is a no-op.

huangapple
  • 本文由 发表于 2022年6月30日 18:50:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/72814502.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定