如何检查是否与 MQTT 代理失去连接?

huangapple go评论111阅读模式
英文:

How I can check if I lose connection with mqtt broker?

问题

我正在使用paho包尝试使用Golang构建MQTT订阅客户端,但是当代理断开连接时,我的客户端出现了问题。我认为应该会出现丢失消息的情况appear,但实际上并没有发生。如果我启动代理,MQTT订阅客户端无法接收到MQTT发布客户端发送的消息。

为什么会发生这种情况,我该如何解决?

代码:

package main

import (
	"fmt"
	"os"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var (
	broker                     = "tcp://localhost:1883"
	f      mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Printf("TOPIC: %s\n", msg.Topic())
		fmt.Printf("MSG: %s\n", msg.Payload())
	}
)

func main() {
	// 创建ClientOptions
	opts := mqtt.NewClientOptions().AddBroker(broker)
	opts.SetClientID("group-one")
	opts.SetDefaultPublishHandler(f)

	// 使用上述ClientOptions创建并启动客户端
	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	for {

	}
}
英文:

I am trying with paho pkg to build mqtt sub client by golang,
and I have a problem with my client when the broker disconnect, I think should lost message appear, but this not happen, and if I start the broker,
mqtt sub client can't able to get message sent by mqtt pub client.

why this happens and how I can fix that?

Code

package main

import (
	"fmt"
	"os"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

var (
	broker                     = "tcp://localhost:1883"
	f      mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		fmt.Printf("TOPIC: %s\n", msg.Topic())
		fmt.Printf("MSG: %s\n", msg.Payload())
	}
)

func main() {
	//create a ClientOptions
	opts := mqtt.NewClientOptions().AddBroker(broker)
	opts.SetClientID("group-one")
	opts.SetDefaultPublishHandler(f)

	//create and start a client using the above ClientOptions
	c := mqtt.NewClient(opts)
	if token := c.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	if token := c.Subscribe("test", 0, nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	for {

	}
}

答案1

得分: 8

为了捕获连接丢失事件并在客户端失去连接时执行其他操作,您可以分配一个自定义的OnConnectionLostHandler。如果将AutoReconnect选项设置为true(这是默认行为),客户端将在连接丢失后自动重新连接到代理。请注意,在连接丢失后,代理可能不会保存您的订阅状态/信息,因此您将无法接收任何消息。为了解决这个问题,将主题订阅移动到OnConnect处理程序中。以下是一个示例实现:

package main

import (
    "fmt"
    "os"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func messageHandler(c mqtt.Client, msg mqtt.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

func connLostHandler(c mqtt.Client, err error) {
    fmt.Printf("连接丢失,原因:%v\n", err)

    //执行其他操作...
}

func main() {
    //创建ClientOptions
    opts := mqtt.NewClientOptions().
        AddBroker("tcp://localhost:1883").
        SetClientID("group-one").
        SetDefaultPublishHandler(messageHandler).
        SetConnectionLostHandler(connLostHandler)

    //将OnConnect处理程序设置为匿名函数
    //连接后,订阅主题
    opts.OnConnect = func(c mqtt.Client) {
        fmt.Printf("客户端已连接,订阅:test/topic\n")

        //在此处订阅,否则在连接丢失后,
        //您可能无法接收任何消息
        if token := c.Subscribe("test/topic", 0, nil); token.Wait() && token.Error() != nil {
            fmt.Println(token.Error())
            os.Exit(1)
        }
    }

    //使用上述ClientOptions创建和启动客户端
    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    for {
        //懒惰...
        time.Sleep(500 * time.Millisecond)
    }
}

希望对您有所帮助!

英文:

Assign a custom OnConnectionLostHandler to catch connection lost event, so you can perform additional action whenever the client loses connection. If you set the AutoReconnect option to true (which is the default behavior), the client will automatically reconnects to the broker after connection lost. Please note that after connection lost, your subscription state/info may not being saved by the broker, so you won't be able to receive any message. To deal with this issue, move topic subscription to OnConnect handler. Below is an example implementation:

package main
import (
"fmt"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
func messageHandler(c mqtt.Client, msg mqtt.Message) {
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
func connLostHandler(c mqtt.Client, err error) {
fmt.Printf("Connection lost, reason: %v\n", err)
//Perform additional action...
}
func main() {
//create a ClientOptions
opts := mqtt.NewClientOptions().
AddBroker("tcp://localhost:1883").
SetClientID("group-one").
SetDefaultPublishHandler(messageHandler).
SetConnectionLostHandler(connLostHandler)
//set OnConnect handler as anonymous function
//after connected, subscribe to topic
opts.OnConnect = func(c mqtt.Client) {
fmt.Printf("Client connected, subscribing to: test/topic\n")
//Subscribe here, otherwise after connection lost, 
//you may not receive any message
if token := c.Subscribe("test/topic", 0, nil); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
}
//create and start a client using the above ClientOptions
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
for {
//Lazy...
time.Sleep(500 * time.Millisecond)
}
}

huangapple
  • 本文由 发表于 2017年5月3日 19:58:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/43759445.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定