Why am I getting an older timestamp than specified when using KafkaConsumer.offsetsForTimes() in Java?

huangapple go评论79阅读模式
英文:

Why am I getting an older timestamp than specified when using KafkaConsumer.offsetsForTimes() in Java?

问题

我试图通过提供目标时间戳作为UTC即时时间("2023-05-31T00:00:00.00Z")来重新读取消息,但选择性重置似乎不起作用,我总是返回主题的第一个偏移量(0)。根据Javadoc(https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html - offsetsForTimes),我应该收到第一个消息的偏移量,其时间戳等于或大于指定时间:

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)

按时间戳查找给定分区的偏移量。每个分区返回的偏移量是时间戳大于或等于对应分区中给定时间戳的最早偏移量。这是一个阻塞调用。

然而,我返回的时间戳比指定的时间戳旧,而且偏移量是0。

我有点不太愿意认为是Apache Kafka客户端的错误,所以我想请求一些额外的眼睛来验证我是否在这里尝试一些无稽之谈。

以下是我使用代码片段所做的事情的简要摘要。
主方法重置偏移量并轮询记录(并将其打印到控制台):

resetOffsets(consumer, resetTime);

while (true) {
    records = consumer.poll(timeout);

resetOffsets方法首先将时间戳转换为epoch秒,并将其添加到每个分配的TopicPartition的映射中(我只使用单分区主题进行测试):

private static void resetOffsets(Consumer<Object, Object> consumer, String resetTime) {
    Long resetTimeEpoch = Long.valueOf(Instant.parse(resetTime).getEpochSecond());
    Map<TopicPartition, Long> partitionTimestamps = consumer.assignment().stream().collect(Collectors.toMap(tp -> tp, tp -> resetTimeEpoch));

然后,我调用offsetsForTimes来获取每个分区的偏移量,并将参数时间戳和结果映射都打印到控制台以进行调试:

Map<TopicPartition, OffsetAndTimestamp> resetOffsets = consumer.offsetsForTimes(partitionTimestamps);
    
System.out.println(partitionTimestamps);
System.out.println(resetOffsets);

然后,我寻找该偏移量并在主循环中处理消息。

目前主题中有八条消息,五条来自昨天(2023年5月31日),三条来自前天(2023年5月30日)。因此,当重置为"2023-05-31T00:00:00.00Z"时,我希望处理这五条较新的消息,但我却再次得到了全部八条。
控制台输出为{replicated-topic-0=1685491200}(对于System.out.println(partitionTimestamps);)和{replicated-topic-0=(timestamp=1685455042425, leaderEpoch=0, offset=0)}(对于System.out.println(resetOffsets);),表明我使用了正确的时间戳调用该方法。但我却得到了一个较旧的时间戳。

我有点困惑,不知道我可能做错了什么,非常感谢任何意见。如果需要,我可以提供用于重现的小测试程序的完整代码。

英文:

I'm trying to re-read messages up to a specific point in time by providing the target timestamp as UTC Instant ("2023-05-31T00:00:00.00Z"). The selective reset however does not work, I always get returned the first offset (0) of the topic. According to the Javadoc (https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html - offsetsForTimes) I should receive the offset of the first message whose timestamp is equal or greater than the specified one:
> public Map<TopicPartition,OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition,Long> timestampsToSearch)
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. This is a blocking call.

However I'm getting returned a timestamp older than the specified one as well as offset 0.

I'm a bit reluctant to consider a bug in the Apache Kafka client, so I'd like to ask for some extra pairs of eyes to verify I'm not trying some utter nonsense here.

Following is a short summary of what I did with code snippets.
The main method resets the offsets and polls for the records (and prints them to console):

 resetOffsets(consumer, resetTime);

        while (true) {
            records = consumer.poll(timeout);

The resetOffsets method first converts the timestamp to epoch seconds and adds it to a map for each assigned TopicPartiton (I'm using only a single-partition topic for my test):

private static void resetOffsets(Consumer&lt;Object, Object&gt; consumer, String resetTime) {
        Long resetTimeEpoch = Long.valueOf(Instant.parse(resetTime).getEpochSecond());
        Map&lt;TopicPartition, Long&gt; partitionTimestamps = consumer.assignment().stream().collect(Collectors.toMap(tp -&gt; tp, tp -&gt; resetTimeEpoch));

Then I call offsetsForTimes to get the offsets for each partition and print both the argument timestamp and the resulting map to console for debugging:

Map&lt;TopicPartition, OffsetAndTimestamp&gt; resetOffsets = consumer.offsetsForTimes(partitionTimestamps);
        
        System.out.println(partitionTimestamps);
        System.out.println(resetOffsets);

Afterwards I'm seeking to that offset and process the messages in the main loop.

There are currently eight messages in the topic, five from yesterday (31.05.2023) and three from the day before yesterday (30.05.2023). So I would expect to process the five newer messages when resetting to "2023-05-31T00:00:00.00Z", but I got all eight again.
Console outputs are {replicated-topic-0=1685491200} for System.out.println(partitionTimestamps); and {replicated-topic-0=(timestamp=1685455042425, leaderEpoch=0, offset=0)} for System.out.println(resetOffsets);, indicating that I call the method with the correct timestamp. I'm getting an older one back though.

I'm at a bit of a loss here what I may be doing wrong and am grateful for any input. I can provide the full code of my small test program for reproduction if requested.

答案1

得分: 0

确实发生了这种情况。问题越奇怪,很可能是自己引起的概率就越大。

问题是请求的时间戳只有以秒为单位的纪元秒分辨率,而返回的时间戳是以毫秒分辨率的,因此后者当然更大,导致了问题。请原谅浪费的时间。

英文:

> to verify I'm not trying some utter nonsense here

Exactly that happened. The weirder the problem, the greater the probability that it's caused by oneself.

The problem was the requested timestamp was only in epoch seconds resolution, the timestamp returned in milliseconds resolution -> thus the latter is of course greater, causing the problem. Please excuse any wasted time.

huangapple
  • 本文由 发表于 2023年6月1日 22:43:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/76383109.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定