英文:
Eclipse Paho MQTT Golang publish subscribe to AWS IOT returns EOF error when running as separate process on same machine
问题
我正在使用Go v1.17.2在Linux上连接到AWS MQTT,使用paho.mqtt.golang库v1.4.1。我基于EMQX的示例代码,使用AWS IOT Core提供的证书来编写我的代码。
当我在与上述EMQX示例相同的Go程序中运行发布和订阅代码时,一切正常,我可以看到以下输出:
2022/08/11 19:47:42 已连接
已订阅主题:topic_1
2022/08/12 13:47:42 收到消息:来自主题 topic_1 的消息 0
2022/08/11 19:47:43 收到消息:来自主题 topic_1 的消息 1
...
2022/08/11 19:47:51 收到消息:来自主题 topic_1 的消息 9
然而,如果我在一个Go程序中运行发布代码(使用go run),并在同一台机器上同时运行一个独立的订阅程序,那么每条消息的订阅都会失败,并显示EOF错误:
2022/08/11 19:54:50 已连接
2022/08/11 19:54:54 连接丢失:EOF
...
2022/08/11 19:54:59 已连接
2022/08/11 19:54:59 连接丢失:EOF
这是发布代码:
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalln(token.Error())
}
publish(client, *topic)
client.Disconnect(250)
...
func publish(client mqtt.Client, topic string) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("消息 %d", i)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
以及订阅代码:
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalln(token.Error())
}
sub(client, *topic)
time.Sleep(11 * time.Second)
client.Disconnect(250)
...
func sub(client mqtt.Client, topic string) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
log.Printf("已订阅主题:%s", topic)
}
如果我只运行订阅代码,并使用AWS IOT控制台发布一些消息,也是可以正常工作的:
2022/08/11 20:25:27 已连接
2022/08/11 20:25:27 已订阅主题:topic_1
2022/08/11 20:25:29 收到消息:{
"message": "来自AWS IoT控制台的问候"
},来自主题:topic_1
2022/08/11 20:25:30 收到消息:{
"message": "来自AWS IoT控制台的问候"
},来自主题:topic_1
看起来问题与同时打开两个连接到AWS IOT/MQTT有关。
这是我的MQTT客户端选项配置:
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tls://%s:%d", *host, 8883))
opts.SetClientID("basicPubSub")
opts.SetTLSConfig(tlsConfig)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
英文:
I am using Go v1.17.2 on Linux to connect to AWS MQTT using the paho.mqtt.golang library v1.4.1. I am basing my code on this sample from EMQX using TLS with the certificates provided by AWS IOT Core.
When I run the publish and subscribe code in the same go program as per the above example from EMQX everything works and I can see the following output:
2022/08/11 19:47:42 Connected
Subscribed to topic: topic_1
2022/08/12 13:47:42 Received message: Message 0 from topic: topic_1
2022/08/11 19:47:43 Received message: Message 1 from topic: topic_1
...
2022/08/11 19:47:51 Received message: Message 9 from topic: topic_1
However, if I run the publish code in one go program (using go run) and run the subscribe in a separate program on the same machine at the same time, then the subscribe fails on each message with an EOF error:
2022/08/11 19:54:50 Connected
2022/08/11 19:54:54 Connect lost: EOF
...
2022/08/11 19:54:59 Connected
2022/08/11 19:54:59 Connect lost: EOF
Here is the publish code
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalln(token.Error())
}
publish(client, *topic)
client.Disconnect(250)
...
func publish(client mqtt.Client, topic string) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish(topic, 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
and the subscribe code
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
log.Fatalln(token.Error())
}
sub(client, *topic)
time.Sleep(11 * time.Second)
client.Disconnect(250)
...
func sub(client mqtt.Client, topic string) {
token := client.Subscribe(topic, 1, nil)
token.Wait()
log.Printf("Subscribed to topic: %s", topic)
}
If I run the subscribe code only, and use the AWS IOT console to publish some message that also works
2022/08/11 20:25:27 Connected
2022/08/11 20:25:27 Subscribed to topic: topic_1
2022/08/11 20:25:29 Received message: {
"message": "Hello from AWS IoT console"
} from topic: topic_1
2022/08/11 20:25:30 Received message: {
"message": "Hello from AWS IoT console"
} from topic: topic_1
It looks like the issue is to do with having two connections open to AWS IOT/MQTT at the same time.
Here is my MQTT client options config
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tls://%s:%d", *host, 8883))
opts.SetClientID("basicPubSub")
opts.SetTLSConfig(tlsConfig)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
答案1
得分: 1
这是因为我在发布和订阅代码中使用了相同的客户端ID。
Eclipse Paho文档通过一个链接解释了这一点,链接指向MQTT规范。
> 如果验证成功,服务器执行以下步骤。
>
>
> 1.如果ClientId表示已连接到服务器的客户端,则服务器必须断开现有的客户端连接
> [MQTT-3.1.4-2]。
我为每个程序使用了不同的客户端ID,它可以正常工作。
注意: AWS IOT策略必须允许每个客户端ID连接(允许iot:Connect操作)。
英文:
This was due to me using the same client ID for the publish and subscribe code.
The Eclipse Paho docs explain this via a link to the MQTT specifications
> If validation is successful the Server performs the following steps.
>
>
> 1.If the ClientId represents a Client already connected to the Server then the Server MUST disconnect the existing Client
> [MQTT-3.1.4-2].
I used a different client ID for each program and it works.
Note: The AWS IOT policy must allow for each client ID to connect (allow iot:Connect action)
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论