如何检查Kafka中是否分配了一个分区

huangapple go评论54阅读模式
英文:

How to check a partition is assigned in Kafka

问题

我正在尝试找到一种在执行某项操作之前检查是否已分配分区的方法。我们知道通过检查 consumer.assignment().isEmpty 可以找出消费者是否已分配分区。

但是,我要如何对分区执行类似的操作呢?我的代码如下:

List<PartitionInfo> myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream() 
   // 在执行此映射之前,我需要检查此分区是否已分配
   .map(part->new TopicPartition(subTopicName,part.partition()))
   .......;

是否有任何方法可以做到这一点?请提供建议。

英文:

I am trying to find out a way of checking if a partition is assigned before I do something on that partition. We know the way to find if the consumer is assigned or not by checking

 consumer.assignment().isEmpty

But how I will do it to the partitions, my code is below

List&lt;PartitionInfo&gt; myPartitionInfo = consumer.partitionsFor(..);
mypartitionInfo.stream() 
   // before coming to this map i need to check if this partition is assigned
   .map(part-&gt;new TopicPartition(subTopicName,part.partition()))
   .......;

Is there any way i can do that? please suggest.

答案1

得分: 1

你已经回答了你自己的问题。使用 consumer.assignment()

// 要检查的数据
String topic = "...";
int partition = 0;

// 进行检查
boolean isAssigned = consumer.assignment()
  .stream()
  .anyMatch(tp -> tp.topic().equals(topic) && tp.partition() == partition)

另外,在你的消费者轮询循环中,你也可以访问每条记录来自的分区,并在那里进行检查。

英文:

You've already answered your own question. Use consumer.assignment()

// Data to check
String topic = &quot;...&quot;;
int partition = 0;

// Check it
boolean isAssigned = consumer.assignment()
  .stream()
  .anyMatch(tp -&gt; tp.topic().equals(topic) &amp;&amp; tp.partition() == partition)

Otherwise, in your consumer poll loop, you also have acesss to the partition that each record comes from, and can check there.

huangapple
  • 本文由 发表于 2023年6月6日 01:03:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/76408577.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定