使用SASL用户名和密码的Confluent Cloud的Kafka消费者的Go客户端示例。

huangapple go评论93阅读模式
英文:

go client example of a kafka consumer for confluent cloud with sasl.username and sasl.password

问题

有人有一个使用sasl.username和sasl.password的Kafka消费者的confluent cloud的go客户端示例吗?

我在尝试从confluent cloud消费消息时遇到了一个错误。

连接到Kafka代理失败:kafka:客户端已经用完了可用的代理来进行通信:EOF
英文:

Does anyone have a go client example of a Kafka consumer for the confluent cloud with sasl.username and sasl.password?

I am running into an error while trying to consume a message from the confluent cloud.

Failed to connect to Kafka broker: kafka: client has run out of available brokers to talk to: EOF

答案1

得分: 0

Confluent有自己的示例代码存储库。

https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/confluent_cloud_example/confluent_cloud_example.go

提取:

	bootstrapServers          = "<BOOTSTRAP_SERVERS>"
	ccloudAPIKey              = "<CCLOUD_API_KEY>"
	ccloudAPISecret           = "<CCLOUD_API_SECRET>"
	schemaRegistryAPIEndpoint = "<CCLOUD_SR_ENDPOINT>"
	schemaRegistryAPIKey      = "<CCLOUD_SR_API_KEY>"
	schemaRegistryAPISecret   = "<CCLOUD_SR_API_SECRET>"
)

func main() {

	topic := "go-test-topic"
	createTopic(topic)

	// Produce a new record to the topic...
	producer, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootstrapServers,
		"sasl.mechanisms":   "PLAIN",
		"security.protocol": "SASL_SSL",
		"sasl.username":     ccloudAPIKey,
		"sasl.password":     ccloudAPISecret})

	if err != nil {
		panic(fmt.Sprintf("Failed to create producer: %s", err))
	}

	client, err := schemaregistry.NewClient(schemaregistry.NewConfigWithAuthentication(
		schemaRegistryAPIEndpoint,
		schemaRegistryAPIKey,
		schemaRegistryAPISecret))
英文:

Confluent has their own repo for examples

https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/confluent_cloud_example/confluent_cloud_example.go

Extract

	bootstrapServers          = &quot;&lt;BOOTSTRAP_SERVERS&gt;&quot;
	ccloudAPIKey              = &quot;&lt;CCLOUD_API_KEY&gt;&quot;
	ccloudAPISecret           = &quot;&lt;CCLOUD_API_SECRET&gt;&quot;
	schemaRegistryAPIEndpoint = &quot;&lt;CCLOUD_SR_ENDPOINT&gt;&quot;
	schemaRegistryAPIKey      = &quot;&lt;CCLOUD_SR_API_KEY&gt;&quot;
	schemaRegistryAPISecret   = &quot;&lt;CCLOUD_SR_API_SECRET&gt;&quot;
)

func main() {

	topic := &quot;go-test-topic&quot;
	createTopic(topic)

	// Produce a new record to the topic...
	producer, err := kafka.NewProducer(&amp;kafka.ConfigMap{
		&quot;bootstrap.servers&quot;: bootstrapServers,
		&quot;sasl.mechanisms&quot;:   &quot;PLAIN&quot;,
		&quot;security.protocol&quot;: &quot;SASL_SSL&quot;,
		&quot;sasl.username&quot;:     ccloudAPIKey,
		&quot;sasl.password&quot;:     ccloudAPISecret})

	if err != nil {
		panic(fmt.Sprintf(&quot;Failed to create producer: %s&quot;, err))
	}

	client, err := schemaregistry.NewClient(schemaregistry.NewConfigWithAuthentication(
		schemaRegistryAPIEndpoint,
		schemaRegistryAPIKey,
		schemaRegistryAPISecret))

huangapple
  • 本文由 发表于 2023年2月22日 01:58:43
  • 转载请务必保留本文链接:https://go.coder-hub.com/75524211.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定