MQTT paho – 发布消息失败时没有错误

huangapple go评论85阅读模式
英文:

MQTT paho - no error when failed to publish message

问题

我正在使用paho.mqtt.golang库连接到代理并发布消息。

除了在发布失败时没有错误之外,它的工作正常。

我进行的测试如下:

  • 我启动代理
  • 我运行我的代码连接到代理。连接后,代码等待输入以继续发布
  • 我关闭代理
  • 我按回车键以继续发布消息

我期望通过发布函数返回的令牌出现错误if token.Error() != nil {...},但是我没有得到任何错误。

这是发布函数的代码:

func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
    token := handle.client.Publish(topic, byte(qos), retained, payload)
    go func(ctx context.Context) {
        log := logger.NewLogFromCtx(ctx)
        log.Debug("waiting for transaction to complete...")
        _ = token.Done()
        log.Debug("transaction Done!", zap.Any("token.Error()", token.Error()))
        if token.Error() != nil {
            log.Error("failed to publish MQTT message", zap.Error(token.Error()))
        }
    }(ctx)
    log := logger.NewLogFromCtx(ctx)
    log.Debug("Msg sent !")
    return nil
}

这是日志:

Thu 27 May 17:40:25 CEST        INFO    logger/logging.go:32    logger initialized in development mode
[DEBUG][client]   Connect()
[DEBUG][store]    memorystore initialized
[DEBUG][client]   about to write new connect msg
[DEBUG][client]   socket connected to broker
[DEBUG][client]   Using MQTT 3.1.1 protocol
[DEBUG][net]      connect started
[DEBUG][net]      received connack
[DEBUG][client]   startCommsWorkers called
[DEBUG][client]   client is connected/reconnected
[DEBUG][net]      incoming started
[DEBUG][net]      startIncomingComms started
[DEBUG][net]      outgoing started
[DEBUG][net]      startComms started
[DEBUG][client]   startCommsWorkers done
[WARN][store]    memorystore wiped
[DEBUG][client]   exit startClient
Thu 27 May 17:40:25 CEST        INFO    mqtt_/client.go:68      successfully connected to MQTT broker   {"url": "tcp://127.0.0.1:1883", "in": "41.843622ms"}
press enter to publish...

此时,我已连接到代理,代码正在等待输入;我关闭代理:

[ERROR][client]   Connect comms goroutine - error triggered EOF
[DEBUG][client]   internalConnLost called
[DEBUG][client]   stopCommsWorkers called
[DEBUG][router]   matchAndDispatch exiting
[DEBUG][pinger]   keepalive stopped
[DEBUG][client]   startCommsWorkers output redirector finished
[DEBUG][net]      outgoing waiting for an outbound message
[DEBUG][net]      outgoing waiting for an outbound message
[DEBUG][net]      outgoing comms stopping
[DEBUG][net]      startComms closing outError
[DEBUG][client]   incoming comms goroutine done
[DEBUG][client]   stopCommsWorkers waiting for workers
[DEBUG][client]   stopCommsWorkers waiting for comms
[DEBUG][client]   stopCommsWorkers done
[DEBUG][client]   internalConnLost waiting on workers
[DEBUG][client]   internalConnLost workers stopped
[DEBUG][client]   internalConnLost complete
[DEBUG]Connection lost: EOF
[DEBUG][client]   enter reconnect
[DEBUG][client]   about to write new connect msg
[DEBUG][client]   socket connected to broker
[DEBUG][client]   Using MQTT 3.1.1 protocol
[DEBUG][net]      connect started
[ERROR][net]      connect got error EOF
[ERROR][client]   Connecting to tcp://127.0.0.1:1883 CONNACK was not CONN_ACCEPTED, but rather Connection Error
[DEBUG][client]   Reconnect failed, sleeping for 1 seconds: network Error : EOF

连接确实断开了。我按回车键以继续发布:

[DEBUG][client]   enter Publish
[DEBUG][client]   storing publish message (reconnecting), topic: just/for/test
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:84      Msg sent !
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:76      waiting for transaction to complete...
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:78      transaction Done!       {"token.Error()": null}

token.Error()中没有任何内容。我如何检查发布是否成功?


如果需要更多细节,这是我的完整代码。

连接到代理并发布消息:

type Handler struct {
    client MQTT.Client
    conf   config.Configuration
}

func InitMQTT() {
    MQTT.DEBUG = lg.New(os.Stdout, "[DEBUG]", 0)
    MQTT.WARN = lg.New(os.Stdout, "[WARN]", 0)
    MQTT.CRITICAL = lg.New(os.Stdout, "[CRIT]", 0)
    MQTT.ERROR = lg.New(os.Stdout, "[ERROR]", 0)
}

func NewClient(ctx context.Context, conf config.Configuration) (Handler, error) {
    start := time.Now()
    log := logger.NewLogFromCtx(ctx)
    brokerUrl := fmt.Sprintf("tcp://%s:%s", conf.GW_MQTT_BROKER_HOST_IP, conf.GW_MQTT_BROKER_PORT)
    hostname, _ := os.Hostname()
    clientId := hostname + strconv.Itoa(time.Now().Second())
    connOpts := MQTT.NewClientOptions()
    connOpts.AddBroker(brokerUrl)
    connOpts.SetClientID(clientId)
    connOpts.SetCleanSession(true)
    handler := Handler{conf: conf}
    handler.client = MQTT.NewClient(connOpts)
    if token := handler.client.Connect(); token.Wait() && token.Error() != nil {
        log.Error("failed to connect to MQTT broker", zap.Error(token.Error()))
        return Handler{}, token.Error()
    }
    log.Info("successfully connected to MQTT broker", zap.String("url", brokerUrl), zap.Duration("in", time.Since(start)))
    return handler, nil
}

func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
    token := handle.client.Publish(topic, byte(qos), retained, payload)
    go func(ctx context.Context) {
        log := logger.NewLogFromCtx(ctx)
        log.Debug("waiting for transaction to complete...")
        _ = token.Done()
        log.Debug("transaction Done!", zap.Any("token.Error", token.Error()))
        if token.Error() != nil {
            log.Error("failed to publish MQTT message", zap.Error(token.Error()))
        }
    }(ctx)
    log := logger.NewLogFromCtx(ctx)
    log.Debug("Msg sent !")
    return nil
}

这是主要代码:

func main() {
    conf := config.GetConfig()
    err := logger.SetupLogging(conf.IS_DEV_ENV)
    if err != nil {
        panic(err)
    }
    ctx := context.Background()
    log := logger.NewLogFromCtx(ctx)

    mqtt.InitMQTT()
    mqttClient, _ := mqtt.NewClient(ctx, conf)

    reader := bufio.NewReader(os.Stdin)
    fmt.Print("press enter to publish...")
    text, _ := reader.ReadString('\n')
    mqttClient.Pub(ctx, "just/for/test", "test", 2, false)
}
英文:

I am using the paho.mqtt.golang library to connect to a broker and publish message.

It's working fine except that I don't have errors when the publish failed.

The test I'm doing is as follow:

  • I start the broker
  • I run my code to connect to the broker. After connection the code waits for an input to proceed to publish
  • I kill the broker
  • I press enter to proceed to publish the message

I would expect an error with the token returned by the publish function if token.Error() != nil {...} but I got none.

Here is the code of the publish function:

func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
	token := handle.client.Publish(topic, byte(qos), retained, payload)
	go func(ctx context.Context) {
		log := logger.NewLogFromCtx(ctx)
		log.Debug("waiting for transaction to complete...")
		_ = token.Done()
		log.Debug("transaction Done!", zap.Any("token.Error()", token.Error()))
		if token.Error() != nil {
			log.Error("failed to publish MQTT message", zap.Error(token.Error()))
		}
	}(ctx)
	log := logger.NewLogFromCtx(ctx)
	log.Debug("Msg sent !")
	return nil
}

And here is the log:

Thu 27 May 17:40:25 CEST        INFO    logger/logging.go:32    logger initialized in development mode
[DEBUG][client]   Connect()
[DEBUG][store]    memorystore initialized
[DEBUG][client]   about to write new connect msg
[DEBUG][client]   socket connected to broker
[DEBUG][client]   Using MQTT 3.1.1 protocol
[DEBUG][net]      connect started
[DEBUG][net]      received connack
[DEBUG][client]   startCommsWorkers called
[DEBUG][client]   client is connected/reconnected
[DEBUG][net]      incoming started
[DEBUG][net]      startIncomingComms started
[DEBUG][net]      outgoing started
[DEBUG][net]      startComms started
[DEBUG][client]   startCommsWorkers done
[WARN][store]    memorystore wiped
[DEBUG][client]   exit startClient
Thu 27 May 17:40:25 CEST        INFO    mqtt_/client.go:68      successfully connected to MQTT broker   {"url": "tcp://127.0.0.1:1883", "in": "41.843622ms"}
press enter to publish...

At this point I'm connected to the broker, code is waiting for an input; I kill the broker:

[ERROR][client]   Connect comms goroutine - error triggered EOF
[DEBUG][client]   internalConnLost called
[DEBUG][client]   stopCommsWorkers called
[DEBUG][router]   matchAndDispatch exiting
[DEBUG][pinger]   keepalive stopped
[DEBUG][client]   startCommsWorkers output redirector finished
[DEBUG][net]      outgoing waiting for an outbound message
[DEBUG][net]      outgoing waiting for an outbound message
[DEBUG][net]      outgoing comms stopping
[DEBUG][net]      startComms closing outError
[DEBUG][client]   incoming comms goroutine done
[DEBUG][client]   stopCommsWorkers waiting for workers
[DEBUG][client]   stopCommsWorkers waiting for comms
[DEBUG][client]   stopCommsWorkers done
[DEBUG][client]   internalConnLost waiting on workers
[DEBUG][client]   internalConnLost workers stopped
[DEBUG][client]   internalConnLost complete
[DEBUG]Connection lost: EOF
[DEBUG][client]   enter reconnect
[DEBUG][client]   about to write new connect msg
[DEBUG][client]   socket connected to broker
[DEBUG][client]   Using MQTT 3.1.1 protocol
[DEBUG][net]      connect started
[ERROR][net]      connect got error EOF
[ERROR][client]   Connecting to tcp://127.0.0.1:1883 CONNACK was not CONN_ACCEPTED, but rather Connection Error
[DEBUG][client]   Reconnect failed, sleeping for 1 seconds: network Error : EOF

The connection is indeed lost. I press enter to proceed to publish:

[DEBUG][client]   enter Publish
[DEBUG][client]   storing publish message (reconnecting), topic: just/for/test
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:84      Msg sent !
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:76      waiting for transaction to complete...
Thu 27 May 17:40:42 CEST        DEBUG   mqtt_/client.go:78      transaction Done!       {"token.Error()": null}

There is nothing in the token.Error(). How can I check if the publish was successful ?


Here is my full code if you need more details.

Connect and publish to the broker:

type Handler struct {
	client MQTT.Client
	conf   config.Configuration
}

func InitMQTT() {
	MQTT.DEBUG = lg.New(os.Stdout, "[DEBUG]", 0)
	MQTT.WARN = lg.New(os.Stdout, "[WARN]", 0)
	MQTT.CRITICAL = lg.New(os.Stdout, "[CRIT]", 0)
	MQTT.ERROR = lg.New(os.Stdout, "[ERROR]", 0)
}

func NewClient(ctx context.Context, conf config.Configuration) (Handler, error) {
	start := time.Now()
	log := logger.NewLogFromCtx(ctx)
	brokerUrl := fmt.Sprintf("tcp://%s:%s", conf.GW_MQTT_BROKER_HOST_IP, conf.GW_MQTT_BROKER_PORT)
	hostname, _ := os.Hostname()
	clientId := hostname + strconv.Itoa(time.Now().Second())
	connOpts := MQTT.NewClientOptions()
	connOpts.AddBroker(brokerUrl)
	connOpts.SetClientID(clientId)
	connOpts.SetCleanSession(true)
	handler := Handler{conf: conf}
	handler.client = MQTT.NewClient(connOpts)
	if token := handler.client.Connect(); token.Wait() && token.Error() != nil {
		log.Error("failed to connect to MQTT broker", zap.Error(token.Error()))
		return Handler{}, token.Error()
	}
	log.Info("successfully connected to MQTT broker", zap.String("url", brokerUrl), zap.Duration("in", time.Since(start)))
	return handler, nil
}

func (handle Handler) Pub(ctx context.Context, topic, payload string, qos int, retained bool) error {
	token := handle.client.Publish(topic, byte(qos), retained, payload)
	go func(ctx context.Context) {
		log := logger.NewLogFromCtx(ctx)
		log.Debug("waiting for transaction to complete...")
		_ = token.Done()
		log.Debug("transaction Done!", zap.Any("token.Error", token.Error()))
		if token.Error() != nil {
			log.Error("failed to publish MQTT message", zap.Error(token.Error()))
		}
	}(ctx)
	log := logger.NewLogFromCtx(ctx)
	log.Debug("Msg sent !")
	return nil
}

And here is the main:

func main() {
	conf := config.GetConfig()
	err := logger.SetupLogging(conf.IS_DEV_ENV)
	if err != nil {
		panic(err)
	}
	ctx := context.Background()
	log := logger.NewLogFromCtx(ctx)

	mqtt.InitMQTT()
	mqttClient, _ := mqtt.NewClient(ctx, conf)

	reader := bufio.NewReader(os.Stdin)
	fmt.Print("press enter to publish...")
	text, _ := reader.ReadString('\n')
	mqttClient.Pub(ctx, "just/for/test", "test", 2, false)
}

答案1

得分: 1

文档中可以看到:

// Wait将无限期等待Token完成,即将发布发送并从代理确认接收。
Wait() bool
// Done用于在select语句中使用。简单的用例可以使用Wait或WaitTimeout。
Done() <-chan struct{}

所以_ = token.Done()实际上什么也不做;最简单的等待方式是使用token.Wait()。如果你想使用token.Done(),你需要等待返回的通道;例如<- token.Done()Done()存在的原因是在等待多个事件时使事情更简单(例如等待context或mqtt操作完成)。

英文:

From the docs:

// Wait will wait indefinitely for the Token to complete, ie the Publish
// to be sent and confirmed receipt from the broker.
Wait() bool
// Done is provided for use in select statements. Simple use cases may
// use Wait or WaitTimeout.
Done() &lt;-chan struct{}

So _ = token.Done() really does nothing; the simplest wait to wait is to use token.Wait(). If you want to use token.Done() you need to wait on the channel returned; e.g. &lt;- token.Done(). The reason that Done() exists is to make things simpler when you are waiting on more that one event (e.g. waiting for a context OR an mqtt operation to complete).

huangapple
  • 本文由 发表于 2021年5月27日 23:59:32
  • 转载请务必保留本文链接:https://go.coder-hub.com/67725842.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定