Why am I getting an older timestamp than specified when using KafkaConsumer.offsetsForTimes() in Java?

huangapple go评论100阅读模式

Why am I getting an older timestamp than specified when using KafkaConsumer.offsetsForTimes() in Java?


我试图通过提供目标时间戳作为UTC即时时间("2023-05-31T00:00:00.00Z")来重新读取消息,但选择性重置似乎不起作用,我总是返回主题的第一个偏移量(0)。根据Javadoc(https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html - offsetsForTimes),我应该收到第一个消息的偏移量,其时间戳等于或大于指定时间:

  1. public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)



我有点不太愿意认为是Apache Kafka客户端的错误,所以我想请求一些额外的眼睛来验证我是否在这里尝试一些无稽之谈。


  1. resetOffsets(consumer, resetTime);
  2. while (true) {
  3. records = consumer.poll(timeout);


  1. private static void resetOffsets(Consumer<Object, Object> consumer, String resetTime) {
  2. Long resetTimeEpoch = Long.valueOf(Instant.parse(resetTime).getEpochSecond());
  3. Map<TopicPartition, Long> partitionTimestamps = consumer.assignment().stream().collect(Collectors.toMap(tp -> tp, tp -> resetTimeEpoch));


  1. Map<TopicPartition, OffsetAndTimestamp> resetOffsets = consumer.offsetsForTimes(partitionTimestamps);
  2. System.out.println(partitionTimestamps);
  3. System.out.println(resetOffsets);


控制台输出为{replicated-topic-0=1685491200}(对于System.out.println(partitionTimestamps);)和{replicated-topic-0=(timestamp=1685455042425, leaderEpoch=0, offset=0)}(对于System.out.println(resetOffsets);),表明我使用了正确的时间戳调用该方法。但我却得到了一个较旧的时间戳。



I'm trying to re-read messages up to a specific point in time by providing the target timestamp as UTC Instant ("2023-05-31T00:00:00.00Z"). The selective reset however does not work, I always get returned the first offset (0) of the topic. According to the Javadoc (https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html - offsetsForTimes) I should receive the offset of the first message whose timestamp is equal or greater than the specified one:
> public Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. This is a blocking call.

However I'm getting returned a timestamp older than the specified one as well as offset 0.

I'm a bit reluctant to consider a bug in the Apache Kafka client, so I'd like to ask for some extra pairs of eyes to verify I'm not trying some utter nonsense here.

Following is a short summary of what I did with code snippets.
The main method resets the offsets and polls for the records (and prints them to console):

  1. resetOffsets(consumer, resetTime);
  2. while (true) {
  3. records = consumer.poll(timeout);

The resetOffsets method first converts the timestamp to epoch seconds and adds it to a map for each assigned TopicPartiton (I'm using only a single-partition topic for my test):

  1. private static void resetOffsets(Consumer&lt;Object, Object&gt; consumer, String resetTime) {
  2. Long resetTimeEpoch = Long.valueOf(Instant.parse(resetTime).getEpochSecond());
  3. Map&lt;TopicPartition, Long&gt; partitionTimestamps = consumer.assignment().stream().collect(Collectors.toMap(tp -&gt; tp, tp -&gt; resetTimeEpoch));

Then I call offsetsForTimes to get the offsets for each partition and print both the argument timestamp and the resulting map to console for debugging:

  1. Map&lt;TopicPartition, OffsetAndTimestamp&gt; resetOffsets = consumer.offsetsForTimes(partitionTimestamps);
  2. System.out.println(partitionTimestamps);
  3. System.out.println(resetOffsets);

Afterwards I'm seeking to that offset and process the messages in the main loop.

There are currently eight messages in the topic, five from yesterday (31.05.2023) and three from the day before yesterday (30.05.2023). So I would expect to process the five newer messages when resetting to "2023-05-31T00:00:00.00Z", but I got all eight again.
Console outputs are {replicated-topic-0=1685491200} for System.out.println(partitionTimestamps); and {replicated-topic-0=(timestamp=1685455042425, leaderEpoch=0, offset=0)} for System.out.println(resetOffsets);, indicating that I call the method with the correct timestamp. I'm getting an older one back though.

I'm at a bit of a loss here what I may be doing wrong and am grateful for any input. I can provide the full code of my small test program for reproduction if requested.


得分: 0




> to verify I'm not trying some utter nonsense here

Exactly that happened. The weirder the problem, the greater the probability that it's caused by oneself.

The problem was the requested timestamp was only in epoch seconds resolution, the timestamp returned in milliseconds resolution -> thus the latter is of course greater, causing the problem. Please excuse any wasted time.

  • 本文由 发表于 2023年6月1日 22:43:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76383109.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
