Paho GO客户端无法连接到代理服务器。

huangapple go评论78阅读模式
英文:

Paho GO client unable to connect to Broker

问题

我正在尝试连接到一个使用基于证书的身份验证的Mosquitto代理。

下面是Mosquitto的配置片段:

listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key

require_certificate true

这个配置是有效的,因为我可以在远程机器上使用以下命令进行发布/订阅:

mosquitto_pub -t "/test" -m "test" --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com

mosquitto_sub -t "/test" --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com

或者通过以下方式打开SSL套接字:

openssl s_client -connect server.com:8883 -CAfile ca/ca.crt -cert certs/client.crt -key certs/client.key

但是,当我尝试使用GO Paho客户端时,它不起作用:

配置加载失败,因为在configureMqttConnection()中无法加载证书(这一行tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)),并且在connect(backOff int)中失败,因为客户端无法在握手中发送证书。

我认为问题可能是LoadX509KeyPair期望的证书不是我生成的证书。我的证书上写着“BEGIN TRUSTED CERTIFICATE”,并且没有被信任和认证。如果是这样,我不确定如何创建正确的证书。

我正在使用以下GO代码(代码以GO start()开头):

package main

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io/ioutil"
	"strings"
	"time"

	MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
	"linksmart.eu/lc/core/catalog"
	"linksmart.eu/lc/core/catalog/service"
)

// MQTTConnector提供MQTT协议连接
type MQTTConnector struct {
	config        *MqttProtocol
	clientID      string
	client        *MQTT.Client
	pubCh         chan AgentResponse
	subCh         chan<- DataRequest
	pubTopics     map[string]string
	subTopicsRvsd map[string]string // 存储SUB主题的“反向”以优化消息处理中的查找
}

const defaultQoS = 1

func (c *MQTTConnector) start() {
	logger.Println("MQTTConnector.start()")

	if c.config.Discover && c.config.URL == "" {
		err := c.discoverBrokerEndpoint()
		if err != nil {
			logger.Println("MQTTConnector.start() failed to start publisher:", err.Error())
			return
		}
	}

	// 配置mqtt客户端
	c.configureMqttConnection()

	// 启动连接例程
	logger.Printf("MQTTConnector.start() 将连接到代理 %v\n", c.config.URL)
	go c.connect(0)

	// 启动发布者例程
	go c.publisher()
}

// 从pubCh读取传出的消息并将其发布到代理
func (c *MQTTConnector) publisher() {
	for resp := range c.pubCh {
		if !c.client.IsConnected() {
			logger.Println("MQTTConnector.publisher() 在未连接到代理时收到数据。**已丢弃**")
			continue
		}
		if resp.IsError {
			logger.Println("MQTTConnector.publisher() 来自代理管理器的数据错误:", string(resp.Payload))
			continue
		}
		topic := c.pubTopics[resp.ResourceId]
		c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
		// 我们不等待代理的确认(避免在此处阻塞!)
		//<-r
		logger.Println("MQTTConnector.publisher() 发布到", topic)
	}
}


func (c *MQTTConnector) stop() {
	logger.Println("MQTTConnector.stop()")
	if c.client != nil && c.client.IsConnected() {
		c.client.Disconnect(500)
	}
}

func (c *MQTTConnector) connect(backOff int) {
	if c.client == nil {
		logger.Printf("MQTTConnector.connect() 未配置客户端")
		return
	}
	for {
		logger.Printf("MQTTConnector.connect() 连接到代理 %v,backOff: %v 秒\n", c.config.URL, backOff)
		time.Sleep(time.Duration(backOff) * time.Second)
		if c.client.IsConnected() {
			break
		}
		token := c.client.Connect()
		token.Wait()
		if token.Error() == nil {
			break
		}
		logger.Printf("MQTTConnector.connect() 连接失败: %v\n", token.Error().Error())
		if backOff == 0 {
			backOff = 10
		} else if backOff <= 600 {
			backOff *= 2
		}
	}

	logger.Printf("MQTTConnector.connect() 连接到代理 %v", c.config.URL)
	return
}

func (c *MQTTConnector) onConnected(client *MQTT.Client) {
	// 如果至少有一个资源在MQTT协议中配置了SUB,则订阅
	if len(c.subTopicsRvsd) > 0 {
		logger.Println("MQTTPulbisher.onConnected() 将(重新)订阅所有配置的SUB主题")

		topicFilters := make(map[string]byte)
		for topic, _ := range c.subTopicsRvsd {
			logger.Printf("MQTTPulbisher.onConnected() 将订阅主题 %s", topic)
			topicFilters[topic] = defaultQoS
		}
		client.SubscribeMultiple(topicFilters, c.messageHandler)
	} else {
		logger.Println("MQTTPulbisher.onConnected() 没有配置SUB的资源")
	}
}

func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
	logger.Println("MQTTPulbisher.onConnectionLost() 与代理的连接丢失:", reason.Error())

	// 初始化一个新的客户端并重新连接
	c.configureMqttConnection()
	go c.connect(0)
}

func (c *MQTTConnector) configureMqttConnection() {
	connOpts := MQTT.NewClientOptions().
		AddBroker(c.config.URL).
		SetClientID(c.clientID).
		SetCleanSession(true).
		SetConnectionLostHandler(c.onConnectionLost).
		SetOnConnectHandler(c.onConnected).
		SetAutoReconnect(false) // 我们自己处理重新连接

	// 用户名/密码身份验证
	if c.config.Username != "" && c.config.Password != "" {
		connOpts.SetUsername(c.config.Username)
		connOpts.SetPassword(c.config.Password)
	}

	// SSL/TLS
	if strings.HasPrefix(c.config.URL, "ssl") {
		tlsConfig := &tls.Config{}
		// 使用自签名证书对代理进行身份验证的自定义CA
		if c.config.CaFile != "" {
			caFile, err := ioutil.ReadFile(c.config.CaFile)
			if err != nil {
				logger.Printf("MQTTConnector.configureMqttConnection() 错误:无法读取CA文件 %s:%s\n", c.config.CaFile, err.Error())
			} else {
				tlsConfig.RootCAs = x509.NewCertPool()
				ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
				if !ok {
					logger.Printf("MQTTConnector.configureMqttConnection() 错误:无法解析CA证书 %s\n", c.config.CaFile)
				}
			}
		}
		// 基于证书的客户端身份验证
		if c.config.CertFile != "" && c.config.KeyFile != "" {
			cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
			if err != nil {
				logger.Printf("MQTTConnector.configureMqttConnection() 错误:无法加载客户端TLS凭证:%s\n",
					err.Error())
			} else {
				tlsConfig.Certificates = []tls.Certificate{cert}
			}
		}

		connOpts.SetTLSConfig(tlsConfig)
	}

	c.client = MQTT.NewClient(connOpts)
}

我认为问题可能是证书不正确。你可以尝试使用正确的证书来解决这个问题。

英文:

I'm trying to connect to a mosquitto broker which is using certificate based authentication.

The mosquitto snipped configuration below:

listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key

require_certificate true

The configuration it works because I'm able to make a pub/sub in a remote machine using following commands:

mosquitto_pub -t &quot;/test&quot; -m &quot;test&quot; --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com
mosquitto_sub -t &quot;/test&quot; --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com

or open a SSL socket by:

openssl s_client -connect server.com:8883 -CAfile ca/ca.crt -cert certs/client.crt -key certs/client.key

but when I trying to use GO Paho client doesn't work:

The the configuration loading doesn't work because in configureMqttConnection() is unable to load the Certificates (this line tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)) and in connect(backOff int) fails because the client is unable to send the certificate in the handshake.

I think maybe the problem is that the certificates that LoadX509KeyPair is expecting are not the ones I'm generating. My certificates says 'BEGIN TRUSTED CERTIFICATE' and not being trusted and certificated or something like this. If this is I not sure how I can create the correct certificate.

I'm using this GO code below (the code starts with GO start():

package main
import (
&quot;crypto/tls&quot;
&quot;crypto/x509&quot;
&quot;fmt&quot;
&quot;io/ioutil&quot;
&quot;strings&quot;
&quot;time&quot;
MQTT &quot;git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git&quot;
&quot;linksmart.eu/lc/core/catalog&quot;
&quot;linksmart.eu/lc/core/catalog/service&quot;
)
// MQTTConnector provides MQTT protocol connectivity
type MQTTConnector struct {
config        *MqttProtocol
clientID      string
client        *MQTT.Client
pubCh         chan AgentResponse
subCh         chan&lt;- DataRequest
pubTopics     map[string]string
subTopicsRvsd map[string]string // store SUB topics &quot;reversed&quot; to optimize lookup in messageHandler
}
const defaultQoS = 1
func (c *MQTTConnector) start() {
logger.Println(&quot;MQTTConnector.start()&quot;)
if c.config.Discover &amp;&amp; c.config.URL == &quot;&quot; {
err := c.discoverBrokerEndpoint()
if err != nil {
logger.Println(&quot;MQTTConnector.start() failed to start publisher:&quot;, err.Error())
return
}
}
// configure the mqtt client
c.configureMqttConnection()
// start the connection routine
logger.Printf(&quot;MQTTConnector.start() Will connect to the broker %v\n&quot;, c.config.URL)
go c.connect(0)
// start the publisher routine
go c.publisher()
}
// reads outgoing messages from the pubCh und publishes them to the broker
func (c *MQTTConnector) publisher() {
for resp := range c.pubCh {
if !c.client.IsConnected() {
logger.Println(&quot;MQTTConnector.publisher() got data while not connected to the broker. **discarded**&quot;)
continue
}
if resp.IsError {
logger.Println(&quot;MQTTConnector.publisher() data ERROR from agent manager:&quot;, string(resp.Payload))
continue
}
topic := c.pubTopics[resp.ResourceId]
c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
// We dont&#39; wait for confirmation from broker (avoid blocking here!)
//&lt;-r
logger.Println(&quot;MQTTConnector.publisher() published to&quot;, topic)
}
}
func (c *MQTTConnector) stop() {
logger.Println(&quot;MQTTConnector.stop()&quot;)
if c.client != nil &amp;&amp; c.client.IsConnected() {
c.client.Disconnect(500)
}
}
func (c *MQTTConnector) connect(backOff int) {
if c.client == nil {
logger.Printf(&quot;MQTTConnector.connect() client is not configured&quot;)
return
}
for {
logger.Printf(&quot;MQTTConnector.connect() connecting to the broker %v, backOff: %v sec\n&quot;, c.config.URL, backOff)
time.Sleep(time.Duration(backOff) * time.Second)
if c.client.IsConnected() {
break
}
token := c.client.Connect()
token.Wait()
if token.Error() == nil {
break
}
logger.Printf(&quot;MQTTConnector.connect() failed to connect: %v\n&quot;, token.Error().Error())
if backOff == 0 {
backOff = 10
} else if backOff &lt;= 600 {
backOff *= 2
}
}
logger.Printf(&quot;MQTTConnector.connect() connected to the broker %v&quot;, c.config.URL)
return
}
func (c *MQTTConnector) onConnected(client *MQTT.Client) {
// subscribe if there is at least one resource with SUB in MQTT protocol is configured
if len(c.subTopicsRvsd) &gt; 0 {
logger.Println(&quot;MQTTPulbisher.onConnected() will (re-)subscribe to all configured SUB topics&quot;)
topicFilters := make(map[string]byte)
for topic, _ := range c.subTopicsRvsd {
logger.Printf(&quot;MQTTPulbisher.onConnected() will subscribe to topic %s&quot;, topic)
topicFilters[topic] = defaultQoS
}
client.SubscribeMultiple(topicFilters, c.messageHandler)
} else {
logger.Println(&quot;MQTTPulbisher.onConnected() no resources with SUB configured&quot;)
}
}
func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
logger.Println(&quot;MQTTPulbisher.onConnectionLost() lost connection to the broker: &quot;, reason.Error())
// Initialize a new client and reconnect
c.configureMqttConnection()
go c.connect(0)
}
func (c *MQTTConnector) configureMqttConnection() {
connOpts := MQTT.NewClientOptions().
AddBroker(c.config.URL).
SetClientID(c.clientID).
SetCleanSession(true).
SetConnectionLostHandler(c.onConnectionLost).
SetOnConnectHandler(c.onConnected).
SetAutoReconnect(false) // we take care of re-connect ourselves
// Username/password authentication
if c.config.Username != &quot;&quot; &amp;&amp; c.config.Password != &quot;&quot; {
connOpts.SetUsername(c.config.Username)
connOpts.SetPassword(c.config.Password)
}
// SSL/TLS
if strings.HasPrefix(c.config.URL, &quot;ssl&quot;) {
tlsConfig := &amp;tls.Config{}
// Custom CA to auth broker with a self-signed certificate
if c.config.CaFile != &quot;&quot; {
caFile, err := ioutil.ReadFile(c.config.CaFile)
if err != nil {
logger.Printf(&quot;MQTTConnector.configureMqttConnection() ERROR: failed to read CA file %s:%s\n&quot;, c.config.CaFile, err.Error())
} else {
tlsConfig.RootCAs = x509.NewCertPool()
ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
if !ok {
logger.Printf(&quot;MQTTConnector.configureMqttConnection() ERROR: failed to parse CA certificate %s\n&quot;, c.config.CaFile)
}
}
}
// Certificate-based client authentication
if c.config.CertFile != &quot;&quot; &amp;&amp; c.config.KeyFile != &quot;&quot; {
cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
if err != nil {
logger.Printf(&quot;MQTTConnector.configureMqttConnection() ERROR: failed to load client TLS credentials: %s\n&quot;,
err.Error())
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
connOpts.SetTLSConfig(tlsConfig)
}
c.client = MQTT.NewClient(connOpts)
}

I think the problem i

答案1

得分: 2

BEGIN TRUSTED CERTIFICATE开头的证书是OpenSSL的“受信任证书”文件,这种格式无法被Go加密库理解或解析。在生成证书时,您是否使用了x509手册页面下的“TRUST SETTINGS”选项之一?Go TLS库只是在文件开头寻找-----BEGIN CERTIFICATE-----,其他任何内容都会引发错误。

英文:

Certificates that start BEGIN TRUSTED CERTIFICATE is an OpenSSL "trusted certificate" file which is a format not understood/parsable by the Go crypto libraries. When you generated the cert did you use any of the options under "TRUST SETTINGS" on the x509 man page?
The Go TLS library is just looking for -----BEGIN CERTIFICATE----- at the start of the file, anything else will throw an error.

huangapple
  • 本文由 发表于 2017年3月13日 18:55:48
  • 转载请务必保留本文链接:https://go.coder-hub.com/42761866.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定