ThreadPoolTaskExecutor只有一个线程在池中,无法处理来自AWS队列的消息。

huangapple go评论74阅读模式
英文:

ThreadPoolTaskExecutor with just one thread on pool not processing messages from AWS queue

问题

我创建了一个按需的ChannelAdapter、AsyncTaskExecutor以及针对应用程序上注册的每个队列的Channel。我注意到,当AsyncTaskExecutor的maxPoolSize数量等于1时,消息不会被处理。以下是创建AsyncTaskExecutor bean的代码部分。

 static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
        final TaskExecutor executor = consumer.getExecutor();

        final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
        builder.addPropertyValue("corePoolSize", executor.getCorePoolSize());
        builder.addPropertyValue("maxPoolSize", executor.getMaxPoolSize());
        builder.addPropertyValue("threadNamePrefix", consumer.getName() + "-");

        final String beanName = executor.getName();
        final BeanDefinition beanDefinition = builder.getBeanDefinition();
        registry.registerBeanDefinition(beanName, beanDefinition);
    }

另一件我注意到的事情是,当调用这个方法java.util.concurrent.ThreadPoolExecutor#execute时,条件workerCountOf(c) < corePoolSize始终为假。完整项目链接在这里 1

英文:

I've created an on demand ChannelAdapter, AsyncTaskExecutor and a Channel for every queue registered on the application. I noticed that when the number of maxPoolSize of the AsyncTaskExecutor is equal to one, the messages are not being processed. This is how the AsyncTaskExecutor bean is created.

 static void registerAsyncTaskExecutor(final Consumer consumer, final GenericApplicationContext registry) {
        final TaskExecutor executor = consumer.getExecutor();

        final BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(ThreadPoolTaskExecutor.class);
        builder.addPropertyValue(&quot;corePoolSize&quot;, executor.getCorePoolSize());
        builder.addPropertyValue(&quot;maxPoolSize&quot;, executor.getMaxPoolSize());
        builder.addPropertyValue(&quot;threadNamePrefix&quot;, consumer.getName() + &quot;-&quot;);

        final String beanName = executor.getName();
        final BeanDefinition beanDefinition = builder.getBeanDefinition();
        registry.registerBeanDefinition(beanName, beanDefinition);
    }

Another thing that I noticed is when this method is called java.util.concurrent.ThreadPoolExecutor#execute this condition workerCountOf(c) &lt; corePoolSize is always false.
The full project link is over here https://github.com/LeoFuso/spring-integration-aws-demo

答案1

得分: 2

在为某个可管理的组件提供线程池时,仅使用一个线程始终是不良实践。您可能不清楚该组件将如何使用您的线程池,事实上,您的单个线程可能会被某个长时间运行的任务内部占用,而所有新任务只会在队列中等待该单个线程空闲,而可能永远不会发生空闲。

实际上,这正是Spring Cloud AWS中的AsynchronousMessageListener所具有的情况,它被提到的SqsMessageDrivenChannelAdapter使用:

public void run() {
    while (isQueueRunning()) {

因此,要么依赖于默认的执行程序,要么提供足够的线程给自己。

看起来,那里的逻辑是基于线程数的:

int spinningThreads = this.getRegisteredQueues().size();

if (spinningThreads > 0) {
    threadPoolTaskExecutor.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);

因此,我们的线程数与我们提供的SQS队列数完全相同,加上2个乘数用于工作线程。看起来,我们需要为每个队列分配一个线程来进行轮询,以及额外的线程来处理来自这些队列的消息。

(虽然不是Spring Integration的问题,更像是Spring Cloud AWS的问题)。

英文:

It is always bad practice to to provide a thread pool just with one thread to some manageable component. You may not know what that component is going to do with your thread pool and it is really could be a fact that your single thread is taken by some long-living task internally and all new tasks are just going to stall in the queue waiting for that single thread to be free, which is might not going to happen.

In fact that is really what we have with the AsynchronousMessageListener from Spring Cloud AWS which is used by the mentioned SqsMessageDrivenChannelAdapter:

public void run() {
		while (isQueueRunning()) {

So, or rely on the the default executor or provide enough threads into your own.

Looks like the logic over there is like this for the number of threads:

    int spinningThreads = this.getRegisteredQueues().size();

	if (spinningThreads &gt; 0) {
		threadPoolTaskExecutor
				.setCorePoolSize(spinningThreads * DEFAULT_WORKER_THREADS);

So, we have the exact number of thread as we provide SQS queue, plus 2 multiplier for workers. Looks like we need a thread for each queue to poll and extra thread to process messages from them.

(Not Spring Integration question though - more like Spring Cloud AWS).

huangapple
  • 本文由 发表于 2020年9月11日 10:08:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/63839869.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定