如何使用Confluent Kafka Python包消费Kafka中的最后5分钟数据?

huangapple go评论73阅读模式
英文:

How to consume the last 5 minutes data in Kafka using confluent kakfa python package?

问题

我目前能够消费Kafka中的最新实时数据,但是否有一种优化的方式可以消费每个分区中过去5分钟的数据?

目前的做法是将auto.offset.reset设置为earliest,然后消费直到达到每个分区中5分钟时间戳的末尾偏移量。但这需要很长时间。

如果有一种以相反的顺序来做到这一点以减少消耗时间的方法,那将非常有帮助!

英文:

I am currently able to consume the lastest real-time data in kafka but is there a way to consume the last 5 mins data in each partition in an optimized way?

The current way of doing this is setting auto.offset.reset to earliest and then consuming till it reaches the end of the offset in each partion that lies in the 5 minute timestamp. But this takes a long time.

If there is a way to do this but in reverse order so as to reduce the cosumption time, it would be really helpful!

答案1

得分: 3

The confluent_kafka.Consumer.offsets_for_times() 函数提供了一种获取 TopicPartition 对象的最早偏移量的机制,其中时间戳大于或等于以毫秒提供的 POSIX 时间戳。

在订阅主题时,您可以为 on_assign 事件注册回调函数,该回调函数使用 Consumer.offsets_for_times()Consumer.assign() 来在消费消息之前将分配的分区的偏移量重置为所需位置。

例如,您可以这样做:

import datetime
import math
from confluent_kafka import Consumer, TopicPartition

def get_time_offset():
  '''返回距离调用时5分钟的 POSIX 纪元表示(毫秒)的日期时间'''
  delta = datetime.timedelta(minutes=5)  
  now = datetime.datetime.now(datetime.timezone.utc)  # 用于简化 POSIX 纪元转换的 TZ-aware 对象
  prior = now - delta  
  return math.floor(prior.timestamp() * 1000)  # 将秒转换为毫秒以供 Consumer.offsets_for_times() 使用

def reset_offsets(consumer, partitions):
  '''将提供的分区的偏移量重置为时间戳大于或等于5分钟前找到的最早偏移量'''
  time_offset = get_time_offset()
  search_partitions = [TopicPartition(p.topic, p.partition, time_offset) for p in partitions]  # 使用偏移量= time_offset 创建新的 TP
  time_offset_partitions = consumer.offsets_for_times(search_partitions)  # 查找时间戳大于或等于 time_offset 的最早偏移量的 TP
  consumer.assign(time_offset_partitions)  # (重新)设置消费者的分区分配并开始消费

topics = ['my-topic-of-interest']

c = Consumer({
  'bootstrap.servers': 'server-fqdn',
  'group.id': 'group-name'
})

c.subscribe(topics, on_assign=reset_offsets)  # 在接收到分区分配后,调用 reset_offsets()
# 处理从重置偏移量(5分钟前)到现在(以及未来)的所有消息
while True:
  try:
    msg = c.poll()  # 第一次调用触发 on_assign 回调函数的执行,重置偏移量
  except RuntimeError as e:
    print("Consumer is closed.")
    break
  # 处理消息并提交...

c.close()
英文:

The confluent_kafka.Consumer.offsets_for_times() function provides a mechanism to obtain the earliest offsets for TopicPartition objects where the timestamps are greater than or equal to a POSIX timestamp provided in milliseconds.

You could register a callback function for the on_assign event when subscribing to your topic(s) that uses Consumer.offsets_for_times() and Consumer.assign() to reset offsets on your assigned partitions to the desired positions prior to consuming messages.

For example, you might do something like this:

import datetime
import math
from confluent_kafka import Consumer, TopicPartition

def get_time_offset():
  '''Returns the POSIX epoch representation (in milliseconds) of the datetime 5 minutes prior to being called'''
  delta = datetime.timedelta(minutes=5)  
  now = datetime.datetime.now(datetime.timezone.utc)  # TZ-aware object to simplify POSIX epoch conversion
  prior = now - delta  
  return math.floor(prior.timestamp() * 1000)  # convert seconds to milliseconds for Consumer.offsets_for_times()

def reset_offsets(consumer, partitions):
  '''Resets the offsets of the provided partitions to the first offsets found corresponding to timestamps greater than or equal to 5 minutes ago.'''
  time_offset = get_time_offset()
  search_partitions = [TopicPartition(p.topic, p.partition, time_offset) for p in partitions]  # new TPs with offset= time_offset
  time_offset_partitions = consumer.offsets_for_times(search_partitions)  # find TPs with timestamp of earliest offset >= time_offset
  consumer.assign(time_offset_partitions)  # (re-)set consumer partition assignments and start consuming

topics = ['my-topic-of-interest']

c = Consumer({
  'bootstrap.servers': 'server-fqdn',
  'group.id': 'group-name'
})

c.subscribe(topics, on_assign=reset_offsets)  # reset_offsets() called when partition assignment received after c.poll()

# Process all messages from reset offsets (5 min. ago) to present (and ongoing)
while True:
  try:
    msg = c.poll()  # first call triggers execution of on_assign callback function, resetting offsets
  except RuntimeError as e:
    print("Consumer is closed.")
    break
  # process message and commit...

c.close()

huangapple
  • 本文由 发表于 2023年4月20日 00:23:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76056824.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定