如何使用Benthos从Kafka读取和解码AVRO消息以及它们的关联Kafka键?

huangapple go评论158阅读模式
英文:

How can I read and decode AVRO messages from Kafka along with their associated kafka key using Benthos?

问题

我正在使用Benthos从Kafka读取AVRO编码的消息,其中kafka_key元数据字段也包含一个AVRO编码的有效负载。这些AVRO编码有效负载的模式存储在模式注册表中,Benthos有一个schema_registry_decode处理器用于解码它们。我希望为每个Kafka消息生成一个输出JSON消息,其中包含两个字段,一个称为content,包含解码的AVRO消息,另一个称为metadata,包含Benthos收集的各种元数据字段,包括解码的kafka_key有效负载。

英文:

I am using Benthos to read AVRO-encoded messages from Kafka which have the kafka_key metadata field set to also contain an AVRO-encoded payload. The schemas of these AVRO-encoded payloads are stored in Schema Registry and Benthos has a schema_registry_decode processor for decoding them. I'm looking to produce an output JSON message for each Kafka message containing two fields, one called content containing the decoded AVRO message and the other one called metadata containing the various metadata fields collected by Benthos including the decoded kafka_key payload.

答案1

得分: 2

原文翻译如下:

原来可以使用branch处理器来实现这一点:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # 解码消息
    - schema_registry_decode:
        url: http://localhost:8081

    # 填充输出内容字段
    - bloblang: |
                root.content = this

    # 解码kafka_key元数据负载并填充输出元数据字段
    - branch:
        request_map: |
                    root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this          

output:
  stdout: {}
英文:

It turns out that one can achieve this using a branch processor like so:

input:
  kafka:
    addresses:
      - localhost:9092
    consumer_group: benthos_consumer_group
    topics:
      - benthos_input

pipeline:
  processors:
    # Decode the message
    - schema_registry_decode:
        url: http://localhost:8081

    # Populate output content field
    - bloblang: |
                root.content = this

    # Decode kafka_key metadata payload and populate output metadata field
    - branch:
        request_map: |
                    root = meta("kafka_key")

        processors:
          - schema_registry_decode:
              url: http://localhost:8081

        result_map: |
          root.metadata = meta()
          root.metadata.kafka_key = this          

output:
  stdout: {}

huangapple
  • 本文由 发表于 2022年2月12日 08:12:49
  • 转载请务必保留本文链接:https://go.coder-hub.com/71087902.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定