ActiveMQ使用管理器内存限制

huangapple go评论56阅读模式
英文:

ActiveMQ Usage Manager Memory Limit

问题

我在我的机器人中使用ActiveMQ代理来向我的订阅者发送消息,但我收到了以下错误消息:

2023-06-09 12:57:57,650 | INFO  | Usage Manager Memory Limit (1336252826) reached on queue://VIBER, size 0. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info. | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616
2023-06-09 12:58:27,735 | INFO  | Usage(default:memory:queue://VIBER:memory) percentUsage=48%, usage=646397829, limit=1336252826, percentUsageMinDelta=1%;Parent:Usage(default:memory) percentUsage=100%, usage=1336254815, limit=1336252826, percentUsageMinDelta=1%: Usage Manager Memory Limit reached. Producer (ID:49-bot-a.ukrposhta.loc-36283-1685673275857-17607:1:1:1) stopped to prevent flooding queue://VIBER. See http://activemq.apache.org/producer-flow-control.html for more info (blocking for: 8s) | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616

一段时间后,我的ActiveMQ内存耗尽,我收到了Broken pipe错误:

Transport Connection to: tcp://10.255.102.101:35652 failed: java.net.SocketException: Broken pipe (Write failed) | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ InactivityMonitor Worker

这是描述我机器人发送新闻逻辑的代码:

@Override
public void sendNews(News news) {
    telegramLog.info(this.toString() + " start send news.");
    produceNews(telegramUserDao, telegramLog, QUEUE, this, news);

    //send news to users
    consumeNews(telegramLog, QUEUE, sender);
}

这是我如何启动生产者的方式:

protected void produceNews(UserDao userDao, BotLogger log, String queueName, Bot bot, News news) {
    List<BotUser> users = userDao.getAllUsers();
    Set<String> usersId = null;
    if (news.getFilter() != null) {
        if (news.getFilter() == NewsFilter.SHKI) {
            String botType = bot.toString();
            List<ShkiMonitoring> allByBotType = monitorDao.getAllByBotType(botType);
            if (allByBotType == null || allByBotType.isEmpty()) return;
            Comparison comparison = news.getComparison();
            if (Comparison.PATTERN == comparison) {
                Pattern pattern = Pattern.compile(news.getFilterValue());
                usersId = allByBotType.stream().filter(shkiMonitoring -> {
                    Matcher matcher = pattern.matcher(shkiMonitoring.getBarcode());
                    return matcher.find();
                }).map(ShkiMonitoring::getUserId).collect(Collectors.toSet());
            }
        }
    } else {
        usersId = users.stream().map(BotUser::getId).collect(Collectors.toSet());
    }
    if (usersId == null || usersId.isEmpty()) return;

    log.info("Amount of Users: " + usersId.size());
    Set<String> newsLetters = usersId.stream()
            .map(id -> new NewsLetter(id, news))
            .map(newsLetter -> new Gson().toJson(newsLetter))
            .collect(Collectors.toSet());
    log.info("Amount of NewsLetters: " + newsLetters.size());
    //write set of users id to queue
    Thread producer = new Thread(beanFactory.getBean(NewsProducer.class, log, newsLetters, queueName, bot, news));
    producer.setName("News " + bot + " Producer Thread");
    producer.start();
    log.info("wait producer writing the messages...");
    try {
        producer.join();
    } catch (InterruptedException e) {
        log.error(e.getMessage());
    }
}

这是我根据机器人类型创建消费者的方式:

public void consumeNews(BotLogger log, String queueName, Object sender) {
    if (this.newsConsumer == null) {
        this.newsConsumer = beanFactory.getBean(
                NewsConsumer.class, log, queueName, sender);
    }
    LocalTime startSendTime = newsConsumer.getStartSendTime();
    LocalTime endSendTime = newsConsumer.getEndSendTime();
    if ((startSendTime == null || startSendTime.isBefore(LocalTime.now())) &&
            (endSendTime == null || endSendTime.isAfter(LocalTime.now()))) {
        log.info(sender.getClass().getSimpleName() + " starting consum \"" + queueName + "\" queue...");
        int threadsCount = (sender instanceof FacebookSender) ? 4 : (sender instanceof TelegramSender) ? 8 : 10;
        ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
        List<Callable<Void>> tasks = new ArrayList<>();
        int i;
        for (i = 0; i < threadsCount; i++) {
            tasks.add(Executors.callable(this.newsConsumer, null));
        }
        log.info(i + " consumers created for " + queueName + " queue.");
        try {
            List<Future<Void>> futures = executorService.invokeAll(tasks);
        } catch (InterruptedException e) {
            executorService.shutdown();
        } finally {
            if (!executorService.isShutdown()) {
                executorService.shutdown();
            }
        }
    }
    log.info("end consuming \"" + queueName + "\" queue.");
}

我有NewsProducer bean的以下方法:

@Override
public void run() {
    log.info("producer start, queue name : " + queueName);
    log.info("broker url : " + brokerUrl);
    Connection connection = null;
    Session session = null;
    MessageProducer producer = null;

    try {
        log.info("producer# initializing activeMQ factory...");
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
        log.info("producer# initializing connection...");
        connection = factory.createConnection();
        connection.start();
        log.info("connection started " + connection.toString());
        session = connection.createSession(true, Session.SESSION_TRANSACTED);
        log.info("session started " + session.toString());
        Destination destination = session.createQueue(queueName);
        producer = session.createProducer(destination);
        log.info("producer started " + producer.toString());
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        log.info("news letters size : " + newsLetters.size());
        for (String newsLetter : newsLetters) {
            TextMessage textMessage = session.createTextMessage(newsLetter);
            producer.send(textMessage);
        }
        session.commit();
        log.info(newsLetters.size() + " news letters committed.");
        //update news
        log.info("updating news status...");
        news.setBotStatus(bot.toString(), false);
        log.info(bot.toString() + " set bot status "

<details>
<summary>英文:</summary>

I am using ActiveMQ broker in my bot for sending message to my subscribers, and I&#39;m receiving this error:

2023-06-09 12:57:57,650 | INFO | Usage Manager Memory Limit (1336252826) reached on queue://VIBER, size 0. Producers will be throttled to the rate at which messages are removed from this destination to prevent flooding it. See http://activemq.apache.org/producer-flow-control.html for more info. | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616
2023-06-09 12:58:27,735 | INFO | Usage(default:memory:queue://VIBER:memory) percentUsage=48%, usage=646397829, limit=1336252826, percentUsageMinDelta=1%;Parent:Usage(default:memory) percentUsage=100%, usage=1336254815, limit=1336252826, percentUsageMinDelta=1%: Usage Manager Memory Limit reached. Producer (ID:49-bot-a.ukrposhta.loc-36283-1685673275857-17607:1:1:1) stopped to prevent flooding queue://VIBER. See http://activemq.apache.org/producer-flow-control.html for more info (blocking for: 8s) | org.apache.activemq.broker.region.Queue | ActiveMQ Transport: tcp:///10.255.102.101:45200@61616

After in some time memory of my ActiveMQ is ending and I am getting `Broken pipe`:

Transport Connection to: tcp://10.255.102.101:35652 failed: java.net.SocketException: Broken pipe (Write failed) | org.apache.activemq.broker.TransportConnection.Transport | ActiveMQ InactivityMonitor Worker

This is the code describing the logic for sending news in my bot:
```java
@Override
public void sendNews(News news) {
telegramLog.info(this.toString() + &quot; start send news.&quot;);
produceNews(telegramUserDao, telegramLog, QUEUE, this, news);
//send news to users
consumeNews(telegramLog, QUEUE, sender);
}

This is how I am starting the producers:

    protected void produceNews(UserDao userDao, BotLogger log, String queueName, Bot bot, News news) {
        List&lt;BotUser&gt; users = userDao.getAllUsers();
        Set&lt;String&gt; usersId = null;
        if (news.getFilter() != null) {
            if (news.getFilter() == NewsFilter.SHKI) {
                String botType = bot.toString();
                List&lt;ShkiMonitoring&gt; allByBotType = monitorDao.getAllByBotType(botType);
                if (allByBotType == null || allByBotType.isEmpty()) return;
                Comparison comparison = news.getComparison();
                if (Comparison.PATTERN == comparison) {
                    Pattern pattern = Pattern.compile(news.getFilterValue());
                    usersId = allByBotType.stream().filter(shkiMonitoring -&gt; {
                        Matcher matcher = pattern.matcher(shkiMonitoring.getBarcode());
                        return matcher.find();
                    }).map(ShkiMonitoring::getUserId).collect(Collectors.toSet());
                }
            }
        } else {
            usersId = users.stream().map(BotUser::getId).collect(Collectors.toSet());
        }
        if (usersId == null || usersId.isEmpty()) return;

        log.info(&quot;Amount of Users: &quot; + usersId.size());
        Set&lt;String&gt; newsLetters = usersId.stream()
                .map(id -&gt; new NewsLetter(id, news))
                .map(newsLetter -&gt; new Gson().toJson(newsLetter))
                .collect(Collectors.toSet());
        log.info(&quot;Amount of NewsLetters: &quot; + newsLetters.size());
        //write set of users id to queue
        Thread producer = new Thread(beanFactory.getBean(NewsProducer.class, log, newsLetters, queueName, bot, news));
        producer.setName(&quot;News &quot; + bot + &quot; Producer Thread&quot;);
        producer.start();
        log.info(&quot;wait producer writing the messages...&quot;);
        try {
            producer.join();
        } catch (InterruptedException e) {
            log.error(e.getMessage());
        }
    }

And this is how I create the consumers according from bot type:

    public void consumeNews(BotLogger log, String queueName, Object sender) {
        if (this.newsConsumer == null) {
            this.newsConsumer = beanFactory.getBean(
                    NewsConsumer.class, log, queueName, sender);
        }
        LocalTime startSendTime = newsConsumer.getStartSendTime();
        LocalTime endSendTime = newsConsumer.getEndSendTime();
        if ((startSendTime == null || startSendTime.isBefore(LocalTime.now())) &amp;&amp;
                (endSendTime == null || endSendTime.isAfter(LocalTime.now()))) {
            log.info(sender.getClass().getSimpleName() + &quot; starting consum \&quot;&quot; + queueName + &quot;\&quot; queue...&quot;);
            int threadsCount = (sender instanceof FacebookSender) ? 4 : (sender instanceof TelegramSender) ? 8 : 10;
            ExecutorService executorService = Executors.newFixedThreadPool(threadsCount);
            List&lt;Callable&lt;Void&gt;&gt; tasks = new ArrayList&lt;&gt;();
            int i;
            for (i = 0; i &lt; threadsCount; i++) {
                tasks.add(Executors.callable(this.newsConsumer, null));
            }
            log.info(i + &quot; consumers created for &quot; + queueName + &quot; queue.&quot;);
            try {
                List&lt;Future&lt;Void&gt;&gt; futures = executorService.invokeAll(tasks);
            } catch (InterruptedException e) {
                executorService.shutdown();
            } finally {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
            }
        }
        log.info(&quot;end consuming \&quot;&quot; + queueName + &quot;\&quot; queue.&quot;);
    }

I have the next method of my NewsProducer bean:

    @Override
    public void run() {
        log.info(&quot;producer start, queue name : &quot; + queueName);
        log.info(&quot;broker url : &quot; + brokerUrl);
        Connection connection = null;
        Session session = null;
        MessageProducer producer = null;

        try {
            log.info(&quot;producer# initializing activeMQ factory...&quot;);
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
            log.info(&quot;producer# initializing connection...&quot;);
            connection = factory.createConnection();
            connection.start();
            log.info(&quot;connection started &quot; + connection.toString());
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            log.info(&quot;session started &quot; + session.toString());
            Destination destination = session.createQueue(queueName);
            producer = session.createProducer(destination);
            log.info(&quot;producer started &quot; + producer.toString());
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            log.info(&quot;news letters size : &quot; + newsLetters.size());
            for (String newsLetter : newsLetters) {
                TextMessage textMessage = session.createTextMessage(newsLetter);
                producer.send(textMessage);
            }
            session.commit();
            log.info(newsLetters.size() + &quot; news letters committed.&quot;);
            //update news
            log.info(&quot;updating news status...&quot;);
            news.setBotStatus(bot.toString(), false);
            log.info(bot.toString() + &quot; set bot status &quot; + news.isBotStatus(bot.toString()));
            newsDao.updateStatus(bot.toString(), news);
            log.info(&quot;News status updated for &quot; + bot);
        } catch (Exception e) {
            log.warn(e.getMessage());
            try {
                if (session != null) {
                    session.rollback();
                }
            } catch (JMSException e1) {
                log.warn(e1.getMessage());
            }
        } finally {
            try {
                if (producer != null) {
                    producer.close();
                }
                if (session != null) {
                    session.close();
                }
                if (connection != null) {
                    connection.close();
                }
            } catch (JMSException e) {
                log.warn(e.getMessage());
            }
        }
    }

All news for sending are saved in the database. Every half of hour to start the thread, which checks the queue and news in my database. If news is there it start sending process:

@Override
    public void run() {
        botLoggerT.info(&quot;RUN ___ BEFORE QUEUE&quot;);
        bot.checkQueue();
        botLoggerT.info(&quot;RUN ___ AFTER QUEUE&quot;);
        List&lt;News&gt; news = newsDao.getNewNews(bot.toString());
        botLoggerT.info(&quot;RUN NEWS &quot; + news.size());
        if (news != null &amp;&amp; news.size() &gt; 0) {
            news.forEach(n -&gt; bot.sendNews(n));
        }
    }

But I can not undesrstand why i have this error. What am I doing wrong? In viber I have 500,000 subscribers and 250,000 subscribers in Telegram.

This is the systemUsage settings of my ActiveMQ:

          &lt;systemUsage&gt;
            &lt;systemUsage&gt;
                &lt;memoryUsage&gt;
                    &lt;memoryUsage percentOfJvmHeap=&quot;70&quot; /&gt;
                &lt;/memoryUsage&gt;
                &lt;storeUsage&gt;
                    &lt;storeUsage limit=&quot;100 gb&quot;/&gt;
                &lt;/storeUsage&gt;
                &lt;tempUsage&gt;
                    &lt;tempUsage limit=&quot;50 gb&quot;/&gt;
                &lt;/tempUsage&gt;
            &lt;/systemUsage&gt;
        &lt;/systemUsage&gt;

and destinationPolicy :

  &lt;destinationPolicy&gt;
    &lt;policyMap&gt;
      &lt;policyEntries&gt;
        &lt;policyEntry topic=&quot;&gt;&quot; &gt;
            &lt;!-- The constantPendingMessageLimitStrategy is used to prevent
                 slow topic consumers to block producers and affect other consumers
                 by limiting the number of messages that are retained
                 For more information, see:

                 http://activemq.apache.org/slow-consumer-handling.html

            --&gt;
          &lt;pendingMessageLimitStrategy&gt;
            &lt;constantPendingMessageLimitStrategy limit=&quot;1000&quot;/&gt;
          &lt;/pendingMessageLimitStrategy&gt;
        &lt;/policyEntry&gt;
      &lt;/policyEntries&gt;
    &lt;/policyMap&gt;
  &lt;/destinationPolicy&gt;

答案1

得分: 1

最好将您的事务批处理分为较小的块进行提交。大多数消息系统都围绕着每个单一事务的消息计数上限为10,000条,并且最常用的提交大小为128到1024。

英文:

Most likely you need to commit your transacted batch in smaller chunks. Most messaging systems are geared around the 10k message count as an upper end for a single transaction and a commit size in the 128 to 1024 is most commonly used.

答案2

得分: 0

@Matt Pavlovich是正确的。
问题出在这里,在代码的这部分:

for (String newsLetter : newsLetters) {
TextMessage textMessage = session.createTextMessage(newsLetter);
producer.send(textMessage);
}
session.commit();

由于要发送大量消息,这个版本的代码创建了一个非常大的批次来发送。
通过将代码的这部分更改为:

for (String newsLetter : newsLetters) {
TextMessage textMessage = session.createTextMessage(newsLetter);
producer.send(textMessage);
countLetter++;
if (countLetter%1000 == 0) {
session.commit();System.out.println(countLetter);
}
}
session.commit();

一切都正常。

英文:

@Matt Pavlovich is right.
The problem was here , in this part of code :

 for (String newsLetter : newsLetters) {
TextMessage textMessage = session.createTextMessage(newsLetter);
producer.send(textMessage);
}
session.commit();

Since there were a large number of messages to send, this version of the code created a very large batch to send.
By changing this part of code to as :

for (String newsLetter : newsLetters) {
TextMessage textMessage = session.createTextMessage(newsLetter);
producer.send(textMessage);
countLetter++;
if (countLetter%1000 == 0) {
session.commit();System.out.println(countLetter);
}
}
session.commit();

everything was fine.

huangapple
  • 本文由 发表于 2023年6月12日 14:33:55
  • 转载请务必保留本文链接:https://go.coder-hub.com/76454098.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定