ActiveMQ Artemis和5.x监听器同时工作时 – 空指针异常

huangapple go评论69阅读模式
英文:

ActiveMQ Artemis and 5.x listeners at same time - NullPointerException

问题

我有一个使用遗留的 Spring 4.2.1.RELEASE 应用程序,连接到 ActiveMQ 5.x 作为监听器,现在我们正在添加与 ActiveMQ Artemis 的连接。对于 Artemis,我们正在使用持久订阅,因为我们不希望在订阅者关闭时在主题上丢失消息,并且我们还使用了共享订阅,因为我们想要在订阅中处理消息时具有集群或使用并发的选项。我有单独的 ConnectionFactoryListenerContainer,但从这个不断重复的 WARN 日志来看,似乎 Artemis DMLC 无法启动,原因是以下 NPE(空指针异常):

java.lang.NullPointerException
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
    at java.lang.Thread.run(Unknown Source)

表面上看,它似乎找不到 createSharedDurableConsumer 方法。查看我所拥有的 AbstractMessageListenerContainer,第 856 行调用了 method.invoke 方法:

/** 如果可用,JMS 2.0 Session.createSharedDurableConsumer 方法 */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
    Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);

...

Method method = (isSubscriptionDurable() ?
                        createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
    return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}

Artemis 配置如下:

@Configuration
public class ArtemisConfig {

    @Autowired
    private Environment env;

    @Bean
    public ConnectionFactory artemisConnectionFactory() {
        ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
                .createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());

        artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
        artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
        artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
        // 其他属性的设置...

        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);

        // 其他属性的设置...

        return cachingConnectionFactory;
    }

    @Bean
    public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
            JmsTransactionManager artemisJmsTransactionManager,
            MappingJackson2MessageConverter mappingJackson2MessageConverter) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

        // 其他属性的设置...

        return factory;
    }

    // 其他方法...

    private TransportConfiguration[] createTransportConfigurations() {
        // 配置传输参数...
        return new TransportConfiguration[] {
            new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
            new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
    }
}

我的 pom 文件中使用的是 Artemis 的 2.10.0 版本。

如何修复这个问题?

英文:

I have a legacy Spring 4.2.1.RELEASE application that connects to ActiveMQ 5.x as a listener and now we're adding connectivity to ActiveMQ Artemis. For Artemis we're using durable subscriptions because we don't want message loss on a topic when the subscribers go down and shared subscriptions because we wanted the option of clustering or using concurrency to asynchronously process the messages in the subscription. I have separate ConnectionFactorys and ListenerContainers, but from this WARN log that keeps repeating it looks like the Artemis DMLC can't start due to the following NPE:

java.lang.NullPointerException
    at org.springframework.jms.listener.AbstractMessageListenerContainer.createConsumer(AbstractMessageListenerContainer.java:856)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:213)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1173)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1149)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1142)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1039)
    at java.lang.Thread.run(Unknown Source)

On the surface it looks like it can't find the method createSharedDurableConsumer. Looking at the AbstractMessageListenerContainer I have, line 856 is calling method.invoke

/** The JMS 2.0 Session.createSharedDurableConsumer method, if available */
private static final Method createSharedDurableConsumerMethod = ClassUtils.getMethodIfAvailable(
		Session.class, "createSharedDurableConsumer", Topic.class, String.class, String.class);

...

Method method = (isSubscriptionDurable() ?
						createSharedDurableConsumerMethod : createSharedConsumerMethod);
try {
	return (MessageConsumer) method.invoke(session, destination, getSubscriptionName(), getMessageSelector());
}

Artemis configuration:

@Configuration
public class ArtemisConfig {

	@Autowired
	private Environment env;

	@Bean
	public ConnectionFactory artemisConnectionFactory() {
		ActiveMQConnectionFactory artemisConnectionFactory = ActiveMQJMSClient
				.createConnectionFactoryWithHA(JMSFactoryType.CF, createTransportConfigurations());

		artemisConnectionFactory.setUser(env.getRequiredProperty("artemis.username"));
		artemisConnectionFactory.setPassword(env.getRequiredProperty("artemis.password"));
		artemisConnectionFactory.setCallTimeout(env.getRequiredProperty("artemis.call.timeout.millis", Long.class));
		artemisConnectionFactory.setConnectionTTL(env.getRequiredProperty("artemis.connection.ttl.millis", Long.class));
		artemisConnectionFactory
				.setCallFailoverTimeout(env.getRequiredProperty("artemis.call.failover.timeout.millis", Long.class));
		artemisConnectionFactory.setInitialConnectAttempts(
				env.getRequiredProperty("artemis.connection.attempts.initial", Integer.class));
		artemisConnectionFactory
				.setReconnectAttempts(env.getRequiredProperty("artemis.connection.attempts.reconnect", Integer.class));
		artemisConnectionFactory.setRetryInterval(env.getRequiredProperty("artemis.retry.interval.millis", Long.class));
		artemisConnectionFactory
				.setRetryIntervalMultiplier(env.getRequiredProperty("artemis.retry.interval.multiplier", Double.class));
		artemisConnectionFactory.setBlockOnAcknowledge(true);
		artemisConnectionFactory.setBlockOnDurableSend(true);
		artemisConnectionFactory.setCacheDestinations(true);
		artemisConnectionFactory.setConsumerWindowSize(0);
		artemisConnectionFactory.setMinLargeMessageSize(1024 * 1024);

		CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(artemisConnectionFactory);

		cachingConnectionFactory
		.setSessionCacheSize(env.getRequiredProperty("artemis.session.cache.size", Integer.class));
		cachingConnectionFactory.setReconnectOnException(true);

		return cachingConnectionFactory;
	}

	@Bean
	public DefaultJmsListenerContainerFactory artemisContainerFactory(ConnectionFactory artemisConnectionFactory,
			JmsTransactionManager artemisJmsTransactionManager,
			MappingJackson2MessageConverter mappingJackson2MessageConverter) {
		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

		factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
		factory.setConnectionFactory(artemisConnectionFactory);
		factory.setDestinationResolver(new DynamicDestinationResolver());
		factory.setMessageConverter(mappingJackson2MessageConverter);
		factory.setSubscriptionDurable(Boolean.TRUE);
		factory.setSubscriptionShared(Boolean.TRUE);
		factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
		factory.setSessionTransacted(Boolean.TRUE);
		factory.setTransactionManager(artemisJmsTransactionManager);

		return factory;
	}

	private TransportConfiguration[] createTransportConfigurations() {
		String connectorFactoryFqcn = NettyConnectorFactory.class.getName();
		Map<String, Object> primaryTransportParameters = new HashMap<>(2, 1F);
		String primaryHostname = env.getRequiredProperty("artemis.primary.hostname");
		Integer primaryPort = env.getRequiredProperty("artemis.primary.port", Integer.class);

		primaryTransportParameters.put("host", primaryHostname);
		primaryTransportParameters.put("port", primaryPort);

		return new TransportConfiguration[] {
				new TransportConfiguration(connectorFactoryFqcn, primaryTransportParameters),
				new TransportConfiguration(connectorFactoryFqcn, backupTransportParameters) };
	}
}

My pom uses version 2.10.0 of Artemis.

How do I fix this?

答案1

得分: 2

JMS 2.0规范与JMS 1.1向后兼容,因此请确保类路径上只有JMS 2规范。我的直觉是Spring代码中的反射调用出现问题,因为它们正在访问JMS 1.1规范的类,而不是正确的JMS 2规范类。

英文:

The JMS 2.0 spec is backwards compatible with JMS 1.1 so make sure you only have the JMS 2 spec on your classpath. My hunch is that the reflection calls in the Spring code are getting messed up because they're hitting the JMS 1.1 spec classes instead of the proper JMS 2 spec classes.

huangapple
  • 本文由 发表于 2020年9月12日 00:00:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/63850508.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定