ActiveMQ 5在网络断开后无法正常工作的重传机制未生效。

huangapple go评论83阅读模式
英文:

ActiveMQ 5 redelivery not working after network disconnect

问题

Frameworks:

  • Java 1.7.0_191 和 1.8.0_181
  • Spring 4.3.18.RELEASE
  • ActiveMQ 5.14.5

Scenario:

两个客户端和一个服务器。客户端1失去连接(因为网络断开)。一条消息被发送并被客户端2消费。客户端1恢复网络连接。我期望消息会被重新发送到客户端1。虽然客户端1现在工作正常(获取所有新消息),但存在一个间隙,客户端1没有收到更新的消息,所以客户端不再可信。

这是按设计还是我配置错了?

ServerBroker:

final String brokerURI = String.format("broker://(tcp://%s:%s)?brokerName=clientBroker", host, port);
final BrokerService brokerService = BrokerFactory.createBroker(brokerURI);
brokerService.setUseJmx(true);
brokerService.setDataDirectory(dataDirectory);
final LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
loggingBrokerPlugin.setLogConsumerEvents(true);
loggingBrokerPlugin.setLogProducerEvents(true);
brokerService.setPlugins(new BrokerPlugin[] { loggingBrokerPlugin });
brokerService.start();

ServerProducer:

final JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(true);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setTimeToLive(600_000L);
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("tcp://%s:%s", host, port));
jmsTemplate.setConnectionFactory(new PooledConnectionFactory(activeMQConnectionFactory));
jmsTemplate.convertAndSend(JmsQueueConstants.SERVER_UPDATE, new NotificationQueueEntry());

ClientConsumer:

@JmsListener(destination = JmsQueueConstants.SERVER_UPDATE)
public void receive(final NotificationQueueEntry msg) {
	process(msg);
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
	final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	factory.setConnectionFactory(jmsConnectionFactory());
	factory.setPubSubDomain(true);
	factory.setSessionTransacted(true);
	factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
	factory.setConcurrency("2");
	return factory;
}

private ConnectionFactory jmsConnectionFactory() {
	final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
	activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
	activeMQConnectionFactory.setBrokerURL(String.format("failover:(tcp://%s:%s)?jms.closeTimeout=%d", host, port, 600_000));
	final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
	redeliveryPolicy.setInitialRedeliveryDelay(10_000L);
	redeliveryPolicy.setRedeliveryDelay(1_000L);
	redeliveryPolicy.setMaximumRedeliveries(600);
	activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
	activeMQConnectionFactory.setTransportListener(new TransportListener() {
		
		@Override
		public void onCommand(final Object command) {
		}
		
		@Override
		public void onException(final IOException error) {
			connected = false;
		}
		
		@Override
		public void transportInterupted() {
			connected = false;
		}
		
		@Override
		public void transportResumed() {
			if (!connected) {
				reconnect();
			}
		}
		
	});
	return new PooledConnectionFactory(activeMQConnectionFactory);
}

connectedreconnect() 仅用于在客户端中可见地显示断开连接状态,不会主动重新连接ActiveMQ连接。

Log:

2020-09-09 11:39:52,415 [ActiveMQ Transport: tcp:///<client-1-ip>:54049@1079] DEBUG o.a.a.b.T.Transport:241 Transport Connection to: tcp://<client-1-ip>:54049 failed: java.net.SocketException: Connection reset
2020-09-09 11:39:56,339 [RMI TCP Connection(310)-<client-2-ip>] INFO m.p.s.DatabaseServiceImplementation:98 saving entity to database
2020-09-09 11:39:56,342 [server-pool-1-thread-16] INFO m.p.c.JmsConnectionFactoryCache:73 Create connection for address: client-2:1079
...
...
...

(因为日志内容较多,以上只是部分内容。)

英文:

Frameworks:

  • Java 1.7.0_191 and 1.8.0_181
  • Spring 4.3.18.RELEASE
  • ActiveMQ 5.14.5

Scenario:

Two clients and a server. Client 1 loses connection (due to network disconnect). A message is sent and consumed by client 2. Client 1 regains network connection. Now I was expecting a redelivery of the message to client 1. Although client 1 works fine now (getting all new messages) there is a gap where client 1 didn't get the update messages, so the client is not trustable anymore.

Is this by design or did I configure something wrong?

ServerBroker:

final String brokerURI = String.format("broker://(tcp://%s:%s)?brokerName=clientBroker", host, port);
final BrokerService brokerService = BrokerFactory.createBroker(brokerURI);
brokerService.setUseJmx(true);
brokerService.setDataDirectory(dataDirectory);
final LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin();
loggingBrokerPlugin.setLogConsumerEvents(true);
loggingBrokerPlugin.setLogProducerEvents(true);
brokerService.setPlugins(new BrokerPlugin[] { loggingBrokerPlugin });
brokerService.start();

ServerProducer:

final JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setPubSubDomain(true);
jmsTemplate.setExplicitQosEnabled(true);
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
jmsTemplate.setTimeToLive(600_000L);
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
activeMQConnectionFactory.setBrokerURL(String.format("tcp://%s:%s", host, port));
jmsTemplate.setConnectionFactory(new PooledConnectionFactory(activeMQConnectionFactory));
jmsTemplate.convertAndSend(JmsQueueConstants.SERVER_UPDATE, new NotificationQueueEntry());

ClientConsumer:

@JmsListener(destination = JmsQueueConstants.SERVER_UPDATE)
public void receive(final NotificationQueueEntry msg) {
	process(msg);
}

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
	final DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
	factory.setConnectionFactory(jmsConnectionFactory());
	factory.setPubSubDomain(true);
	factory.setSessionTransacted(true);
	factory.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
	factory.setConcurrency("2");
	return factory;
}

private ConnectionFactory jmsConnectionFactory() {
	final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
	activeMQConnectionFactory.setTrustedPackages(ImmutableList.of("my.package", "java"));
	activeMQConnectionFactory.setBrokerURL(String.format("failover:(tcp://%s:%s)?jms.closeTimeout=%d", host, port, 600_000));
	final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
	redeliveryPolicy.setInitialRedeliveryDelay(10_000L);
	redeliveryPolicy.setRedeliveryDelay(1_000L);
	redeliveryPolicy.setMaximumRedeliveries(600);
	activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
	activeMQConnectionFactory.setTransportListener(new TransportListener() {
		
		@Override
		public void onCommand(final Object command) {
		}
		
		@Override
		public void onException(final IOException error) {
			connected = false;
		}
		
		@Override
		public void transportInterupted() {
			connected = false;
		}
		
		@Override
		public void transportResumed() {
			if (!connected) {
				reconnect();
			}
		}
		
	});
	return new PooledConnectionFactory(activeMQConnectionFactory);
}

connected and reconnect() are just to visibly show the disconnected state in the client, not actively reconnecting the ActiveMQ connection.

Log:

2020-09-09 11:39:52,415 [ActiveMQ Transport: tcp:///<client-1-ip>:54049@1079] DEBUG o.a.a.b.T.Transport:241 Transport Connection to: tcp://<client-1-ip>:54049 failed: java.net.SocketException: Connection reset
2020-09-09 11:39:52,416 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-1-ip>:54049@1079
2020-09-09 11:39:52,417 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.b.TransportConnection:1233 Cleaning up connection resources: tcp://<client-1-ip>:54049
2020-09-09 11:39:56,339 [RMI TCP Connection(310)-<client-2-ip>] INFO m.p.s.DatabaseServiceImplementation:98 saving entity to database
2020-09-09 11:39:56,342 [server-pool-1-thread-16] INFO m.p.c.JmsConnectionFactoryCache:73 Create connection for address: client-2:1079
2020-09-09 11:39:56,343 [server-pool-1-thread-16] INFO m.p.s.ServerProducerImplementation:268 Send message to client-2:1079. Data: NotificationQueueEntry
2020-09-09 11:39:56,343 [ActiveMQ Transport: tcp:///<client-2-ip>:63125@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:257 Removing Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:5:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 1}
2020-09-09 11:39:56,345 [ActiveMQ BrokerService[clientBroker] Task-17] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp:///<client-2-ip>:63125@1079
2020-09-09 11:39:56,345 [server-pool-1-thread-16] DEBUG o.a.a.util.ThreadPoolUtils:155 Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@f78a38[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
2020-09-09 11:39:56,346 [server-pool-1-thread-16] DEBUG o.a.a.t.tcp.TcpTransport:549 Stopping transport tcp://client-2/<client-2-ip>:1079@63125
2020-09-09 11:39:56,348 [server-pool-1-thread-16] DEBUG o.a.a.t.WireFormatNegotiator:82 Sending: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,350 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, Host=client-2, TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp://client-2/<client-2-ip>:1079@52652 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-2-ip>:52652@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp://client-2/<client-2-ip>:1079@52652] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp://client-2/<client-2-ip>:1079@52652 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,351 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-2-ip>:52652@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:39:56,354 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:192 Adding Producer: ProducerInfo {commandId = 4, responseRequired = true, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = null, brokerPath = null, dispatchAsync = false, windowSize = 0, sentCount = 0}
2020-09-09 11:39:56,357 [ActiveMQ Transport: tcp:///<client-2-ip>:52652@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:285 Sending message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@11eb3c5, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
2020-09-09 11:39:56,358 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:428 preProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,359 [ActiveMQ BrokerService[clientBroker] Task-18] INFO o.a.a.b.u.LoggingBrokerPlugin:436 postProcessDispatch: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:client-2-52500-1599644321472-1:1:1:1, destination = topic://ServerUpdate, message = ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:client-2-61550-1599640739463-4:6:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:client-2-61550-1599640739463-4:6:1:1, destination = topic://ServerUpdate, transactionId = null, expiration = 1599644996357, timestamp = 1599644396357, arrival = 0, brokerInTime = 1599644396358, brokerOutTime = 1599644396359, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@162a6d1, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1936, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}, redeliveryCounter = 0}
2020-09-09 11:39:56,419 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] INFO o.a.a.b.u.LoggingBrokerPlugin:157 Acknowledging message for client ID: ID:client-2-52500-1599644321472-0:1, ID:client-2-61550-1599640739463-4:6:1:1:1
2020-09-09 11:39:56,421 [ActiveMQ Transport: tcp:///<client-2-ip>:52501@1079] DEBUG o.a.a.t.LocalTransaction:48 commit: TX:ID:client-2-52500-1599644321472-1:1:1 syncCount: 1
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.InactivityMonitor:103 Using min of local: WireFormatInfo { version=12, properties={TcpNoDelayEnabled=true, SizePrefixDisabled=false, CacheSize=1024, ProviderName=ActiveMQ, StackTraceEnabled=true, PlatformDetails=JVM: 1.8.0_181, 25.181-b13, Oracle Corporation, OS: Windows 10, 10.0, x86, CacheEnabled=true, TightEncodingEnabled=true, MaxFrameSize=9223372036854775807, MaxInactivityDuration=30000, MaxInactivityDurationInitalDelay=10000, ProviderVersion=5.14.5}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:130 Received WireFormat: WireFormatInfo { version=12, properties={CacheSize=1024, ProviderName=ActiveMQ, SizePrefixDisabled=false, TcpNoDelayEnabled=true, PlatformDetails=JVM: 1.7.0_191, 24.191-b08, Oracle Corporation, OS: Windows 10, 10.0, x86, StackTraceEnabled=true, CacheEnabled=true, MaxFrameSize=9223372036854775807, TightEncodingEnabled=true, MaxInactivityDuration=30000, ProviderVersion=3.2.0.0-SNAPSHOT, MaxInactivityDurationInitalDelay=10000}, magic=[A,c,t,i,v,e,M,Q]}
2020-09-09 11:41:04,215 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:137 tcp:///<client-1-ip>:50446@1079 before negotiation: OpenWireFormat{version=12, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
2020-09-09 11:41:04,216 [ActiveMQ Transport: tcp:///<client-1-ip>:50446@1079] DEBUG o.a.a.t.WireFormatNegotiator:152 tcp:///<client-1-ip>:50446@1079 after negotiation: OpenWireFormat{version=12, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}

答案1

得分: 0

这是非持久主题订阅者的预期行为。当这样的订阅者未连接时,它将不会接收到发送到代理的任何消息。

还值得注意的是,这不是通常所称的“重新投递”。消息重新投递是指当消息在事务中被消耗并且该事务被回滚时,消息会被重新投递给客户端以再次尝试。

英文:

This is the expected behavior for a non-durable topic subscriber. When such a subscriber is not connected it will not receive any messages sent to the broker.

It's also worth noting that this is not what is typically referred to as "redelivery." Message redelivery is what happens when, for example, a message is consumed in a transaction and that transaction is rolled back and the message is then redelivered to the client to try again.

huangapple
  • 本文由 发表于 2020年9月9日 18:03:35
  • 转载请务必保留本文链接:https://go.coder-hub.com/63809305.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定