英文:
How to check a partition is assigned in Kafka
问题
我正在尝试找到一种在执行某项操作之前检查是否已分配分区的方法。我们知道通过检查 consumer.assignment().isEmpty
可以找出消费者是否已分配分区。
但是,我要如何对分区执行类似的操作呢?我的代码如下:
List<PartitionInfo> myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream()
// 在执行此映射之前,我需要检查此分区是否已分配
.map(part->new TopicPartition(subTopicName,part.partition()))
.......;
是否有任何方法可以做到这一点?请提供建议。
英文:
I am trying to find out a way of checking if a partition is assigned before I do something on that partition. We know the way to find if the consumer is assigned or not by checking
consumer.assignment().isEmpty
But how I will do it to the partitions, my code is below
List<PartitionInfo> myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream()
// before coming to this map i need to check if this partition is assigned
.map(part->new TopicPartition(subTopicName,part.partition()))
.......;
Is there any way i can do that? please suggest.
答案1
得分: 1
你已经回答了你自己的问题。使用 consumer.assignment()
// 要检查的数据
String topic = "...";
int partition = 0;
// 进行检查
boolean isAssigned = consumer.assignment()
.stream()
.anyMatch(tp -> tp.topic().equals(topic) && tp.partition() == partition)
另外,在你的消费者轮询循环中,你也可以访问每条记录来自的分区,并在那里进行检查。
英文:
You've already answered your own question. Use consumer.assignment()
// Data to check
String topic = "...";
int partition = 0;
// Check it
boolean isAssigned = consumer.assignment()
.stream()
.anyMatch(tp -> tp.topic().equals(topic) && tp.partition() == partition)
Otherwise, in your consumer poll loop, you also have acesss to the partition that each record comes from, and can check there.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论