如何使用Stomp.py强制断开ActiveMQ连接

huangapple go评论63阅读模式
英文:

How to force disconnect from ActiveMQ connection with Stomp.py

问题

当使用持久连接监听消息队列时,我在监听器中遇到了错误。我通过按<kbd>CTRL</kbd>-<kbd>Z</kbd>来模拟退出程序。尝试重新连接时,出现以下错误:

on_error! : "javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: <Client-id> already connected from tcp://10.18.57.69:4241
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
        ...
        at java.lang.Thread.run(Thread.java:745)

我尝试使用以下方法取消订阅并断开连接,但它不会使我断开连接:

class MyListener(stomp.ConnectionListener):
    """This is a listener class that listens for new messages using the STOMP protocol"""

    def __init__(self, conn):
        self.conn = conn
        self.client_id = client_id

    def on_error(self, headers, message):
        self.disconnect()

    def on_message(self, headers, message):
        ...

    def on_disconnected(self):
        self.disconnect()

    def disconnect(self):
        try:
            self.conn.unsubscribe(
                destination="/topic/bmrsTopic",
                id=self.client_id
            )
        except:
            print('unsubscribe failed')
        # first disconnect before trying to reconnect
        print('first disconnect before trying to reconnect')
        try:
            self.conn.disconnect()
        except:
            print('disconnect failed')

如何强制AMQ服务器忘记我的先前连接?

英文:

When listening to a message queue using a durable connection I get an error in the listener. I simulate this by hitting <kbd>CTRL</kbd>-<kbd>Z</kbd> to quit the program. Trying to re-connect gives me a an error that says:

on_error! : &quot;javax.jms.InvalidClientIDException: Broker: BMRSBROKER - Client: &lt;Client-id&gt; already connected from tcp://10.18.57.69:4241
        at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:255)
        at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:116)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.security.JaasAuthenticationBroker.addConnection(JaasAuthenticationBroker.java:75)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.plugin.AbstractRuntimeConfigurationBroker.addConnection(AbstractRuntimeConfigurationBroker.java:118)
        at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:98)
        at org.apache.activemq.broker.MutableBrokerFilter.addConnection(MutableBrokerFilter.java:103)
        at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)
        at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)
        at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)
        at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:333)
        at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:197)
        at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
        at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:300)
        at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:97)
        at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:202)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompConnect(ProtocolConverter.java:774)
        at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:265)
        at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:85)
        at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
        at org.apache.activemq.transport.tcp.SslTransport.doConsume(SslTransport.java:108)
        at org.apache.activemq.transport.stomp.StompSslTransportFactory$1$1.doConsume(StompSslTransportFactory.java:70)
        at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:233)
        at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
        at java.lang.Thread.run(Thread.java:745)
&quot;

I have tried to unsubscribe and disconnect my connection using the method below but it won't disconnect me.

class MyListener(stomp.ConnectionListener):
    &quot;&quot;&quot;This is a listener class that listens for new messages using the STOMP protocol&quot;&quot;&quot;

    def __init__(self, conn):
        self.conn = conn
        self.client_id = client_id

    def on_error(self, headers, message):
        self.disconnect()

    def on_message(self, headers, message):
        ...

    def on_disconnected(self):
        self.disconnect()

    def disconnect(self):
        try:
            self.conn.unsubscribe(
                destination=&quot;/topic/bmrsTopic&quot;,
                id=self.client_id
            )
        except:
            print(&#39;unsubscribe failed&#39;)
        # first disconnect before trying to reconnect
        print(&#39;first disconnect before trying to reconnect&#39;)
        try:
            self.conn.disconnect()
        except:
            print(&#39;disconnect failed&#39;)

How can I force the AMQ server to forget my previous connection?

答案1

得分: 1

你不能让代理断开另一个连接的其他客户端资源。 相反,你应该为客户端和代理配置一个空闲超时值,以便双方都可以处理远程断开连接,而不让套接字检测到关闭。

你还可以为不支持心跳的stomp客户端配置代理传输连接器的心跳宽限期值,请参阅ActiveMQ代理STOMP文档。

STOMP规范概述了心跳的工作原理。

英文:

You cannot for the broker to disconnect other client resources from another connection. Instead you should be configuring the connection with an idle timeout value for both the client and the broker so that both sides handle the remote dropping without the socket detecting the close.

You can also configure the broker transport connector with Heart-Beat Grace Period values for stomp clients that don't advertise heart beats, refer to the ActiveMQ broker STOMP documentation.

The STOMP specification outlines how heart beats work:

huangapple
  • 本文由 发表于 2020年1月6日 15:10:51
  • 转载请务必保留本文链接:https://go.coder-hub.com/59608020.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定