Golang-Paho MQTT订阅者在重新初始化后断开连接,出现EOF错误。

huangapple go评论114阅读模式
英文:

Golang-Paho MQTT Subscriber keeps disconnecting with error EOF after reinitialization of subscriber

问题

我正在尝试动态更改mqtt客户端处理程序和证书,这导致订阅者在订阅者和发布者连接时出现EOF错误。

这是我尝试做的事情:

1] 我正在初始化订阅者/发布者(使用firstPubHandler、firstConnectHandler和默认证书)

2] 使用发布者向服务器发送注册消息以获取新的证书详细信息

3] 服务器将以证书详细信息的形式回复,该回复将由firstConnectHandler处理,处理主题为***.../id/Certificate***,以下载证书。

4] firstPubHandler将处理服务器的响应并重新初始化发布者/订阅者(使用messagePubHandler、connectHandler和新下载的证书),connectHandler将监听所有主题***/id/+***

除了当我重新初始化订阅者/发布者时,订阅者会不断断开连接并显示"EOF"错误之外,一切都运行良好。

我在这里做错了什么?或者有没有更好的方法来实现这个目标?任何帮助都将不胜感激。

-- 主函数

var opt Params
var publisher mqtt.Client
var subscriber mqtt.Client

func main() {
    InitializeBroker(firstPubHandler, firstConnectHandler)

    // 最终会在"../id/Certificate"主题上触发消息,由firstConnectHandler处理
    PublishRegistrationMessage(publisher)

    c := make(chan os.Signal, 1)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    done := make(chan os.Signal, 1)
    signal.Notify(done, os.Interrupt, syscall.SIGTERM)
    go func() {
        for {
        }
    }()
    <-done
    <-c
    DisconnectBrocker()
}

-- 处理程序

// 第一个处理程序
var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    DownloadCertificates(msg.Payload())
    InitializeBroker(messagePubHandler, connectHandler)
}

var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
    if token := c.Subscribe(opt.SubClientId+"/id/Certificate", 0, firstPubHandler); token.Wait() && token.Error() != nil {
        log.Error(token.Error())
    }
}

// 第二个处理程序
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    ProcessMessage(msg.Payload())
}

var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
    if token := c.Subscribe(opt.SubClientId+"/id/+", 0, messagePubHandler); token.Wait() && token.Error() != nil {
        log.Error(token.Error())
    }
}

// 公共处理程序
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    log.Info(err)
}

-- Mqtt代理初始化

func InitializeBroker(lMessageHandler mqtt.MessageHandler, lConnectHandler mqtt.OnConnectHandler) {
    statusPublishTopic := opt.PubClientId + "/id"
    nodeSubscribeTopic := opt.SubClientId + "/id"

    // 为发布客户端构建选项
    publisherOptions := mqtt.NewClientOptions()
    publisherOptions.AddBroker(opt.Broker)
    publisherOptions.SetClientID(statusPublishTopic)
    publisherOptions.SetDefaultPublishHandler(lMessageHandler)
    publisherOptions.OnConnectionLost = connectLostHandler

    // 为订阅客户端构建选项
    subscriberOptions := mqtt.NewClientOptions()
    subscriberOptions.AddBroker(opt.Broker)
    subscriberOptions.SetClientID(nodeSubscribeTopic)
    subscriberOptions.SetDefaultPublishHandler(lMessageHandler)
    subscriberOptions.OnConnectionLost = connectLostHandler
    subscriberOptions.OnConnect = lConnectHandler

    if !opt.NoTLS {
        tlsconfig, err := NewTLSConfig()
        if err != nil {
            log.Fatalf(err)
        }

        subscriberOptions.SetTLSConfig(tlsconfig)
        publisherOptions.SetTLSConfig(tlsconfig)
    }

    publisher = mqtt.NewClient(publisherOptions)
    if token := publisher.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf(token.Error())
    }

    subscriber = mqtt.NewClient(subscriberOptions)
    if token := subscriber.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf(token.Error())
    }
}

func NewTLSConfig() (config *tls.Config, err error) {
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(rootCert)
    if err != nil {
        return nil, err
    }
    certpool.AppendCertsFromPEM(pemCerts)

    cert, err := tls.LoadX509KeyPair(nodeCertFilePath, pvtKeyFilePath)
    if err != nil {
        return nil, err
    }

    config = &tls.Config{
        RootCAs:      certpool,
        ClientAuth:   tls.NoClientCert,
        ClientCAs:    nil,
        Certificates: []tls.Certificate{cert},
    }
    return config, nil
}
英文:

I'm trying to change mqtt client handlers and certificates dynamically, which causing subscriber EOF, when subscriber and publisher is connected

This is what I'm trying to do,

1] I'm initializing subscriber/publisher (using firstPubHandler, firstConnectHandler and default certificates)

2] Sending registration message on server using publisher to get new certificates details

3] Server will response back with certificate details, that response will be handled by firstConnectHandler on topic .../id/Certificate to download certificates.

4] firstPubHandler will handle response by server and reinitialize publisher/subscriber (using messagePubHandler, connectHandler and newly downloaded certificates), connectHandler will listen for all topics /id/+

Everything works good, except when I reinitialize subscriber/publisher, subscriber keeps disconnecting with error "EOF"

Am I doing anything wrong here? or is there any better way to accomplish this?
Any help is appreciated

-- Main function

var opt Params
var publisher mqtt.Client
var subscriber mqtt.Client

func main() {
	InitializeBroker(firstPubHandler, firstConnectHandler)

    //Ultimately it will trigger message on &quot;.../id/Certificate&quot; topic which will be handled byfirstConnectHandler 
	PublishRegistrationMessage(publisher)

	c := make(chan os.Signal, 1)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	done := make(chan os.Signal, 1)
	signal.Notify(done, os.Interrupt, syscall.SIGTERM)
	go func() {
		for {
		}
	}()
	&lt;-done
	&lt;-c
	DisconnectBrocker()
}

-- Handlers

// First handlers
var firstPubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	DownloadCertificates(msg.Payload())
	InitializeBroker(messagePubHandler, connectHandler)
}

var firstConnectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
	if token := c.Subscribe(opt.SubClientId+&quot;/id/Certificate&quot;, 0, firstPubHandler); token.Wait() &amp;&amp; token.Error() != nil {
		log.Error(token.Error())
	}
}

// Second handlers
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	ProcessMessage(msg.Payload())
}

var connectHandler mqtt.OnConnectHandler = func(c mqtt.Client) {
	if token := c.Subscribe(opt.SubClientId+&quot;/id/+&quot;, 0, messagePubHandler); token.Wait() &amp;&amp; token.Error() != nil {
		log.Error(token.Error())
	}
}

// Common handler
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	log.Info(err)
}

-- Mqtt broker initialization

func InitializeBroker(lMessageHandler mqtt.MessageHandler, lConnectHandler mqtt.OnConnectHandler) {
	statusPublishTopic := opt.PubClientId/id
	nodeSubscribeTopic := opt.SubClientId/id

	// Build the options for the publish client
	publisherOptions := mqtt.NewClientOptions()
	publisherOptions.AddBroker(opt.Broker)
	publisherOptions.SetClientID(statusPublishTopic)
	publisherOptions.SetDefaultPublishHandler(lMessageHandler)
	publisherOptions.OnConnectionLost = connectLostHandler

	// Build the options for the subscribe client
	subscriberOptions := mqtt.NewClientOptions()
	subscriberOptions.AddBroker(opt.Broker)
	subscriberOptions.SetClientID(nodeSubscribeTopic)
	subscriberOptions.SetDefaultPublishHandler(lMessageHandler)
	subscriberOptions.OnConnectionLost = connectLostHandler
	subscriberOptions.OnConnect = lConnectHandler

	if !opt.NoTLS {
		tlsconfig, err := NewTLSConfig()
		if err != nil {
			log.Fatalf(err)
		}

		subscriberOptions.SetTLSConfig(tlsconfig)
		publisherOptions.SetTLSConfig(tlsconfig)
	}

	publisher = mqtt.NewClient(publisherOptions)
	if token := publisher.Connect(); token.Wait() &amp;&amp; token.Error() != nil {
		log.Fatalf(token.Error())
	}

	subscriber = mqtt.NewClient(subscriberOptions)
	if token := subscriber.Connect(); token.Wait() &amp;&amp; token.Error() != nil {
		log.Fatalf(token.Error())
	}
}

func NewTLSConfig() (config *tls.Config, err error) {
	certpool := x509.NewCertPool()
	pemCerts, err := ioutil.ReadFile(rootCert)
	if err != nil {
		return nil, err
	}
	certpool.AppendCertsFromPEM(pemCerts)

	cert, err := tls.LoadX509KeyPair(nodeCertFilePath, pvtKeyFilePath)
	if err != nil {
		return nil, err
	}

	config = &amp;tls.Config{
		RootCAs:      certpool,
		ClientAuth:   tls.NoClientCert,
		ClientCAs:    nil,
		Certificates: []tls.Certificate{cert},
	}
	return config, nil
}

答案1

得分: 2

根据对你的代码的快速审查,这是我认为正在发生的事情(由于你没有提供所有的代码,所以有一些猜测):

  1. main() 调用 InitializeBroker,它创建了两个与代理的连接。默认的发布处理程序设置为 firstPubHandler,在 OnConnect 处理程序中,你订阅了 SubClientId+"/id/Certificate"
  2. 当接收到消息(firstPubHandler)时,你从消息中获取一个证书,并使用它建立了一组新的与代理的连接,使用相同的客户端ID,但不同的 OnConnect/默认发布处理程序。

所以,在第2点之后,你实际上有两组独立的与代理的连接(总共4个连接)。然而,MQTT-3.1.4-2(参见规范)规定:

>如果 ClientId 表示已经连接到服务器的客户端,则服务器必须断开现有的客户端。

因此,当建立第二组连接时,代理将断开第一组连接。这就是你看到的 "EOF" 断开连接。第二组连接仍然保持连接。由于你在第一组和第二组连接中使用相同的 connectLostHandler,你无法在日志中看到哪个连接被终止。

总结一下,我认为你的代码实际上是正常工作的。然而,你可能应该在 firstConnectHandler 中调用 c.Disconnect(),以便在建立第二组连接之前,干净地关闭初始连接。你还需要在某个地方存储 publisher,以便可以同时断开该连接。

注意:我很难理解你为什么要这样做。建立初始连接以检索证书似乎降低了系统的整体安全性。标准的方法是为每个客户端提供一个唯一的证书,然后在代理上使用 ACL(访问控制列表)来应用必要的限制。对于许多代理,你可以在 ACL 中使用证书的公共名称(从而消除了对第二个连接的需求)。

英文:

Based on a quick review of your code this is what I believe is happening (as you have not provided all of the code there is a little guesswork involved):

  1. main() calls InitializeBroker which creates two connections to the broker. The default publish handler is set to firstPubHandler and in the OnConnect handler you subscribe to SubClientId+&quot;/id/Certificate
  2. When a message is received (firstPubHandler) you grab a certificate from the message and use it to establish a new set of connections to the broker using the same client IDs but different OnConnect/default publish handler.

So after point 2 you actually have two separate sets of connections to the broker (4 connections in total). However MQTT-3.1.4-2 (see the spec) states:

>If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client.

So when the second set of connections are established the broker will drop the first set of connections. This is the 'EOF' disconnection you are seeing. The second set of connections will still be up. As you are using the same connectLostHandler for the first and second sets of connections you cannot see which connection is being terminated in the logs.

In summary I believe your code is actually working. However you probably should call c.Disconnect() in firstConnectHandler so that the initial connection is cleanly closed before you establish the second set of connections. You would also need to store the publisher somewhere so you can disconnect that connection at the same time.

Note: I'm struggling to understand why you are doing this. Establishing an initial connection to retrieve a certificate appears to decrease the overall security of your system. The standard apprach would be to give each client a unique certificate and then use ACL's on the broker to apply whatever restrictions are necessary. With many brokers you can use the certificate common-name in ACL's (thus removing the need for a second connection).

huangapple
  • 本文由 发表于 2021年11月2日 05:52:12
  • 转载请务必保留本文链接:https://go.coder-hub.com/69803155.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定