英文:
Apache Pulsar - What is the behaviour of the Consumer.seek() method by timestamp?
问题
https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-
调用消费者的 seek(long timestamp) 方法时,timestamp 是否必须等于消息发布的确切时间?
例如,如果我在 t=1、5、7 时发送了三条消息,如果我调用 consumer.seek(3),会出现错误吗?还是我的消费者会重置为 t=3,这样如果我调用 consumer.next(),我会收到第二条消息?
提前致谢,
英文:
https://pulsar.apache.org/api/client/2.4.0/org/apache/pulsar/client/api/Consumer.html#seek-long-
When calling seek(long timestamp) method on the consumer, does timestamp have to equal the exact time a message was published?
For example, if i sent three messages at t=1, 5, 7 and if i call consumer.seek(3), will i get an error? or will my consumer get reset to t=3, so that if i call consumer.next(), i'll get my second message?
Thanks in advance,
答案1
得分: 3
Consumer#seek(long timestamp)
允许您将订阅重置到给定的时间戳。在执行 seek 操作后,消费者将开始接收那些发布时间等于或大于传递给 seek
方法的时间戳的消息。
以下示例展示了如何将消费者重置到前一个小时:
try (
// 创建 PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 创建消费者订阅
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// 将消费者定位到前一个小时
consumer.seek(Instant.now().minus(Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"接收到消息:key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
请注意,如果您有多个属于同一订阅(例如,Key_Shared)的消费者,则所有消费者都将被重置。
英文:
The Consumer#seek(long timestamp)
allows you to reset your subscription to a given timestamp. After seeking the consumer will start receiving messages with a publish time equal to or greater than the timestamp passed to the seek
method.
The below example show how to reset a consumer to the previous hour:
try (
// Create PulsarClient
PulsarClient client = PulsarClient
.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// Create Consumer subscription
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
.subscribe()
) {
// Seek consumer to previous hour
consumer.seek(Instant.now().minus( Duration.ofHours(1)).toEpochMilli());
while (true) {
final Message<String> msg = consumer.receive();
System.out.printf(
"Message received: key=%s, value=%s, topic=%s, id=%s%n",
msg.getKey(),
msg.getValue(),
msg.getTopicName(),
msg.getMessageId().toString());
consumer.acknowledge(msg);
}
}
Note that if you have multiple consumers that belong to the same subscriptio ( e.g., Key_Shared) then all consumers will be reset.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论