英文:
How to get consumer group offsets for partition in Golang Kafka 10
问题
现在Golang Kafka库(sarama)提供了消费者组功能,无需使用kafka 10的任何外部库。我如何在任何给定时间获取由消费者组处理的当前消息偏移量?
以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo-go)来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在我使用sarama-cluster(https://github.com/bsm/sarama-cluster),我不确定要使用哪个API来获取我的消费者组消息偏移量。
英文:
Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?
Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.
答案1
得分: 1
在底层,consumerGroupSession
结构体使用 PartitionOffsetManager
来获取下一个偏移量:
if pom := s.offsets.findPOM(topic, partition); pom != nil {
offset, _ = pom.NextOffset()
}
这是 pom.NextOffset()
的文档。
当 consumerGroupSession
通过 newConsumerGroupClaim()
方法构建一个 consumerGroupClaim
结构体时,它将 pom.NextOffset()
返回的偏移量作为 offset
参数传递。您可以稍后通过 claim.InitialOffset()
访问它。在开始消费消息后,您可以使用当前处理消息的 message.Offset
。
不幸的是,consumerGroupSession.offsets.findPOM()
无法从 ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)
方法中访问,因为它将会话作为 ConsumerGroupSession
接口而不是 consumerGroupSession
结构体接收。因此,偏移量变量是私有的,无法访问。
因此,我们无法真正访问 NextOffset()
方法,该方法正是 OP 所需的。
英文:
Under the hood the consumerGroupSession
struct is using PartitionOffsetManager
to get next offset:
if pom := s.offsets.findPOM(topic, partition); pom != nil {
offset, _ = pom.NextOffset()
}
Here is the documentation of pom.NextOffset().
When a consumerGroupSession
constructs a consumerGroupClaim
struct via newConsumerGroupClaim()
method, it passes offset, returned by pom.NextOffset()
, as offset
argument. You can access it later via claim.InitialOffset()
. After you started consuming messages, you can use message.Offset
of the currently processed message.
Unfortunately, consumerGroupSession.offsets.findPOM()
can't be accessed from ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim)
method, because it receives session as a ConsumerGroupSession
interface, not as consumerGroupSession
struct. So the offsets variable is private and not accessible.
Thus we can't really access NextOffset()
method, which does precisely what the OP wants.
答案2
得分: 0
我也在使用Sarama和Kafka来获取一个主题的偏移量。
你可以使用以下代码来获取偏移量。
package main
import (
"gopkg.in/Shopify/sarama"
"fmt"
)
func main(){
client , err := sarama.Client([]string{"localhost:9092"},nil) // 我没有提供任何配置
if err != nil {
panic(err)
}
lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}
fmt.Println("最新提交的偏移量:",lastoffset)
}
如果这是你要找的答案,并且对你有帮助,请告诉我。
英文:
I am also working with Sarama and Kafka to get offset of a topic.
You can get offset with following code.
package main
import (
"gopkg.in/Shopify/sarama"
"fmt"
)
func main(){
client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
if err != nil {
panic(err)
}
lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}
fmt.Println("Last Commited Offset ",lastoffset)
}
Let me know if this is the answer you are looking for and if it is helpful.
答案3
得分: 0
以下是获取消费者组偏移量的示例代码(即消费者组将开始的偏移量):
package main
import (
"context"
"log"
"strings"
"github.com/Shopify/sarama"
)
func main() {
groupName := "testgrp"
topic := "topic_name"
offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
if e != nil {
log.Fatal(e)
}
log.Printf("消费者组 %s 在主题 %s 上的偏移量为:%d", groupName, topic, offset)
}
type gcInfo struct {
offset int64
}
func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
g.offset = claim.InitialOffset()
return nil
}
func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = false // 我们不会更新消费者组的偏移量
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
if err != nil {
return 0, err
}
info := gcInfo{}
if err := client.Consume(ctx, []string{topic}, &info); err != nil {
return 0, err
}
return info.offset, nil
}
英文:
Here's a sample code to get the consumer group offset (i.e. the offset where the consumer group will start):
package main
import (
"context"
"log"
"strings"
"github.com/Shopify/sarama"
)
func main() {
groupName := "testgrp"
topic := "topic_name"
offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
if e != nil {
log.Fatal(e)
}
log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
}
type gcInfo struct {
offset int64
}
func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
return nil
}
func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
g.offset = claim.InitialOffset()
return nil
}
func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
config := sarama.NewConfig()
config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
if err != nil {
return 0, err
}
info := gcInfo{}
if err := client.Consume(ctx, []string{topic}, &info); err != nil {
return 0, err
}
return info.offset, nil
}
答案4
得分: 0
我刚刚自己在做这个工作。正如@boris-burkov所提到的,你没有访问getPOM方法的权限,但是你可以自己创建一个POM并调用NextOffset()方法来获取当前消费者的实际偏移量:
offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
offsetPartitionManager.NextOffset()
英文:
I've just been doing work on this myself. As @boris-burkov mentioned you don't have access to the getPOM method, however, you can create a POM yourself and called NextOffset() to get the current consumer's actual offset:
offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
offsetPartitionManager.NextOffset()
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论