jms MessageListenerContainer.stop() called but subscriber count doesn't decrease in amq console

huangapple go评论70阅读模式
英文:

jms MessageListenerContainer.stop() called but subscriber count doesn't decrease in amq console

问题

我有一个Spring Boot应用程序,需要动态订阅和取消订阅(非持久化)ActiveMQ主题。订阅工作正常,取消订阅也按照代码运行,但ActiveMQ主题中的订阅者计数没有减少。我漏掉或不理解的是什么?

以下是一些精简的代码:

@Component
@Scope("prototype")
public class DataStreamController {

    @Autowired
    @Qualifier("pubsubNonDurableJmsListenerContainer")
    private DefaultJmsListenerContainerFactory listenerContainerFactory;
    @Autowired
    private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
    private DataStreamListener listener;
    private String customer;
    private final String uuidStream1 = UUID.randomUUID().toString();
    private final String uuidStream2 = UUID.randomUUID().toString();

    public DataStreamController(String customer, DataStreamListener listener) {
        this.customer = customer;
        this.listener = listener;
    }

    @PostConstruct
    public void init() {
        subscribeToTopics();
    }

    public void subscribeToTopics() {
        SimpleJmsListenerEndpoint stream1TopicEndpoint = new SimpleJmsListenerEndpoint();
        stream1TopicEndpoint.setDestination("stream1." + customer);
        stream1TopicEndpoint.setId(uuidStream1);
        stream1TopicEndPoint.setMessageListener(message ->
            onStream1(message));

        SimpleJmsListenerEndpoint stream2TopicEndpoint = new SimpleJmsListenerEndpoint();
        stream2TopicEndpoint.setDestination("stream2." + customer);
        stream2TopicEndpoint.setId(uuidStream2);
        stream2TopicEndPoint.setMessageListener(message ->
            onStream2(message));

        jmsListenerEndpointRegistry.registerListenerContainer(stream1TopicEndpoint, listenerContainerFactory, true);
        jmsListenerEndpointRegistry.registerListenerContainer(stream2TopicEndpoint, listenerContainerFactory, true);
    }

    public void onStream1(final Message message) {
        // 做一些操作
        listener.onData(// JSON对象);
    }

    public void onStream2(final Message message) {
        // 做一些操作
        listener.onData(// JSON对象);
    }

    public void close() {
        jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).stop();
        jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).stop();
        logger.info("stream1 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).isRunning());
        logger.info("stream2 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).isRunning());
    }
}

当调用close()时,我的日志显示:stream1 isRunning? false,然而(如前所述),amq控制台显示订阅者仍然处于活动状态。

英文:

I have a spring-boot app which needs to dynamically subscribe and unsubscribe (non-durable) to an ActiveMQ topic. The subscription all works and so does the unsubscribe (according to the code) but the subscriber count on the topic in ActiveMQ doesn't decrease. What is it I'm missing/not understanding?

Here's some stripped down code:

@Component
@Scope("prototype")
public class DataStreamController {
@Autowired
@Qualifier("pubsubNonDurableJmsListenerContainer")
private DefaultJmsListenerContainerFactory listenerContainerFactory;
@Autowired
private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
private DataStreamListener listener;
private String customer;
private final String uuidStream1 = UUID.randomUUID().toString();
private final String uuidStream2 = UUID.randomUUID().toString();
public DataStreamController(String customer, DataStreamListener listener) {
this.customer = customer;
this.listener = listener;
}
@PostConstruct
public void init() {
subscribeToTopics();
}
public void subscribeToTopics() {
SimpleJmsListenerEndpoint stream1TopicEndpoint = new SimpleJmsListenerEndpoint();
stream1TopicEndpoint.setDestination("stream1." + customer);
stream1TopicEndpoint.setId(uuidStream1);
stream1TopicEndPoint.setMessageListener(message ->
onStream1(message);
SimpleJmsListenerEndpoint stream2TopicEndpoint = new SimpleJmsListenerEndpoint();
stream2TopicEndpoint.setDestination("stream2." + customer);
stream2TopicEndpoint.setId(uuidStream2);
stream2TopicEndPoint.setMessageListener(message ->
onStream2(message);
jmsListenerEndpointRegistry.registerListenerContainer(stream1TopicEndpoint, listenerContainerFactory, true);
jmsListenerEndpointRegistry.registerListenerContainer(stream2TopicEndpoint, listenerContainerFactory, true);
}
public void onStream1(final Message message) {
// do some stuff
listener.onData(// json object);
}
public void onStream2(final Message message) {
// do some stuff
listener.onData(// json object);
}
public void close() {
jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).stop();
jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).stop();
logger.info("stream1 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).isRunning());
logger.info("stream2 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).isRunning());
}
}

When calling close() my logs say: stream1 isRunning? false and yet (as stated earlier) the amq console shows the subscriber is still active.

答案1

得分: 2

Stopping the container just stops the container from listening, it does not close the consumer. You need to call shutDown() to close the consumer.

https://stackoverflow.com/questions/73725269/not-able-to-stop-an-ibm-mq-jms-consumer-in-spring-boot/73731402#73731402

英文:

Stopping the container just stops the container from listening, it does not close the consumer. You need to call shutDown() to close the consumer.

https://stackoverflow.com/questions/73725269/not-able-to-stop-an-ibm-mq-jms-consumer-in-spring-boot/73731402#73731402

huangapple
  • 本文由 发表于 2023年6月6日 17:00:50
  • 转载请务必保留本文链接:https://go.coder-hub.com/76412989.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定