Spring Boot Kafka如何动态增加/减少消费者线程数

huangapple go评论99阅读模式
英文:

Spring boot kafka how to dynamically increase/decrease consumer thread count

问题

我有一个Kafka主题,有一些高数量的分区,比如100 (固定),我有一个Spring Boot应用程序,其中有Kafka监听器(或者说消费者)从主题中消费。现在我知道可以使用concurrency属性来调整消费者线程的数量,类似这样(通过application.properties文件进行控制):

@KafkaListener(groupId = "${someGroupID}",
        topics = "${someTopic}",
        containerFactory = "someKafkaListenerContainerFactory",
        concurrency = "${concurrencyProperty}")
{
        //some logic
}

@KafkaListener上使用concurrency属性可以固定消费者线程的数量,但是如果我想在运行时根据请求负载动态调整该数量(比如说如果所有消费者线程都被占用,生成5个新的消费者线程),有没有办法做到这一点?

或者让我知道是否有任何更好的实践方法来根据负载管理消费者线程。谢谢。

注意: 我将主题的分区数量保持为固定值,因为增加该数量可能会导致消费者重新平衡延迟,据我所知,这可能非常耗时,所以我计划保持分区数量不变。

英文:

I have one kafka topic with some high number of partitions, say 100 (fixed), and I have a spring boot application which has kafka listeners(or say consumer) consuming from the topic. Now I know that using concurrency property you can adjust the number of consumer threads, something like this (controlling it via application.properties file):

    @KafkaListener(groupId = "${someGroupID}",
            topics = "${someTopic}",
            containerFactory = "someKafkaListenerContainerFactory",
            concurrency = "${concurrencyProperty}")
    {
            //some logic
    }

Using concurrency property on @KafkaListener I can fix the number of consumer threads but if I want to dynamically adjust that number during runtime based on the request load (say if all consumer threads are occupied, render 5 new consumer threads), is there anyway to do this?

Or let me know if there are any better practices to manage the consumer threads according to the load. Thanks.

NOTE: I have kept topic partition count as fixed to a high number since increasing that number could bring in consumer re-balancing delays which as far as I know can be very time consuming, so I am planning to keep the partition count as fixed.

答案1

得分: 1

你不能在容器运行时动态更改 concurrency;但你可以停止容器,更改 concurrency 然后重新启动它。

要停止/启动容器,使用 KafkaListenerEndpointRegistry bean,并给监听器分配一个 id 属性,这样你可以从注册表中获取特定容器的引用。

不应该在其中一个监听器线程上停止容器。

英文:

You cannot change the concurrency dynamically while the container is running; you can, however, stop the container, change the concurrency and restart it.

To stop/start the container, use the KafkaListenerEndpointRegistry bean, and give the listener(s) an id property so you can get a reference to the specific container from the registry.

You should not stop a container on one of its listener threads.

huangapple
  • 本文由 发表于 2023年7月10日 21:28:17
  • 转载请务必保留本文链接:https://go.coder-hub.com/76654238.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定