在Spring Batch中限制从Kafka获取的消息数量。

huangapple go评论128阅读模式
英文:

Limit number of messages from Kafka in Spring Batch

问题

我如何限制队列大小为100万条?每当队列被轮询并且有空闲空间时,它应该重新开始监听。

英文:

I am storing the data from KafkaListener in a ConcurrentLinkedQueue to be processed. Currently it consumes as many data as it can and completely fills up RAM. How do I limit the number of messages in the queue so that when it reaches the limit the KafkaListener pauses.

  1. ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();
  2. @KafkaListener(
  3. topics = "topic",
  4. id = "topic-kafka-listener",
  5. groupId = "batch-processor",
  6. containerFactory = "kafkaListenerContainerFactory"
  7. )
  8. public void receive(@NotNull @Payload List<Message> messages) {
  9. queue.addAll(messages);
  10. }

How do I limit the queue size to say 1 million?
Whenever the queue is polled and there is free space it should start listening again.

OR

How do I limit the rate at which Kafka consumes messages to say 100,000 messages per second?

答案1

得分: 0

以下是您要翻译的内容:

"Instead of using annotation I used the KafkaConsumer object to poll for data manually. With this there is more control."

  1. Map<String, Object> consumerConfig = Map.of(
  2. "bootstrap.servers", "localhost:9092",
  3. "key.deserializer", StringDeserializer.class,
  4. "value.deserializer", StringDeserializer.class,
  5. "group.id", "batch-processor",
  6. "max.poll.records", 480000
  7. );
  8. KafkaConsumer<String, Message> kafkaConsumer = new KafkaConsumer<>(consumerConfig);
  9. kafkaConsumer.subscribe(List.of("topic"));
  1. public void receive() {
  2. ConsumerRecords<String, Message> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
  3. consumerRecords.forEach(record -> queue.add(record.value()));
  4. }
英文:

Instead of using annotation I used the KafkaConsumer object to poll for data manually. With this there is more control.

  1. Map&lt;String, Object&gt; consumerConfig = Map.of(
  2. &quot;bootstrap. Servers&quot;, &quot;localhost:9092&quot;,
  3. &quot;key.deserializer&quot;, StringDeserializer.class,
  4. &quot;value.deserializer&quot;, StringDeserializer.class,
  5. &quot;group.id&quot;, &quot;batch-processor&quot;,
  6. &quot;max.poll.records&quot;, 480000
  7. );
  8. KafkaConsumer&lt;String, Message&gt; kafkaConsumer = new KafkaConsumer&lt;&gt;(consumerConfig);
  9. kafkaConsumer.subscribe(List.of(&quot;topic&quot;));
  1. public void receive() {
  2. ConsumerRecords&lt;String, Message&gt; consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
  3. consumerRecords.forEach(record -&gt; queue. Add(record. Value()));
  4. }

答案2

得分: 0

Kafka Consumer 可以通过 API 暂停和恢复。有关详细信息,请查看以下方法:

英文:

Kafka Consumer can be paused and resumed by API. Check this methods for details:

huangapple
  • 本文由 发表于 2023年6月27日 19:14:20
  • 转载请务必保留本文链接:https://go.coder-hub.com/76564288.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定