英文:
How I can get the last offset in topic from Kafka using php library?
问题
我正在使用php-rdkafka库为API项目编写Kafka消费者。我需要找到主题中的最后偏移量,并从中获取值以进行进一步处理。例如,主题中的最后偏移量为5,然后我需要获取偏移量5并通过API发送,直到添加新的偏移量。我尝试运行以下代码:
$conf = new RdKafka\Conf();
$settings = [
'socket.keepalive.enable' => true,
'log_level' => LOG_WARNING,
'enable.auto.offset.store' => 'true',
'auto.offset.reset' => 'earliest',
'enable.partition.eof' => 'false',
'enable.auto.commit' => 'false',
'max.poll.interval.ms' => 300000,
'session.timeout.ms' => 45000,
'group.id' => 'test-group',
'group.instance.id' => uniqid('', true),
'metadata.broker.list' => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
$conf->set($key, $value);
}
$topicName = 'userstatistics_12345';
$partition = 0;
$topicPartition = new RdKafka\TopicPartition($topicName, $partition);
$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);
var_dump($topicPartitionsWithOffsets);
但是这返回了负偏移量的奇怪结果:
array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }
尽管实际上当前的最后偏移量是59。我的想法是获取最后的偏移量数字,然后使用以下方式获取值:
$consumer->assign([
new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);
我也不想使用while(true)
循环以获得快速的脚本执行。
这就是全部内容。谢谢。
英文:
I'm using php-rdkafka library to write a Kafka consumer for API project. I need to find the last offset in topic and get value from it for further processing. For example, the last offset in topic = 5, then I need to get offset 5 and send it through API until a new offset is added. What I'm trying to run:
$conf = new RdKafka\Conf();
$settings = [
'socket.keepalive.enable' => true,
'log_level' => LOG_WARNING,
'enable.auto.offset.store' => 'true',
'auto.offset.reset' => 'earliest',
'enable.partition.eof' => 'false',
'enable.auto.commit' => 'false',
'max.poll.interval.ms' => 300000,
'session.timeout.ms' => 45000,
'group.id' => 'test-group',
'group.instance.id' => uniqid('', true),
'metadata.broker.list' => 'stat-kafka-1:9092,stat-kafka-2:9092,stat-kafka-3:9092',
];
foreach ($settings as $key => $value) {
$conf->set($key, $value);
}
$topicName = 'userstatistics_12345';
$partition = 0;
$topicPartition = new RdKafka\TopicPartition($topicName, $partition);
$topicPartitionsWithOffsets = $consumer->getOffsetPositions([$topicPartition]);
var_dump($topicPartitionsWithOffsets);
but this returns weird result with negative offset
array(1) { [0]=> object(RdKafka\TopicPartition)#6 (4) { ["topic"]=> string(20) "userstatistics_12345" ["partition"]=> int(0) ["offset"]=> int(-1001) ["err"]=> int(0) } }
Although in fact the current last offset is 59. My idea is to get the last offset number and then get the value using:
$consumer->assign([
new RdKafka\TopicPartition($topicName, $partition, $lastOffset)
]);
I also don't want to use while(true) loop for fast script work.
That's all. Thanks.
答案1
得分: 1
我找到答案,对我来说运行正常:
$conf = new RdKafka\Conf();
// 配置 group.id。所有具有相同 group.id 的消费者将消费不同的分区。
$conf->set('group.id', 'test-group');
// Kafka 代理的初始列表
$conf->set('metadata.broker.list', 'kafka-1:9092');
// 设置当没有初始偏移或所需偏移超出范围时从哪里开始消费消息。
// 'earliest':从开头开始
$conf->set('auto.offset.reset', 'latest');
// 达到分区末尾时发出 EOF 事件
$conf->set('enable.partition.eof', 'true');
$kafkaConsumer = new RdKafka\KafkaConsumer($conf);
$topicName = 'topic_name';
$partition = 0;
$topicPartition = new RdKafka\TopicPartition($topicName, 0);
$timeoutMs = 100000;
$low = null;
$high = null;
$wm = $kafkaConsumer->queryWatermarkOffsets($topicName, $partition, $low, $high, $timeoutMs);
$offset = $high - 1;
$kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]);
$message = $kafkaConsumer->consume(1000);
if ($message !== null) {
// 处理消息
$payload = $message->payload;
echo "偏移 $offset 的消息:$payload\n";
}
$kafkaConsumer->close();
请注意,这是PHP代码,用于使用RdKafka库从Kafka主题中消费消息。
英文:
I found answer, works fine for me:
$conf = new RdKafka\Conf();
// Configure the group.id. All consumer with the same group.id will consume
// different partitions.
$conf->set('group.id', 'test-group');
// Initial list of Kafka brokers
$conf->set('metadata.broker.list', 'kafka-1:9092');
// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$conf->set('auto.offset.reset', 'latest');
// Emit EOF event when reaching the end of a partition
$conf->set('enable.partition.eof', 'true');
$kafkaConsumer = new RdKafka\KafkaConsumer($conf);
$topicName = 'topic_name';
$partition = 0;
$topicPartition = new RdKafka\TopicPartition($topicName, 0);
$timeoutMs = 100000;
$low = null;
$high = null;
$wm = $kafkaConsumer->queryWatermarkOffsets($topicName,$partition,$low,$high,$timeoutMs);
$offset = $high - 1;
$kafkaConsumer->assign([new RdKafka\TopicPartition($topicName, $partition, $offset)]);
$message = $kafkaConsumer->consume(1000);
if ($message !== null) {
// Process the message
$payload = $message->payload;
echo "Message at offset $offset: $payload\n";
}
$kafkaConsumer->close();
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论