英文:
Spring boot kafka how to dynamically increase/decrease consumer thread count
问题
我有一个Kafka主题,有一些高数量的分区,比如100 (固定),我有一个Spring Boot应用程序,其中有Kafka监听器(或者说消费者)从主题中消费。现在我知道可以使用concurrency
属性来调整消费者线程的数量,类似这样(通过application.properties
文件进行控制):
@KafkaListener(groupId = "${someGroupID}",
topics = "${someTopic}",
containerFactory = "someKafkaListenerContainerFactory",
concurrency = "${concurrencyProperty}")
{
//some logic
}
在@KafkaListener
上使用concurrency
属性可以固定消费者线程的数量,但是如果我想在运行时根据请求负载动态调整该数量(比如说如果所有消费者线程都被占用,生成5个新的消费者线程),有没有办法做到这一点?
或者让我知道是否有任何更好的实践方法来根据负载管理消费者线程。谢谢。
注意: 我将主题的分区数量保持为固定值,因为增加该数量可能会导致消费者重新平衡延迟,据我所知,这可能非常耗时,所以我计划保持分区数量不变。
英文:
I have one kafka topic with some high number of partitions, say 100 (fixed), and I have a spring boot application which has kafka listeners(or say consumer) consuming from the topic. Now I know that using concurrency
property you can adjust the number of consumer threads, something like this (controlling it via application.properties
file):
@KafkaListener(groupId = "${someGroupID}",
topics = "${someTopic}",
containerFactory = "someKafkaListenerContainerFactory",
concurrency = "${concurrencyProperty}")
{
//some logic
}
Using concurrency
property on @KafkaListener
I can fix the number of consumer threads but if I want to dynamically adjust that number during runtime based on the request load (say if all consumer threads are occupied, render 5 new consumer threads), is there anyway to do this?
Or let me know if there are any better practices to manage the consumer threads according to the load. Thanks.
NOTE: I have kept topic partition count as fixed to a high number since increasing that number could bring in consumer re-balancing delays which as far as I know can be very time consuming, so I am planning to keep the partition count as fixed.
答案1
得分: 1
你不能在容器运行时动态更改 concurrency
;但你可以停止容器,更改 concurrency
然后重新启动它。
要停止/启动容器,使用 KafkaListenerEndpointRegistry
bean,并给监听器分配一个 id
属性,这样你可以从注册表中获取特定容器的引用。
不应该在其中一个监听器线程上停止容器。
英文:
You cannot change the concurrency
dynamically while the container is running; you can, however, stop the container, change the concurrency
and restart it.
To stop/start the container, use the KafkaListenerEndpointRegistry
bean, and give the listener(s) an id
property so you can get a reference to the specific container from the registry.
You should not stop a container on one of its listener threads.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论