英文:
How to create a kafka consumer group in Golang?
问题
一个可用的库是sarama(或其扩展sarama-cluster),然而在sarama和sarama-cluster中都没有提供消费者组的示例。
我不理解这个API。我可以有一个创建主题的消费者组的示例吗?
英文:
An available library is sarama (or its expansion sarama-cluster) however no consumer group example are provided, not in sarama nor in sarama-cluster.
I do not understand the API. May I have an example of creating a consumer group for a topic?
答案1
得分: 20
不需要使用sarama-cluster库。它已经被弃用,不再用于与Apache Kafka集成。Sarama
原始库本身提供了一种连接到Kafka集群的方式,使用消费者组。
我们需要创建客户端,然后初始化消费者组,在其中创建claims并等待消息通道接收消息。
初始化客户端:
kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion是Kafka服务器的版本,如0.11.0.2
if err != nil {
log.Println(err)
}
config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true
// 从客户端开始
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
log.Println(err)
}
defer func() { _ = client.Close() }()
连接到消费者组:
// 启动一个新的消费者组
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
log.Println(err)
}
defer func() { _ = group.Close() }()
从主题分区开始消费消息:
// 遍历消费者会话
ctx := context.Background()
for {
topics := []string{topicName}
handler := &Message{}
err := group.Consume(ctx, topics, handler)
if err != nil {
log.Println(err)
}
}
最后一部分是等待消息通道消费消息。我们需要实现所有的函数(三个函数)来实现ConsumerGroupHandler
接口。
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
有关使用Golang的Kafka的更多信息,请查看sarama库。
英文:
There is no need to use sarama-cluster library. It is DEPRECATED for apache kafka integration. Sarama
original library itself provide a way to connect to kafka cluster using consumer group.
We need to create client and then we initialize consumer group where we create claims and wait for message channel to receive message.
Initializing client :-
kfversion, err := sarama.ParseKafkaVersion(kafkaVersion) // kafkaVersion is the version of kafka server like 0.11.0.2
if err != nil {
log.Println(err)
}
config := sarama.NewConfig()
config.Version = kfversion
config.Consumer.Return.Errors = true
// Start with a client
client, err := sarama.NewClient([]string{brokerAddr}, config)
if err != nil {
log.Println(err)
}
defer func() { _ = client.Close() }()
Connection to consumer group :-
// Start a new consumer group
group, err := sarama.NewConsumerGroupFromClient(consumer_group, client)
if err != nil {
log.Println(err)
}
defer func() { _ = group.Close() }()
Start consuming messages from topic partition :-
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{topicName}
handler := &Message{}
err := group.Consume(ctx, topics, handler)
if err != nil {
log.Println(err)
}
}
The last part is to wait for message channel to consume messages. We need to implement all of the functions (three) to implement ConsumerGroupHandler
interface.
func (exampleConsumerGroupHandler) Setup(_ ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupClaim) error {
for msg := range claim.Messages() {
fmt.Printf("Message topic:%q partition:%d offset:%d\n", msg.Topic, msg.Partition, msg.Offset)
sess.MarkMessage(msg, "")
}
return nil
}
For more information on kafka using golang check sarama library.
答案2
得分: 2
消费者组由集群消费者的第二个参数指定“构造函数”。这是一个非常基本的示例:
import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)
conf := cluster.NewConfig()
// 添加配置值
brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(brokers, group, topics, conf)
这样,你将拥有一个属于指定消费者组的消费者。
英文:
The consumer group is specified by the second argument of the cluster consumer "constructor". Here's a very basic sketch:
import (
"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)
conf := cluster.NewConfig()
// add config values
brokers := []string{"kafka-1:9092", "kafka-2:9092"}
group := "Your-Consumer-Group"
topics := []string{"topicName"}
consumer := cluster.NewConsumer(broker, group, topics, conf)
And so you'll have a consumer belonging to the specified consumer group.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论