英文:
Usage of Java Kafka Consumer in multiple threads
问题
我正在考虑在线程池中使用Kafka消费者。我提出了这个方法。现在看起来运行正常,但我在考虑这种方法可能带来的缺点和问题。基本上我需要将记录处理与消费解耦。此外,我需要确保只有在所有记录都被处理后才会提交。有人能提供关于如何更好地执行此操作的建议或建议吗?
final var consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
final var threadPool = Executors.newFixedThreadPool(32);
while (true) {
ConsumerRecords<String, String> records;
synchronized (consumer) {
records = consumer.poll(Duration.ofMillis(100));
}
CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
synchronized (consumer) {
consumer.commitSync();
}
});
}
英文:
I'm thinking of using Kafka Consumer in a thread pool. I came out with this approach. Now It seems working fine but I'm thinking about drawbacks and what problem this approach can bring. Basically what I need is to decouple records processing from consuming. Also, I need to have a strong guarantee that commits happens only after all records are processed. Could someone give a suggestion or advice about how to do this better?
final var consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(topics);
final var threadPool = Executors.newFixedThreadPool(32);
while(true) {
ConsumerRecords<String, String> records;
synchronized (consumer) {
records = consumer.poll(Duration.ofMillis(100));
}
CompletableFuture.runAsync(this::processTask, threadPool).thenRun(() -> {
synchronized (consumer) {
consumer.commitSync();
}
});
}
答案1
得分: 1
问题
这个解决方案对所述需求不够健壮:
> 另外,我需要确保提交只会在所有记录都处理完后才会发生
场景:
- 轮询读取100条记录,开始异步处理
- 轮询读取5条记录,开始异步处理
- 对5条记录的处理立即发生,消费者在100条记录的处理仍在进行时就完成了提交
- 消费者崩溃
当再次启动消费者时,最后的提交将对应第105条记录。因此,它将开始处理第106条记录,而我们错过了对记录1-100的成功处理。
您需要仅提交在该轮询中正在处理的偏移量,如下所示:
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
此外,需要保证顺序,即首次轮询先提交,然后是第二次轮询,依此类推。这将相当复杂。
建议
我认为您正在尝试实现消息处理的并发性。这可以通过更简单的解决方案实现。将您的 max.poll.records 增加到读取一个合理的批次,将其分成较小的批次并在异步中运行,以实现并发性。一旦所有批次都完成,就提交给Kafka消费者。
英文:
Issue
This solution is not robust for the stated requirement:
> Also, I need to have a strong guarantee that commits happens only after all records are processed
Scenario:
- Poll reads 100 records, starts processing in async
- Poll reads 5 records, starts processing in async
- Processing of 5 records happens immediately and consumer commit is done while processing of 100 records is still in progress
- Consumer crashes
When the consumer is brought up again, the last commit would be corresponding to 105th record. Hence it will start processing for 106th record and we have missed out on successful processing of records 1-100.
You would need to commit only the offsets that you are processing in that poll via:
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
Also, the ordering would need to be guaranteed such that first poll is committed first, followed by second and so on. This would be fairly complicated.
Proposition
I believe that you are trying to achieve concurrency in message processing. This can be achieved with a simpler solution. Increase your max.poll.records to read a decent batch, break it into smaller batches and run them in async to achieve concurrency. Once all batches are done, commit to the kafka consumer.
答案2
得分: 0
我遇到了以下的文章,它解耦了Kafka记录的消费和处理。您可以通过显式调用 poll()
方法,并借助 pause()
和 resume()
方法来处理记录。
英文:
I came across the following article which decouples the consumption and processing of records in kafka. You can achieve this by calling poll()
method explicitly and processing records with the help of pause()
and resume()
method.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论