在Java中检查Kafka消费者是否没有任何记录要返回并且为空的好方法是什么?

huangapple go评论65阅读模式
英文:

Good way to check if Kafka Consumer doesn't have any records to return and is empty in java?

问题

我正在使用 Apache KafkaConsumer。我希望在不进行轮询的情况下检查消费者是否有任何要返回的消息。如果我轮询消费者并且没有任何消息,那么我会在超时期满之前的无限循环中收到消息“由于组正在重新平衡,尝试心跳失败”。尽管我有一个records.isEmpty()条件。这是我的代码片段:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
    log.info("没有更多记录");
    consumer.close();
} else {
    records.iterator().forEachRemaining(record -> log.info("记录:" + record));
}

这在records不为空的情况下运行良好。一旦它为空,它会多次记录“由于组正在重新平衡,尝试心跳失败”,一次记录“没有更多记录”,然后继续记录心跳错误。我该如何解决这个问题,以及如何优雅地检查(无需任何心跳消息)是否没有更多要轮询的记录?

编辑:我问了另一个问题,完整的代码和上下文在此链接中:https://stackoverflow.com/questions/64328346/how-to-get-messages-from-kafka-consumer-one-by-one-in-java/

提前感谢!

英文:

I'm using Apache KafkaConsumer. I want to check if the consumer has any messages to return without polling. If I poll the consumer and there aren't any messages, then I get the message "Attempt to heartbeat failed since the group is rebalancing" in an infinite loop until the timeout expires, even though I have a records.isEmpty() clause. This is a snippet of my code:

ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofSeconds(10));
if (records.isEmpty()) {
      log.info(&quot;No More Records&quot;);
      consumer.close();
    }
else {
      records.iterator().forEachRemaining(record -&gt; log.info(&quot;RECORD: &quot; + record);
);

This works fine until records are empty. Once it is empty, it logs "Attempt to heartbeat failed since the group is rebalancing" many times, logs "No More Records" once, and then continues to log the heartbeat error. What can I do to combat this and how can I elegantly check (without any heartbeat messages) that there are no more records to poll?

Edit: I asked another question and the full code and context is on this link: https://stackoverflow.com/questions/64328346/how-to-get-messages-from-kafka-consumer-one-by-one-in-java/

Thanks in advance!

答案1

得分: 1

不需要翻译代码部分,只翻译注释部分:

"Since I have a UI and want to receive a message one by one by clicking the 'receive' button, there might be a case when there are no more messages to be polled."

In that case you need to create a new KafkaConsumer every time someone clicks on the 'receive' button and then close it afterwards.

If you want to use the same KafkaConsumer for the lifetime of your client, you need to let the broker know that it is still alive (by sending a heartbeat, which is implicitly done through calling the poll method). Otherwise, as you have already experienced, the broker thinks your KafkaConsumer is dead and will initiate a rebalancing. As there is no other active Consumer available, this rebalancing will not stop.

英文:

> Out of comment: "Since I have a UI and want to receive a message one by one by clicking the "receive" button, there might be a case when there are no more messages to be polled."

In that case you need to create a new KafkaConsumer every time someone clicks on the "receive" button and then close it afterwards.

If you want to use the same KafkaConsumer for the lifetime of your client, you need to let the broker know that it is still alive (by sending a heartbeat, which is implicitly done through calling the poll method). Otherwise, as you have already experienced, the broker thinks your KafkaConsumer is dead and will initiate a rebalancing. As there is no other active Consumer available this rebalancing will not stop.

huangapple
  • 本文由 发表于 2020年10月14日 23:51:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/64357095.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定