AWS SQS JMS SDK MessageListener onMessage() 没有为每个调用分配不同的线程。

huangapple go评论65阅读模式
英文:

AWS SQS JMS SDK MessageListener onMessage() Not Assigning Different Thread For Each Invocation

问题

I'll provide a translation of the code and question you've posted:

我有一个需求,需要编写一个SQS消费者,异步从AWS SQS中消费消息。我的假设是JMS是多线程的,对于MessageListener的每次调用onMessage(),它会分配一个新的线程。

`SQSConnectionManager.java`:
```java
public class SQSConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSConnectionManager.class);

    private SQSConnectionFactory sqsConnectionFactory;
    private SQSConnection sqsConnection;
    private Session sqsSession;

    public SQSConnectionManager() {
    }

    public void createSQSConnection(final String queueName) throws JMSException {

        LOGGER.info("Initializing sqs connection");
        sqsConnectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .build()
        );

        sqsConnection = sqsConnectionFactory.createConnection();

        sqsSession = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = sqsSession.createQueue(queueName);

        MessageConsumer sqsConsumer = sqsSession.createConsumer(queue);

        sqsConsumer.setMessageListener(new MyCustomListener());

        sqsConnection.start();
        LOGGER.info("SQS Connection started");
    }
}

MyCustomListener.java

public class MyCustomListener implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomListener.class);

    public MyCustomListener() {}

    @Override
    public void onMessage(Message message) {
        try {
            LOGGER.info("onMessage() Thread name : {}", Thread.currentThread().getName());
            LOGGER.info("onMessage() Thread id : {}", Thread.currentThread().getId());
            LOGGER.info("Reading incoming sqs message");
            final SQSTextMessage sqsTextMessage = (SQSTextMessage) message;
            final String receivedMessage = sqsTextMessage.getText();
            LOGGER.info("Received sqs message : {}", receivedMessage);
            helper(receivedMessage);
        } catch (JMSException e) {
            LOGGER.error("Failed to read incoming sqs message : {}", e.getCause());
        }
    }

    private void helper(final String sqsMessage) {
        LOGGER.info("helper() Thread name : {}", Thread.currentThread().getName());
        LOGGER.info("helper() Thread id : {}", Thread.currentThread().getId());
        LOGGER.info("sqs message : {}", sqsMessage);
    }
}

Application.java

public class Application {

    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    
    public static void main(String[] args) throws Exception {
        SQSConnectionManager sqsConnectionManager = new SQSConnectionManager();
        sqsConnectionManager.createSQSConnection("test-queue");
    }
}

AWS Maven Dependency:

<dependency>
	<groupId>com.amazonaws</groupId>
	<artifactId>amazon-sqs-java-messaging-lib</artifactId>
	<version>1.1.0</version>
</dependency>

这个Java应用程序部署在AWS上作为弹性Beanstalk应用程序。

当我在Cloud Watch中检查日志时,我看到onMessage()helper()都具有相同的线程ID:

AWS SQS JMS SDK MessageListener onMessage() 没有为每个调用分配不同的线程。

可以有人帮助我了解JMS Listener如何处理线程概念吗?它是否确保执行是多线程的?


Is there anything else you would like me to do with this translated content?

<details>
<summary>英文:</summary>

I have a requirement where I have to write a sqs consumer which consumes messages asynchronously from AWS SQS. My assumption is that JMS is multi threaded and for each invocation of MessageListener&#39;s `onMessage()`, it will assign a new thread to it. 

`SQSConnectionManager.java`:
```java
public class SQSConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(SQSConnectionManager.class);

    private SQSConnectionFactory sqsConnectionFactory;
    private SQSConnection sqsConnection;
    private Session sqsSession;

    public SQSConnectionManager() {
    }

    public void createSQSConnection(final String queueName) throws JMSException {

        LOGGER.info(&quot;Initializing sqs connection&quot;);
        sqsConnectionFactory = new SQSConnectionFactory(
            new ProviderConfiguration(),
            AmazonSQSClientBuilder.standard()
                                  .build()
        );

        sqsConnection = sqsConnectionFactory.createConnection();

        sqsSession = sqsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Queue queue = sqsSession.createQueue(queueName);

        MessageConsumer sqsConsumer = sqsSession.createConsumer(queue);

        sqsConsumer.setMessageListener(new MyCustomListener());

        sqsConnection.start();
        LOGGER.info(&quot;SQS Connection started&quot;);
    }
}

MyCustomListener.java:

public class MyCustomListener implements MessageListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MyCustomListener.class);

    public MyCustomListener() {}

    @Override
    public void onMessage(Message message) {
        try {
            LOGGER.info(&quot;onMessage() Thread name : {}&quot;, Thread.currentThread().getName());
            LOGGER.info(&quot;onMessage() Thread id : {}&quot;, Thread.currentThread().getId());
            LOGGER.info(&quot;Reading incoming sqs message&quot;);
            final SQSTextMessage sqsTextMessage = (SQSTextMessage) message;
            final String receivedMessage = sqsTextMessage.getText();
            LOGGER.info(&quot;Received sqs message : {}&quot;, receivedMessage);
            helper(receivedMessage);
        } catch (JMSException e) {
            LOGGER.error(&quot;Failed to read incoming sqs message : {}&quot;, e.getCause());
        }
    }

    private void helper(final String sqsMessage) {
        LOGGER.info(&quot;helper() Thread name : {}&quot;, Thread.currentThread().getName());
        LOGGER.info(&quot;helper() Thread id : {}&quot;, Thread.currentThread().getId());
        LOGGER.info(&quot;sqs message : {}&quot;, sqsMessage);
    }
}

Application.java

public class Application {

    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);
    
    public static void main(String[] args) throws Exception {
        SQSConnectionManager sqsConnectionManager = new SQSConnectionManager();
        sqsConnectionManager.createSQSConnection(&quot;test-queue&quot;);
    }
}

AWS Maven Dependency:

&lt;dependency&gt;
	&lt;groupId&gt;com.amazonaws&lt;/groupId&gt;
	&lt;artifactId&gt;amazon-sqs-java-messaging-lib&lt;/artifactId&gt;
	&lt;version&gt;1.1.0&lt;/version&gt;
&lt;/dependency&gt;

This Java app is deployed to AWS as elastic bean stalk application.

When I check the logs in Cloud Watch I see same thread id for both onMessage() and helper():

AWS SQS JMS SDK MessageListener onMessage() 没有为每个调用分配不同的线程。

Can anyone please help me understand how does JMS Listener handle threading concept? Does it ensure execution is multi-threaded?

答案1

得分: 1

你对JMS中多线程支持的假设是不正确的。特别是,SessionMessageConsumer 对象是线程安全的。通过 MessageListener 进行的异步消息消费是串行的(即不是并行/并发的),所以你看到的是预期的行为。

这在Jakarta Messaging 3.1规范的第2.14节中有更详细的说明。关于对Session施加并发限制的解释如下:

> 限制并发访问会话有两个原因。首先,会话是支持事务的Jakarta Messaging实体。要实现多线程事务非常困难。其次,会话支持异步消息消费。重要的是,Jakarta Messaging要求用于异步消息消费的客户端代码能够处理多个并发消息。此外,如果会话已经设置为具有多个异步消费者,重要的是客户端不会被迫处理这些单独的消费者同时执行的情况。这些限制使Jakarta Messaging更容易用于典型的客户端。更复杂的客户端可以通过使用多个会话获得他们所需的并发性。在经典API和领域特定的API中,这意味着使用多个会话对象。在简化的API中,这意味着使用多个JMSContext对象。

因此,如果你希望在你的情况下进行并发消费,那么你需要创建额外的会话和消费者。

尽管如此,如果你愿意,你当然可以自行管理并发性(例如,使用java.util.concurrent中的类)。这对于涉及AUTO_ACKNOWLEDGE的简单用例来说是可以的。然而,如果你将来超出了涉及CLIENT_ACKNOWLEDGE或事务的简单用例,你的代码会变得更加复杂,因为你将不得不管理对Session对象的线程安全访问,以处理确认、提交、回滚等操作。创建多个会话和消费者通常更容易实现,更容易维护。

英文:

You're assumption about multi-threaded support in JMS is incorrect. In particular, the Session and MessageConsumer objects are not thread-safe. Asynchronous message consumption by a MessageListener is serial (i.e. not parallel/concurrent) so you're seeing the expected behavior.

This is described in more detail in section 2.14 of the Jakarta Messaging 3.1 specification. This explanation is provided for imposing concurrency limits on the Session:

> There are two reasons for restricting concurrent access to sessions. First, sessions are the Jakarta Messaging entity that supports transactions. It is very difficult to implement transactions that are multi-threaded. Second, sessions support asynchronous message consumption. It is important that Jakarta Messaging not require that client code used for asynchronous message consumption be capable of handling multiple, concurrent messages. In addition, if a session has been set up with multiple, asynchronous consumers, it is important that the client is not forced to handle the case where these separate consumers are concurrently executing. These restrictions make Jakarta Messaging easier to use for typical clients. More sophisticated clients can get the concurrency they desire by using multiple sessions. In the classic API and the domain-specific APIs this means using multiple session objects. In the simplified API this means using multiple JMSContext objects.

Therefore, if you want concurrent consumption in your case then you need to create additional sessions and consumers.

That said, you can certainly manage the concurrency yourself if you like (e.g. using the classes in java.util.concurrent). That is fine for simple use-cases involving AUTO_ACKNOWLEDGE. However, if you ever move beyond such a simple use-case to something more complex involving CLIENT_ACKNOWLEDGE or transactions then your code will get more complicated because you'll have to manage thread-safe access to the Session object to deal with acknowledgements, commits, roll-backs, etc. Creating multiple sessions and consumers is often simpler to implement and more straight-forward to maintain.

huangapple
  • 本文由 发表于 2023年4月13日 21:41:11
  • 转载请务必保留本文链接:https://go.coder-hub.com/76006153.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定