WSO2队列消费

huangapple go评论65阅读模式
英文:

WSO2 queue consume

问题

我在Spring TS中开发了一个Web服务,它创建了一个ActiveMQ队列和一个API,使用WSO2 Integration Studio 8.0来消费队列并接收队列消息。

我的Spring TS Web服务功能正常,它启动后,我可以使用POSTMAN运行它,然后Web服务连接到ActiveMQ以发送一个整数(两个数字求和的结果),到这一点一切都正常。

接下来,我有一个WSO2 Integration Studio API,用于消费ActiveMQ队列以接收消息(在这种情况下,是两个数字求和的结果),这一切都进行得很正常,但我遇到了一个问题,当我想要部署它到WSO2 Enterprise Integrator 7.1.0时,我无法做到,因为我的项目是基于Java应用程序的,而Enterprise Integrator不支持Java应用程序,而且由于我的项目性质,也不能有一个用于生成CAR文件以将我的集成导入WSO2 Enterprise Integrator的Composite Exporter文件夹,我已经了解到,要在Micro Integrator环境中实现流程,我应该使用中介器集成流和端点在一个Integration Project中开发它,以便我可以拥有一个Composite Exporter文件夹,其中我可以生成一个CAR文件,然后将我的项目导入Enterprise Integrator环境,那么下面是获取队列消息以进行队列消费的Java应用程序代码:

package getQueueMsg;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumer {

    public static void main(String[] args) {
        String brokerUrl = "tcp://localhost:61616"; // ActiveMQ Broker URL

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

        try {
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // 设置要查询的队列名称
            Destination destination = session.createQueue("SumResult");

            MessageConsumer consumer = session.createConsumer(destination);

            // 读取队列消息
            while (true) {
                Message message = consumer.receive();
                // 根据您的需求处理消息
                System.out.println("Received message: " + message.toString());
            }

            // 关闭连接并释放资源
            //session.close();
            //connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

使用这个Java代码,我可以正确地消费队列,但问题在于,由于项目的性质是基于Java应用程序的,我无法生成CAR文件,然后在Enterprise Integrator环境中部署它。有人可以指导我如何开始创建一个流程,以执行与我通过我的Java代码执行相同的操作,但是使用基于中介器的集成流在Integration Project中,以便以这种方式生成CAR文件,然后在Enterprise Integrator环境中部署我的项目?

英文:

i developed an Web Service in Spring TS that create an ActiveMQ Queue and API with WSO2 Integration Studio 8.0 for consume the queue and receive the queue message.

My Spring TS Web Service functions correctly, he starts and with POSTMAN i run it and then the Web Service connects to ActiveMQ for send a integer number (the result of 2 numbers sum), all is okay at this point.

After, i have and WSO2 Integration Studio API that consume a ActiveMQ queue for receive the message (in this case is about of the 2 numbers sum result) and this are do correctly but i have a problem, i when wanna deploy it in WSO2 Enterprise Integrator 7.1.0 i cant do it because my project is Java Application based and Enterprise Integrator dont have support to Java Application and for the nature of my project cant either have a Compositer Exporter folder for generate CAR file to import my integration to WSO2 Enterprise Integrator, i have understanted that for can implement a flow in a Micro Integrator enviroment i should develop it with mediators integration flows and endpoints in a Integration Project for i can have a Compositer Exporter folder in where i can generate a CAR file with which i can import my project to the Enterprise Integrator enviroment, well the JAVA Application code with which get the queue message for queue consume is the next:

package getQueueMsg;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;

public class ActiveMQConsumer {

    public static void main(String[] args) {
        String brokerUrl = "tcp://localhost:61616"; // Broker URL ActiveMQ

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

        try {
            Connection connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            // Set the Queue name to consult
            Destination destination = session.createQueue("SumResult");

            MessageConsumer consumer = session.createConsumer(destination);

            // Read the queue messages
            while (true) {
                Message message = consumer.receive();
                // Process the message about your needs
                System.out.println("Mensaje recibido: " + message.toString());
            }

            // Close the connection and free the resources
            //session.close();
            //connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

with this java code i consume correctly the queue but the problem is for the nature of the project which is Java Application based i cant generate CAR file to then deploy it in Enterprise Integrator enviroment.
Can someone guide to me to starts do a flow to do the same that i do trought mi JAVA code but using a integration flow based in mediators on a Integration Project for this way i can generate an CAR file for deploy it in a Enterprise Integrator enviroment?

答案1

得分: 1

以下是您要翻译的内容:

无法在Micro Integrator中像提到的那样使用Java代码。为了消费来自ActiveMQ的JMS消息,您将需要开发一个集成流程。您可以使用Listener Proxy,也可以创建一个Inbound Endpoint来监听ActiveMQ队列。我建议使用后者而不是代理。

英文:

You can't Java code like the one mentioned within Micro Integrator. Inorder to consume JMS messages from ActiveMQ you will have to develop an integration flow. You can either user a Listener Proxy.

<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="JMStoHTTPStockQuoteProxy"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="transactionID"
                   expression="get-property('MessageID')"
                   scope="default"/>
         <property name="sourceMessageID"
                   expression="get-property('MessageID')"
                   scope="default"/>
         <property name="proxyMessageID"
                   expression="get-property('MessageID')"
                   scope="default"/>
         <log level="full">
            <property name="transactionID" expression="get-property('transactionID')"/>
            <property name="sourceMessageID" expression="get-property('sourceMessageID')"/>
            <property name="MessageID" expression="get-property('proxyMessageID')"/>
         </log>
         <property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>
         <drop/>               
      </inSequence>
      <faultSequence name="jms_fault"/>
   </target>
   <parameter name="redeliveryPolicy.maximumRedeliveries">1</parameter>
   <parameter name="transport.jms.DestinationType">queue</parameter>
   <parameter name="transport.jms.SessionTransacted">true</parameter>
   <parameter name="transport.jms.Destination">JMStoHTTPStockQuoteProxy</parameter>
   <parameter name="redeliveryPolicy.redeliveryDelay">2000</parameter>
   <parameter name="transport.jms.CacheLevel">consumer</parameter>
   <description/>
</proxy>

Or you can create an Inbound Endpoint to listen to the ActiveMQ queue. I would recommend this option over the proxy.

huangapple
  • 本文由 发表于 2023年6月9日 02:27:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76434720.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定