英文:
NATS JetStream: Is it possible to explicitly ask from JetStream to (re)send the last few messages it received in subject foo.*?
问题
基本上,主题说的是:
我想知道是否可以以一种方式查询JetStream,以便我们可以重新获取主题为"foo."的最后15条消息,或者获取JetStream在过去1.5秒内接收到的主题为"foo."的消息。
如果可能的话,如果能提供代码示例或代码示例的链接,将不胜感激。
英文:
Essentially what the subject says.
I'm wondering if JetStream can be queried in a way that allows us to refetch either the last 15 messages of subject "foo.*" or the messages that JetStream received on subject "foo.*" in the last 1.5 seconds.
If that's possible any code-samples or links to code-samples are appreciated.
答案1
得分: 1
根据官方文档1:
-
可以从特定时间开始获取消息:在过去的1.5秒内。
DeliverByStartTime
当首次消费消息时,从此时间或之后的消息开始。消费者需要指定OptStartTime,即要开始的流中的时间。它将接收到最接近该时间或之后的可用消息。 -
另一个要求是获取最后的15条消息,我认为这是不可能的。
英文:
According to the official docs
-
It is possible to grab message starting from a certain time: in the last 1.5 seconds.
> DeliverByStartTime
When first consuming messages, start with messages on or after this time. The consumer is required to specify OptStartTime, the time in the stream to start at. It will receive the closest available message on or after that time. -
The other requirement, the last 15 messages, I think it's not possible
答案2
得分: 1
-
在JetStream中实现与时间相关的检索有一种方法。
now := time.Now() oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500) js, _ := nc.JetStream() sub, err := js.SubscribeSync( "foo.*", nats.OrderedConsumer(), nats.StartTime(oneAndHalfSecondAgo), ) for { msg, err := sub.NextMsg(10 * time.Second) //从最旧到最新的消息 if err != nil { log.Fatal(err) } // 1. 检查消息的时间戳,如果在“now”之后,我们在这里跳出for循环 // 2. 如果消息在now之前,我们可以在这里将其推入一个数组 }
需要注意的是,尽管这种技术很有用,但效率相对较低,因为我们逐个获取消息。
我们可以使用.Subscribe()进行修改(它是异步的),但这样会出现另一个问题:
我们会从JetStream中过度拉取超过当前时刻的消息,然后我们必须确保我们抓取的缓冲消息确实会返回到JetStream。据我所知,目前没有配置选项可以告诉JetStream关于“MaxTime”。
-
至于如何“获取最新的N条消息”,可以修改上面的代码示例,以便获取一段相当长的消息(例如最近5秒、10秒或30秒内的所有消息),然后在获取到当前时刻的所有消息后,再获取最新的'N'条消息。
当然,这种技术并不理想,但目前似乎没有其他方法可以实现这一点。
英文:
-
There is a way to achieve the time-related retrieval in JetStream.
now := time.Now() oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500) js, _ := nc.JetStream() sub, err := js.SubscribeSync( "foo.*", nats.OrderedConsumer(), nats.StartTime(oneAndHalfSecondAgo), ) for { msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones if err != nil { log.Fatal(err) } // 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here // 2. if the message is before now we can push it in an array here }
Note that this technique, though useful, is quite inefficient because we grab messages one by one.
We could modify it using .Subscribe() (which is asynchronous) but then we would have a different problem:
We would overpull from JetStream past the present moment and then we would have to make sure that the buffered messages that we grabbed will indeed be returned back to JetStream. As far as I can tell there is no configuration option to tell JetStream about the "MaxTime".
-
As for how to "get the latest N-Messages" one could modify the above code-sample so that he will get a reasonably high chunk of messages (p.e. all messages in the last 5secs or 10secs or 30secs) and after getting all the messages up to the present moment he can then grab the latest 'N' messages.
This technique is not ideal ofcourse but there doesn't seem to be another way to do this - at least not at the time of this writing.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论