Spring Kafka手动确认使用AckMode.RECORD。

huangapple go评论77阅读模式
英文:

Spring Kafka manual acknowledge using AckMode.RECORD

问题

I am trying to use Kafka for the below use case that I have.

我们尝试在以下使用案例中使用Kafka。

Our producer will produce messages to kafka topic. Our consumer is slow by nature because each message processing will vary depending on the content of the message. We are also batching the incoming messages based on some conditions and processing the list of batched messages in one go. We tried the below two approaches.

我们的生产者将消息生成到Kafka主题。我们的消费者本质上较慢,因为每条消息的处理时间取决于消息的内容。我们还根据某些条件对传入的消息进行批处理,然后一次性处理批处理消息列表。我们尝试了以下两种方法。

  1. Multiple Partitions: With multiple partitions, we are able to batch and process the messages with the same consumer thread which is working fine, but when we have some smaller set of messages in the batch and our condition to process the batch is not met, we are waiting for new messages to come to process the existing messages which are already batched. We tried with time out but as the consumer thread wont execute until we get a message, we were never able to process the last chunk of messages.

  2. 多个分区:使用多个分区,我们能够将消息进行批处理并使用同一消费者线程处理这些消息,这一切都运行良好,但当批处理中有一些较小的消息集合并且我们的处理批处理的条件未满足时,我们会等待新消息以处理已经批处理的现有消息。我们尝试了设置超时,但由于消费者线程不会执行直到收到消息,我们无法处理最后一批消息。

  3. One Partition - In memory queue: We tried with single partition which will write all the incoming messages to an in memory concurrent queue and we are spawning 5 threads(lets' say) using ExecutorService. All the threads were able to read unique messages from queue and process them. But the problem is with Acknowledgement. We are saving ack along with message in queue and trying to ack each message after processing. But what we observed is after partial processing from internal queue and if the consumer is restarted, I dont see other messages coming back to consumer. We are guessing this is because of the ack we are doing. Here the Ack mode id MANUAL/MANUAL_IMMEDIATE

  4. 单个分区 - 内存队列:我们尝试使用单个分区,将所有传入的消息写入内存并发队列,然后使用ExecutorService生成5个线程(假设)。所有线程都能够从队列中读取唯一的消息并处理它们。但问题出现在确认(Acknowledgement)方面。我们将确认信息与消息一起保存在队列中,并尝试在处理后确认每条消息。但我们观察到的问题是,在从内部队列进行部分处理后,如果重新启动消费者,我看不到其他消息返回到消费者。我们猜测这是因为我们进行的确认。这里的确认模式是 MANUAL/MANUAL_IMMEDIATE

Code below:

下面是代码:

@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}", concurrency = "1")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        JSONObject jsonObject = new JSONObject(record.value());
        JSONArray request_data = jsonObject.getJSONArray("request");
        JSONObject data = request_data.getJSONObject(0);
        MessagePair messagePair = new MessagePair();
        messagePair.setMessage(data);
        messagePair.setAcknowledgment(ack);
        linkedQueue.add(messagePair);
        System.out.println("Queue size is : " + linkedQueue.size());

        if (executorService == null) {
            System.out.println("Starting executor service");
            executorService = Executors.newFixedThreadPool(2);
            for (int i = 0; i < 2; i++) {
                QueeuTask queeuTask = new QueeuTask(linkedQueue);
                System.out.println();
                executorService.execute(queeuTask);
            }
        }
}

The printList method below will be called by each executor thread after creating the batch based on custom conditions. Our actual processing will happen in the below method and after processing we are trying to acknowledge each message.

下面的printList方法将由每个执行线程在根据自定义条件创建批处理后调用。我们的实际处理将在下面的方法中进行,处理后我们尝试确认每条消息。

public void Printlist(ArrayList<MessagePair> lst, String threadName) throws InterruptedException {
    //Process the messages
    for(MessagePair pair : lst) {
        System.out.println("Sending acknowledgement for message : " + pair.getMessage());
        pair.getAcknowledgment().acknowledge();
    }
    lst.clear();
}

With the above implementation, we do see missing messages which are not processed in the local queue.

通过上述实现,我们确实看到了在本地队列中未处理的丢失消息。

Can someone please help how can we achieve this acknowledgement to happen at each message. We have tried with AckMode.RECORD which says

请问有人能帮助我们如何在每条消息上实现这种确认。我们已经尝试了AckMode.RECORD,但它表示
No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment

Appreciate the help.感谢帮助。

英文:

I am trying to use Kafka for the below use case that I have.

Our producer will produce messages to kafka topic. Our consumer is slow by nature because each message processing will vary depending on the content of the message. We are also batching the incoming messages based on some conditions and processing the list of batched messages in one go. We tried the below two approaches.

  1. Multiple Partitions : With multiple partitions, we are able to batch and process the messages with the same consumer thread which is working fine, but when we have some smaller set of messages in the batch and our condition to process the batch is not met, we are waiting for new messages to come to process the existing messages which are already batched. We tried with time out but as the consumer thread wont execute until we get a message, we were never able to process the last chunk of messages.

  2. One Partition - In memory queue : We tried with single partition which will write all the incoming messages to an in memory concurrent queue and we are spawning 5 threads(lets' say) using ExecutorService. All the threads were able to read unique messages from queue and process them. But the problem is with Acknowledgement. We are saving ack along with message in queue and trying to ack each message after processing. But what we observed is after partial processing from internal queue and if the consumer is restarted, I dont see other messages coming back to consumer. We are guessing this is because of the ack we are doing. Here the Ack mode id MANUAL/MANUAL_IMMEDIATE

Code below :

@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}", concurrency = "1")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) throws InterruptedException {
        JSONObject jsonObject = new JSONObject(record.value());
        JSONArray request_data = jsonObject.getJSONArray("request");
        JSONObject data = request_data.getJSONObject(0);
        MessagePair messagePair = new MessagePair();
        messagePair.setMessage(data);
        messagePair.setAcknowledgment(ack);
        linkedQueue.add(messagePair);
        System.out.println("Queue size is : " + linkedQueue.size());

        if (executorService == null) {
            System.out.println("Starting executor service");
            executorService = Executors.newFixedThreadPool(2);
            for (int i = 0; i < 2; i++) {
                QueeuTask queeuTask = new QueeuTask(linkedQueue);
                System.out.println();
                executorService.execute(queeuTask);
            }
        }
}

The printList method below will be called by each executor thread after creating the batch based on custom conditions. Our actual processing will happen in the below method and after processing we are trying to acknowledge each message.

public void Printlist(ArrayList<MessagePair> lst, String threadName) throws InterruptedException {
		//Process the messages
        for(MessagePair pair : lst) {
            System.out.println("Sending acknowledgement for message : " + pair.getMessage());
            pair.getAcknowledgment().acknowledge();
        }
        lst.clear();
}

With the above implementation, we do see missing messages which are not processed in the local queue.

Can someone please help how can we achieve this acknowledgement to happen at each message. We have tried with AckMode.RECORD which says

No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment

Appreciate the help.

答案1

得分: 0

We have changed the design to ensure that Kafka consumers will not be slow. We are also handling offset commits to ensure processing is completed. With the new use case, we are using a Kafka batch listener to read a fixed number of messages in one poll and distribute equal loads to the actual process. Kafka listeners are used to process these loads, which are not very processing intensive. Thanks for all the suggestions.

英文:

After analyzing the use case, we have changed the design to make sure kafka consumers will not be slow. We are also handling offset commits to make sure processing is finished. With the new use case, we are using Kafka batch listener to read a fixed messages in one poll and create equal loads to the actual process and kafka listener to process these loads which is not very processing intensive. Thanks for all the suggestions.

huangapple
  • 本文由 发表于 2023年7月6日 18:01:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/76627666.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定