英文:
ThreadPoolTaskExecutor with just one thread on pool not processing messages from AWS queue
问题
我创建了一个按需的ChannelAdapter、AsyncTaskExecutor以及针对应用程序上注册的每个队列的Channel。我注意到,当AsyncTaskExecutor的maxPoolSize
数量等于1时,消息不会被处理。以下是创建AsyncTaskExecutor bean的代码部分。
static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
final TaskExecutor executor = consumer.getExecutor();
final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");
final String beanName = executor.getName();
final BeanDefinition beanDefinition = builder.getBeanDefinition();
registry.registerBeanDefinition(beanName, beanDefinition);
}
另一件我注意到的事情是,当调用这个方法java.util.concurrent.ThreadPoolExecutor#execute
时,条件workerCountOf(c) < corePoolSize
始终为假。完整项目链接在这里 1。
英文:
I've created an on demand ChannelAdapter, AsyncTaskExecutor and a Channel for every queue registered on the application. I noticed that when the number of maxPoolSize
of the AsyncTaskExecutor is equal to one, the messages are not being processed. This is how the AsyncTaskExecutor bean is created.
static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
final TaskExecutor executor = consumer.getExecutor();
final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");
final String beanName = executor.getName();
final BeanDefinition beanDefinition = builder.getBeanDefinition();
registry.registerBeanDefinition(beanName, beanDefinition);
}
Another thing that I noticed is when this method is called java.util.concurrent.ThreadPoolExecutor#execute
this condition workerCountOf(c) < corePoolSize
is always false.
The full project link is over here https://github.com/LeoFuso/spring-integration-aws-demo
答案1
得分: 2
在为某个可管理的组件提供线程池时,仅使用一个线程始终是不良实践。您可能不清楚该组件将如何使用您的线程池,事实上,您的单个线程可能会被某个长时间运行的任务内部占用,而所有新任务只会在队列中等待该单个线程空闲,而可能永远不会发生空闲。
实际上,这正是Spring Cloud AWS中的AsynchronousMessageListener
所具有的情况,它被提到的SqsMessageDrivenChannelAdapter
使用:
public void run() {
while (isQueueRunning()) {
因此,要么依赖于默认的执行程序,要么提供足够的线程给自己。
看起来,那里的逻辑是基于线程数的:
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
因此,我们的线程数与我们提供的SQS队列数完全相同,加上2
个乘数用于工作线程。看起来,我们需要为每个队列分配一个线程来进行轮询,以及额外的线程来处理来自这些队列的消息。
(虽然不是Spring Integration的问题,更像是Spring Cloud AWS的问题)。
英文:
It is always bad practice to to provide a thread pool just with one thread to some manageable component. You may not know what that component is going to do with your thread pool and it is really could be a fact that your single thread is taken by some long-living task internally and all new tasks are just going to stall in the queue waiting for that single thread to be free, which is might not going to happen.
In fact that is really what we have with the AsynchronousMessageListener
from Spring Cloud AWS which is used by the mentioned SqsMessageDrivenChannelAdapter
:
public void run() {
while (isQueueRunning()) {
So, or rely on the the default executor or provide enough threads into your own.
Looks like the logic over there is like this for the number of threads:
int spinningThreads = this.getRegisteredQueues().size();
if (spinningThreads > 0) {
threadPoolTaskExecutor
.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);
So, we have the exact number of thread as we provide SQS queue, plus 2
multiplier for workers. Looks like we need a thread for each queue to poll and extra thread to process messages from them.
(Not Spring Integration question though - more like Spring Cloud AWS).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论