如何在Java中逐条从Kafka消费者获取消息?

huangapple go评论68阅读模式
英文:

How to get messages from Kafka Consumer one by one in java?

问题

我正在使用Apache Kafka API,尝试一次只获取一条消息。我只向一个主题写入。我可以通过一个带有文本框的弹出式UI屏幕发送和接收消息。我在文本框中输入字符串,然后点击“发送”。我可以随意发送多条消息。假设我发送了3条消息,我的3条消息分别是“hi”,“lol”和“bye”。

还有一个“接收”按钮。现在,使用TutorialsPoint中的传统代码,当我点击接收按钮时,控制台会一次打印出所有3条消息(hi,lol,bye)。然而,我希望每次我在UI上点击“接收”时只打印一条消息。例如,我第一次点击接收按钮,它会打印出“hi”,第二次会是“lol”,第三次会是“bye”。

我对Kafka还不熟悉,不清楚如何做到这一点。我尝试将代码中的循环都删除,所以只剩下这两行代码:

ConsumerRecords<String, String> records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

如果我只有这两行代码,第一次点击接收按钮,它会打印出“hi”,但第二次按下它时,会得到消息“尝试进行心跳失败,因为组正在重新平衡kafka”。当我将max.poll.records设置为1时,我也会得到错误,因为我希望所有的消息最终都能被处理,但只有其中一条消息在按下接收按钮时会被记录到控制台。下一次,主题中未记录的下一条消息将被记录。

希望这样能有意义!感谢任何帮助!提前谢谢!

编辑:在包括队列并且我们可以在发送和接收消息之间进行交替的情况下,更新代码以及在有新消息时更新队列:

if (payloadQueue.isEmpty()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
    if (records.isEmpty()) {
        log.info("没有更多记录要添加");
        consumer.close();
    } else {
        records.iterator().forEachRemaining(record -> {
            log.info("记录:" + record);
            payloadQueue.offer(record);
        });
        payload = payloadQueue.poll().value();
        log.info("从KAFKA收到来自主题 {} 的事件,载荷为 \"{}\"", subject, payload);
    }
} else {
    payload = payloadQueue.poll().value();
    log.info("从KAFKA收到来自主题 {} 的事件,载荷为 \"{}\"", subject, payload);
}
英文:

I'm using Apache Kafka API and trying to get only one message at a time. I'm only writing to one topic. I can send and receive messages by having a pop UI screen with a textbox. I input a string in the textbox and click "send." I can send as many messages as I want. Let's say I send 3 messages and my 3 messages were "hi," "lol," "bye." There is also a "receive" button. Right now, using the traditional code found in TutorialsPoint, I get all 3 messages (hi, lol, bye) at once printed on the console when I click on the receive button. However, I want only want one message to be printed at a time when I click "receive" on the UI. For example, the first time I hit the receive button, it would print "hi," the second time would be "lol," and the third time would be "bye." I am new to Kafka and am confused on how to do this. I tried removing both the loops from the code so it just has

ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);
System.out.printf(records.iterator().next().value());

If I just have those 2 lines of code, the first time I hit the receive button, it would print "hi" but the second time I press it, get the message "attempt to heartbeat failed since group is rebalancing kafka." I'm getting errors when I set the max.poll.records = 1 too as I want all my messages eventually but just one of them needs to be logged to the console when the receive button is pressed. The next time, the next message in the topic not logged would be logged.

Hope that makes sense!
Appreciate any help!
Thanks in advance!

EDIT:
New code after including queues and also so we can alternate between sending and receiving messages & update the queue whenever there is a new message:

        if (payloadQueue.isEmpty()){
            ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(500));
            if (records.isEmpty()) {
                log.info(&quot;No More Records to Add&quot;);
                consumer.close();
            }
            else {
                records.iterator().forEachRemaining(record -&gt; {
                    log.info(&quot;RECORD: &quot; + record);
                    payloadQueue.offer(record);
                });
                payload = payloadQueue.poll().value();
                log.info(&quot;Received event from KAFKA on subject {} with payload \&quot;{}\&quot;&quot;, subject, payload);
            }
        }
        else {
            payload = payloadQueue.poll().value();
            log.info(&quot;Received event from KAFKA on subject {} with payload \&quot;{}\&quot;&quot;, subject, payload);
        }

答案1

得分: 3

Kafka使用批量获取以提升性能设置max.poll.records=1并不是真正必要的
你想要的可以通过一些变通方法轻松实现
# 解决方案
你可以使用一个队列来存储消息每次按下接收按钮时从队列中轮询一个消息如果队列为空就调用`consumer.poll`来填充队列
# 代码
```java
    private Queue<ConsumerRecord<String, String>> queue = new LinkedList<>();
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
    public void buttonPressed() {
        if (queue.isEmpty()) {
            consumer.poll(100).iterator().forEachRemaining(record -> queue.offer(record));
        } else {
            System.out.println(queue.poll());
        }
    }

<details>
<summary>英文:</summary>

Kafka use batch get to improve performance, it&#39;s really not necessary to set max.poll.records=1.  
What you want can be easily achieved with some workaround.  
# solution
You can have a `Queue` to store the message, each time the receive button is pressed, you poll one message from the queue, if the queue is empty, you call `consumer.poll` to fill the queue.
# code
private Queue&lt;ConsumerRecord&lt;String,String&gt;&gt; queue=new LinkedList&lt;&gt;();
KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(properties);
public void buttonPressed(){
    if (queue.isEmpty()){
        consumer.poll(100).iterator().forEachRemaining(record-&gt;queue.offer(record));
    }else {
        System.out.println(queue.poll());
    }
}

</details>



# 答案2
**得分**: 0

你应该按照 @haoyu 的建议添加一个队列,但要手动提交已消耗的偏移量。否则,应用程序重置可能会导致数据丢失(因为消息已经从主题中消耗,尽管没有被打印到用户界面)。

建议阅读 [KafkaConsumer][1] 的“手动偏移量控制”和“在 Kafka 之外存储偏移量”部分。


  [1]: https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

<details>
<summary>英文:</summary>

You should add a Queue as suggested by @haoyu, but manually commiting consumed offsets. Otherwise an application reset may lead to data loss (because the messages have been consumed from the topic, despite not being printed into the UI).

It is recommended to read the section &quot;Manual Offset Control&quot; and &quot;Storing Offsets Outside Kafka&quot; of [KafkaConsumer][1] javadoc.


  [1]: https://kafka.apache.org/25/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

</details>



huangapple
  • 本文由 发表于 2020年10月13日 12:08:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/64328346.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定