英文:
spring boot jms - send and receive message inside @JmsListner
问题
QUEUE_INI 使用 IBM MQ9,最新的 Spring Boot 和 IBM 的 spring-boot-starter。
我想通过 @JmsListener 接收消息,然后在不提交它的情况下,将另一条消息发送到另一个队列,接收响应,然后提交所有消息。
到目前为止,我有以下代码:
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest'")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
我卡在了 "Message sent" 那一步。看起来子请求似乎没有真正发送。我在 MQ UI 中看到队列深度为 1,但内部没有任何消息,我的子请求监听器也没有收到任何消息。
我还尝试过使用 sendAndReceive
方法:
Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
Message msg = session.createTextMessage();
msg.setStringProperty("type", "com.example.SubRequest");
LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
return msg;
});
但我没有权限访问模型队列。
有没有办法让这个工作起来?
更新:
在你们的共同帮助下,我使这个工作起来了。最终我创建了一个单独的服务,仅使用 @Transactional(propagation = Propagation.REQUIRES_NEW)
发送子请求。所有其他逻辑都保留在主监听器中。
同时打开事务开始/结束日志记录也很有帮助:
logging:
level:
org.springframework.transaction.interceptor: trace
英文:
QUEUE_INI use IBM MQ9, latest spring boot, and IBM's spring-boot-starter.
I'd like to receive a message with @JmsListner, then without committing it, send another message to another queue, receive response and then commit all.
So far I has next:
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
I'm stuck on 'Message sent'. It looks like sub-request hasn't really sent. I see in MQ UI that queue depth is 1 but there is no message inside, and my sub-request listener also doesn't see any messages.
I've also tried use sendAndReceive
method:
Message reply = jmsTemplate.sendAndReceive(QUEUE_OUT, session -> {
Message msg = session.createTextMessage();
msg.setStringProperty("type", "com.example.SubRequest");
LOG.info("Sending msg: <{}> to {}", msg, QUEUE_OUT);
return msg;
});
But I do not have permissions to access model queue.
Is there any way to make this work?
UPDATE:
I made this work with combined help from you all. I end up with separate service to only send sub request with @Transactional(propagation = Propagation.REQUIRES_NEW)
. All other logic remained within main listener.
Also turning on transactions start/end logs was helpful:
logging:
level:
org.springframework.transaction.interceptor: trace
答案1
得分: 3
你的出站消息发送方与消息接收方在同一事务中注册。因此,出站消息的接收方在事务提交之前不会看到该消息。我认为你需要启动一个新的事务来执行内部过程。
== 更新 ==
所以已经过了一段时间,我没有为此设置开发环境,但我建议做类似这样的更改。基本上,你将侦听器(Listener)和发送者/接收者(Sender/Receiver)拆分为两个独立的类。发送者/接收者应该被注入到你的侦听器中,以便@Transaction注解能够生效。
public class MyListener {
private final MySender sender;
public MyListener(MySender sender) {
this.sender = sender;
}
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest'")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
Message reply = sender.sendAndReceive()
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
}
public class MySender {
private final JmsTemplate jmsTemplate2;
private final Destination QUEUE_OUT;
public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
this.jmsTemplate2 = jmsTemplate2;
this.QUEUE_OUT = QUEUE_OUT;
}
@Transactional(propagation=Propagation.NESTED) // Or REQUIRES_NEW, depending on usage
public Message sendAndReceive() throws JMSException {
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
return reply;
}
}
@Transactional(propagation=Propagation.NESTED)
会在已有事务运行时启动一个新的事务(在方法退出时进行提交/回滚),因此你应该能够发送/接收消息,并将其返回到你的侦听器。
英文:
Your outbound message sender is enrolled in the same transaction as your message receiver. So the receiver of the outbound message won't see the message until the transaction commits. I think you need to start a new transaction to perform the inner procedure.
== Update ==
So it's been a while, and I don't have a dev environment set up for this, but I suggest something like this. Essentially, you're splitting up the Listener and Sender/Receiver into two separate classes. The Sender/Receiver should be injected into your listener so the @Transactional annotation is honored.
public class MyListener {
private final MySender sender;
public MyListener(MySender sender) {
this.sender = sender;
}
@JmsListener(destination = QUEUE_IN, selector = "type='com.example.MainRequest")
public void receiveMessage(Message message) throws JMSException {
LOG.info("Received {} <{}>", QUEUE_IN, message);
Message reply = sender.sendAndReceive()
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
}
}
public class MySender {
private final JmsTemplate jmsTemplate2;
private final Destination QUEUE_OUT;
public MySender(JmsTemplate jmsTemplate2, Destination QUEUE_OUT) {
this.jmsTemplate2 = jmsTemplate2;
this.QUEUE_OUT = QUEUE_OUT;
}
@Transactional(propagation=Propagation.NESTED) // Or REQUIRES_NEW, edepending on usage
public Message sendAndReceive() throws JMSException {
jmsTemplate2.convertAndSend(QUEUE_OUT, "SUB Request", requestMsg -> {
requestMsg.setStringProperty("type", "com.example.SubRequest");
return requestMsg;
});
LOG.info("Message sent");
Message reply = jmsTemplate2.receiveSelected(QUEUE_IN, "type='com.example.SubResponse'");
LOG.info("Received reply from {}: <{}>", QUEUE_IN, reply);
return reply;
}
}
The @Transactional(propagation=Propagation.NESTED) will start a new transaction if one is already running (and commit/rollback when the method exits), so you should be able to send/receive the message and return it to you listener.
答案2
得分: 3
很遗憾,由于我没有足够的权限,无法给先前的回答添加评论。
正如@Nicholas所说,您收到的消息与消息发送在同一个事务中。在receiveMessage
方法完成之前,初始发送不会被提交到队列,以供接收者消费。我假设您可能已经将接收配置为类似于RECEIVE_TIMEOUT_INDEFINITE_WAIT
的内容,这将阻止方法的完成。
在此配置下,您已经将消息监听器绑定到了QUEUE_IN
。传递到该队列的消息将只传递给一个消费者。我看到您正在使用选择器来避免此问题。
一个选项可能是为类型为com.example.SubResponse
的消息引入另一个消息监听器,并从第一个监听器中移除阻塞接收的部分。
英文:
Unfortunately I am unable to add a comment to the previous answer as I don’t have sufficient privilege.
As @Nicholas says, your message receive is in the same transaction as the message send. Until the receiveMessage
method completes the initial send won’t be committed to the queue for the receiver to consume. I am assuming you might have the receive configured with something like RECEIVE_TIMEOUT_INDEFINITE_WAIT
which would then block completion of the method.
With this configuration, you already have a Message Listener bound to QUEUE_IN
. Messages delivered to that queue would be delivered to just one consumer. I see you are using selectors to avoid this problem.
One option might be to introduce another Message Listener for type = com.example.SubResponse
and remove the blocking receive from the first.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论