Connection.start is not needed for JMS MessageProducer but needed for MessageConsumer

huangapple go评论84阅读模式
英文:

Connection.start is not needed for JMS MessageProducer but needed for MessageConsumer

问题

  1. <h1>A - 问题</h1>
  2. 我知道在 Stack Overflow 上有一个类似的问题,但不完全相同。
  3. 我试图理解 JMS **MessageProducer** **MessageConsumer** 的底层工作原理。我使用 **ActiveMQ** 的实现,编写了一个简单的 **MessageProducer** 示例,用于向队列发送消息,以及一个 **MessageConsumer** 示例,用于从队列消费消息,同时在本地运行 **ActiveMQ**。
  4. 对于发送消息到队列,需要使用 **Connection#start** 方法。具体的调试点如下所示。调用 **Connection#start** 会触发 **ActiveMQSession#start** 方法。当调用 **Connection#start** 时,会触发此方法。请参见下面的调试点 `org.apache.activemq.ActiveMQSession#start`;
  5. [![ActiveMQ 调试点][1]][1]
  6. 问题是,**MessageProducer** 上不需要显式使用 **Connection#start**,但是在 **MessageConsumer** 上需要。然而,对于这两个示例,我们都需要清除资源(**session** 和 **connection**)。我意识到,如果我在生产者上删除 **Connection#start** 方法,代码将会执行,调试点不会被触发(甚至在底层也不会触发),我可以在队列中看到消息。但是,如果我在消费者上删除 **Connection#start** 方法,代码将不会执行,这就是问题所在,为什么在 **MessageProducer** 上不需要它,代码可以成功执行,但是在 **MessageConsumer** 上却需要?另外,为什么即使我们不使用 **Connection#start** 来发送消息,即使我们需要关闭连接以刷新资源。这似乎有些不合理。
  7. 我看到 **started** 字段是一个 `AtomicBoolean` 类型。我不是并发和多线程方面的专家,所以可能有人可以解释一下为什么对于 **MessageProducer**,不需要强制执行 Connection#start;
  8. [![org.apache.activemq.ActiveMQSession - started 字段][2]][2]
  9. <h1>B - 使用 ActiveMQ JMS MessageProducer 示例代码</h1>
  10. ```java
  11. package com.bzdgn.jms.stackoverflow;
  12. import javax.jms.Connection;
  13. import javax.jms.ConnectionFactory;
  14. import javax.jms.JMSException;
  15. import javax.jms.MessageProducer;
  16. import javax.jms.Queue;
  17. import javax.jms.Session;
  18. import javax.jms.TextMessage;
  19. import org.apache.activemq.ActiveMQConnectionFactory;
  20. public class JMSSendMessageToQueue {
  21. private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
  22. public static void main(String[] args) throws JMSException {
  23. String queueName = "test_queue";
  24. String messageContent = "Hello StackOverflow!";
  25. // ActiveMQ 实现的 Connection Factory
  26. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
  27. // 从 Connection Factory 获取连接
  28. Connection connection = connectionFactory.createConnection();
  29. // 创建会话
  30. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  31. // 发送消息到队列
  32. Queue queue = session.createQueue(queueName);
  33. TextMessage msg = session.createTextMessage(messageContent);
  34. MessageProducer messageProducer = session.createProducer(queue);
  35. messageProducer.send(msg);
  36. // 清除资源
  37. session.close();
  38. connection.close();
  39. }
  40. }

<h1>C - 使用 ActiveMQ 的 JMS MessageConsumer 示例代码</h1>

  1. package com.bzdgn.jms.stackoverflow;
  2. import javax.jms.Connection;
  3. import javax.jms.ConnectionFactory;
  4. import javax.jms.JMSException;
  5. import javax.jms.Message;
  6. import javax.jms.MessageConsumer;
  7. import javax.jms.Queue;
  8. import javax.jms.Session;
  9. import javax.jms.TextMessage;
  10. import org.apache.activemq.ActiveMQConnectionFactory;
  11. public class JMSConsumeMessageFromQueue {
  12. private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
  13. public static void main(String[] args) throws JMSException {
  14. String queueName = "test_queue";
  15. // ActiveMQ 实现的 Connection Factory
  16. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
  17. // 从 Connection Factory 获取连接
  18. Connection connection = connectionFactory.createConnection();
  19. // 创建会话
  20. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  21. // 从队列消费消息
  22. Queue queue = session.createQueue(queueName);
  23. MessageConsumer messageConsumer = session.createConsumer(queue);
  24. connection.start();
  25. Message message = messageConsumer.receive(500);
  26. if (message != null) {
  27. if (message instanceof TextMessage) {
  28. TextMessage textMessage = (TextMessage) message;
  29. String messageContent = textMessage.getText();
  30. System.out.println("Message Content: " + messageContent);
  31. }
  32. } else {
  33. System.out.println("No message in the queue: " + queueName);
  34. }
  35. // 清除资源
  36. session.close();
  37. connection.close();
  38. }
  39. }

<h1>D - 配置和 Maven 依赖</h1>

JDK 版本为 1.8,我正在运行 ActiveMQ 5.15.12,并且客户端依赖项也使用了相同的版本;

  1. <dependency>
  2. <groupId>org.apache.activemq</groupId>
  3. <artifactId>activemq-client</artifactId>
  4. <version>5.15.12</version>
  5. </dependency>
  1. <details>
  2. <summary>英文:</summary>
  3. &lt;h1&gt;A - Question&lt;/h1&gt;
  4. I know there is a similar question but not the same in SO.
  5. I&#39;m trying to understand what goes on under the hood with **MessageProducer** and **MessageConsumer** in JMS. Using the implementation of **ActiveMQ**, I&#39;ve written a simple **MessageProducer** example to send a message to queue, and a **MessageConsumer** example to consume the message from the queue, while running **ActiveMQ** locally.
  6. **Connection#start** method is needed for sending a Message to Queue. The exact debug point is as follows. **Connection#start** triggers **ActiveMQSession#start** method. This method is triggered when a **Connection#start** is called. See the following debug point at `org.apache.activemq.ActiveMQSession#start`;
  7. [![ActiveMQ Debug Point][1]][1]
  8. The problem is, **Connection#start** is not explicitly needed on a **MessageProducer** but needed on a **MessageConsumer**. However, for both examples, we need to clear the resources (**session** and **connection**). What I realized is, if I remove **Connection#start** method on producer, the code will execute, debug point won&#39;t be triggered (not even under the hood) and I see the message in the queue. But if I remove Connection#start method on consumer, the code won&#39;t execute, that&#39;s the question, why it&#39;s not needed in **MessageProducer** and the code executes successfully but needed on **MessageConsumer**? Also why even we don&#39;t use **Connection#start** for **MessageProducer** even to the fact that we need to close the connection in order to flush the resources. It seems like code smells.
  9. I see that field **started** is an `AtomicBoolean`. I&#39;m not an expert on concurrency and multi-threading, so, may be there is a logic someone can explain why for a MessageProducer, a Connection#start is not mandatory;
  10. [![org.apache.activemq.ActiveMQSession - started field][2]][2]
  11. &lt;h1&gt;B - Example Code for JMS MessageProducer with ActiveMQ&lt;/h1&gt;
  12. package com.bzdgn.jms.stackoverflow;
  13. import javax.jms.Connection;
  14. import javax.jms.ConnectionFactory;
  15. import javax.jms.JMSException;
  16. import javax.jms.MessageProducer;
  17. import javax.jms.Queue;
  18. import javax.jms.Session;
  19. import javax.jms.TextMessage;
  20. import org.apache.activemq.ActiveMQConnectionFactory;
  21. public class JMSSendMessageToQueue {
  22. private static final String ACTIVE_MQ_URL = &quot;tcp://localhost:61616&quot;;
  23. public static void main(String[] args) throws JMSException {
  24. String queueName = &quot;test_queue&quot;;
  25. String messageContent = &quot;Hello StackOverflow!&quot;;
  26. // Connection Factory from ActiveMQ Implementation
  27. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
  28. // Get connection from Connection Factory
  29. Connection connection = connectionFactory.createConnection();
  30. // Create session
  31. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  32. // Send Message to Queue
  33. Queue queue = session.createQueue(queueName);
  34. TextMessage msg = session.createTextMessage(messageContent);
  35. MessageProducer messageProducer = session.createProducer(queue);
  36. messageProducer.send(msg);
  37. // Clear resources
  38. session.close();
  39. connection.close();
  40. }
  41. }
  42. &lt;h1&gt;C - Example Code for JMS MessageConsumer with ActiveMQ&lt;/h1&gt;
  43. package com.bzdgn.jms.stackoverflow;
  44. import javax.jms.Connection;
  45. import javax.jms.ConnectionFactory;
  46. import javax.jms.JMSException;
  47. import javax.jms.Message;
  48. import javax.jms.MessageConsumer;
  49. import javax.jms.Queue;
  50. import javax.jms.Session;
  51. import javax.jms.TextMessage;
  52. import org.apache.activemq.ActiveMQConnectionFactory;
  53. public class JMSConsumeMessageFromQueue {
  54. private static final String ACTIVE_MQ_URL = &quot;tcp://localhost:61616&quot;;
  55. public static void main(String[] args) throws JMSException {
  56. String queueName = &quot;test_queue&quot;;
  57. // Connection Factory from ActiveMQ Implementation
  58. ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
  59. // Get connection from Connection Factory
  60. Connection connection = connectionFactory.createConnection();
  61. // Create session
  62. Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  63. // Consume Message from the Queue
  64. Queue queue = session.createQueue(queueName);
  65. MessageConsumer messageConsumer = session.createConsumer(queue);
  66. connection.start();
  67. Message message = messageConsumer.receive(500);
  68. if ( message != null ) {
  69. if ( message instanceof TextMessage ) {
  70. TextMessage textMessage = (TextMessage) message;
  71. String messageContent = textMessage.getText();
  72. System.out.println(&quot;Message Content: &quot; + messageContent);
  73. }
  74. } else {
  75. System.out.println(&quot;No message in the queue: &quot; + queueName);
  76. }
  77. // Clear resources
  78. session.close();
  79. connection.close();
  80. }
  81. }
  82. &lt;h1&gt;D - Configuration And Maven Dependency&lt;/h1&gt;
  83. **JDK** version is `1.8`, I&#39;m running `ActiveMQ 5.15.12` and also using the same version for the client dependency;
  84. &lt;dependency&gt;
  85. &lt;groupId&gt;org.apache.activemq&lt;/groupId&gt;
  86. &lt;artifactId&gt;activemq-client&lt;/artifactId&gt;
  87. &lt;version&gt;5.15.12&lt;/version&gt;
  88. &lt;/dependency&gt;
  89. [1]: https://i.stack.imgur.com/VWKVl.png
  90. [2]: https://i.stack.imgur.com/tdtGh.png
  91. </details>
  92. # 答案1
  93. **得分**: 2
  94. 这里的行为受JMS规范的规定。简而言之,`javax.jms.Connection.start()` 方法适用于消费者而不是生产者。它通知代理开始向与连接关联的消费者传递消息。关于 `Connection` 的[JavaDoc][1]中写道:
  95. &gt; 通常情况下,直到设置完成(也就是直到所有消息消费者都已创建),才将连接保持在停止模式。在这一点上,客户端调用连接的 start 方法,消息开始到达连接的消费者。这种设置约定可以在客户端设置自身时最大程度地减少因异步消息传递而导致的任何客户端混淆。
  96. &gt;
  97. &gt; 连接可以立即启动,然后进行设置。这样做的客户端必须准备好在仍在设置过程中处理异步消息传递。
  98. `start()` 方法对生产者没有影响。你看到的是预期的行为。
  99. 值得注意的是,如果你使用的是JMS 2的简化API,则此行为稍有不同。如果您使用 `JMSContext` 来创建 `JMSConsumer`,则消息传递会自动启动。需要明确的是,ActiveMQ 5.x 不实现JMS 2,但是 [ActiveMQ Artemis][2] 实现了。
  100. [1]: https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html
  101. [2]: http://activemq.apache.org/components/artemis/
  102. <details>
  103. <summary>英文:</summary>
  104. The behavior here is dictated by the JMS specification. Simply put, `javax.jms.Connection.start()` applies to consumers not producers. It tells the broker to begin delivering messages to the consumers associated with the connection. The [JavaDoc for `Connection`][1] says this:
  105. &gt; It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers have been created). At that point, the client calls the connection&#39;s start method, and messages begin arriving at the connection&#39;s consumers. This setup convention minimizes any client confusion that may result from asynchronous message delivery while the client is still in the process of setting itself up.
  106. &gt;
  107. &gt; A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared to handle asynchronous message delivery while they are still in the process of setting up.
  108. The `start()` method has no impact on producers. You&#39;re seeing the expected behavior.
  109. It&#39;s worth noting that this behavior is a bit different if you&#39;re using the simplified API which is part of JMS 2. If you use a `JMSContext` to create the a `JMSConsumer` then message delivery starts automatically. To be clear, ActiveMQ 5.x doesn&#39;t implement JMS 2, but [ActiveMQ Artemis][2] does.
  110. [1]: https://docs.oracle.com/javaee/7/api/javax/jms/Connection.html
  111. [2]: http://activemq.apache.org/components/artemis/
  112. </details>

huangapple
  • 本文由 发表于 2020年10月2日 18:23:33
  • 转载请务必保留本文链接:https://go.coder-hub.com/64169992.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定