Paho MQTT golang适用于多个模块吗?

huangapple go评论74阅读模式
英文:

Paho MQTT golang for multiple modules?

问题

我正在使用golang编写一个用于mqtt模块的微服务。这个模块将同时被不同的函数使用。我正在使用Grpc作为传输层。

我已经创建了一个连接函数,代码如下:

func Connect() {
    deviceID := flag.String("device", "handler-1", "GCP Device-Id")
    bridge := struct {
        host *string
        port *string
    }{
        flag.String("mqtt_host", "", "MQTT Bridge Host"),
        flag.String("mqtt_port", "", "MQTT Bridge Port"),
    }
    projectID := flag.String("project", "", "GCP Project ID")
    registryID := flag.String("registry", "", "Cloud IoT Registry ID (short form)")
    region := flag.String("region", "", "GCP Region")
    certsCA := flag.String("ca_certs", "", "Download https://pki.google.com/roots.pem")
    privateKey := flag.String("private_key", "", "Path to private key file")

    server := fmt.Sprintf("ssl://%v:%v", *bridge.host, *bridge.port)
    topic := struct {
        config    string
        telemetry string
    }{
        config:    fmt.Sprintf("/devices/%v/config", *deviceID),
        telemetry: fmt.Sprintf("/devices/%v/events/topic", *deviceID),
    }
    qos := flag.Int("qos", 0, "The QoS to subscribe to messages at")
    clientid := fmt.Sprintf("projects/%v/locations/%v/registries/%v/devices/%v",
        *projectID,
        *region,
        *registryID,
        *deviceID,
    )
    log.Println("[main] Loading Google's roots")
    certpool := x509.NewCertPool()
    pemCerts, err := ioutil.ReadFile(*certsCA)
    if err == nil {
        certpool.AppendCertsFromPEM(pemCerts)
    }

    log.Println("[main] Creating TLS Config")
    config := &tls.Config{
        RootCAs:            certpool,
        ClientAuth:         tls.NoClientCert,
        ClientCAs:          nil,
        InsecureSkipVerify: true,
        Certificates:       []tls.Certificate{},
        MinVersion:         tls.VersionTLS12,
    }

    flag.Parse()

    connOpts := MQTT.NewClientOptions().
        AddBroker(server).
        SetClientID(clientid).
        SetAutoReconnect(true).
        SetPingTimeout(10 * time.Second).
        SetKeepAlive(10 * time.Second).
        SetDefaultPublishHandler(onMessageReceived).
        SetConnectionLostHandler(connLostHandler).
        SetReconnectingHandler(reconnHandler).
        SetTLSConfig(config)
    connOpts.SetUsername("unused")
    ///JWT Generation Starts from Here
    token := jwt.New(jwt.SigningMethodES256)
    token.Claims = jwt.StandardClaims{
        Audience:  *projectID,
        IssuedAt:  time.Now().Unix(),
        ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
    }
    //Reading key file
    log.Println("[main] Load Private Key")
    keyBytes, err := ioutil.ReadFile(*privateKey)
    if err != nil {
        log.Fatal(err)
    }
    //Parsing key from file
    log.Println("[main] Parse Private Key")
    key, err := jwt.ParseECPrivateKeyFromPEM(keyBytes)
    if err != nil {
        log.Fatal(err)
    }
    //Signing JWT with private key
    log.Println("[main] Sign String")
    tokenString, err := token.SignedString(key)
    if err != nil {
        log.Fatal(err)
    }
    //JWT Generation Ends here

    connOpts.SetPassword(tokenString)
    connOpts.OnConnect = func(c MQTT.Client) {
        if token := c.Subscribe(topic.config, byte(*qos), nil); token.Wait() && token.Error() != nil {
            log.Fatal(token.Error())
        }
    }

    client := MQTT.NewClient(connOpts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        fmt.Printf("Not Connected..Retrying...  %s\n", server)
    } else {
        fmt.Printf("Connected to %s\n", server)
    }
}

我在main.go中的go routine中调用了这个函数:

func main() {
    fmt.Println("Server started at port 5005")
    lis, err := net.Listen("tcp", "0.0.0.0:5005")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }
    //Creating keepAlive channel for mqttt subscribe
    keepAlive := make(chan os.Signal)
    defer close(keepAlive)
    go func() {
        //checking for internet connection
        for !IsOnline() {
            fmt.Println("No Internet Connection..Retrying")
            //looking for internet connection after every 8 seconds
            time.Sleep(8 * time.Second)
        }
        fmt.Println("Internet connected...connecting to mqtt broker")
        repositories.Connect()
        //looking for interupt(Ctrl+C)
        value := <-keepAlive
        //If Ctrl+C is pressed then exit the application
        if value == os.Interrupt {
            fmt.Printf("Exiting the application")
            os.Exit(3)
        }
    }()
    s := grpc.NewServer()
    MqttRepository := repositories.MqttRepository()
    // It creates a new gRPC server instance
    rpc.NewMqttServer(s, MqttRepository)
    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)

    }
}

func IsOnline() bool {
    timeout := time.Duration(5000 * time.Millisecond)
    client := http.Client{
        Timeout: timeout,
    }
    //default url to check connection is http://google.com
    _, err := client.Get("https://google.com")

    if err != nil {
        return false
    }

    return true
}

我在main函数中使用go routine是为了在每次启动时启动应用程序。

现在,我想使用这个MQTT Connect函数来从其他不同的函数中发布数据。

例如,函数A可以这样调用:Connect(payload1,topic1),函数B可以这样调用:Connect(payload2,topic2),然后这个函数应该处理将数据发布到云端。

我应该在这个Connect函数中添加主题和有效载荷,然后从另一个函数中调用它吗?还是有可能将客户端作为全局变量返回或导出,然后在另一个函数或go routine中使用它?如果我的问题听起来很愚蠢,我很抱歉,我不是golang专家。

英文:

I am writing a microservice in golang for a mqtt module. This module will be used by different function at the same time. I am using Grpc as a transport layer.
I have made a connect function which is this..

func Connect() { //it would be Connect(payload1 struct,topic string)
deviceID := flag.String(&quot;device&quot;, &quot;handler-1&quot;, &quot;GCP Device-Id&quot;)
bridge := struct {
host *string
port *string
}{
flag.String(&quot;mqtt_host&quot;, &quot;&quot;, &quot;MQTT Bridge Host&quot;),
flag.String(&quot;mqtt_port&quot;, &quot;&quot;, &quot;MQTT Bridge Port&quot;),
}
projectID := flag.String(&quot;project&quot;, &quot;&quot;, &quot;GCP Project ID&quot;)
registryID := flag.String(&quot;registry&quot;, &quot;&quot;, &quot;Cloud IoT Registry ID (short form)&quot;)
region := flag.String(&quot;region&quot;, &quot;&quot;, &quot;GCP Region&quot;)
certsCA := flag.String(&quot;ca_certs&quot;, &quot;&quot;, &quot;Download https://pki.google.com/roots.pem&quot;)
privateKey := flag.String(&quot;private_key&quot;, &quot;&quot;, &quot;Path to private key file&quot;)
server := fmt.Sprintf(&quot;ssl://%v:%v&quot;, *bridge.host, *bridge.port)
topic := struct {
config    string
telemetry string
}{
config:    fmt.Sprintf(&quot;/devices/%v/config&quot;, *deviceID),
telemetry: fmt.Sprintf(&quot;/devices/%v/events/topic&quot;, *deviceID),
}
qos := flag.Int(&quot;qos&quot;, 0, &quot;The QoS to subscribe to messages at&quot;)
clientid := fmt.Sprintf(&quot;projects/%v/locations/%v/registries/%v/devices/%v&quot;,
*projectID,
*region,
*registryID,
*deviceID,
)
log.Println(&quot;[main] Loading Google&#39;s roots&quot;)
certpool := x509.NewCertPool()
pemCerts, err := ioutil.ReadFile(*certsCA)
if err == nil {
certpool.AppendCertsFromPEM(pemCerts)
}
log.Println(&quot;[main] Creating TLS Config&quot;)
config := &amp;tls.Config{
RootCAs:            certpool,
ClientAuth:         tls.NoClientCert,
ClientCAs:          nil,
InsecureSkipVerify: true,
Certificates:       []tls.Certificate{},
MinVersion:         tls.VersionTLS12,
}
flag.Parse()
connOpts := MQTT.NewClientOptions().
AddBroker(server).
SetClientID(clientid).
SetAutoReconnect(true).
SetPingTimeout(10 * time.Second).
SetKeepAlive(10 * time.Second).
SetDefaultPublishHandler(onMessageReceived).
SetConnectionLostHandler(connLostHandler).
SetReconnectingHandler(reconnHandler).
SetTLSConfig(config)
connOpts.SetUsername(&quot;unused&quot;)
///JWT Generation Starts from Here
token := jwt.New(jwt.SigningMethodES256)
token.Claims = jwt.StandardClaims{
Audience:  *projectID,
IssuedAt:  time.Now().Unix(),
ExpiresAt: time.Now().Add(24 * time.Hour).Unix(),
}
//Reading key file
log.Println(&quot;[main] Load Private Key&quot;)
keyBytes, err := ioutil.ReadFile(*privateKey)
if err != nil {
log.Fatal(err)
}
//Parsing key from file
log.Println(&quot;[main] Parse Private Key&quot;)
key, err := jwt.ParseECPrivateKeyFromPEM(keyBytes)
if err != nil {
log.Fatal(err)
}
//Signing JWT with private key
log.Println(&quot;[main] Sign String&quot;)
tokenString, err := token.SignedString(key)
if err != nil {
log.Fatal(err)
}
//JWT Generation Ends here
connOpts.SetPassword(tokenString)
connOpts.OnConnect = func(c MQTT.Client) {
if token := c.Subscribe(topic.config, byte(*qos), nil); token.Wait() &amp;&amp; token.Error() != nil {
log.Fatal(token.Error())
}
}
client := MQTT.NewClient(connOpts)
if token := client.Connect(); token.Wait() &amp;&amp; token.Error() != nil {
fmt.Printf(&quot;Not Connected..Retrying...  %s\n&quot;, server)
} else {
fmt.Printf(&quot;Connected to %s\n&quot;, server)
}
}

i am calling this function in go routine in my main.go

func main() {
fmt.Println(&quot;Server started at port 5005&quot;)
lis, err := net.Listen(&quot;tcp&quot;, &quot;0.0.0.0:5005&quot;)
if err != nil {
log.Fatalf(&quot;Failed to listen: %v&quot;, err)
}
//Creating keepAlive channel for mqttt subscribe
keepAlive := make(chan os.Signal)
defer close(keepAlive)
go func() {
//checking for internet connection
for !IsOnline() {
fmt.Println(&quot;No Internet Connection..Retrying&quot;)
//looking for internet connection after every 8 seconds
time.Sleep(8 * time.Second)
}
fmt.Println(&quot;Internet connected...connecting to mqtt broker&quot;)
repositories.Connect()
//looking for interupt(Ctrl+C)
value := &lt;-keepAlive
//If Ctrl+C is pressed then exit the application
if value == os.Interrupt {
fmt.Printf(&quot;Exiting the application&quot;)
os.Exit(3)
}
}()
s := grpc.NewServer()
MqttRepository := repositories.MqttRepository()
// It creates a new gRPC server instance
rpc.NewMqttServer(s, MqttRepository)
if err := s.Serve(lis); err != nil {
log.Fatalf(&quot;Failed to serve: %v&quot;, err)
}
}
func IsOnline() bool {
timeout := time.Duration(5000 * time.Millisecond)
client := http.Client{
Timeout: timeout,
}
//default url to check connection is http://google.com
_, err := client.Get(&quot;https://google.com&quot;)
if err != nil {
return false
}
return true
}

I am using the go routine in my main in order for the application to start on every startup.

Now I want to use this MQTT Connect function to publish the data from other different functions.

e.g. Function A can call it like Connect(payload1,topic1) and function B can call it like Connect(payload2,topic2) and then this function should handle the publishing the data into the cloud.

Should I just add the topic and payload in this Connect function and then call it from another function? or is there any possibility that I can return or export the client as a global and then use it in another function or go routine? I am sorry if my question sound very stupid .. I am not a golang expert..

答案1

得分: 2

现在我想使用这个MQTT连接函数来发布来自其他不同函数的数据。

我怀疑我可能误解了你在这里的意图,但是除非你有特定的原因需要建立多个连接,否则最好是连接一次,然后使用该单个连接来发布多个消息。每次发送消息时建立连接存在一些问题,包括:

  • 建立连接需要时间并生成一些网络流量(TLS握手等)。
  • 对于给定的ClientID,只能有一个活动连接(如果建立第二个连接,代理将关闭先前的连接)。
  • 库不会自动断开连接 - 在发布后需要调用Disconnect
  • 由于连接断开(注意CleanSession默认为true),传入的消息可能会丢失。

我应该只需在此连接函数中添加主题和有效载荷,然后从另一个函数调用它吗?

如上所述,首选的方法是连接一次,然后通过一个连接发布多个消息。Client被设计为线程安全的,因此您可以将其传递并从多个go例程中调用Publish。如果您希望库管理连接,您还可以使用AutoConnect选项(您正在使用它),还有一个SetConnectRetry函数。但请注意,如果在尝试发送时链接关闭,QOS 0消息将不会重试。

我建议您的连接函数返回客户端(即func Connect() mqtt.Client),然后使用该客户端来发布消息(您可以将其存储在某个地方或仅传递它;我建议将其添加到您的grpc服务器结构中)。

我猜您可能需要建立多个连接,如果您需要使用特定的clientid连接以发送到所需的主题(但通常您会使服务器连接访问广泛的主题范围)。这将需要一些工作,以确保您不会同时尝试建立多个具有相同客户端ID的连接,并根据您的要求接收传入的消息。

一些额外的注意事项:

  • 如果您使用AutoConnectSetConnectRetry,您可以简化您的代码(只需使用IsConnectionOpen()检查连接是否正常,无需使用IsOnline())。
  • 规范说明:“服务器必须允许长度为1到23个UTF-8编码字节的ClientIds” - 看起来您的ClientID比这个要长(我没有使用过GCP,它可能支持/需要更长的客户端ID)。
  • 在生产环境中,您不应该需要InsecureSkipVerify
英文:

>Now I want to use this MQTT Connect function to publish the data from other different functions.

I suspect I may be misunderstanding what you are trying to do here but unless you have a specific reason for making multiple connections you are best to connect once and then use that single connection to publish multiple messages. There are a few issues with establishing a connection each time you send a message including:

  • Establishing the connection takes time and generates a bit of network traffic (TLS handshake etc).
  • There can only be one active connection for a given ClientID (if you establish a second connection the broker will close the previous connection).
  • The library will not automatically disconnect - you would need to call Disconnect after publishing.
  • Incoming messages are likely to be lost due to the connection being down (note that CleanSession defaults to true).

>Should I just add the topic and payload in this Connect function and then call it from another function?

As mentioned above the preferred approach would be to connect once and then publish multiple messages over the one connection. The Client is designed to be thread safe so you can pass it around and call Publish from multiple go routines. You can also make use of AutoConnect option (which you are) if you want the library to manage the connection (there is also a SetConnectRetry function) but bear in mind that a QOS 0 message will not be retried if the link is down when you attempt to send it.

I would suggest that your connect function return the client (i.e. func Connect() mqtt.Client) and then use that client to publish messages (you can store it somewhere or just pass it around; I'd suggest adding it you your grpc server struct).

I guess it is possible that you may need to establish multiple connections if you need to connect with a specific clientid in order to send to the desired topic (but generally you would give your servers connection access to a wide range of topics). This would require some work to ensure you don't try to establish multiple connections with the same client id simultaneously and, depending upon your requirements, receiving incoming messages.

A few additional notes:

  • If you use AutoConnect and SetConnectRetry you can simplify your code code (and just use IsConnectionOpen() to check if the connection is up removing the need for IsOnline()).
  • The spec states that "The Server MUST allow ClientIds which are between 1 and 23 UTF-8 encoded bytes in length" - it looks like yours is longer than that (I have not used GCP and it may well support/require a longer client ID).
  • You should not need InsecureSkipVerify in production.

huangapple
  • 本文由 发表于 2021年7月22日 22:50:52
  • 转载请务必保留本文链接:https://go.coder-hub.com/68487000.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定