英文:
Consume Kafka Avro messages in go
问题
我正在尝试在Go中消费Avro格式的Kafka消息,但无法将Avro消息解码为JSON。
我正在使用Confluent平台(3.0.1)。例如,我使用以下命令生成Avro消息:
kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}
现在,我使用Go的Kafka库sarama来消费消息。纯文本消息可以正常工作,但Avro消息需要解码。我找到了不同的库:github.com/linkedin/goavro,github.com/elodina/go-avro
但是,在解码后,我得到的JSON没有值(两个库都是如此):
{"f1":""}
goavro:
avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))
go-avro:
schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())
msg = sarama.ConsumerMessage
英文:
I'm trying to consume Kafka messages in avro format but I'm not able to decode the messages from avro to json in Go.
I'm using the Confluent platform (3.0.1). For example I produce avro messages like:
kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'
{"f1":"message1"}
{"f1":"message2"}
Now I consume messages with the go Kafka libary: sarama. Plain text message are working fine. Avro message have to be decoded. I found different libs: github.com/linkedin/goavro, github.com/elodina/go-avro
But after decoding I get a json without values (both libs):
{"f1":""}
goavro:
avroSchema := `
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
`
codec, err := goavro.NewCodec(avroSchema)
if err != nil {
log.Fatal(err)
}
bb := bytes.NewBuffer(msg.Value)
decoded, err := codec.Decode(bb)
log.Println(fmt.Sprintf("%s", decoded))
go-avro:
schema := avro.MustParseSchema(avroSchema)
reader := avro.NewGenericDatumReader()
reader.SetSchema(schema)
decoder := avro.NewBinaryDecoder(msg.Value)
decodedRecord := avro.NewGenericRecord(schema)
log.Println(decodedRecord.String())
msg = sarama.ConsumerMessage
答案1
得分: 17
第一个字节是魔术字节(0)。接下来的4个字节是Avro模式ID。只有在使用Confluent模式注册表时才真正有用。
英文:
The first byte is a magic byte (0). The following 4 bytes are the avro schema ID
Which is only really useful if you use the Confluent schema registry.
答案2
得分: 11
刚刚发现(通过比较二进制的 Avro 消息),我需要移除消息字节数组的前 5 个元素 - 现在一切都正常了
message = msg.Value[5:]
也许有人可以解释为什么要这样做。
英文:
Just found out (by comparing binary avro messages) that I had to remove the first 5 elements of the message byte array - now everything works
message = msg.Value[5:]
Maybe someone can explain why
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论