英文:
How to consume the last 5 minutes data in Kafka using confluent kakfa python package?
问题
我目前能够消费Kafka中的最新实时数据,但是否有一种优化的方式可以消费每个分区中过去5分钟的数据?
目前的做法是将auto.offset.reset
设置为earliest
,然后消费直到达到每个分区中5分钟时间戳的末尾偏移量。但这需要很长时间。
如果有一种以相反的顺序来做到这一点以减少消耗时间的方法,那将非常有帮助!
英文:
I am currently able to consume the lastest real-time data in kafka but is there a way to consume the last 5 mins data in each partition in an optimized way?
The current way of doing this is setting auto.offset.reset
to earliest
and then consuming till it reaches the end of the offset in each partion that lies in the 5 minute timestamp. But this takes a long time.
If there is a way to do this but in reverse order so as to reduce the cosumption time, it would be really helpful!
答案1
得分: 3
The confluent_kafka.Consumer.offsets_for_times()
函数提供了一种获取 TopicPartition
对象的最早偏移量的机制,其中时间戳大于或等于以毫秒提供的 POSIX 时间戳。
在订阅主题时,您可以为 on_assign
事件注册回调函数,该回调函数使用 Consumer.offsets_for_times()
和 Consumer.assign()
来在消费消息之前将分配的分区的偏移量重置为所需位置。
例如,您可以这样做:
import datetime
import math
from confluent_kafka import Consumer, TopicPartition
def get_time_offset():
'''返回距离调用时5分钟的 POSIX 纪元表示(毫秒)的日期时间'''
delta = datetime.timedelta(minutes=5)
now = datetime.datetime.now(datetime.timezone.utc) # 用于简化 POSIX 纪元转换的 TZ-aware 对象
prior = now - delta
return math.floor(prior.timestamp() * 1000) # 将秒转换为毫秒以供 Consumer.offsets_for_times() 使用
def reset_offsets(consumer, partitions):
'''将提供的分区的偏移量重置为时间戳大于或等于5分钟前找到的最早偏移量'''
time_offset = get_time_offset()
search_partitions = [TopicPartition(p.topic, p.partition, time_offset) for p in partitions] # 使用偏移量= time_offset 创建新的 TP
time_offset_partitions = consumer.offsets_for_times(search_partitions) # 查找时间戳大于或等于 time_offset 的最早偏移量的 TP
consumer.assign(time_offset_partitions) # (重新)设置消费者的分区分配并开始消费
topics = ['my-topic-of-interest']
c = Consumer({
'bootstrap.servers': 'server-fqdn',
'group.id': 'group-name'
})
c.subscribe(topics, on_assign=reset_offsets) # 在接收到分区分配后,调用 reset_offsets()
# 处理从重置偏移量(5分钟前)到现在(以及未来)的所有消息
while True:
try:
msg = c.poll() # 第一次调用触发 on_assign 回调函数的执行,重置偏移量
except RuntimeError as e:
print("Consumer is closed.")
break
# 处理消息并提交...
c.close()
英文:
The confluent_kafka.Consumer.offsets_for_times()
function provides a mechanism to obtain the earliest offsets for TopicPartition
objects where the timestamps are greater than or equal to a POSIX timestamp provided in milliseconds.
You could register a callback function for the on_assign
event when subscribing to your topic(s) that uses Consumer.offsets_for_times()
and Consumer.assign()
to reset offsets on your assigned partitions to the desired positions prior to consuming messages.
For example, you might do something like this:
import datetime
import math
from confluent_kafka import Consumer, TopicPartition
def get_time_offset():
'''Returns the POSIX epoch representation (in milliseconds) of the datetime 5 minutes prior to being called'''
delta = datetime.timedelta(minutes=5)
now = datetime.datetime.now(datetime.timezone.utc) # TZ-aware object to simplify POSIX epoch conversion
prior = now - delta
return math.floor(prior.timestamp() * 1000) # convert seconds to milliseconds for Consumer.offsets_for_times()
def reset_offsets(consumer, partitions):
'''Resets the offsets of the provided partitions to the first offsets found corresponding to timestamps greater than or equal to 5 minutes ago.'''
time_offset = get_time_offset()
search_partitions = [TopicPartition(p.topic, p.partition, time_offset) for p in partitions] # new TPs with offset= time_offset
time_offset_partitions = consumer.offsets_for_times(search_partitions) # find TPs with timestamp of earliest offset >= time_offset
consumer.assign(time_offset_partitions) # (re-)set consumer partition assignments and start consuming
topics = ['my-topic-of-interest']
c = Consumer({
'bootstrap.servers': 'server-fqdn',
'group.id': 'group-name'
})
c.subscribe(topics, on_assign=reset_offsets) # reset_offsets() called when partition assignment received after c.poll()
# Process all messages from reset offsets (5 min. ago) to present (and ongoing)
while True:
try:
msg = c.poll() # first call triggers execution of on_assign callback function, resetting offsets
except RuntimeError as e:
print("Consumer is closed.")
break
# process message and commit...
c.close()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论