使用Shopify Sarama处理Kafka错误

huangapple go评论104阅读模式
英文:

Kafka Error Handling using Shopify Sarama

问题

所以我正在尝试在我的应用程序中使用Kafka,该应用程序具有将操作记录到Kafka消息队列的生产者和读取消息的消费者。由于我的应用程序是用Go编写的,所以我正在使用Shopify Sarama来实现这一点。

目前,我能够从消息队列中读取并使用fmt.Printf打印消息内容。

然而,我希望错误处理比控制台打印更好,我愿意付出额外的努力。

目前的消费者连接代码如下:

mqCfg := sarama.NewConfig()

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
    panic(err) // 不想在出现错误时引发panic,而是处理它
}

消息处理代码如下:

go func() {
    defer wg.Done()
    for message := range consumer.Messages() {
        var msgContent Message
        _ = json.Unmarshal(message.Value, &msgContent)
        fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) // 不想打印它
    }
}()

我的问题(我对测试Kafka和Kafka都不熟悉):

  1. 在上述程序中可能发生错误的地方是哪里,以便我可以处理它们?对我来说,任何示例代码都很好。我能想到的错误情况是当JSON中的msgContent字段不包含任何TypeContentId字段时。

  2. 在Kafka中,是否存在这样的情况,即消费者尝试读取当前偏移量,但由于某种原因无法读取(即使JSON格式正确)?我的消费者是否可以回溯到失败的偏移量读取之前的x个步骤,并重新处理偏移量?或者有更好的方法来处理这个问题吗?这些情况可能是什么?

我愿意阅读并尝试一些方法。

英文:

So I am trying to use Kafka for my application which has a producer logging actions into the Kafka MQ and the consumer which reads it off the MQ.Since my application is in Go, I am using the Shopify Sarama to make this possible.

Right now, I'm able to read off the MQ and print the message contents using a

fmt.Printf

Howeveer, I would really like the error handling to be better than console printing and I am willing to go the extra mile.

Code right now for consumer connection:

mqCfg := sarama.NewConfig()

master, err := sarama.NewConsumer([]string{brokerConnect}, mqCfg)
if err != nil {
    panic(err) // Don't want to panic when error occurs, instead handle it
}

And the processing of messages:

    go func() {
    defer wg.Done()
    for message := range consumer.Messages() {
        var msgContent Message
        _ = json.Unmarshal(message.Value, &msgContent)
        fmt.Printf("Reading message of type %s with id : %d\n", msgContent.Type, msgContent.ContentId) //Don't want to print it
    }
}()

My questions (I am new to testing Kafka and new to kafka in general):

  1. Where could errors occur in the above program, so that I can handle them? Any sample code will be great for me to start with. The error conditions I could think of are when the msgContent doesn't really contain any Type of ContentId fields in the JSON.

  2. In kafka, are there situations when the consumer is trying to read at the current offset, but for some reason was not able to (even when the JSON is well formed)? Is it possible for my consumer to backtrack to say x steps above the failed offset read and re-process the offsets? Or is there a better way to do this? again, what could these situations be?

I'm open to reading and trying things.

答案1

得分: 2

关于1)请检查我在下面记录错误消息的位置。这基本上是我会做的。

关于2)我不知道如何在一个主题中向后遍历。只需重复创建一个消费者,每次其起始偏移减一即可实现。但我不建议这样做,因为很可能会重复播放相同的消息。我建议经常保存偏移量,以便在出现问题时可以恢复。

以下是我认为解决了大部分问题的代码。我没有尝试编译这段代码。而且sarama API最近一直在变化,所以API可能有所不同。

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan<- *Message) (error) {
    wg.Add(1)
    go func(){
        defer wg.Done()
        //用于跟踪我们处理的最后一个已知的好的偏移量,每次成功处理事件后更新。
        saveprogress := func(off int64){
            //将偏移量保存在某个地方...一个文件...
            //我也使用kafka将进度保存在一个特殊的主题中作为WAL
        }
        defer saveprogress(lastgoodoffset)

        client, err := sarama.NewClient("clientId", brokers, sarama.NewClientConfig())
        if err != nil {
            log.Error(err)
            return
        }
        defer client.Close()
        sarama.NewConsumerConfig()
        consumerConfig.OffsetMethod = sarama.OffsetMethodManual
        consumerConfig.OffsetValue = int64(lastgoodoff)
        consumer, err := sarama.NewConsumer(client, topic, partition, "consumerId", consumerConfig)
        if err != nil {
            log.Error(err)
            return
        }
        defer consumer.Close()
        for {
            select {
            case event := <-consumer.Events():
                if event.Err != nil {
                    log.Error(event.Err)
                    return
                }
                msgContent := &Message{}
                err = json.Unmarshal(message.Value, msgContent)
                if err != nil {
                    log.Error(err)
                    continue //继续跳过此消息或返回以停止而不更新偏移量。
                }
                //将消息发送到处理程序。
                out <- msgContent 

                lastgoodoff = event.Offset
            }
        }
    }()
}

希望对你有所帮助!如果有任何问题,请随时提问。

英文:

Regarding 1) Check where I log error messages below. This is more or less what I would do.

Regarding 2) I don't know about trying to walk backwards in a topic. Its very much possible by just creating a consumer over and over, with its starting offset minus one each time. But I wouldn't advise it, as most likely you'll end up replaying the same message over and over. I do advice saving your offset often so you can recover if things go south.

Below is code that I believe addresses most of your questions. I haven't tried compiling this. And sarama api has been changing lately, so the api may currently differ a bit.

func StartKafkaReader(wg *sync.WaitGroup, lastgoodoff int64, out chan&lt;- *Message) (error) {
wg.Add(1)
go func(){
defer wg.Done()
//to track the last known good offset we processed, which is 
// updated after each successfully processed event. 
saveprogress := func(off int64){
//Save the offset somewhere...a file... 
//Ive also used kafka to store progress 
//using a special topic as a WAL
}
defer saveprogress(lastgoodoffset)
client, err := sarama.NewClient(&quot;clientId&quot;, brokers, sarama.NewClientConfig())
if err != nil {
log.Error(err)
return
}
defer client.Close()
sarama.NewConsumerConfig()
consumerConfig.OffsetMethod = sarama.OffsetMethodManual
consumerConfig.OffsetValue = int64(lastgoodoff)
consumer, err := sarama.NewConsumer(client, topic, partition, &quot;consumerId&quot;, consumerConfig)
if err != nil {
log.Error(err)
return
}
defer consumer.Close()
for {
select {
case event := &lt;-consumer.Events():
if event.Err != nil {
log.Error(event.Err)
return
}
msgContent := &amp;Message{}
err = json.Unmarshal(message.Value, msgContent)
if err != nil {
log.Error(err)
continue //continue to skip this message or return to stop without updating the offset.
}
// Send the message on to be processed.
out &lt;- msgContent 
lastgoodoff = event.Offset
}
}
}()
}

huangapple
  • 本文由 发表于 2015年4月2日 02:24:30
  • 转载请务必保留本文链接:https://go.coder-hub.com/29398051.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定