Golang Paho MQTT 丢失消息

huangapple go评论91阅读模式
英文:

golang paho mqtt dropping messages

问题

我有一个运行在防火墙后面的mosquitto代理。所需的端口是开放的,从外部,我使用以下命令检查它是否工作正常:

mosquitto_sub  -h [ip地址] -t "# " -u [用户名] -P [密码] -v


mosquitto_pub  -h [ip地址] -t "topic" -u [用户名] -P [密码] -m "hello"

我可以看到发布的消息。
我想用一个小的Go程序做同样的事情,代码如下:

package main

import (
    "crypto/tls"
    "fmt"
    "time"
    "strconv"
    MQTT "github.com/eclipse/paho.mqtt.golang"
)

func messageHandler(c MQTT.Client, msg MQTT.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

func connLostHandler(c MQTT.Client, err error) {
    fmt.Printf("Connection lost, reason: %v\n", err)
}

func main() {

    opts := MQTT.NewClientOptions()

    skipVerify := true
    opts.AddBroker("tcp://[ip]:1883")
    opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: skipVerify})
    //opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: skipVerify, ClientAuth: tls.NoClientCert})
    opts.SetClientID("myclientid")
    opts.SetAutoReconnect(true)
    //opts.SetCleanSession(true)
    opts.SetDefaultPublishHandler(messageHandler)
    opts.SetConnectionLostHandler(connLostHandler)

    opts.OnConnect = func(c MQTT.Client) {
        fmt.Printf("Client connected\n")
    }

    client := MQTT.NewClient(opts)
    token := client.Connect()
    token.Wait()
    fmt.Println("connected")
    if token.Error() != nil {
        fmt.Println("problems with connection")
        panic(token.Error())
    }
    for i := 0; i < 10; i++ {
        str := "hello: " + strconv.Itoa(i)
        token := client.Publish("topic/temperature", 0, false, str)
        token.Wait()
        if token.Error() != nil {
            fmt.Println("problems with pub")
        }
        fmt.Println("published")
        time.Sleep(1000 * time.Millisecond)
    }
    fmt.Println("finished")
    client.Disconnect(1)
}

我期望的结果是每个带有编号的hello都被发布,但只有少数消息通过。这是我的代理配置文件,我认为它具有使其工作所需的最低配置:

# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example

log_dest file /var/log/mosquitto/mosquitto.log
log_type debug

allow_anonymous true

port 1883
listener 8883

我可以告诉我在使用paho go库时做错了什么,因为我使用python的paho mqtt库编写了类似的脚本,它完美地工作:

import paho.mqtt.client as mqtt
import time

topic1 = "topic/temperature"

mqttClient = mqtt.Client()

mqttClient.connect([ip], 1883, 60)
mqttClient.loop_start()
for i in range(10000):
    mqttClient.publish(topic1, "hello - " + str(i), qos= 0)
    #time.sleep(1)
mqttClient.loop_stop()
mqttClient.disconnect()

我也尝试过更改QOS,但结果是相似的。我错过了什么?

英文:

I've a mosquitto broker that is running in server behind a firewall. Ports needed are open and, from outside, I check that it's working with:

mosquitto_sub  -h [ip address] -t &quot;#&quot; -u [username] -P 
		
输入密码查看隐藏内容

-v mosquitto_pub -h [ip address] -t &quot;topic&quot; -u [username] -P
输入密码查看隐藏内容

-m &quot;hello&quot;

and I can see the message published.
I wanted to do the same with a small go program, the code is the following:

package main

import (
	&quot;crypto/tls&quot;
	&quot;fmt&quot;
	&quot;time&quot;
	&quot;strconv&quot;
	MQTT &quot;github.com/eclipse/paho.mqtt.golang&quot;
)

func messageHandler(c MQTT.Client, msg MQTT.Message) {
	fmt.Printf(&quot;TOPIC: %s\n&quot;, msg.Topic())
	fmt.Printf(&quot;MSG: %s\n&quot;, msg.Payload())
}

func connLostHandler(c MQTT.Client, err error) {
	fmt.Printf(&quot;Connection lost, reason: %v\n&quot;, err)
}

func main() {

	opts := MQTT.NewClientOptions()

	skipVerify := true
	opts.AddBroker(&quot;tcp://[ip]:1883&quot;)
	opts.SetTLSConfig(&amp;tls.Config{InsecureSkipVerify: skipVerify})
	//opts.SetTLSConfig(&amp;tls.Config{InsecureSkipVerify: skipVerify, ClientAuth: tls.NoClientCert})
	opts.SetClientID(&quot;myclientid&quot;)
	opts.SetAutoReconnect(true)
	//opts.SetCleanSession(true)
	opts.SetDefaultPublishHandler(messageHandler)
	opts.SetConnectionLostHandler(connLostHandler)

	opts.OnConnect = func(c MQTT.Client) {
		fmt.Printf(&quot;Client connected\n&quot;)
	}

	client := MQTT.NewClient(opts)
	token := client.Connect()
	token.Wait()
	fmt.Println(&quot;connected&quot;)
	if token.Error() != nil {
		fmt.Println(&quot;problems with connection&quot;)
		panic(token.Error())
	}
	for i := 0; i &lt; 10; i++ {
		str := &quot;hello: &quot; + strconv.Itoa(i)
		token := client.Publish(&quot;topic/temperature&quot;, 0, false, str)
		token.Wait()
		if token.Error() != nil {
			fmt.Println(&quot;problems with pub&quot;)
		}
		fmt.Println(&quot;published&quot;)
		time.Sleep(1000 * time.Millisecond)
	}
	fmt.Println(&quot;finished&quot;)
	client.Disconnect(1)
}

The result I expected was to have every numbered hello published, but only few messages make it through. This is my broker configuration file and I think it has the bare-minimum configurations to make it work:

# Place your local configuration in /etc/mosquitto/conf.d/
#
# A full description of the configuration file is at
# /usr/share/doc/mosquitto/examples/mosquitto.conf.example

log_dest file /var/log/mosquitto/mosquitto.log
log_type debug

allow_anonymous true

port 1883
listener 8883

I can tell I'm doing something wrong with the paho go library because I've done a similar script with python using the python paho mqtt library and it works perfectly:

import paho.mqtt.client as mqtt
import time

topic1 = &quot;topic/temperature&quot;

mqttClient = mqtt.Client()

mqttClient.connect([ip], 1883, 60)
mqttClient.loop_start()
for i in range(10000):
    mqttClient.publish(topic1, &quot;hello - &quot; + str(i), qos= 0)
    #time.sleep(1)
mqttClient.loop_stop()
mqttClient.disconnect()

I've also tried chaging QOS, but the results are similar. What am I missing?

答案1

得分: 1

根据评论,问题是由于另一个客户端使用相同的客户端ID引起的。检查这个问题最简单的方法是阅读代理日志(从客户端的角度来看,连接会突然中断而没有警告)。

MQTT规范中指出:

> 服务器必须将从客户端发送的第二个CONNECT数据包视为协议违规,并断开与客户端的连接[MQTT-3.1.0-2]。有关处理错误的信息,请参阅第4.8节。

这是一个相当常见的问题(在项目自述文件的常见问题部分中首先提到)。

英文:

As per the comments - the issue was due to another client using the same client id. The easiest way to check for this is to read the broker logs (from the clients perspective the connection is just dropped without warning).

The MQTT spec states:

> The Server MUST process a second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2]. See section 4.8 for information about handling errors.

This is a fairly common issue (and is the first thing mentioned in the common problems section of the project readme).

huangapple
  • 本文由 发表于 2021年7月29日 23:30:00
  • 转载请务必保留本文链接:https://go.coder-hub.com/68578819.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定