英文:
How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benthos?
问题
我正在使用Benthos从Kafka读取AVRO编码的消息,其中kafka_key
元数据字段也包含一个AVRO编码的有效负载。这些AVRO编码有效负载的模式存储在模式注册表中,Benthos有一个schema_registry_decode
处理器用于解码它们。我希望为每个Kafka消息生成一个输出JSON消息,其中包含两个字段,一个称为content
,包含解码的AVRO消息,另一个称为metadata
,包含Benthos收集的各种元数据字段,包括解码的kafka_key
有效负载。
英文:
I am using Benthos to read AVRO-encoded messages from Kafka which have the kafka_key
metadata field set to also contain an AVRO-encoded payload. The schemas of these AVRO-encoded payloads are stored in Schema Registry and Benthos has a schema_registry_decode
processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content
containing the decoded AVRO message and the other one called metadata
containing the various metadata fields collected by Benthos including the decoded kafka_key
payload.
答案1
得分: 2
原文翻译如下:
原来可以使用branch
处理器来实现这一点:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# 解码消息
- schema_registry_decode:
url: http://localhost:8081
# 填充输出内容字段
- bloblang: |
root.content = this
# 解码kafka_key元数据负载并填充输出元数据字段
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}
英文:
It turns out that one can achieve this using a branch
processor like so:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论