Eclipse Paho MQTT Golang publish subscribe to AWS IOT returns EOF error when running as separate process on same machine

huangapple go评论88阅读模式
英文:

Eclipse Paho MQTT Golang publish subscribe to AWS IOT returns EOF error when running as separate process on same machine

问题

我正在使用Go v1.17.2在Linux上连接到AWS MQTT,使用paho.mqtt.golang库v1.4.1。我基于EMQX的示例代码,使用AWS IOT Core提供的证书来编写我的代码。

当我在与上述EMQX示例相同的Go程序中运行发布和订阅代码时,一切正常,我可以看到以下输出:

2022/08/11 19:47:42 已连接
已订阅主题:topic_1
2022/08/12 13:47:42 收到消息:来自主题 topic_1 的消息 0
2022/08/11 19:47:43 收到消息:来自主题 topic_1 的消息 1
...
2022/08/11 19:47:51 收到消息:来自主题 topic_1 的消息 9

然而,如果我在一个Go程序中运行发布代码(使用go run),并在同一台机器上同时运行一个独立的订阅程序,那么每条消息的订阅都会失败,并显示EOF错误:

2022/08/11 19:54:50 已连接
2022/08/11 19:54:54 连接丢失:EOF
...
2022/08/11 19:54:59 已连接
2022/08/11 19:54:59 连接丢失:EOF

这是发布代码:

client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
    log.Fatalln(token.Error())
}

publish(client, *topic)
client.Disconnect(250)
...

func publish(client mqtt.Client, topic string) {
    num := 10
    for i := 0; i < num; i++ {
       text := fmt.Sprintf("消息 %d", i)
       token := client.Publish(topic, 0, false, text)
       token.Wait()
       time.Sleep(time.Second)
	}
}

以及订阅代码:

client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
   log.Fatalln(token.Error())
}

sub(client, *topic)
time.Sleep(11 * time.Second)
client.Disconnect(250)
...

func sub(client mqtt.Client, topic string) {
	token := client.Subscribe(topic, 1, nil)
	token.Wait()
	log.Printf("已订阅主题:%s", topic)
}

如果我只运行订阅代码,并使用AWS IOT控制台发布一些消息,也是可以正常工作的:

2022/08/11 20:25:27 已连接
2022/08/11 20:25:27 已订阅主题:topic_1
2022/08/11 20:25:29 收到消息:{
  "message": "来自AWS IoT控制台的问候"
},来自主题:topic_1
2022/08/11 20:25:30 收到消息:{
  "message": "来自AWS IoT控制台的问候"
},来自主题:topic_1

看起来问题与同时打开两个连接到AWS IOT/MQTT有关。

这是我的MQTT客户端选项配置:

opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tls://%s:%d", *host, 8883))
opts.SetClientID("basicPubSub")
opts.SetTLSConfig(tlsConfig)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
英文:

I am using Go v1.17.2 on Linux to connect to AWS MQTT using the paho.mqtt.golang library v1.4.1. I am basing my code on this sample from EMQX using TLS with the certificates provided by AWS IOT Core.

When I run the publish and subscribe code in the same go program as per the above example from EMQX everything works and I can see the following output:

2022/08/11 19:47:42 Connected
Subscribed to topic: topic_1
2022/08/12 13:47:42 Received message: Message 0 from topic: topic_1
2022/08/11 19:47:43 Received message: Message 1 from topic: topic_1
...
2022/08/11 19:47:51 Received message: Message 9 from topic: topic_1

However, if I run the publish code in one go program (using go run) and run the subscribe in a separate program on the same machine at the same time, then the subscribe fails on each message with an EOF error:

2022/08/11 19:54:50 Connected
2022/08/11 19:54:54 Connect lost: EOF
...
2022/08/11 19:54:59 Connected
2022/08/11 19:54:59 Connect lost: EOF

Here is the publish code

client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() &amp;&amp; token.Error() != nil {
    log.Fatalln(token.Error())
}

publish(client, *topic)
client.Disconnect(250)
...

func publish(client mqtt.Client, topic string) {
    num := 10
    for i := 0; i &lt; num; i++ {
       text := fmt.Sprintf(&quot;Message %d&quot;, i)
       token := client.Publish(topic, 0, false, text)
       token.Wait()
       time.Sleep(time.Second)
	}
}

and the subscribe code

client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() &amp;&amp; token.Error() != nil {
   log.Fatalln(token.Error())
}

sub(client, *topic)
time.Sleep(11 * time.Second)
client.Disconnect(250)

...

func sub(client mqtt.Client, topic string) {
	token := client.Subscribe(topic, 1, nil)
	token.Wait()
	log.Printf(&quot;Subscribed to topic: %s&quot;, topic)
}

If I run the subscribe code only, and use the AWS IOT console to publish some message that also works

2022/08/11 20:25:27 Connected
2022/08/11 20:25:27 Subscribed to topic: topic_1
2022/08/11 20:25:29 Received message: {
  &quot;message&quot;: &quot;Hello from AWS IoT console&quot;
} from topic: topic_1
2022/08/11 20:25:30 Received message: {
  &quot;message&quot;: &quot;Hello from AWS IoT console&quot;
} from topic: topic_1

It looks like the issue is to do with having two connections open to AWS IOT/MQTT at the same time.

Here is my MQTT client options config

opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf(&quot;tls://%s:%d&quot;, *host, 8883))
opts.SetClientID(&quot;basicPubSub&quot;)
opts.SetTLSConfig(tlsConfig)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler

答案1

得分: 1

这是因为我在发布和订阅代码中使用了相同的客户端ID。

Eclipse Paho文档通过一个链接解释了这一点,链接指向MQTT规范

> 如果验证成功,服务器执行以下步骤。
>
>
> 1.如果ClientId表示已连接到服务器的客户端,则服务器必须断开现有的客户端连接
> [MQTT-3.1.4-2]。

我为每个程序使用了不同的客户端ID,它可以正常工作。
注意: AWS IOT策略必须允许每个客户端ID连接(允许iot:Connect操作)。

英文:

This was due to me using the same client ID for the publish and subscribe code.

The Eclipse Paho docs explain this via a link to the MQTT specifications

> If validation is successful the Server performs the following steps.
>
>
> 1.If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client
> [MQTT-3.1.4-2].

I used a different client ID for each program and it works.
Note: The AWS IOT policy must allow for each client ID to connect (allow iot:Connect action)

huangapple
  • 本文由 发表于 2022年8月12日 21:22:18
  • 转载请务必保留本文链接:https://go.coder-hub.com/73334868.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定