英文:
Paho GO client unable to connect to Broker
问题
我正在尝试连接到一个使用基于证书的身份验证的Mosquitto代理。
下面是Mosquitto的配置片段:
listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true
这个配置是有效的,因为我可以在远程机器上使用以下命令进行发布/订阅:
mosquitto_pub -t "/test" -m "test" --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com
mosquitto_sub -t "/test" --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com
或者通过以下方式打开SSL套接字:
openssl s_client -connect server.com:8883 -CAfile ca/ca.crt -cert certs/client.crt -key certs/client.key
但是,当我尝试使用GO Paho客户端时,它不起作用:
配置加载失败,因为在configureMqttConnection()
中无法加载证书(这一行tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
),并且在connect(backOff int)
中失败,因为客户端无法在握手中发送证书。
我认为问题可能是LoadX509KeyPair期望的证书不是我生成的证书。我的证书上写着“BEGIN TRUSTED CERTIFICATE”,并且没有被信任和认证。如果是这样,我不确定如何创建正确的证书。
我正在使用以下GO代码(代码以GO start()
开头):
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"strings"
"time"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"linksmart.eu/lc/core/catalog"
"linksmart.eu/lc/core/catalog/service"
)
// MQTTConnector提供MQTT协议连接
type MQTTConnector struct {
config *MqttProtocol
clientID string
client *MQTT.Client
pubCh chan AgentResponse
subCh chan<- DataRequest
pubTopics map[string]string
subTopicsRvsd map[string]string // 存储SUB主题的“反向”以优化消息处理中的查找
}
const defaultQoS = 1
func (c *MQTTConnector) start() {
logger.Println("MQTTConnector.start()")
if c.config.Discover && c.config.URL == "" {
err := c.discoverBrokerEndpoint()
if err != nil {
logger.Println("MQTTConnector.start() failed to start publisher:", err.Error())
return
}
}
// 配置mqtt客户端
c.configureMqttConnection()
// 启动连接例程
logger.Printf("MQTTConnector.start() 将连接到代理 %v\n", c.config.URL)
go c.connect(0)
// 启动发布者例程
go c.publisher()
}
// 从pubCh读取传出的消息并将其发布到代理
func (c *MQTTConnector) publisher() {
for resp := range c.pubCh {
if !c.client.IsConnected() {
logger.Println("MQTTConnector.publisher() 在未连接到代理时收到数据。**已丢弃**")
continue
}
if resp.IsError {
logger.Println("MQTTConnector.publisher() 来自代理管理器的数据错误:", string(resp.Payload))
continue
}
topic := c.pubTopics[resp.ResourceId]
c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
// 我们不等待代理的确认(避免在此处阻塞!)
//<-r
logger.Println("MQTTConnector.publisher() 发布到", topic)
}
}
func (c *MQTTConnector) stop() {
logger.Println("MQTTConnector.stop()")
if c.client != nil && c.client.IsConnected() {
c.client.Disconnect(500)
}
}
func (c *MQTTConnector) connect(backOff int) {
if c.client == nil {
logger.Printf("MQTTConnector.connect() 未配置客户端")
return
}
for {
logger.Printf("MQTTConnector.connect() 连接到代理 %v,backOff: %v 秒\n", c.config.URL, backOff)
time.Sleep(time.Duration(backOff) * time.Second)
if c.client.IsConnected() {
break
}
token := c.client.Connect()
token.Wait()
if token.Error() == nil {
break
}
logger.Printf("MQTTConnector.connect() 连接失败: %v\n", token.Error().Error())
if backOff == 0 {
backOff = 10
} else if backOff <= 600 {
backOff *= 2
}
}
logger.Printf("MQTTConnector.connect() 连接到代理 %v", c.config.URL)
return
}
func (c *MQTTConnector) onConnected(client *MQTT.Client) {
// 如果至少有一个资源在MQTT协议中配置了SUB,则订阅
if len(c.subTopicsRvsd) > 0 {
logger.Println("MQTTPulbisher.onConnected() 将(重新)订阅所有配置的SUB主题")
topicFilters := make(map[string]byte)
for topic, _ := range c.subTopicsRvsd {
logger.Printf("MQTTPulbisher.onConnected() 将订阅主题 %s", topic)
topicFilters[topic] = defaultQoS
}
client.SubscribeMultiple(topicFilters, c.messageHandler)
} else {
logger.Println("MQTTPulbisher.onConnected() 没有配置SUB的资源")
}
}
func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
logger.Println("MQTTPulbisher.onConnectionLost() 与代理的连接丢失:", reason.Error())
// 初始化一个新的客户端并重新连接
c.configureMqttConnection()
go c.connect(0)
}
func (c *MQTTConnector) configureMqttConnection() {
connOpts := MQTT.NewClientOptions().
AddBroker(c.config.URL).
SetClientID(c.clientID).
SetCleanSession(true).
SetConnectionLostHandler(c.onConnectionLost).
SetOnConnectHandler(c.onConnected).
SetAutoReconnect(false) // 我们自己处理重新连接
// 用户名/密码身份验证
if c.config.Username != "" && c.config.Password != "" {
connOpts.SetUsername(c.config.Username)
connOpts.SetPassword(c.config.Password)
}
// SSL/TLS
if strings.HasPrefix(c.config.URL, "ssl") {
tlsConfig := &tls.Config{}
// 使用自签名证书对代理进行身份验证的自定义CA
if c.config.CaFile != "" {
caFile, err := ioutil.ReadFile(c.config.CaFile)
if err != nil {
logger.Printf("MQTTConnector.configureMqttConnection() 错误:无法读取CA文件 %s:%s\n", c.config.CaFile, err.Error())
} else {
tlsConfig.RootCAs = x509.NewCertPool()
ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
if !ok {
logger.Printf("MQTTConnector.configureMqttConnection() 错误:无法解析CA证书 %s\n", c.config.CaFile)
}
}
}
// 基于证书的客户端身份验证
if c.config.CertFile != "" && c.config.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
if err != nil {
logger.Printf("MQTTConnector.configureMqttConnection() 错误:无法加载客户端TLS凭证:%s\n",
err.Error())
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
connOpts.SetTLSConfig(tlsConfig)
}
c.client = MQTT.NewClient(connOpts)
}
我认为问题可能是证书不正确。你可以尝试使用正确的证书来解决这个问题。
英文:
I'm trying to connect to a mosquitto broker which is using certificate based authentication.
The mosquitto snipped configuration below:
listener 8883
cafile /etc/mosquitto/ca_certificates/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true
The configuration it works because I'm able to make a pub/sub in a remote machine using following commands:
mosquitto_pub -t "/test" -m "test" --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com
mosquitto_sub -t "/test" --cafile ca/ca.crt --cert certs/client.crt --key certs/client.key -p 8883 -h server.com
or open a SSL socket by:
openssl s_client -connect server.com:8883 -CAfile ca/ca.crt -cert certs/client.crt -key certs/client.key
but when I trying to use GO Paho client doesn't work:
The the configuration loading doesn't work because in configureMqttConnection()
is unable to load the Certificates (this line tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
) and in connect(backOff int)
fails because the client is unable to send the certificate in the handshake.
I think maybe the problem is that the certificates that LoadX509KeyPair is expecting are not the ones I'm generating. My certificates says 'BEGIN TRUSTED CERTIFICATE' and not being trusted and certificated or something like this. If this is I not sure how I can create the correct certificate.
I'm using this GO code below (the code starts with GO start()
:
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"strings"
"time"
MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
"linksmart.eu/lc/core/catalog"
"linksmart.eu/lc/core/catalog/service"
)
// MQTTConnector provides MQTT protocol connectivity
type MQTTConnector struct {
config *MqttProtocol
clientID string
client *MQTT.Client
pubCh chan AgentResponse
subCh chan<- DataRequest
pubTopics map[string]string
subTopicsRvsd map[string]string // store SUB topics "reversed" to optimize lookup in messageHandler
}
const defaultQoS = 1
func (c *MQTTConnector) start() {
logger.Println("MQTTConnector.start()")
if c.config.Discover && c.config.URL == "" {
err := c.discoverBrokerEndpoint()
if err != nil {
logger.Println("MQTTConnector.start() failed to start publisher:", err.Error())
return
}
}
// configure the mqtt client
c.configureMqttConnection()
// start the connection routine
logger.Printf("MQTTConnector.start() Will connect to the broker %v\n", c.config.URL)
go c.connect(0)
// start the publisher routine
go c.publisher()
}
// reads outgoing messages from the pubCh und publishes them to the broker
func (c *MQTTConnector) publisher() {
for resp := range c.pubCh {
if !c.client.IsConnected() {
logger.Println("MQTTConnector.publisher() got data while not connected to the broker. **discarded**")
continue
}
if resp.IsError {
logger.Println("MQTTConnector.publisher() data ERROR from agent manager:", string(resp.Payload))
continue
}
topic := c.pubTopics[resp.ResourceId]
c.client.Publish(topic, byte(defaultQoS), false, resp.Payload)
// We dont' wait for confirmation from broker (avoid blocking here!)
//<-r
logger.Println("MQTTConnector.publisher() published to", topic)
}
}
func (c *MQTTConnector) stop() {
logger.Println("MQTTConnector.stop()")
if c.client != nil && c.client.IsConnected() {
c.client.Disconnect(500)
}
}
func (c *MQTTConnector) connect(backOff int) {
if c.client == nil {
logger.Printf("MQTTConnector.connect() client is not configured")
return
}
for {
logger.Printf("MQTTConnector.connect() connecting to the broker %v, backOff: %v sec\n", c.config.URL, backOff)
time.Sleep(time.Duration(backOff) * time.Second)
if c.client.IsConnected() {
break
}
token := c.client.Connect()
token.Wait()
if token.Error() == nil {
break
}
logger.Printf("MQTTConnector.connect() failed to connect: %v\n", token.Error().Error())
if backOff == 0 {
backOff = 10
} else if backOff <= 600 {
backOff *= 2
}
}
logger.Printf("MQTTConnector.connect() connected to the broker %v", c.config.URL)
return
}
func (c *MQTTConnector) onConnected(client *MQTT.Client) {
// subscribe if there is at least one resource with SUB in MQTT protocol is configured
if len(c.subTopicsRvsd) > 0 {
logger.Println("MQTTPulbisher.onConnected() will (re-)subscribe to all configured SUB topics")
topicFilters := make(map[string]byte)
for topic, _ := range c.subTopicsRvsd {
logger.Printf("MQTTPulbisher.onConnected() will subscribe to topic %s", topic)
topicFilters[topic] = defaultQoS
}
client.SubscribeMultiple(topicFilters, c.messageHandler)
} else {
logger.Println("MQTTPulbisher.onConnected() no resources with SUB configured")
}
}
func (c *MQTTConnector) onConnectionLost(client *MQTT.Client, reason error) {
logger.Println("MQTTPulbisher.onConnectionLost() lost connection to the broker: ", reason.Error())
// Initialize a new client and reconnect
c.configureMqttConnection()
go c.connect(0)
}
func (c *MQTTConnector) configureMqttConnection() {
connOpts := MQTT.NewClientOptions().
AddBroker(c.config.URL).
SetClientID(c.clientID).
SetCleanSession(true).
SetConnectionLostHandler(c.onConnectionLost).
SetOnConnectHandler(c.onConnected).
SetAutoReconnect(false) // we take care of re-connect ourselves
// Username/password authentication
if c.config.Username != "" && c.config.Password != "" {
connOpts.SetUsername(c.config.Username)
connOpts.SetPassword(c.config.Password)
}
// SSL/TLS
if strings.HasPrefix(c.config.URL, "ssl") {
tlsConfig := &tls.Config{}
// Custom CA to auth broker with a self-signed certificate
if c.config.CaFile != "" {
caFile, err := ioutil.ReadFile(c.config.CaFile)
if err != nil {
logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to read CA file %s:%s\n", c.config.CaFile, err.Error())
} else {
tlsConfig.RootCAs = x509.NewCertPool()
ok := tlsConfig.RootCAs.AppendCertsFromPEM(caFile)
if !ok {
logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to parse CA certificate %s\n", c.config.CaFile)
}
}
}
// Certificate-based client authentication
if c.config.CertFile != "" && c.config.KeyFile != "" {
cert, err := tls.LoadX509KeyPair(c.config.CertFile, c.config.KeyFile)
if err != nil {
logger.Printf("MQTTConnector.configureMqttConnection() ERROR: failed to load client TLS credentials: %s\n",
err.Error())
} else {
tlsConfig.Certificates = []tls.Certificate{cert}
}
}
connOpts.SetTLSConfig(tlsConfig)
}
c.client = MQTT.NewClient(connOpts)
}
I think the problem i
答案1
得分: 2
以BEGIN TRUSTED CERTIFICATE
开头的证书是OpenSSL的“受信任证书”文件,这种格式无法被Go加密库理解或解析。在生成证书时,您是否使用了x509手册页面下的“TRUST SETTINGS”选项之一?Go TLS库只是在文件开头寻找-----BEGIN CERTIFICATE-----
,其他任何内容都会引发错误。
英文:
Certificates that start BEGIN TRUSTED CERTIFICATE
is an OpenSSL "trusted certificate" file which is a format not understood/parsable by the Go crypto libraries. When you generated the cert did you use any of the options under "TRUST SETTINGS" on the x509 man page?
The Go TLS library is just looking for -----BEGIN CERTIFICATE-----
at the start of the file, anything else will throw an error.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论