英文:
ActiveMQ Artemis and 5.x listeners at same time - NullPointerException
问题
我有一个使用遗留的 Spring 4.2.1.RELEASE 应用程序,连接到 ActiveMQ 5.x 作为监听器,现在我们正在添加与 ActiveMQ Artemis 的连接。对于 Artemis,我们正在使用持久订阅,因为我们不希望在订阅者关闭时在主题上丢失消息,并且我们还使用了共享订阅,因为我们想要在订阅中处理消息时具有集群或使用并发的选项。我有单独的 ConnectionFactory
和 ListenerContainer
,但从这个不断重复的 WARN
日志来看,似乎 Artemis DMLC 无法启动,原因是以下 NPE(空指针异常):
java.lang.NullPointerException
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
at java.lang.Thread.run(Unknown Source)
表面上看,它似乎找不到 createSharedDurableConsumer
方法。查看我所拥有的 AbstractMessageListenerContainer,第 856 行调用了 method.invoke
方法:
/** 如果可用,JMS 2.0 Session.createSharedDurableConsumer 方法 */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
...
Method method = (isSubscriptionDurable() ?
createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}
Artemis 配置如下:
@Configuration
public class ArtemisConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory artemisConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
.createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());
artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
// 其他属性的设置...
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);
// 其他属性的设置...
return cachingConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
JmsTransactionManager artemisJmsTransactionManager,
MappingJackson2MessageConverter mappingJackson2MessageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// 其他属性的设置...
return factory;
}
// 其他方法...
private TransportConfiguration[] createTransportConfigurations() {
// 配置传输参数...
return new TransportConfiguration[] {
new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
}
}
我的 pom 文件中使用的是 Artemis 的 2.10.0 版本。
如何修复这个问题?
英文:
I have a legacy Spring 4.2.1.RELEASE application that connects to ActiveMQ 5.x as a listener and now we're adding connectivity to ActiveMQ Artemis. For Artemis we're using durable subscriptions because we don't want message loss on a topic when the subscribers go down and shared subscriptions because we wanted the option of clustering or using concurrency to asynchronously process the messages in the subscription. I have separate ConnectionFactory
s and ListenerContainer
s, but from this WARN
log that keeps repeating it looks like the Artemis DMLC can't start due to the following NPE:
java.lang.NullPointerException
at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
at java.lang.Thread.run(Unknown Source)
On the surface it looks like it can't find the method createSharedDurableConsumer
. Looking at the AbstractMessageListenerContainer I have, line 856 is calling method.invoke
/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);
...
Method method = (isSubscriptionDurable() ?
createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}
Artemis configuration:
@Configuration
public class ArtemisConfig {
@Autowired
private Environment env;
@Bean
public ConnectionFactory artemisConnectionFactory() {
ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
.createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());
artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
artemisConnectionFactory
.setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
artemisConnectionFactory.setInitialConnectAttempts(
env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
artemisConnectionFactory
.setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
artemisConnectionFactory
.setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
artemisConnectionFactory.setBlockOnAcknowledge(true);
artemisConnectionFactory.setBlockOnDurableSend(true);
artemisConnectionFactory.setCacheDestinations(true);
artemisConnectionFactory.setConsumerWindowSize(0);
artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);
cachingConnectionFactory
.setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
cachingConnectionFactory.setReconnectOnException(true);
return cachingConnectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
JmsTransactionManager artemisJmsTransactionManager,
MappingJackson2MessageConverter mappingJackson2MessageConverter) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
factory.setConnectionFactory(artemisConnectionFactory);
factory.setDestinationResolver(new DynamicDestinationResolver());
factory.setMessageConverter(mappingJackson2MessageConverter);
factory.setSubscriptionDurable(Boolean.TRUE);
factory.setSubscriptionShared(Boolean.TRUE);
factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
factory.setSessionTransacted(Boolean.TRUE);
factory.setTransactionManager(artemisJmsTransactionManager);
return factory;
}
private TransportConfiguration[] createTransportConfigurations() {
String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);
primaryTransportParameters.put("host", primaryHostname);
primaryTransportParameters.put("port", primaryPort);
return new TransportConfiguration[] {
new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
}
}
My pom uses version 2.10.0 of Artemis.
How do I fix this?
答案1
得分: 2
JMS 2.0规范与JMS 1.1向后兼容,因此请确保类路径上只有JMS 2规范。我的直觉是Spring代码中的反射调用出现问题,因为它们正在访问JMS 1.1规范的类,而不是正确的JMS 2规范类。
英文:
The JMS 2.0 spec is backwards compatible with JMS 1.1 so make sure you only have the JMS 2 spec on your classpath. My hunch is that the reflection calls in the Spring code are getting messed up because they're hitting the JMS 1.1 spec classes instead of the proper JMS 2 spec classes.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论