英文:
jms MessageListenerContainer.stop() called but subscriber count doesn't decrease in amq console
问题
我有一个Spring Boot应用程序,需要动态订阅和取消订阅(非持久化)ActiveMQ主题。订阅工作正常,取消订阅也按照代码运行,但ActiveMQ主题中的订阅者计数没有减少。我漏掉或不理解的是什么?
以下是一些精简的代码:
@Component
@Scope("prototype")
public class DataStreamController {
@Autowired
@Qualifier("pubsubNonDurableJmsListenerContainer")
private DefaultJmsListenerContainerFactory listenerContainerFactory;
@Autowired
private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
private DataStreamListener listener;
private String customer;
private final String uuidStream1 = UUID.randomUUID().toString();
private final String uuidStream2 = UUID.randomUUID().toString();
public DataStreamController(String customer, DataStreamListener listener) {
this.customer = customer;
this.listener = listener;
}
@PostConstruct
public void init() {
subscribeToTopics();
}
public void subscribeToTopics() {
SimpleJmsListenerEndpoint stream1TopicEndpoint = new SimpleJmsListenerEndpoint();
stream1TopicEndpoint.setDestination("stream1." + customer);
stream1TopicEndpoint.setId(uuidStream1);
stream1TopicEndPoint.setMessageListener(message ->
onStream1(message));
SimpleJmsListenerEndpoint stream2TopicEndpoint = new SimpleJmsListenerEndpoint();
stream2TopicEndpoint.setDestination("stream2." + customer);
stream2TopicEndpoint.setId(uuidStream2);
stream2TopicEndPoint.setMessageListener(message ->
onStream2(message));
jmsListenerEndpointRegistry.registerListenerContainer(stream1TopicEndpoint, listenerContainerFactory, true);
jmsListenerEndpointRegistry.registerListenerContainer(stream2TopicEndpoint, listenerContainerFactory, true);
}
public void onStream1(final Message message) {
// 做一些操作
listener.onData(// JSON对象);
}
public void onStream2(final Message message) {
// 做一些操作
listener.onData(// JSON对象);
}
public void close() {
jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).stop();
jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).stop();
logger.info("stream1 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).isRunning());
logger.info("stream2 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).isRunning());
}
}
当调用close()
时,我的日志显示:stream1 isRunning? false
,然而(如前所述),amq控制台显示订阅者仍然处于活动状态。
英文:
I have a spring-boot app which needs to dynamically subscribe and unsubscribe (non-durable) to an ActiveMQ topic. The subscription all works and so does the unsubscribe (according to the code) but the subscriber count on the topic in ActiveMQ doesn't decrease. What is it I'm missing/not understanding?
Here's some stripped down code:
@Component
@Scope("prototype")
public class DataStreamController {
@Autowired
@Qualifier("pubsubNonDurableJmsListenerContainer")
private DefaultJmsListenerContainerFactory listenerContainerFactory;
@Autowired
private JmsListenerEndpointRegistry jmsListenerEndpointRegistry;
private DataStreamListener listener;
private String customer;
private final String uuidStream1 = UUID.randomUUID().toString();
private final String uuidStream2 = UUID.randomUUID().toString();
public DataStreamController(String customer, DataStreamListener listener) {
this.customer = customer;
this.listener = listener;
}
@PostConstruct
public void init() {
subscribeToTopics();
}
public void subscribeToTopics() {
SimpleJmsListenerEndpoint stream1TopicEndpoint = new SimpleJmsListenerEndpoint();
stream1TopicEndpoint.setDestination("stream1." + customer);
stream1TopicEndpoint.setId(uuidStream1);
stream1TopicEndPoint.setMessageListener(message ->
onStream1(message);
SimpleJmsListenerEndpoint stream2TopicEndpoint = new SimpleJmsListenerEndpoint();
stream2TopicEndpoint.setDestination("stream2." + customer);
stream2TopicEndpoint.setId(uuidStream2);
stream2TopicEndPoint.setMessageListener(message ->
onStream2(message);
jmsListenerEndpointRegistry.registerListenerContainer(stream1TopicEndpoint, listenerContainerFactory, true);
jmsListenerEndpointRegistry.registerListenerContainer(stream2TopicEndpoint, listenerContainerFactory, true);
}
public void onStream1(final Message message) {
// do some stuff
listener.onData(// json object);
}
public void onStream2(final Message message) {
// do some stuff
listener.onData(// json object);
}
public void close() {
jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).stop();
jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).stop();
logger.info("stream1 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream1).isRunning());
logger.info("stream2 isRunning? " + jmsListenerEndpointRegistry.getListenerContainer(uuidStream2).isRunning());
}
}
When calling close()
my logs say: stream1 isRunning? false
and yet (as stated earlier) the amq console shows the subscriber is still active.
答案1
得分: 2
Stopping the container just stops the container from listening, it does not close the consumer. You need to call shutDown()
to close the consumer.
英文:
Stopping the container just stops the container from listening, it does not close the consumer. You need to call shutDown()
to close the consumer.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论