英文:
Limit number of messages from Kafka in Spring Batch
问题
我如何限制队列大小为100万条?每当队列被轮询并且有空闲空间时,它应该重新开始监听。
英文:
I am storing the data from KafkaListener in a ConcurrentLinkedQueue to be processed. Currently it consumes as many data as it can and completely fills up RAM. How do I limit the number of messages in the queue so that when it reaches the limit the KafkaListener pauses.
ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();
@KafkaListener(
topics = "topic",
id = "topic-kafka-listener",
groupId = "batch-processor",
containerFactory = "kafkaListenerContainerFactory"
)
public void receive(@NotNull @Payload List<Message> messages) {
queue.addAll(messages);
}
How do I limit the queue size to say 1 million?
Whenever the queue is polled and there is free space it should start listening again.
OR
How do I limit the rate at which Kafka consumes messages to say 100,000 messages per second?
答案1
得分: 0
以下是您要翻译的内容:
"Instead of using annotation I used the KafkaConsumer
object to poll for data manually. With this there is more control."
Map<String, Object> consumerConfig = Map.of(
"bootstrap.servers", "localhost:9092",
"key.deserializer", StringDeserializer.class,
"value.deserializer", StringDeserializer.class,
"group.id", "batch-processor",
"max.poll.records", 480000
);
KafkaConsumer<String, Message> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
kafkaConsumer.subscribe(List.of("topic"));
public void receive() {
ConsumerRecords<String, Message> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> queue.add(record.value()));
}
英文:
Instead of using annotation I used the KafkaConsumer
object to poll for data manually. With this there is more control.
Map<String, Object> consumerConfig = Map.of(
"bootstrap. Servers", "localhost:9092",
"key.deserializer", StringDeserializer.class,
"value.deserializer", StringDeserializer.class,
"group.id", "batch-processor",
"max.poll.records", 480000
);
KafkaConsumer<String, Message> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
kafkaConsumer.subscribe(List.of("topic"));
public void receive() {
ConsumerRecords<String, Message> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
consumerRecords.forEach(record -> queue. Add(record. Value()));
}
答案2
得分: 0
Kafka Consumer 可以通过 API 暂停和恢复。有关详细信息,请查看以下方法:
英文:
Kafka Consumer can be paused and resumed by API. Check this methods for details:
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论