聚合应用为什么要使用Base64编码消息?

huangapple go评论56阅读模式
英文:

Why aggregator app encode messages in base64?

问题

我正在尝试在 SCDF 2.9.6 和 Kafka 中使用 aggregator processor 2021.1.x

它会聚合消息,但我收到了一组 base64 编码的消息,而不是 JSON 消息。
类似于:

[
  0: "base64encodedString",
  1: "base64encodedString"
]

而不是:

[
  {id: 1, bar: "foo"},
  {id: 2, bar: "foo"}
]

我只设置了 Redis 存储属性,对于聚合、关联和释放使用默认设置。

拆分的消息都具有 contentType 标头设置为 "application/json"。

为什么会发生这种情况,我该如何修复?

编辑:
以下是一个示例:
DSL:test-aggregator = http | splitter | aggregator | log
使用以下属性部署:

version.http=3.2.1
version.splitter=3.2.1
version.aggregator=2021.1.x
version.log=3.2.1

app.http.server.port=8181
app.splitter.splitter.expression=#jsonPath(payload, 'store.book')
app.aggregator.spring.cloud.stream.kafka.default.consumer.standard-headers=both

然后,我将此 JSON 文件发布到 "http source":

{
  "store": {
    "book": [
      {
        "author": "Nigel Rees",
        "title": "Sayings of the Century"
      },
      {
        "author": "Evelyn Waugh",
        "title": "Sword of Honour"
      },
      {
        "author": "Herman Melville",
        "title": "Moby Dick"
      },
      {
        "author": "J. R. R. Tolkien",
        "title": "The Lord of the Rings"
      }
    ]
  }
}

为此,我使用 SCDF shell,如下所示:

http post --target http://<ip-http-source>:8181 --file data/test.json --contentType "application/json; charset=utf-8"

当我在分离器之后使用 Kowl 检查 Kafka 消息时,我获得了 4 本书的 JSON 消息,并且具有正确的 contentType 标头。但在聚合器之后,以下是日志汇总和 Kowl 中的结果:

[
  "eyJhdXRob3IiOiJOaWdlbCBSZWVzIiwidGl0bGUiOiJTYXlpbmdzIG9mIHRoZSBDZW50dXJ5In0=",
  "eyJhdXRob3IiOiJFdmVseW4gV2F1Z2giLCJ0aXRsZSI6IlN3b3JkIG9mIhvbm91ciJ9",
  "eyJhdXRob3IiOiJIZXJtYW4gTWVsdmlsbGUiLCJ0aXRsZSI6Ik1vYnkgRGljayJ9",
  "eyJhdXRob3IiOiJKLiBSLiBSLiBUb2xraWVuIiwidGl0bGUiOiJUaGUgTG9yZCBvZiB0aGUgUmluZ3MifQ=="
]
英文:

I'm trying to use the aggregator processor 2021.1.x with SCDF 2.9.6 and Kafka.

It aggregates the messages except I get a list of base64 encoded messages instead of json messages.
Something like :

[
  0: &quot;base64encodedString&quot;,
  1: &quot;base64encodedString&quot;
]

Instead of :

[
  {id: 1, bar: &quot;foo&quot;},
  {id: 2, bar: &quot;foo&quot;}
]

I only set redis store properties and let default settings for aggregation, correlation and release.

The splitted messages have all the contentType header set to "application/json".

Why is this happening and how can I fix it ?

EDIT :
Here is an example :
The DSL : test-aggregator = http | splitter | aggregator | log
Deployed with these properties :

version.http=3.2.1
version.splitter=3.2.1
version.aggregator=2021.1.x
version.log=3.2.1

app.http.server.port=8181
app.splitter.splitter.expression=#jsonPath(payload, &#39;$.store.book&#39;)
app.aggregator.spring.cloud.stream.kafka.default.consumer.standard-headers=both

Then I post this JSON file to the http source :

{ &quot;store&quot;: {
    &quot;book&quot;: [
        {
            &quot;author&quot;: &quot;Nigel Rees&quot;,
            &quot;title&quot;: &quot;Sayings of the Century&quot;
        },
        {
            &quot;author&quot;: &quot;Evelyn Waugh&quot;,
            &quot;title&quot;: &quot;Sword of Honour&quot;
        },
        {
            &quot;author&quot;: &quot;Herman Melville&quot;,
            &quot;title&quot;: &quot;Moby Dick&quot;
        },
        {
            &quot;author&quot;: &quot;J. R. R. Tolkien&quot;,
            &quot;title&quot;: &quot;The Lord of the Rings&quot;
        }
    ]
}}

To do that, I use the SCDF shell like this :

http post --target http://&lt;ip-http-source&gt;:8181 --file data/test.json --contentType &quot;application/json; charset=utf-8&quot;

When I check the Kafka messages using Kowl after the splitter, I have the 4 books messages as JSON with the good contentType header. But after the aggregator, here is the results in the log sink and also with Kowl :

[
  &quot;eyJhdXRob3IiOiJOaWdlbCBSZWVzIiwidGl0bGUiOiJTYXlpbmdzIG9mIHRoZSBDZW50dXJ5In0=&quot;,
  &quot;eyJhdXRob3IiOiJFdmVseW4gV2F1Z2giLCJ0aXRsZSI6IlN3b3JkIG9mIEhvbm91ciJ9&quot;,
  &quot;eyJhdXRob3IiOiJIZXJtYW4gTWVsdmlsbGUiLCJ0aXRsZSI6Ik1vYnkgRGljayJ9&quot;,
  &quot;eyJhdXRob3IiOiJKLiBSLiBSLiBUb2xraWVuIiwidGl0bGUiOiJUaGUgTG9yZCBvZiB0aGUgUmluZ3MifQ==&quot;
]

答案1

得分: 0

以下是翻译好的部分:

configure this property:

aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))]

这意味着将每个分组消息的byte[]负载反序列化为Map,这实际上是任何JSON的良好Java表示。

这样,聚合器将生成一个以JSON格式序列化的地图列表作为绑定生成者的答复。

我们正在考虑提供一些开箱即用的功能,可以将字节数组反序列化为地图以供聚合器输入使用,但是有疑虑输入是否始终为JSON。因此,我们需要想出一种强大而通用的解决方案。

英文:

So, here is an answer for the current aggregator-processor version:

configure this property:

aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))]

It means to deserialize a byte[] payload of each grouped message to the Map, which is essentially are good Java representation of any JSON.

This way an aggregator will produce a reply as a list of maps which in the end will be serialized as JSON on the binder producer.

We are thinking about providing something out-of-the-box as a deserialization from byte array to map on the aggregator input, but there is a doubt if an input is always a JSON. So, we need to come up with some robust, common solution yet.

huangapple
  • 本文由 发表于 2023年3月7日 19:29:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/75661401.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定