英文:
Trying to release conncection to avoid maximum client connection error
问题
我正在尝试解决以下错误消息的问题,而不需要将默认连接大小从1000增加到2000或更多。
最近,在下面的代码中,当发送大约1000条消息到代理时,出现了以下错误,每条消息之间有5分钟的延迟。
WARN | 无法接受连接:超过允许的客户端连接的最大数量。请参阅ActiveMQ配置文件(例如activemq.xml)中TCP传输配置URI上的'maximumConnections'属性 | org.apache.activemq.broker.TransportConnector | ActiveMQ传输服务器线程处理程序:tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
以下是代码,它持续监听ActiveMQ,并在看到“COMPLETE”后生成文件后向用户发送电子邮件。否则,它会进入“else”块并再次向代理发送消息。
在“else”块内,我希望在发送消息后测试关闭连接。因此,我已经在finally块中关闭了连接,如下所示。这样做的方式是否正确?
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
Connection connection;
// 使用JMS 2.0的工作代码
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
// 从消息中提取用户名以包含在电子邮件中
String[] parts = message.split("#");
String userName = parts[1].trim();
// 从消息中提取personnelID以在webservice调用中使用
String personnelID = parts[3].trim();
// 在发送消息之前,检查是否为“COMPLETE”或“ERROR”等
if(receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message,latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName+"@organization.com";
/*****************************************************\
// 开始:与电子邮件相关的代码
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage,true);
javaMailSender.send(msg);
}
else {
// 从服务器获取JMS连接并启动它
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Destination表示我们的队列 'MessageProducerJMSV1' 在 JMS 服务器上
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//将消息发送到队列
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period =300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
}
}
catch(Throwable th){
th.printStackTrace();
}
finally {
connection.close();
}
}
// JMS服务器的URL。DEFAULT_BROKER_URL表示JMS服务器在本地主机上
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //队列名称
// 默认代理URL为:tcp://localhost:61616";
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}
希望这有助于解决您的问题。
英文:
I am trying to figure out a solution for the following error message without increasing the default connection size from 1000 to 2000 or more.
Recently I came across the following error when around 1000 messages were sent to the broker with a delay of 5 minutes as shown in the code below.
WARN | Could not accept connection : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
The following is the code which is listening to ActiveMQ continously and as soon as it sees COMPLETE
, it sends out an email to the user after generating a file.Otherwise, it goes inside else
block and sends out the message to the broker again.
Inside the else block, I want to test by closing the connection after I am done sending the message. So I have closed the connection inside a finally block as shown below. Is this a correct way to go about it?
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
Connection connection;
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if(receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message,latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName+"@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage,true);
javaMailSender.send(msg);
}
else {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period =300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
}
}
catch(Throwable th){
th.printStackTrace();
}
finally {
connection.close();
}
}
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}
答案1
得分: 2
你之所以收到"超过允许的最大客户端连接数"的错误消息是因为你创建了连接但未关闭它们。换句话说,你的应用程序存在"连接泄漏"问题。为了解决泄漏问题,你需要关闭连接。在finally
块中关闭JMS连接是一种普遍接受的做法,所以你的代码在这方面看起来很好。但是,在实际创建connection
之前,你需要检查是否为null
,以防出现问题,例如:
finally {
if (connection != null) {
connection.close();
}
}
值得注意的是,创建和关闭JMS连接、会话和生产者来发送单个消息是一个众所周知的反模式。最好的做法是缓存连接(例如在static
变量中)并重复使用它。例如:
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// 一个实例,重复使用
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private static Connection connection;
private static Object connectionLock = new Object();
// JMS服务器的URL。DEFAULT_BROKER_URL表示JMS服务器在本地主机上
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //队列名称
// 默认的broker URL是:tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
// 使用JMS 2.0的工作代码
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//从消息中提取用户名,以便包含在电子邮件中
String[] parts = message.split("#");
String userName = parts[1].trim();
//从消息中提取personnelID,以便在webservice调用中使用
String personnelID = parts[3].trim();
//在发送消息之前,检查是否为COMPLETE或ERROR等
if (receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message, latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName + "@organization.com";
/*****************************************************\
// START: EMAIL相关代码
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage, true);
javaMailSender.send(msg);
} else {
try {
createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination表示JMS服务器上的队列 'MessageProducerJMSV1'
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//将消息发送到队列
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period = 300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
} catch (Throwable th) {
th.printStackTrace();
synchronized (connectionLock) {
//如果出现任何问题,关闭连接,下次会重新创建
if (connection != null) {
connection.close();
connection = null;
}
}
}
}
} catch (Throwable th) {
th.printStackTrace();
}
}
private void createConnection() {
synchronized (connectionLock) {
if (connection == null) {
//从服务器获取JMS连接
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}
}
}
}
请注意,在此代码中没有finally
块来关闭连接。这是有意的,因为这段代码的整体目的是保持连接打开,以避免在发送单个消息时不断打开和关闭连接。只有在捕获到Throwable
时才会关闭连接。
此外,请记住,如果只是发送消息,就没有必要调用JMS连接的start()
方法。start()
方法只影响消费者。
英文:
The reason you're getting "Exceeded the maximum number of allowed client connections" is because you're creating connections and not closing them. In other words, your application is "leaking" connections. In order to fix the leak you need to close the connection. Closing a JMS connection in a finally
block is the generally accepted practice so your code looks good there. However, you need to check for null
in case there is a problem before the connection
is actually created, e.g.:
finally {
if (connection != null) {
connection.close();
}
}
That said, it's worth noting that creating & closing a JMS connection & session & producer to send a single message is a well-known anti-pattern. It would be better if you cached the connection (e.g. in a static
variable) and re-used it. For example:
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private static Connection connection;
private static Object connectionLock = new Object();
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if (receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message, latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName + "@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("ABCResearch@organization.com");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage, true);
javaMailSender.send(msg);
} else {
try {
createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period = 300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
} catch (Throwable th) {
th.printStackTrace();
synchronized (connectionLock) {
// if there are any problems close the connection and it will be re-created next time
if (connection != null) {
connection.close();
connection = null;
}
}
}
}
} catch (Throwable th) {
th.printStackTrace();
}
}
private void createConnection() {
synchronized (connectionLock) {
if (connection == null) {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}
}
}
}
You'll note that in this code there is no finally
block to close the connection. That's intentional because the whole point of this code is keep the connection open so that it's not opening & closing the connection to send a single message. The connection is being re-used between invocations. The only time the connection is closed is when a Throwable
is caught.
Also, keep in mind that there's no reason to call start()
on the JMS connection if it's just sending a message. The start()
method only impacts consumers.
答案2
得分: 0
问题在于下面的代码,它会在每个消息中打开新的连接 - 你应该尽量在一次调用中打开它(如果连接过期,则再次调用)。
// 从服务器获取JMS连接并启动它
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
// 从中获取一个会话并关闭该会话。根据使用情况,你甚至可以保持会话更长时间。
英文:
The issue is the below code, which will open new connection each message - you should make this ideally invoke this once (and again if connection expires).
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
get a session from it and close the session. Based on usage you can even keep session for longer.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论