Golang Kafka无法消费所有最新偏移量的消息。

huangapple go评论84阅读模式
英文:

Golang Kafka not consuming all messages offsetnewest

问题

首先批次:
我正在尝试从100个平面文件中提取数据,将其加载到一个数组中,并将它们逐个作为字节数组插入到Kafka生产者中。

第二批次:
我正在从Kafka消费者中消费数据,然后将其插入到NoSQL数据库中。

我在shopify sarama golang包的配置文件中使用了Offsetnewset来配置Kafka。

我可以接收并将消息插入到Kafka中,但在消费时只能获取到第一条消息。因为我在sarama配置中设置了Offset newest。请问如何获取所有的数据。

英文:

First Batch:-
I am trying to pull data from 100 flat file and loading up into an array and inserting them to kafka producer one by one as byte array.

Second Batch:-
I am consuming from kafka consumer and then inserting them to NoSQL database.

I use Offsetnewset in the config file of shopify sarama golang package for Kafka.

I can receive and insert messages to kafka but while consuming I am getting only the first message. Since I gave Offset newest in the sarama config.
how can I get all the data here.

答案1

得分: 2

很难在没有任何代码或更深入的解释的情况下提供准确的答案,关于Kafka的配置(例如:主题、分区等),我能想到一些快速检查的方法:

  1. 假设你在开始生产之前使用OffsetNewest设置开始消费,可能的一个原因是你没有从该主题的所有分区进行消费。根据sarama文档的说明,你需要通过创建PartitionConsumers来显式地消费每个分区。参考以下示例:

     partitionConsumer, err := consumer.ConsumePartition("my_topic", 0, OffsetNewest)
     if err != nil {
         panic(err)
     }
    
     ...
    
     consumed := 0
     ConsumerLoop:
     for {
         select {
         case msg := <-partitionConsumer.Messages():
             log.Printf("Consumed message offset %d\n", msg.Offset)
             consumed++
         case <-signals:
             break ConsumerLoop
         }
     }
    
  2. 实际上,你是在生产所有事件之后开始消费的,因此,读取它们的指针应该是OffsetOldest而不是OffsetNewest。

很抱歉无法给你一个更有用的答案,但如果你提供一些代码或更多细节,我们可能可以提供更多帮助。

英文:

It is difficult to be able to tell something without any code or more in depth explanation about how kafka is configured (i.e.: topics, partitions, ...), so few quick checks come to my mind:

  1. Assuming you start consuming with the OffsetNewest set before you start producing, one thing that maybe happening is that you are not consuming from all partitions on that topic, regarding to sarama docs, you have to consume each partition explicitly by creating PartitionConsumers. From the example in https://godoc.org/github.com/Shopify/sarama#Consumer:

     partitionConsumer, err := consumer.ConsumePartition(&quot;my_topic&quot;, 0, OffsetNewest)
     if err != nil {
         panic(err)
     }
    
     ...
    
     consumed := 0
     ConsumerLoop:
     for {
         select {
         case msg := &lt;-partitionConsumer.Messages():
             log.Printf(&quot;Consumed message offset %d\n&quot;, msg.Offset)
             consumed++
         case &lt;-signals:
             break ConsumerLoop
         }
     }
    
  2. You, in fact, are starting the consuming after producing all the events, and so, the pointer to read them all is not OffsetNewest but OffsetOldest instead.

I'm sorry to not be able to give you a more useful answer, but maybe if you paste some code or give more details we can help a but more.

huangapple
  • 本文由 发表于 2016年12月5日 22:09:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/40976189.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定