如何从Java程序创建到IBM MQ AMQP主题的持久订阅?

huangapple go评论74阅读模式
英文:

How to create durable subscription to ibm mq amqp topic from java program?

问题

我们通过使用createDurableSubscriber方法,通过提供clientId和订阅者名称,以编程方式创建了订阅者来订阅IBM MQ AMQP主题。

我们启动程序,使其订阅主题,然后停止程序。然后将消息发送到主题,再次启动接收程序,但我们无法接收到已发送的消息,并且丢失了不应该在持久订阅的情况下发生的消息。

我们可以通过使用mqsc命令DISPLAY TOPICDISPLAY TPSTATUSDISPLAY TPSTATUS SUBDISPLAY SUB SUBID在订阅者连接时看到AMQP主题及其持久订阅,但在订阅者程序停止时无法看到。我们已经定义了属性DEFPSIST(YES),且客户端(发布者到主题)正在发送持久消息。

消息去哪了?因为我们在订阅者的持久队列中看不到消息?这是否取决于到期属性?

以下是我们的订阅者在连接时的DISPLAY SUB SUBID输出。

AMQ8096: 已查询WebSphere MQ订阅。


SUBID("十六进制订阅ID")
   SUB(:private:CLINET01:TOPIC01)            TOPICSTR(TOPIC01)
   TOPICOBJ(SYSTEM.BASE.TOPIC)             DISTYPE(已解析)
   DEST(SYSTEM.MANAGED.DURABLE.5F6B5C2524FB9AED)
   DESTQMGR(qm.name)                   PUBAPPID( )
   SELECTOR( )                             SELTYPE(NONE)
   USERDATA(010)
   PUBACCT(***************************************************)
   DESTCORL(***************************************************)
   DESTCLAS(MANAGED)                       DURABLE(是的)
   EXPIRY(0)                               PSPROP(MSGPROP)
   PUBPRTY(ASPUB)                          REQONLY(NO)
   SUBSCOPE(ALL)                           SUBLEVEL(1)
   SUBTYPE(API)                            VARUSER(FIXED)
   WSCHEMA(TOPIC)                          SUBUSER(mqm)
   CRDATE(2020-09-28)                      CRTIME(04:14:09)
   ALTDATE(2020-09-28)                     ALTTIME(04:14:09)

订阅者ID具有私有(原因不明)和客户端ID,但没有订阅者名称,而订阅者名称是sub4。

以下是从jndi.properties文件中获取的上下文工厂和提供程序URL的值。

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;

import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;

public class AMQPQueueExample1 implements Runnable  {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;

public void run(){
try{
 Connection connection = null;
 Context context = new InitialContext();
 ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
 connection = connectionFactory.createConnection();
 connection.setClientID("123");
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 Topic priceTopic = (Topic) context.lookup("myTopicLookup1");
 MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic,"sub420");
System.out.println("TOPIC " + priceTopic);

connection.start();
while(true){
TextMessage message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
           System.out.println("Subscriber 1 received: " + message1.getText());


}
}catch(Exception e){
e.printStackTrace();
}
}

 public static void main(String[] args)  {

AMQPQueueExample1 amp=new AMQPQueueExample1();
 Thread thread = new Thread(amp);
thread.start();


 }
}

以上是从jndi.properties文件中获取上下文工厂和提供程序URL的值。

英文:

We have programmatically created subscriber to IBM MQ AMQP TOPIC with createDurableSubscriber by providing clientId and subscriber name.

We start the program so it subscribes to TOPIC and stop the program. Then send the msgs to topic and again start the receiver program again but we cannot receive the msgs sent and loose the messages which should not happen in case of durable subscription..

We can see amqp topic and its durable subscription when subscriber is connected using mqsc commands DISPLAY TOPIC, DISPLAY TPSTATUS, DISPLAY TPSTATUS SUB, DISPLAY SUB SUBID but not when subscriber program is stopped. We have defined attribute DEFPSIST(YES) and client(producer to topic) is sending persistent messages.

Where are the messages gone as we cannot see messages in durable queues of subscriber? Does it depends on expiry attribute?

Output of DISPLAY SUB SUBID for our subscriber when it is connected.

AMQ8096: WebSphere MQ subscription inquired.
SUBID("hex sub id")
SUB(:private:CLINET01:TOPIC01)            TOPICSTR(TOPIC01)
TOPICOBJ(SYSTEM.BASE.TOPIC)             DISTYPE(RESOLVED)
DEST(SYSTEM.MANAGED.DURABLE.5F6B5C2524FB9AED)
DESTQMGR(qm.name)                   PUBAPPID( )
SELECTOR( )                             SELTYPE(NONE)
USERDATA(010)
PUBACCT(***************************************************)
DESTCORL(***************************************************)
DESTCLAS(MANAGED)                       DURABLE(YES)
EXPIRY(0)                               PSPROP(MSGPROP)
PUBPRTY(ASPUB)                          REQONLY(NO)
SUBSCOPE(ALL)                           SUBLEVEL(1)
SUBTYPE(API)                            VARUSER(FIXED)
WSCHEMA(TOPIC)                          SUBUSER(mqm)
CRDATE(2020-09-28)                      CRTIME(04:14:09)
ALTDATE(2020-09-28)                     ALTTIME(04:14:09)

Subscriber id has private(not sure why) and client id but not subscriber name which is sub4

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;

import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;

public class AMQPQueueExample1 implements Runnable  {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;

public void run(){
try{
 Connection connection = null;
 Context context = new InitialContext();
 ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
 connection = connectionFactory.createConnection();
 connection.setClientID("123");//("WHATS_MY_PURPOSE3"); // Why do we need clientID while publishing the TOPIC from consumer / publisher
 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 Topic priceTopic = (Topic) context.lookup("myTopicLookup1");
 MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic,"sub420"); //"sub3");
System.out.println("TOPIC "+priceTopic);

connection.start();
while(true){
TextMessage   message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
           System.out.println("Subscriber 1 received : " + message1.getText());


}
}catch(Exception e){
e.printStackTrace();
}
}

 public static void main(String[] args)  {

AMQPQueueExample1 amp=new AMQPQueueExample1();
 Thread thread = new Thread(amp);
thread.start();


 }
}

Values are taken from jndi.properties file for context factory and provider url.

答案1

得分: 2

从评论中看,你正在使用MQ 8.0.0.5版本?如果是这样的话,那么Apache Qpid JMS客户端不支持该版本的MQ。我认为在那个版本中,一个非持久订阅 可能 能够工作,但是其他任何JMS方法都不太可能。

我怀疑发生的情况是来自Qpid JMS的AMQP 1.0流在那个MQ版本中并没有被完全理解,因此订阅的过期时间被设置为0,而不是无限大。

MQ 9.2添加了对JMS 2.0规范的更多支持 - 尽管不是每个JMS特性都支持。关于支持的方法,这里有更多信息:

https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.2.0/com.ibm.mq.dev.doc/q125050_.htm

创建持久订阅者和/或消费者应该可以按你的预期工作。

英文:

It looks from the comments like you're using MQ 8.0.0.5? If that's the case then Apache Qpid JMS clients aren't supported with that version of MQ. I believe with that version a very basic non-durable subscribe might work, but any other JMS methods are unlikely to.

I suspect what is happening is the AMQP 1.0 flows from Qpid JMS aren't fully understood by that version of MQ, so the expiry of the subscription is being set to 0 rather than unlimited.

MQ 9.2 added support for more of the JMS 2.0 spec - although not every JMS feature. There is more information about the methods which are supported here:

https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.2.0/com.ibm.mq.dev.doc/q125050_.htm

Creating durable subscribers and/or consumers should work as you expect.

答案2

得分: 0

《Matthew Whitehead 的文章“MQ Light messaging from Microsoft®.NET™(第四部分)”》中提到:

AMQP 通道不支持为 MQ Light 订阅设置无限期到期时间。虽然可以创建具有很长存活时间的订阅,但不可能创建永久存在的订阅。

如果您希望创建永不过期的订阅,可以通过创建 MQ 管理的订阅,并让 MQ Light 客户端加入和离开订阅来实现。这还可以帮助确保在第一个订阅者连接之前发布到主题的任何消息不会完全丢失。阅读我的先前文章,了解有关将 MQ Light 客户端加入管理订阅的信息。

相关的 AMQP 字段

为了提供上述描述的到期功能,MQ Light 使用了 AMQP 1.0 的两个特性:

  • 源超时(Source Timeout)
  • 源到期策略(Source Expiry Policy)

源超时用于指定订阅将过期的时间(以秒为单位)。

源到期策略用于确定何时开始到期计时器。MQ AMQP 通道仅支持链接分离的到期策略,这意味着计时器在最后一个链接从订阅中分离时立即启动。


我搜索了一下,没有找到关于如何在 Apache QPID 中设置源超时或源到期策略的参考,但是链接的博客提到可以通过管理定义的订阅来设置到期时间。根据您问题中的信息,我认为您可以事先定义类似以下的内容。我没有指定 EXPIRY,因为这会从 SYSTEM.DEFAULT.SUB 中获取 EXPIRY(UNLIMITED)

DEFINE SUB(':private:CLINET01:TOPIC01') TOPICOBJ(SYSTEM.BASE.TOPIC) TOPICSTR('TOPIC01') DESTCLAS(MANAGED)

然后,当您连接 AMQP 订阅者时,它将恢复这个现有的订阅,并将到期时间设置为 UNLIMITED

英文:

An article from Matthew Whitehead "MQ Light messaging from Microsoft®.NET™ (Part 4)" states the following:

> AMQP channels don’t support setting an unlimited expiry time for MQ Light subscriptions. While it is possible to create subscriptions that have a very long time-to-live, it isn’t possible to create subscriptions to exist forever.
>
> If you wish to create subscriptions that never expire you can do so by creating an MQ administered subscription and having MQ Light clients join and leave the subscription. This can also help to ensure that any messages published to a topic before the first subscribers have connected, aren’t lost completely. Read my previous article on joining MQ Light clients to administered subscriptions.
>
>Related AMQP Fields
>
> To provide the expiry capabilities described above MQ Light uses 2 features of AMQP 1.0:
>
> * Source Timeout
> * Source Expiry Policy
>
> The source timeout is used to specify the time in seconds that the subscription will expire.
>
> The source expiry policy is used to determine what causes the expiry timer to begin. MQ AMQP channels only support an expiry policy of link-detach, which means the timer starts as soon as the last link detaches from the subscription.


I searched and couldn't find a reference on how to set Source Timeout or Source Expiry Policy in Apache QPID, but the linked blog references setting expiry via a administratively defined subscription. Based on the info in your question I think you can just define something like this ahead of time. I have not specified EXPIRY because this will pick up EXPIRY(UNLIMITED) from SYSTEM.DEFAULT.SUB:

DEFINE SUB(':private:CLINET01:TOPIC01') TOPICOBJ(SYSTEM.BASE.TOPIC) TOPICSTR('TOPIC01') DESTCLAS(MANAGED)

When you then connect your AMQP subscriber it will resume this existing subscription with the expiry set to UNLIMITED.

huangapple
  • 本文由 发表于 2020年9月28日 11:46:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/64095699.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定