尝试释放连接以避免最大客户端连接错误。

huangapple go评论74阅读模式
英文:

Trying to release conncection to avoid maximum client connection error

问题

我正在尝试解决以下错误消息的问题,而不需要将默认连接大小从1000增加到2000或更多。

最近,在下面的代码中,当发送大约1000条消息到代理时,出现了以下错误,每条消息之间有5分钟的延迟。

WARN  | 无法接受连接:超过允许的客户端连接的最大数量。请参阅ActiveMQ配置文件(例如activemq.xml)中TCP传输配置URI上的'maximumConnections'属性 | org.apache.activemq.broker.TransportConnector | ActiveMQ传输服务器线程处理程序:tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600

以下是代码,它持续监听ActiveMQ,并在看到“COMPLETE”后生成文件后向用户发送电子邮件。否则,它会进入“else”块并再次向代理发送消息。

在“else”块内,我希望在发送消息后测试关闭连接。因此,我已经在finally块中关闭了连接,如下所示。这样做的方式是否正确?

@Component
public class DownloadConsumer {
    
    @Autowired
    private JavaMailSender javaMailSender;
    
    // one instance, reuse
    private final CloseableHttpClient httpClient = HttpClients.createDefault();
    
    Connection connection;
            
    
    // 使用JMS 2.0的工作代码
    @JmsListener(destination = "MessageProducerJMSV1")
        public void processBrokerQueues(String message) throws DaoException, JMSException {
            
        
         try {
            
            RequestDao requestDao = (RequestDao) context.getBean("requestDao");
            
            String receivedStatus = requestDao.getRequestStatus(message);
            
            
             
            // 从消息中提取用户名以包含在电子邮件中
             String[] parts = message.split("#");
             String userName = parts[1].trim();
             
            // 从消息中提取personnelID以在webservice调用中使用
             
             String personnelID = parts[3].trim();
            
            
            
            
            // 在发送消息之前,检查是否为“COMPLETE”或“ERROR”等
            if(receivedStatus.equals("COMPLETE")) {
                
                
                
                String latestUUID = requestDao.getUUID();
                
                logger.info("Received UUID in Controller is as follows! ");
                logger.info(latestUUID);
                
                requestDao.sendMessage(message,latestUUID);
                logger.info("Received status is COMPLETE! ");
                logger.info("Sending email to the user! ");
                String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
                String recipientEmail = userName+"@organization.com";
                
                
                
                
                /*****************************************************\
                // 开始:与电子邮件相关的代码
                
                 *******************************************************/
                
                MimeMessage msg = javaMailSender.createMimeMessage();
                 MimeMessageHelper helper = new MimeMessageHelper(msg, true);
                 helper.setFrom("ABCResearch@organization.com");
                 helper.setTo(recipientEmail);
                 helper.setSubject("Requested Files !");
                 helper.setText(emailMessage,true);
                 
                 javaMailSender.send(msg);
                 
                
                    
                                
            }
            else {
                
                
                // 从服务器获取JMS连接并启动它
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
                connection.start();
                
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                
                // Destination表示我们的队列 'MessageProducerJMSV1' 在 JMS 服务器上
                Destination destination = session.createQueue(subject);
                
                
                MessageProducer producer = session.createProducer(destination);
                
                //将消息发送到队列
                TextMessage toSendMessage = session.createTextMessage(message);
                
                long delay = 300 * 1000;
                long period =300 * 1000;
                
                toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                
                producer.send(toSendMessage);
                
                
                
                
                
            }
            
            }
            catch(Throwable th){
                th.printStackTrace();    
                
            }
            finally {
            
                connection.close();
                
            }
            
         }
   // JMS服务器的URL。DEFAULT_BROKER_URL表示JMS服务器在本地主机上
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "MessageProducerJMSV1"; //队列名称
    // 默认代理URL为:tcp://localhost:61616";
    
    private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
    private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}

希望这有助于解决您的问题。

英文:

I am trying to figure out a solution for the following error message without increasing the default connection size from 1000 to 2000 or more.

Recently I came across the following error when around 1000 messages were sent to the broker with a delay of 5 minutes as shown in the code below.

WARN  | Could not accept connection  : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600

The following is the code which is listening to ActiveMQ continously and as soon as it sees COMPLETE, it sends out an email to the user after generating a file.Otherwise, it goes inside else
block and sends out the message to the broker again.

Inside the else block, I want to test by closing the connection after I am done sending the message. So I have closed the connection inside a finally block as shown below. Is this a correct way to go about it?

@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
Connection connection;
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if(receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message,latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName+"@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage,true);
javaMailSender.send(msg);
}
else {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the  JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period =300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
}
}
catch(Throwable th){
th.printStackTrace();	
}
finally {
connection.close();
}
}
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}

答案1

得分: 2

你之所以收到"超过允许的最大客户端连接数"的错误消息是因为你创建了连接但未关闭它们。换句话说,你的应用程序存在"连接泄漏"问题。为了解决泄漏问题,你需要关闭连接。在finally块中关闭JMS连接是一种普遍接受的做法,所以你的代码在这方面看起来很好。但是,在实际创建connection之前,你需要检查是否为null,以防出现问题,例如:

finally {
    if (connection != null) {
        connection.close();
    }                
}

值得注意的是,创建和关闭JMS连接、会话和生产者来发送单个消息是一个众所周知的反模式。最好的做法是缓存连接(例如在static变量中)并重复使用它。例如:

@Component
public class DownloadConsumer {

   @Autowired
   private JavaMailSender javaMailSender;

   // 一个实例,重复使用
   private final CloseableHttpClient httpClient = HttpClients.createDefault();

   private static Connection connection;

   private static Object connectionLock = new Object();

   // JMS服务器的URL。DEFAULT_BROKER_URL表示JMS服务器在本地主机上
   private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
   private static String subject = "MessageProducerJMSV1"; //队列名称
   // 默认的broker URL是:tcp://localhost:61616"

   private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
   private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);

   // 使用JMS 2.0的工作代码
   @JmsListener(destination = "MessageProducerJMSV1")
   public void processBrokerQueues(String message) throws DaoException, JMSException {

      try {

         RequestDao requestDao = (RequestDao) context.getBean("requestDao");

         String receivedStatus = requestDao.getRequestStatus(message);

         //从消息中提取用户名,以便包含在电子邮件中
         String[] parts = message.split("#");
         String userName = parts[1].trim();

         //从消息中提取personnelID,以便在webservice调用中使用

         String personnelID = parts[3].trim();

         //在发送消息之前,检查是否为COMPLETE或ERROR等
         if (receivedStatus.equals("COMPLETE")) {

            String latestUUID = requestDao.getUUID();

            logger.info("Received UUID in Controller is as follows! ");
            logger.info(latestUUID);

            requestDao.sendMessage(message, latestUUID);
            logger.info("Received status is COMPLETE! ");
            logger.info("Sending email to the user! ");
            String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
            String recipientEmail = userName + "@organization.com";

            /*****************************************************\
             // START: EMAIL相关代码
             *******************************************************/

            MimeMessage msg = javaMailSender.createMimeMessage();
            MimeMessageHelper helper = new MimeMessageHelper(msg, true);
            helper.setFrom("ABCResearch@organization.com");
            helper.setTo(recipientEmail);
            helper.setSubject("Requested Files !");
            helper.setText(emailMessage, true);

            javaMailSender.send(msg);

         } else {
            try {
               createConnection();

               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

               // Destination表示JMS服务器上的队列 'MessageProducerJMSV1'
               Destination destination = session.createQueue(subject);

               MessageProducer producer = session.createProducer(destination);

               //将消息发送到队列
               TextMessage toSendMessage = session.createTextMessage(message);

               long delay = 300 * 1000;
               long period = 300 * 1000;

               toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

               producer.send(toSendMessage);
            } catch (Throwable th) {
               th.printStackTrace();
               
               synchronized (connectionLock) {
                  //如果出现任何问题,关闭连接,下次会重新创建
                  if (connection != null) {
                     connection.close();
                     connection = null;
                  }
               }
            }
         }
      } catch (Throwable th) {
         th.printStackTrace();
      }
   }
   
   private void createConnection() {
      synchronized (connectionLock) {
         if (connection == null) {
            //从服务器获取JMS连接
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            connection = connectionFactory.createConnection();
         }
      }
   }
}

请注意,在此代码中没有finally块来关闭连接。这是有意的,因为这段代码的整体目的是保持连接打开,以避免在发送单个消息时不断打开和关闭连接。只有在捕获到Throwable时才会关闭连接。

此外,请记住,如果只是发送消息,就没有必要调用JMS连接的start()方法。start()方法只影响消费者

英文:

The reason you're getting "Exceeded the maximum number of allowed client connections" is because you're creating connections and not closing them. In other words, your application is "leaking" connections. In order to fix the leak you need to close the connection. Closing a JMS connection in a finally block is the generally accepted practice so your code looks good there. However, you need to check for null in case there is a problem before the connection is actually created, e.g.:

finally {
    if (connection != null) {
        connection.close();
    }                
}

That said, it's worth noting that creating & closing a JMS connection & session & producer to send a single message is a well-known anti-pattern. It would be better if you cached the connection (e.g. in a static variable) and re-used it. For example:

@Component
public class DownloadConsumer {

   @Autowired
   private JavaMailSender javaMailSender;

   // one instance, reuse
   private final CloseableHttpClient httpClient = HttpClients.createDefault();

   private static Connection connection;

   private static Object connectionLock = new Object();

   // URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
   private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
   private static String subject = &quot;MessageProducerJMSV1&quot;; //Queue Name
   // default broker URL is : tcp://localhost:61616&quot;

   private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(&quot;ApplicationContext.xml&quot;);
   private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);

   // Working Code with JMS 2.0
   @JmsListener(destination = &quot;MessageProducerJMSV1&quot;)
   public void processBrokerQueues(String message) throws DaoException, JMSException {

      try {

         RequestDao requestDao = (RequestDao) context.getBean(&quot;requestDao&quot;);

         String receivedStatus = requestDao.getRequestStatus(message);

         //Retrieve Username from the message to include in an email
         String[] parts = message.split(&quot;#&quot;);
         String userName = parts[1].trim();

         //Retrieve personnelID from the message to include in the webservice calls

         String personnelID = parts[3].trim();

         //Before sending this message, do the check for COMPLETE or ERROR etc
         if (receivedStatus.equals(&quot;COMPLETE&quot;)) {

            String latestUUID = requestDao.getUUID();

            logger.info(&quot;Received UUID in Controller is as follows! &quot;);
            logger.info(latestUUID);

            requestDao.sendMessage(message, latestUUID);
            logger.info(&quot;Received status is COMPLETE! &quot;);
            logger.info(&quot;Sending email to the user! &quot;);
            String emailMessage = &quot;Dear &quot; + userName + &quot;,&lt;p&gt;Your files are ready. &lt;/p&gt;&lt;p&gt; Thanks,&lt;br/&gt; Jack/p&gt;&quot;;
            String recipientEmail = userName + &quot;@organization.com&quot;;

            /*****************************************************\
             // START: EMAIL Related Code
             *******************************************************/

            MimeMessage msg = javaMailSender.createMimeMessage();
            MimeMessageHelper helper = new MimeMessageHelper(msg, true);
            helper.setFrom(&quot;ABCResearch@organization.com&quot;);
            helper.setTo(recipientEmail);
            helper.setSubject(&quot;Requested Files !&quot;);
            helper.setText(emailMessage, true);

            javaMailSender.send(msg);

         } else {
            try {
               createConnection();

               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

               // Destination represents here our queue &#39;MessageProducerJMSV1&#39; on the  JMS server
               Destination destination = session.createQueue(subject);

               MessageProducer producer = session.createProducer(destination);

               //Sending message to the queue
               TextMessage toSendMessage = session.createTextMessage(message);

               long delay = 300 * 1000;
               long period = 300 * 1000;

               toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

               producer.send(toSendMessage);
            } catch (Throwable th) {
               th.printStackTrace();
               
               synchronized (connectionLock) {
                  // if there are any problems close the connection and it will be re-created next time
                  if (connection != null) {
                     connection.close();
                     connection = null;
                  }
               }
            }
         }
      } catch (Throwable th) {
         th.printStackTrace();
      }
   }
   
   private void createConnection() {
      synchronized (connectionLock) {
         if (connection == null) {
            // Getting JMS connection from the server
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            connection = connectionFactory.createConnection();
         }
      }
   }
}

You'll note that in this code there is no finally block to close the connection. That's intentional because the whole point of this code is keep the connection open so that it's not opening & closing the connection to send a single message. The connection is being re-used between invocations. The only time the connection is closed is when a Throwable is caught.

Also, keep in mind that there's no reason to call start() on the JMS connection if it's just sending a message. The start() method only impacts consumers.

答案2

得分: 0

问题在于下面的代码,它会在每个消息中打开新的连接 - 你应该尽量在一次调用中打开它(如果连接过期,则再次调用)。

// 从服务器获取JMS连接并启动它
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();

// 从中获取一个会话并关闭该会话。根据使用情况,你甚至可以保持会话更长时间。
英文:

The issue is the below code, which will open new connection each message - you should make this ideally invoke this once (and again if connection expires).

// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();

get a session from it and close the session. Based on usage you can even keep session for longer.

huangapple
  • 本文由 发表于 2020年8月7日 03:10:38
  • 转载请务必保留本文链接:https://go.coder-hub.com/63290283.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定