How to create a kafka consumer group in Golang?

huangapple go评论90阅读模式
英文:

How to create a kafka consumer group in Golang?

问题

一个可用的库是sarama(或其扩展sarama-cluster),然而在saramasarama-cluster中都没有提供消费者组的示例。

我不理解这个API。我可以有一个创建主题的消费者组的示例吗?

英文:

An available library is sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster.

I do not understand the API. May I have an example of creating a consumer group for a topic?

答案1

得分: 20

不需要使用sarama-cluster库。它已经被弃用,不再用于与Apache Kafka集成。Sarama原始库本身提供了一种连接到Kafka集群的方式,使用消费者组。

我们需要创建客户端,然后初始化消费者组,在其中创建claims并等待消息通道接收消息。

初始化客户端:

kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion是Kafka服务器的版本,如0.11.0.2
if err != nil {
    log.Println(err)
}

config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true

// 从客户端开始
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
    log.Println(err)
}
defer func() { _ = client.Close() }()

连接到消费者组

// 启动一个新的消费者组
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
    log.Println(err)
}
defer func() { _ = group.Close() }()

从主题分区开始消费消息

// 遍历消费者会话
ctx := context.Background()
for {
    topics := []string{topicName}
    handler := &Message{}
    err := group.Consume(ctx, topics, handler)
    if err != nil {
        log.Println(err)
    }
}

最后一部分是等待消息通道消费消息。我们需要实现所有的函数(三个函数)来实现ConsumerGroupHandler接口。

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
        sess.MarkMessage(msg, "")
    }
    return nil
}

有关使用Golang的Kafka的更多信息,请查看sarama库。

英文:

There is no need to use sarama-cluster library. It is DEPRECATED for apache kafka integration. Sarama original library itself provide a way to connect to kafka cluster using consumer group.

We need to create client and then we initialize consumer group where we create claims and wait for message channel to receive message.

Initializing client :-

kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
	log.Println(err)
}

config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true

// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
	log.Println(err)
}
defer func() { _ = client.Close() }()

Connection to consumer group :-

// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
	log.Println(err)
}
defer func() { _ = group.Close() }()

Start consuming messages from topic partition :-

// Iterate over consumer sessions.
ctx := context.Background()
for {
	topics := []string{topicName}
	handler := &Message{}
	err := group.Consume(ctx, topics, handler)
	if err != nil {
		log.Println(err)
	}
}

The last part is to wait for message channel to consume messages. We need to implement all of the functions (three) to implement ConsumerGroupHandler interface.

func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error   { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
        sess.MarkMessage(msg, "")
    }
    return nil
}

For more information on kafka using golang check sarama library.

答案2

得分: 2

消费者组由集群消费者的第二个参数指定“构造函数”。这是一个非常基本的示例:

import (
    "github.com/Shopify/sarama"
    "github.com/bsm/sarama-cluster"
)

conf := cluster.NewConfig()
// 添加配置值

brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(brokers, group, topics, conf)

这样,你将拥有一个属于指定消费者组的消费者。

英文:

The consumer group is specified by the second argument of the cluster consumer "constructor". Here's a very basic sketch:

import (
    "github.com/Shopify/sarama"
	"github.com/bsm/sarama-cluster"
)

conf := cluster.NewConfig()
// add config values

brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(broker, group, topics, conf)

And so you'll have a consumer belonging to the specified consumer group.

huangapple
  • 本文由 发表于 2017年2月2日 02:07:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/41986674.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定