英文:
MQTT paho - no error when failed to publish message
问题
我正在使用paho.mqtt.golang库连接到代理并发布消息。
除了在发布失败时没有错误之外,它的工作正常。
我进行的测试如下:
- 我启动代理
- 我运行我的代码连接到代理。连接后,代码等待输入以继续发布
- 我关闭代理
- 我按回车键以继续发布消息
我期望通过发布函数返回的令牌出现错误if token.Error() != nil {...}
,但是我没有得到任何错误。
这是发布函数的代码:
func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
token := handle.client.Publish(topic, byte(qos), retained, payload)
go func(ctx context.Context) {
log := logger.NewLogFromCtx(ctx)
log.Debug("waiting for transaction to complete...")
_ = token.Done()
log.Debug("transaction Done!", zap.Any("token.Error()", token.Error()))
if token.Error() != nil {
log.Error("failed to publish MQTT message", zap.Error(token.Error()))
}
}(ctx)
log := logger.NewLogFromCtx(ctx)
log.Debug("Msg sent !")
return nil
}
这是日志:
Thu 27 May 17:40:25 CEST INFO logger/logging.go:32 logger initialized in development mode
[DEBUG][client] Connect()
[DEBUG][store] memorystore initialized
[DEBUG][client] about to write new connect msg
[DEBUG][client] socket connected to broker
[DEBUG][client] Using MQTT 3.1.1 protocol
[DEBUG][net] connect started
[DEBUG][net] received connack
[DEBUG][client] startCommsWorkers called
[DEBUG][client] client is connected/reconnected
[DEBUG][net] incoming started
[DEBUG][net] startIncomingComms started
[DEBUG][net] outgoing started
[DEBUG][net] startComms started
[DEBUG][client] startCommsWorkers done
[WARN][store] memorystore wiped
[DEBUG][client] exit startClient
Thu 27 May 17:40:25 CEST INFO mqtt_/client.go:68 successfully connected to MQTT broker {"url": "tcp://127.0.0.1:1883", "in": "41.843622ms"}
press enter to publish...
此时,我已连接到代理,代码正在等待输入;我关闭代理:
[ERROR][client] Connect comms goroutine - error triggered EOF
[DEBUG][client] internalConnLost called
[DEBUG][client] stopCommsWorkers called
[DEBUG][router] matchAndDispatch exiting
[DEBUG][pinger] keepalive stopped
[DEBUG][client] startCommsWorkers output redirector finished
[DEBUG][net] outgoing waiting for an outbound message
[DEBUG][net] outgoing waiting for an outbound message
[DEBUG][net] outgoing comms stopping
[DEBUG][net] startComms closing outError
[DEBUG][client] incoming comms goroutine done
[DEBUG][client] stopCommsWorkers waiting for workers
[DEBUG][client] stopCommsWorkers waiting for comms
[DEBUG][client] stopCommsWorkers done
[DEBUG][client] internalConnLost waiting on workers
[DEBUG][client] internalConnLost workers stopped
[DEBUG][client] internalConnLost complete
[DEBUG]Connection lost: EOF
[DEBUG][client] enter reconnect
[DEBUG][client] about to write new connect msg
[DEBUG][client] socket connected to broker
[DEBUG][client] Using MQTT 3.1.1 protocol
[DEBUG][net] connect started
[ERROR][net] connect got error EOF
[ERROR][client] Connecting to tcp://127.0.0.1:1883 CONNACK was not CONN_ACCEPTED, but rather Connection Error
[DEBUG][client] Reconnect failed, sleeping for 1 seconds: network Error : EOF
连接确实断开了。我按回车键以继续发布:
[DEBUG][client] enter Publish
[DEBUG][client] storing publish message (reconnecting), topic: just/for/test
Thu 27 May 17:40:42 CEST DEBUG mqtt_/client.go:84 Msg sent !
Thu 27 May 17:40:42 CEST DEBUG mqtt_/client.go:76 waiting for transaction to complete...
Thu 27 May 17:40:42 CEST DEBUG mqtt_/client.go:78 transaction Done! {"token.Error()": null}
token.Error()
中没有任何内容。我如何检查发布是否成功?
如果需要更多细节,这是我的完整代码。
连接到代理并发布消息:
type Handler struct {
client MQTT.Client
conf config.Configuration
}
func InitMQTT() {
MQTT.DEBUG = lg.New(os.Stdout, "[DEBUG]", 0)
MQTT.WARN = lg.New(os.Stdout, "[WARN]", 0)
MQTT.CRITICAL = lg.New(os.Stdout, "[CRIT]", 0)
MQTT.ERROR = lg.New(os.Stdout, "[ERROR]", 0)
}
func NewClient(ctx context.Context, conf config.Configuration) (Handler, error) {
start := time.Now()
log := logger.NewLogFromCtx(ctx)
brokerUrl := fmt.Sprintf("tcp://%s:%s", conf.GW_MQTT_BROKER_HOST_IP, conf.GW_MQTT_BROKER_PORT)
hostname, _ := os.Hostname()
clientId := hostname + strconv.Itoa(time.Now().Second())
connOpts := MQTT.NewClientOptions()
connOpts.AddBroker(brokerUrl)
connOpts.SetClientID(clientId)
connOpts.SetCleanSession(true)
handler := Handler{conf: conf}
handler.client = MQTT.NewClient(connOpts)
if token := handler.client.Connect(); token.Wait() && token.Error() != nil {
log.Error("failed to connect to MQTT broker", zap.Error(token.Error()))
return Handler{}, token.Error()
}
log.Info("successfully connected to MQTT broker", zap.String("url", brokerUrl), zap.Duration("in", time.Since(start)))
return handler, nil
}
func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
token := handle.client.Publish(topic, byte(qos), retained, payload)
go func(ctx context.Context) {
log := logger.NewLogFromCtx(ctx)
log.Debug("waiting for transaction to complete...")
_ = token.Done()
log.Debug("transaction Done!", zap.Any("token.Error", token.Error()))
if token.Error() != nil {
log.Error("failed to publish MQTT message", zap.Error(token.Error()))
}
}(ctx)
log := logger.NewLogFromCtx(ctx)
log.Debug("Msg sent !")
return nil
}
这是主要代码:
func main() {
conf := config.GetConfig()
err := logger.SetupLogging(conf.IS_DEV_ENV)
if err != nil {
panic(err)
}
ctx := context.Background()
log := logger.NewLogFromCtx(ctx)
mqtt.InitMQTT()
mqttClient, _ := mqtt.NewClient(ctx, conf)
reader := bufio.NewReader(os.Stdin)
fmt.Print("press enter to publish...")
text, _ := reader.ReadString('\n')
mqttClient.Pub(ctx, "just/for/test", "test", 2, false)
}
英文:
I am using the paho.mqtt.golang library to connect to a broker and publish message.
It's working fine except that I don't have errors when the publish failed.
The test I'm doing is as follow:
- I start the broker
- I run my code to connect to the broker. After connection the code waits for an input to proceed to publish
- I kill the broker
- I press enter to proceed to publish the message
I would expect an error with the token returned by the publish function if token.Error() != nil {...}
but I got none.
Here is the code of the publish function:
func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
token := handle.client.Publish(topic, byte(qos), retained, payload)
go func(ctx context.Context) {
log := logger.NewLogFromCtx(ctx)
log.Debug("waiting for transaction to complete...")
_ = token.Done()
log.Debug("transaction Done!", zap.Any("token.Error()", token.Error()))
if token.Error() != nil {
log.Error("failed to publish MQTT message", zap.Error(token.Error()))
}
}(ctx)
log := logger.NewLogFromCtx(ctx)
log.Debug("Msg sent !")
return nil
}
And here is the log:
Thu 27 May 17:40:25 CEST INFO logger/logging.go:32 logger initialized in development mode
[DEBUG][client] Connect()
[DEBUG][store] memorystore initialized
[DEBUG][client] about to write new connect msg
[DEBUG][client] socket connected to broker
[DEBUG][client] Using MQTT 3.1.1 protocol
[DEBUG][net] connect started
[DEBUG][net] received connack
[DEBUG][client] startCommsWorkers called
[DEBUG][client] client is connected/reconnected
[DEBUG][net] incoming started
[DEBUG][net] startIncomingComms started
[DEBUG][net] outgoing started
[DEBUG][net] startComms started
[DEBUG][client] startCommsWorkers done
[WARN][store] memorystore wiped
[DEBUG][client] exit startClient
Thu 27 May 17:40:25 CEST INFO mqtt_/client.go:68 successfully connected to MQTT broker {"url": "tcp://127.0.0.1:1883", "in": "41.843622ms"}
press enter to publish...
At this point I'm connected to the broker, code is waiting for an input; I kill the broker:
[ERROR][client] Connect comms goroutine - error triggered EOF
[DEBUG][client] internalConnLost called
[DEBUG][client] stopCommsWorkers called
[DEBUG][router] matchAndDispatch exiting
[DEBUG][pinger] keepalive stopped
[DEBUG][client] startCommsWorkers output redirector finished
[DEBUG][net] outgoing waiting for an outbound message
[DEBUG][net] outgoing waiting for an outbound message
[DEBUG][net] outgoing comms stopping
[DEBUG][net] startComms closing outError
[DEBUG][client] incoming comms goroutine done
[DEBUG][client] stopCommsWorkers waiting for workers
[DEBUG][client] stopCommsWorkers waiting for comms
[DEBUG][client] stopCommsWorkers done
[DEBUG][client] internalConnLost waiting on workers
[DEBUG][client] internalConnLost workers stopped
[DEBUG][client] internalConnLost complete
[DEBUG]Connection lost: EOF
[DEBUG][client] enter reconnect
[DEBUG][client] about to write new connect msg
[DEBUG][client] socket connected to broker
[DEBUG][client] Using MQTT 3.1.1 protocol
[DEBUG][net] connect started
[ERROR][net] connect got error EOF
[ERROR][client] Connecting to tcp://127.0.0.1:1883 CONNACK was not CONN_ACCEPTED, but rather Connection Error
[DEBUG][client] Reconnect failed, sleeping for 1 seconds: network Error : EOF
The connection is indeed lost. I press enter to proceed to publish:
[DEBUG][client] enter Publish
[DEBUG][client] storing publish message (reconnecting), topic: just/for/test
Thu 27 May 17:40:42 CEST DEBUG mqtt_/client.go:84 Msg sent !
Thu 27 May 17:40:42 CEST DEBUG mqtt_/client.go:76 waiting for transaction to complete...
Thu 27 May 17:40:42 CEST DEBUG mqtt_/client.go:78 transaction Done! {"token.Error()": null}
There is nothing in the token.Error(). How can I check if the publish was successful ?
Here is my full code if you need more details.
Connect and publish to the broker:
type Handler struct {
client MQTT.Client
conf config.Configuration
}
func InitMQTT() {
MQTT.DEBUG = lg.New(os.Stdout, "[DEBUG]", 0)
MQTT.WARN = lg.New(os.Stdout, "[WARN]", 0)
MQTT.CRITICAL = lg.New(os.Stdout, "[CRIT]", 0)
MQTT.ERROR = lg.New(os.Stdout, "[ERROR]", 0)
}
func NewClient(ctx context.Context, conf config.Configuration) (Handler, error) {
start := time.Now()
log := logger.NewLogFromCtx(ctx)
brokerUrl := fmt.Sprintf("tcp://%s:%s", conf.GW_MQTT_BROKER_HOST_IP, conf.GW_MQTT_BROKER_PORT)
hostname, _ := os.Hostname()
clientId := hostname + strconv.Itoa(time.Now().Second())
connOpts := MQTT.NewClientOptions()
connOpts.AddBroker(brokerUrl)
connOpts.SetClientID(clientId)
connOpts.SetCleanSession(true)
handler := Handler{conf: conf}
handler.client = MQTT.NewClient(connOpts)
if token := handler.client.Connect(); token.Wait() && token.Error() != nil {
log.Error("failed to connect to MQTT broker", zap.Error(token.Error()))
return Handler{}, token.Error()
}
log.Info("successfully connected to MQTT broker", zap.String("url", brokerUrl), zap.Duration("in", time.Since(start)))
return handler, nil
}
func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
token := handle.client.Publish(topic, byte(qos), retained, payload)
go func(ctx context.Context) {
log := logger.NewLogFromCtx(ctx)
log.Debug("waiting for transaction to complete...")
_ = token.Done()
log.Debug("transaction Done!", zap.Any("token.Error", token.Error()))
if token.Error() != nil {
log.Error("failed to publish MQTT message", zap.Error(token.Error()))
}
}(ctx)
log := logger.NewLogFromCtx(ctx)
log.Debug("Msg sent !")
return nil
}
And here is the main:
func main() {
conf := config.GetConfig()
err := logger.SetupLogging(conf.IS_DEV_ENV)
if err != nil {
panic(err)
}
ctx := context.Background()
log := logger.NewLogFromCtx(ctx)
mqtt.InitMQTT()
mqttClient, _ := mqtt.NewClient(ctx, conf)
reader := bufio.NewReader(os.Stdin)
fmt.Print("press enter to publish...")
text, _ := reader.ReadString('\n')
mqttClient.Pub(ctx, "just/for/test", "test", 2, false)
}
答案1
得分: 1
从文档中可以看到:
// Wait将无限期等待Token完成,即将发布发送并从代理确认接收。
Wait() bool
// Done用于在select语句中使用。简单的用例可以使用Wait或WaitTimeout。
Done() <-chan struct{}
所以_ = token.Done()
实际上什么也不做;最简单的等待方式是使用token.Wait()
。如果你想使用token.Done()
,你需要等待返回的通道;例如<- token.Done()
。Done()
存在的原因是在等待多个事件时使事情更简单(例如等待context
或mqtt操作完成)。
英文:
From the docs:
// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker.
Wait() bool
// Done is provided for use in select statements. Simple use cases may
// use Wait or WaitTimeout.
Done() <-chan struct{}
So _ = token.Done()
really does nothing; the simplest wait to wait is to use token.Wait()
. If you want to use token.Done()
you need to wait on the channel returned; e.g. <- token.Done()
. The reason that Done()
exists is to make things simpler when you are waiting on more that one event (e.g. waiting for a context
OR an mqtt operation to complete).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论