Apache Pulsar:从指定的消息 ID 读取/消费消息到结束消息 ID?

huangapple go评论90阅读模式
英文:

Apache Pulsar: Read / Consume messages from an integer specified message id to an end message Id?

问题

使用Kafka,我可以指定一个整数消息ID来开始消费,并指定一个结束消息来停止消费,例如:

kafkacat -b kafka:9092 -t messages -o 11000 -c 11333

然而,似乎在Apache Pulsar中没有提供指定整数起始和结束消息的相同功能!

公平地说,如果这些消息已经以字节格式被跟踪和保存,可以通过一个非常复杂的过程来指定起始消息ID和结束消息ID,但这必然会影响性能和代码复杂性。

例如:

client, err := NewClient(pulsar.ClientOptions{
    URL: lookupURL,
})

if err != nil {
    log.Fatal(err)
}
defer client.Close()

topic := "topic-1"
ctx := context.Background()

// 创建生产者
producer, err := client.CreateProducer(pulsar.ProducerOptions{
    Topic:           topic,
    DisableBatching: true,
})
if err != nil {
    log.Fatal(err)
}
defer producer.Close()

// 发送10条消息
msgIDs := [10]MessageID{}
for i := 0; i < 10; i++ {
    msgID, err := producer.Send(ctx, &pulsar.ProducerMessage{
        Payload: []byte(fmt.Sprintf("hello-%d", i)),
    })
    assert.NoError(t, err)
    assert.NotNil(t, msgID)
    msgIDs[i] = msgID
}

// 在第5条消息上创建读取器(不包括第5条消息)
reader, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:          topic,
    StartMessageID: msgIDs[4],
})

if err != nil {
    log.Fatal(err)
}
defer reader.Close()

// 接收剩余的5条消息
for i := 5; i < 10; i++ {
    msg, err := reader.Next(context.Background())
    if err != nil {
        log.Fatal(err)
    }
}

// 在第5条消息上创建读取器(包括第5条消息)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
    Topic:                   topic,
    StartMessageID:          msgIDs[4],
    StartMessageIDInclusive: true,
})

if err != nil {
    log.Fatal(err)
}
defer readerInclusive.Close()

然而,这对于多个并发读取器来说是复杂且不可靠的,并且需要使用外部结构来跟踪已处理的消息,然后才能使用起始/结束语义检索它。

是否有任何方法可以实现这一点(最好通过golang)?

英文:

With Kafka, I can specify an integer message id to begin consuming from and an end message to stop at, e.g as follows:

 kafkacat -b kafka:9092 -t messages -o 11000 -c 11333

However, it appears the same functionality to specify integer start and stop messages is not available in Apache Pulsar!

To be fair, it's possible to specify a start message id and end message id, if these have been tracked and saved in a byte format, using a very convoluted process which is bound to affect performance and code complexity.

As in this example:

client, err := NewClient(pulsar.ClientOptions{
URL: lookupURL,
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
topic := &quot;topic-1&quot;
ctx := context.Background()
// create producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic:           topic,
DisableBatching: true,
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
// send 10 messages
msgIDs := [10]MessageID{}
for i := 0; i &lt; 10; i++ {
msgID, err := producer.Send(ctx, &amp;pulsar.ProducerMessage{
Payload: []byte(fmt.Sprintf(&quot;hello-%d&quot;, i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
msgIDs[i] = msgID
}
// create reader on 5th message (not included)
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic:          topic,
StartMessageID: msgIDs[4],
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
// receive the remaining 5 messages
for i := 5; i &lt; 10; i++ {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
// create reader on 5th message (included)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
Topic:                   topic,
StartMessageID:          msgIDs[4],
StartMessageIDInclusive: true,
})
if err != nil {
log.Fatal(err)
}
defer readerInclusive.Close()

However, this is complicated and unreliable (or complex) for multiple concurrent readers and requires the use of an external construct to track the processed messages before it can be retrieved using the start/end semantics.

Is there any way to achieve this (preferably via golang)

答案1

得分: 1

我发现以下简单的方法足够(概念验证脚本):

package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"log"
	"os"
	"strconv"
	"strings"
	"time"

	"github.com/apache/pulsar-client-go/pulsar"
)

func writeBytesToFile(f string, byteSlice []byte) int {
	// 只写模式打开一个新文件
	f = "./data/" + f

	file, err := os.OpenFile(
		f,
		os.O_WRONLY|os.O_TRUNC|os.O_CREATE,
		0666,
	)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	// 将字节写入文件
	bytesWritten, err := file.Write(byteSlice)

	if err != nil {
		log.Fatal(err)
	}

	log.Printf("写入了 %d 个字节。\n", bytesWritten)

	return bytesWritten
}

func readBackByEntryId(msgDir string, msgIndex string) (yourBytes []byte) {

	// 根据约定,我们知道文件名
	fname := msgDir + "/" + msgIndex + ".dat"

	yourBytes, err := ioutil.ReadFile(fname)

	if err != nil {
		log.Printf("读取 %s 时出错", fname)
		return nil
	}

	return yourBytes
}

func getFiles(aDir string) []string {

	var theFiles []string

	files, err := ioutil.ReadDir("./data/")

	if err != nil {
		log.Fatal(err)
	}

	for _, f := range files {

		theFiles = append(theFiles, f.Name())

	}

	return theFiles
}

func streamAll(reader pulsar.Reader, startMsgIndex int64, stopMsgIndex int64) {

	read := false

	for reader.HasNext() {

		msg, err := reader.Next(context.Background())

		if err != nil {
			log.Fatal(err)
		}

		// 我可以访问消息的详细信息吗?可以
		fmt.Printf("%v -> %#v\n", msg.ID().EntryID(), msg.ID())

		// 我可以序列化为字节吗?可以
		myBytes := msg.ID().Serialize()

		// 我可以将其存储在某个地方吗?也许是一个映射表?或者甚至在磁盘上的文件中?
		// 换句话说:我可以将字节数组切片写入文件吗?可以!
		msgIndex := msg.ID().EntryID()

		if msgIndex == startMsgIndex {
			fmt.Println("开始读取:", msgIndex)
			read = true
		}

		if msgIndex > stopMsgIndex {
			fmt.Println("停止读取:", msgIndex)
			read = false
		}

		if read == false {

			fmt.Println("跳过:", msgIndex)

		} else {

			fname := strconv.FormatInt(msgIndex, 10) + ".dat"

			fmt.Println("写入的字节:", writeBytesToFile(fname, myBytes))

			fmt.Printf("接收到的消息 msgId: %#v -- 内容: '%s' 发布于 %v\n",
				msg.ID(), string(msg.Payload()), msg.PublishTime())

		}

		/*
			// FYI - 要保存和重新读取存储的 msgId:https://githubmemory.com/@storm-5
			msgId := msg.ID()
			msgIdBytes := msgId.Serialize()
			idNew, _ := pulsar.DeserializeMessageID(msgIdBytes)

			readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
				Topic:                   "ragnarok/transactions/requests",
				StartMessageID:          idNew,
				StartMessageIDInclusive: true,
			})
		*/
	}

}

func retrieveRange(client pulsar.Client) {

	someFiles := getFiles("./data/")

	for _, f := range someFiles {

		fIndex := strings.Split(f, ".")[0]

		fmt.Println("重新读取消息索引 ->", fIndex)

		msgIdBytes := readBackByEntryId("./data", fIndex)

		fmt.Printf("boom -> %#v\n", msgIdBytes)

		idNew, err := pulsar.DeserializeMessageID(msgIdBytes)

		if err != nil {
			log.Fatal(err)
		}

		fmt.Println("获得消息条目 ID ->", idNew.EntryID())

		readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
			Topic:                   "ragnarok/transactions/requests",
			StartMessageID:          idNew,
			StartMessageIDInclusive: true,
		})

		if err != nil {
			log.Fatal(err)
		}

		defer readerInclusive.Close()

		//defer readerInclusive.Close()
		fmt.Println("bleep!")

		msg, err := readerInclusive.Next(context.Background())

		if err != nil {
			log.Fatal(err)
		}

		//fmt.Println("retrieved message ->", string(msg.Payload()))
		fmt.Printf("检索到的消息 ID msgId: %#v -- 内容: '%s' 发布于 %v\n",
			msg.ID(), string(msg.Payload()), msg.PublishTime())

	}
}

func main() {

	client, err := pulsar.NewClient(
		pulsar.ClientOptions{
			URL:               "pulsar://localhost:6650",
			OperationTimeout:  30 * time.Second,
			ConnectionTimeout: 30 * time.Second,
		})

	if err != nil {
		log.Fatalf("无法实例化 Pulsar 客户端:%v", err)
	}

	defer client.Close()

	reader, err := client.CreateReader(pulsar.ReaderOptions{
		Topic:          "ragnarok/transactions/requests",
		StartMessageID: pulsar.EarliestMessageID(),
	})

	if err != nil {
		log.Fatal(err)
	}

	defer reader.Close()

	if err != nil {
		log.Fatal(err)
	}

	var startMsgId int64 = 55
	var stopMsgId int64 = 66

	// 从最早到最新流式传输所有消息
	// 在开始和停止 ID 之间选择一个子集
	streamAll(reader, startMsgId, stopMsgId)

	// 检索选择的范围
	retrieveRange(client)

}
英文:

I've found that the following simple approach is sufficient (proof of concept script):

package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;io/ioutil&quot;
&quot;log&quot;
&quot;os&quot;
&quot;strconv&quot;
&quot;strings&quot;
&quot;time&quot;
&quot;github.com/apache/pulsar-client-go/pulsar&quot;
)
func writeBytesToFile(f string, byteSlice []byte) int {
// Open a new file for writing only
f = &quot;./data/&quot; + f
file, err := os.OpenFile(
f,
os.O_WRONLY|os.O_TRUNC|os.O_CREATE,
0666,
)
if err != nil {
log.Fatal(err)
}
defer file.Close()
// Write bytes to file
bytesWritten, err := file.Write(byteSlice)
if err != nil {
log.Fatal(err)
}
log.Printf(&quot;Wrote %d bytes.\n&quot;, bytesWritten)
return bytesWritten
}
func readBackByEntryId(msgDir string, msgIndex string) (yourBytes []byte) {
//We know the file name by convention
fname := msgDir + &quot;/&quot; + msgIndex + &quot;.dat&quot;
yourBytes, err := ioutil.ReadFile(fname)
if err != nil {
log.Printf(&quot;error reading %s&quot;, fname)
return nil
}
return yourBytes
}
func getFiles(aDir string) []string {
var theFiles []string
files, err := ioutil.ReadDir(&quot;./data/&quot;)
if err != nil {
log.Fatal(err)
}
for _, f := range files {
theFiles = append(theFiles, f.Name())
}
return theFiles
}
func streamAll(reader pulsar.Reader, startMsgIndex int64, stopMsgIndex int64) {
read := false
for reader.HasNext() {
msg, err := reader.Next(context.Background())
if err != nil {
log.Fatal(err)
}
//can I access the details of the message ? yes
fmt.Printf(&quot;%v -&gt; %#v\n&quot;, msg.ID().EntryID(), msg.ID())
//Can i serialize into bytes? Yes
myBytes := msg.ID().Serialize()
//Can I store it somewhere? Perhaps a map ? or even on disk in a file ?
//In other words: Can I write a byte[] slice to a file? Yes!
msgIndex := msg.ID().EntryID()
if msgIndex == startMsgIndex {
fmt.Println(&quot;start read: &quot;, msgIndex)
read = true
}
if msgIndex &gt; stopMsgIndex {
fmt.Println(&quot;stop reading: &quot;, msgIndex)
read = false
}
if read == false {
fmt.Println(&quot;skipping &quot;, msgIndex)
} else {
fname := strconv.FormatInt(msgIndex, 10) + &quot;.dat&quot;
fmt.Println(&quot;written bytes: &quot;, writeBytesToFile(fname, myBytes))
fmt.Printf(&quot;Received message msgId: %#v -- content: &#39;%s&#39; published at %v\n&quot;,
msg.ID(), string(msg.Payload()), msg.PublishTime())
}
/*
//FYI - to save and reread a msgId from store: https://githubmemory.com/@storm-5
msgId := msg.ID()
msgIdBytes := msgId.Serialize()
idNew, _ := pulsar.DeserializeMessageID(msgIdBytes)
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
Topic:                   &quot;ragnarok/transactions/requests&quot;,
StartMessageID:          idNew,
StartMessageIDInclusive: true,
})
*/
}
}
func retrieveRange(client pulsar.Client) {
someFiles := getFiles(&quot;./data/&quot;)
for _, f := range someFiles {
fIndex := strings.Split(f, &quot;.&quot;)[0]
fmt.Println(&quot;re-reading message index -&gt; &quot;, fIndex)
msgIdBytes := readBackByEntryId(&quot;./data&quot;, fIndex)
fmt.Printf(&quot;boom -&gt; %#v\n&quot;, msgIdBytes)
idNew, err := pulsar.DeserializeMessageID(msgIdBytes)
if err != nil {
log.Fatal(err)
}
fmt.Println(&quot;Got message entry id =&gt; &quot;, idNew.EntryID())
readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
Topic:                   &quot;ragnarok/transactions/requests&quot;,
StartMessageID:          idNew,
StartMessageIDInclusive: true,
})
if err != nil {
log.Fatal(err)
}
defer readerInclusive.Close()
//defer readerInclusive.Close()
fmt.Println(&quot;bleep!&quot;)
msg, err := readerInclusive.Next(context.Background())
if err != nil {
log.Fatal(err)
}
//fmt.Println(&quot;retrieved message -&gt; &quot;, string(msg.Payload()))
fmt.Printf(&quot;Retrieved message ID message msgId: %#v -- content: &#39;%s&#39; published at %v\n&quot;,
msg.ID(), string(msg.Payload()), msg.PublishTime())
}
}
func main() {
client, err := pulsar.NewClient(
pulsar.ClientOptions{
URL:               &quot;pulsar://localhost:6650&quot;,
OperationTimeout:  30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf(&quot;Could not instantiate Pulsar client: %v&quot;, err)
}
defer client.Close()
reader, err := client.CreateReader(pulsar.ReaderOptions{
Topic:          &quot;ragnarok/transactions/requests&quot;,
StartMessageID: pulsar.EarliestMessageID(),
})
if err != nil {
log.Fatal(err)
}
defer reader.Close()
if err != nil {
log.Fatal(err)
}
var startMsgId int64 = 55
var stopMsgId int64 = 66
//stream all the messages from the earliest to latest
//pick a subset between a start and stop id
streamAll(reader, startMsgId, stopMsgId)
//retrieve the picked range
retrieveRange(client)
}

huangapple
  • 本文由 发表于 2022年2月8日 14:28:24
  • 转载请务必保留本文链接:https://go.coder-hub.com/71029427.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定