在Go中消费Kafka Avro消息

huangapple go评论77阅读模式
英文:

Consume Kafka Avro messages in go

问题

我正在尝试在Go中消费Avro格式的Kafka消息,但无法将Avro消息解码为JSON。

我正在使用Confluent平台(3.0.1)。例如,我使用以下命令生成Avro消息:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}

现在,我使用Go的Kafka库sarama来消费消息。纯文本消息可以正常工作,但Avro消息需要解码。我找到了不同的库:github.com/linkedin/goavro,github.com/elodina/go-avro

但是,在解码后,我得到的JSON没有值(两个库都是如此):

{"f1":""}

goavro:

avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
    log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))

go-avro:

schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())

msg = sarama.ConsumerMessage

英文:

I'm trying to consume Kafka messages in avro format but I'm not able to decode the messages from avro to json in Go.

I'm using the Confluent platform (3.0.1). For example I produce avro messages like:

kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}

Now I consume messages with the go Kafka libary: sarama. Plain text message are working fine. Avro message have to be decoded. I found different libs: github.com/linkedin/goavro, github.com/elodina/go-avro

But after decoding I get a json without values (both libs):

{"f1":""}

goavro:

avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
	log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))

go-avro:

schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())

msg = sarama.ConsumerMessage

答案1

得分: 17

第一个字节是魔术字节(0)。接下来的4个字节是Avro模式ID。只有在使用Confluent模式注册表时才真正有用。

英文:

The first byte is a magic byte (0). The following 4 bytes are the avro schema ID

Which is only really useful if you use the Confluent schema registry.

答案2

得分: 11

刚刚发现(通过比较二进制的 Avro 消息),我需要移除消息字节数组的前 5 个元素 - 现在一切都正常了 在Go中消费Kafka Avro消息

message = msg.Value[5:]

也许有人可以解释为什么要这样做。

英文:

Just found out (by comparing binary avro messages) that I had to remove the first 5 elements of the message byte array - now everything works 在Go中消费Kafka Avro消息

message = msg.Value[5:]

Maybe someone can explain why

huangapple
  • 本文由 发表于 2016年11月11日 21:20:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/40548909.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定