如何在Golang Kafka 10中获取分区的消费者组偏移量

huangapple go评论85阅读模式
英文:

How to get consumer group offsets for partition in Golang Kafka 10

问题

现在Golang Kafka库(sarama)提供了消费者组功能,无需使用kafka 10的任何外部库。我如何在任何给定时间获取由消费者组处理的当前消息偏移量?

以前,我使用kazoo-go(https://github.com/wvanbergen/kazoo-go)来获取我的消费者组消息偏移量,因为它存储在Zookeeper中。现在我使用sarama-cluster(https://github.com/bsm/sarama-cluster),我不确定要使用哪个API来获取我的消费者组消息偏移量。

英文:

Now that Golang Kafka library (sarama) is providing consumer group capability without any external library help with kafka 10. How can I get the current message offset being processed by a consumer group at any given time ?

Previously I used kazoo-go (https://github.com/wvanbergen/kazoo-go) to get my consumer group message offset as it is stored in Zookeeper. Now I use sarama-cluster (https://github.com/bsm/sarama-cluster), I am not sure which API to use to get my consumer group message offset.

答案1

得分: 1

在底层,consumerGroupSession 结构体使用 PartitionOffsetManager 来获取下一个偏移量:

	if pom := s.offsets.findPOM(topic, partition); pom != nil {
		offset, _ = pom.NextOffset()
	}

这是 pom.NextOffset() 的文档。

consumerGroupSession 通过 newConsumerGroupClaim() 方法构建一个 consumerGroupClaim 结构体时,它将 pom.NextOffset() 返回的偏移量作为 offset 参数传递。您可以稍后通过 claim.InitialOffset() 访问它。在开始消费消息后,您可以使用当前处理消息的 message.Offset

不幸的是,consumerGroupSession.offsets.findPOM() 无法从 ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) 方法中访问,因为它将会话作为 ConsumerGroupSession 接口而不是 consumerGroupSession 结构体接收。因此,偏移量变量是私有的,无法访问。

因此,我们无法真正访问 NextOffset() 方法,该方法正是 OP 所需的。

英文:

Under the hood the consumerGroupSession struct is using PartitionOffsetManager to get next offset:

	if pom := s.offsets.findPOM(topic, partition); pom != nil {
		offset, _ = pom.NextOffset()
	}

Here is the documentation of pom.NextOffset().

When a consumerGroupSession constructs a consumerGroupClaim struct via newConsumerGroupClaim() method, it passes offset, returned by pom.NextOffset(), as offset argument. You can access it later via claim.InitialOffset(). After you started consuming messages, you can use message.Offset of the currently processed message.

Unfortunately, consumerGroupSession.offsets.findPOM() can't be accessed from ConsumerGroupHandler.ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) method, because it receives session as a ConsumerGroupSession interface, not as consumerGroupSession struct. So the offsets variable is private and not accessible.

Thus we can't really access NextOffset() method, which does precisely what the OP wants.

答案2

得分: 0

我也在使用Sarama和Kafka来获取一个主题的偏移量。

你可以使用以下代码来获取偏移量。

package main

import (
	"gopkg.in/Shopify/sarama"
	"fmt"
)

func main(){
  client , err := sarama.Client([]string{"localhost:9092"},nil) // 我没有提供任何配置
  if err != nil {
	  panic(err)
  }
  lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
  if err != nil {
	  panic(err)
  }
  fmt.Println("最新提交的偏移量:",lastoffset)
}

如果这是你要找的答案,并且对你有帮助,请告诉我。

英文:

I am also working with Sarama and Kafka to get offset of a topic.

You can get offset with following code.

	package main

	import (
	 "gopkg.in/Shopify/sarama"
	 "fmt"
	)

	func main(){
	  client , err := sarama.Client([]string{"localhost:9092"},nil) // I am not giving any configuration
	  if err != nil {
		  panic(err)
	  }
	  lastoffset, err := client.GetOffset("topic-test",0,sarama.OffsetNewest)
	  if err != nil {
		  panic(err)
	  }
      fmt.Println("Last Commited Offset ",lastoffset)
	}

Let me know if this is the answer you are looking for and if it is helpful.

答案3

得分: 0

以下是获取消费者组偏移量的示例代码(即消费者组将开始的偏移量):

package main
    
import (
    "context"
    "log"
    "strings"

    "github.com/Shopify/sarama"
)

func main() {
    groupName := "testgrp"
    topic := "topic_name"
    offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
    if e != nil {
        log.Fatal(e)
    }
    log.Printf("消费者组 %s 在主题 %s 上的偏移量为:%d", groupName, topic, offset)
}

type gcInfo struct {
    offset int64
}

func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
    return nil
}

func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    g.offset = claim.InitialOffset()
    return nil
}

func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
    config := sarama.NewConfig()
    config.Consumer.Offsets.AutoCommit.Enable = false // 我们不会更新消费者组的偏移量
    client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
    if err != nil {
        return 0, err
    }
    info := gcInfo{}
    if err := client.Consume(ctx, []string{topic}, &info); err != nil {
        return 0, err
    }
    return info.offset, nil
}
英文:

Here's a sample code to get the consumer group offset (i.e. the offset where the consumer group will start):

package main
    
    import (
        "context"
        "log"
        "strings"
    
        "github.com/Shopify/sarama"
    )
    
    func main() {
        groupName := "testgrp"
        topic := "topic_name"
        offset, e := GetCGOffset(context.Background(), "localhost:9092", groupName, topic)
        if e != nil {
            log.Fatal(e)
        }
        log.Printf("Consumer group %s offset for topic %s is: %d", groupName, topic, offset)
    }
    
    type gcInfo struct {
        offset int64
    }
    
    func (g *gcInfo) Setup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (g *gcInfo) Cleanup(sarama.ConsumerGroupSession) error {
        return nil
    }
    
    func (g *gcInfo) ConsumeClaim(_ sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
        g.offset = claim.InitialOffset()
        return nil
    }
    
    func GetCGOffset(ctx context.Context, brokers, groupName, topic string) (int64, error) {
        config := sarama.NewConfig()
        config.Consumer.Offsets.AutoCommit.Enable = false // we're not going to update the consumer group offsets
        client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), groupName, config)
        if err != nil {
            return 0, err
        }
        info := gcInfo{}
        if err := client.Consume(ctx, []string{topic}, &info); err != nil {
            return 0, err
        }
        return info.offset, nil
    }

答案4

得分: 0

我刚刚自己在做这个工作。正如@boris-burkov所提到的,你没有访问getPOM方法的权限,但是你可以自己创建一个POM并调用NextOffset()方法来获取当前消费者的实际偏移量:

offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
offsetPartitionManager.NextOffset()
英文:

I've just been doing work on this myself. As @boris-burkov mentioned you don't have access to the getPOM method, however, you can create a POM yourself and called NextOffset() to get the current consumer's actual offset:

offsetManager, _ := sarama.NewOffsetManagerFromClient(clientName, cl.Client)
offsetPartitionManager, _ := offsetManager.ManagePartition("test-topic", 0)
offsetPartitionManager.NextOffset()

huangapple
  • 本文由 发表于 2016年11月17日 05:29:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/40642689.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定