security.protocol error when setting up basic Kafka consumer and producer in Go?

huangapple go评论120阅读模式
英文:

security.protocol error when setting up basic Kafka consumer and producer in Go?

问题

我正在尝试在Go中设置一个基本的Kafka客户端,按照这里详细说明的示例进行操作:https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html#go-example-code 和 https://github.com/confluentinc/confluent-kafka-go

我按照给出的方式编写了消费者和生产者示例,如下所示:

  1. func Produce() {
  2. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "my-broker-name"})
  3. if err != nil {
  4. panic(err)
  5. }
  6. defer p.Close()
  7. go func() {
  8. for e := range p.Events() {
  9. switch ev := e.(type) {
  10. case *kafka.Message:
  11. if ev.TopicPartition.Error != nil {
  12. fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
  13. } else {
  14. fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
  15. }
  16. }
  17. }
  18. }()
  19. topic := "myTopic"
  20. for _, word := range []string{"Welcome", "to", "the", "Confluent", "Kafka", "Golang", "client"} {
  21. p.Produce(&kafka.Message{
  22. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  23. Value: []byte(word),
  24. }, nil)
  25. }
  26. p.Flush(15 * 1000)
  27. }
  28. func Consume() {
  29. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  30. "bootstrap.servers": "my-broker-name",
  31. "group.id": "myGroup",
  32. "auto.offset.reset": "earliest",
  33. })
  34. if err != nil {
  35. panic(err)
  36. }
  37. c.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)
  38. for {
  39. msg, err := c.ReadMessage(-1)
  40. if err == nil {
  41. fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
  42. } else {
  43. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  44. }
  45. }
  46. c.Close()
  47. }

(my-broker-name是我的主机名+端口的替代,我不想在这里包含)

然而,当运行produce函数时,它返回一个错误,说:

  1. Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 31ms in state APIVERSION_QUERY)

当运行consume函数时,我收到相同的错误,但还有一些其他的东西,说:

  1. Consumer error: 1/1 brokers are down (<nil>)

我确定这不是真的。

不幸的是,我找不到任何关于这些错误的文档,也不知道如何解决它们。我该如何解决这个错误,以便能够向我的Broker生产和消费消息?

更新:

我获取了我的证书并将其转换为.pem文件,并将ConfigMap更改为以下内容:

  1. p, err := kafka.NewProducer(&kafka.ConfigMap{
  2. "bootstrap.servers": "my-broker:32500",
  3. "security.protocol": "SSL",
  4. "ssl.certificate.location": "mycert.pem",
  5. "ssl.ca.location": "ca-chain.pem"})
  6. if err != nil {
  7. panic(err)
  8. }

然而,现在返回的是:

  1. client SSL authentication might be required (see ssl.key.location and ssl.certificate.location and consult the broker logs for more information)

这是否意味着证书有问题?还是我在某个地方漏掉了一步?

英文:

I am attempting to set up a basic Kafka client in Go - following the examples detailed here https://docs.confluent.io/clients-confluent-kafka-go/current/overview.html#go-example-code and https://github.com/confluentinc/confluent-kafka-go.

I wrote the consumer and producer examples the same way they were given, like so

  1. func Produce() {
  2. p, err := kafka.NewProducer(&amp;kafka.ConfigMap{&quot;bootstrap.servers&quot;: &quot;my-broker-name&quot;})
  3. if err != nil {
  4. panic(err)
  5. }
  6. defer p.Close()
  7. go func() {
  8. for e := range p.Events() {
  9. switch ev := e.(type) {
  10. case *kafka.Message:
  11. if ev.TopicPartition.Error != nil {
  12. fmt.Printf(&quot;Delivery failed: %v\n&quot;, ev.TopicPartition)
  13. } else {
  14. fmt.Printf(&quot;Delivered message to %v\n&quot;, ev.TopicPartition)
  15. }
  16. }
  17. }
  18. }()
  19. topic := &quot;myTopic&quot;
  20. for _, word := range []string{&quot;Welcome&quot;, &quot;to&quot;, &quot;the&quot;, &quot;Confluent&quot;, &quot;Kafka&quot;, &quot;Golang&quot;, &quot;client&quot;} {
  21. p.Produce(&amp;kafka.Message{
  22. TopicPartition: kafka.TopicPartition{Topic: &amp;topic, Partition: kafka.PartitionAny},
  23. Value: []byte(word),
  24. }, nil)
  25. }
  26. p.Flush(15 * 1000)
  27. }
  28. func Consume() {
  29. c, err := kafka.NewConsumer(&amp;kafka.ConfigMap{
  30. &quot;bootstrap.servers&quot;: &quot;my-broker-name&quot;,
  31. &quot;group.id&quot;: &quot;myGroup&quot;,
  32. &quot;auto.offset.reset&quot;: &quot;earliest&quot;,
  33. })
  34. if err != nil {
  35. panic(err)
  36. }
  37. c.SubscribeTopics([]string{&quot;myTopic&quot;, &quot;^aRegex.*[Tt]opic&quot;}, nil)
  38. for {
  39. msg, err := c.ReadMessage(-1)
  40. if err == nil {
  41. fmt.Printf(&quot;Message on %s: %s\n&quot;, msg.TopicPartition, string(msg.Value))
  42. } else {
  43. fmt.Printf(&quot;Consumer error: %v (%v)\n&quot;, err, msg)
  44. }
  45. }
  46. c.Close()
  47. }

(my-broker-name is a substitute for my hostname + port, which I didn't want to include here)

However when running the produce function it returns an error saying

  1. Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is &lt; 0.10 (see api.version.request) (after 31ms in state APIVERSION_QUERY)

and when running the consume function I receive the same error, but also something that says

  1. Consumer error: 1/1 brokers are down (&lt;nil&gt;)

which I am certain is not the case.

I'm unfortunately unable to find any documentation on what these errors mean, or how to approach fixing them. How do I resolve the error so that I'm able to produce and consume to my Broker?

UPDATE:

I obtained my certificate and converted it to a .pem file, and changed the ConfigMap to the following:

  1. p, err := kafka.NewProducer(&amp;kafka.ConfigMap{
  2. &quot;bootstrap.servers&quot;: &quot;my-broker:32500&quot;,
  3. &quot;security.protocol&quot;: &quot;SSL&quot;,
  4. &quot;ssl.certificate.location&quot;: &quot;mycert.pem&quot;,
  5. &quot;ssl.ca.location&quot;: &quot;ca-chain.pem&quot;})
  6. if err != nil {
  7. panic(err)
  8. }

However, this is now returning

  1. client SSL authentication might be required (see ssl.key.location and ssl.certificate.location and consult the broker logs for more information)

Does this mean that there is a problem with the Certificate? Or is there a step that I am missing somewhere?

答案1

得分: 1

您需要提供主机名和端口作为引导服务器

"bootstrap.servers": "host1:9092"

要连接到Kafka中的安全端口,您需要提供包含CA文件的信任存储配置,或者对于任何应用程序来说,都需要提供用于安全连接的配置。

https://www.google.com/amp/s/www.process-one.net/blog/using-tls-authentication-for-your-go-kafka-client/%3famp

https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go#L6

kafka.ConfigMap{
"bootstrap.servers": "..",
"security.protocol": "SSL",
// 如果您使用SSL身份验证,请在此处提供客户端的密钥
"ssl.key.location": "path-to-private-key.pem",
"ssl.certificate.location": "path-to-public-key.pem",
"ssl.key.password": "如果有的话..",
}


对于您的新错误,请查看以下链接

https://stackoverflow.com/questions/9380403/what-does-ssl-ctx-use-privatekey-file-problems-getting-password-error-indica

英文:

You need to provide hostname and port as your bootstrap servers

  1. &quot;bootstrap.servers&quot;: &quot;host1:9092&quot;

To connect to secured port in kafka you need to provide truststore configuration that contains your ca file, or any application for secured connection for that matter

https://www.google.com/amp/s/www.process-one.net/blog/using-tls-authentication-for-your-go-kafka-client/%3famp

https://github.com/FluuxIO/kafka/blob/master/examples/base-client/base-client.go#L6

  1. kafka.ConfigMap{
  2. &quot;bootstrap.servers&quot;̇: &quot;..&quot;,
  3. &quot;security.protocol&quot;: &quot;SSL&quot;,
  4. // If you&#39;re using SSL authentication, provide the client&#39;s key here
  5. &quot;ssl.key.location&quot;: &quot;path-to-private-key.pem&quot;,
  6. &quot;ssl.certificate.location&quot;: &quot;path-to-public-key.pem&quot;,
  7. &quot;ssl.key.password&quot;: &quot;if any..&quot;,
  8. }

For you new error look there

https://stackoverflow.com/questions/9380403/what-does-ssl-ctx-use-privatekey-file-problems-getting-password-error-indica

答案2

得分: 0

这里的解决方案是我缺少了ssl.key.location。我不得不向管理员请求密钥。一旦我包含了密钥,一切都正常工作了。我最终的配置如下所示:

  1. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  2. "bootstrap.servers": "hostname:port-number",
  3. "security.protocol": "SSL",
  4. "ssl.ca.location": "ca-chain.pem",
  5. "ssl.key.location": "key-location",
  6. "ssl.certificate.location": "mycert.pem",
  7. })
  8. if err != nil {
  9. panic(err)
  10. }
英文:

The solution here was that I was missing ssl.key.location. I had to ask my administrator for the key. Once I included the key everything worked. The final configuration I had looked like the following:

  1. c, err := kafka.NewConsumer(&amp;kafka.ConfigMap{
  2. &quot;bootstrap.servers&quot;: &quot;hostname:port-number&quot;,
  3. &quot;security.protocol&quot;: &quot;SSL&quot;,
  4. &quot;ssl.ca.location&quot;: &quot;ca-chain.pem&quot;,
  5. &quot;ssl.key.location&quot;: &quot;key-location&quot;,
  6. &quot;ssl.certificate.location&quot;: &quot;mycert.pem&quot;})
  7. if err != nil {
  8. panic(err)
  9. }

huangapple
  • 本文由 发表于 2021年7月22日 05:13:09
  • 转载请务必保留本文链接:https://go.coder-hub.com/68476527.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定