英文:
Why aggregator app encode messages in base64?
问题
我正在尝试在 SCDF 2.9.6
和 Kafka 中使用 aggregator processor 2021.1.x
。
它会聚合消息,但我收到了一组 base64 编码的消息,而不是 JSON 消息。
类似于:
[
0: "base64encodedString",
1: "base64encodedString"
]
而不是:
[
{id: 1, bar: "foo"},
{id: 2, bar: "foo"}
]
我只设置了 Redis 存储属性,对于聚合、关联和释放使用默认设置。
拆分的消息都具有 contentType
标头设置为 "application/json"。
为什么会发生这种情况,我该如何修复?
编辑:
以下是一个示例:
DSL:test-aggregator = http | splitter | aggregator | log
使用以下属性部署:
version.http=3.2.1
version.splitter=3.2.1
version.aggregator=2021.1.x
version.log=3.2.1
app.http.server.port=8181
app.splitter.splitter.expression=#jsonPath(payload, 'store.book')
app.aggregator.spring.cloud.stream.kafka.default.consumer.standard-headers=both
然后,我将此 JSON 文件发布到 "http source":
{
"store": {
"book": [
{
"author": "Nigel Rees",
"title": "Sayings of the Century"
},
{
"author": "Evelyn Waugh",
"title": "Sword of Honour"
},
{
"author": "Herman Melville",
"title": "Moby Dick"
},
{
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings"
}
]
}
}
为此,我使用 SCDF shell,如下所示:
http post --target http://<ip-http-source>:8181 --file data/test.json --contentType "application/json; charset=utf-8"
当我在分离器之后使用 Kowl 检查 Kafka 消息时,我获得了 4 本书的 JSON 消息,并且具有正确的 contentType
标头。但在聚合器之后,以下是日志汇总和 Kowl 中的结果:
[
"eyJhdXRob3IiOiJOaWdlbCBSZWVzIiwidGl0bGUiOiJTYXlpbmdzIG9mIHRoZSBDZW50dXJ5In0=",
"eyJhdXRob3IiOiJFdmVseW4gV2F1Z2giLCJ0aXRsZSI6IlN3b3JkIG9mIhvbm91ciJ9",
"eyJhdXRob3IiOiJIZXJtYW4gTWVsdmlsbGUiLCJ0aXRsZSI6Ik1vYnkgRGljayJ9",
"eyJhdXRob3IiOiJKLiBSLiBSLiBUb2xraWVuIiwidGl0bGUiOiJUaGUgTG9yZCBvZiB0aGUgUmluZ3MifQ=="
]
英文:
I'm trying to use the aggregator processor 2021.1.x
with SCDF 2.9.6
and Kafka.
It aggregates the messages except I get a list of base64 encoded messages instead of json messages.
Something like :
[
0: "base64encodedString",
1: "base64encodedString"
]
Instead of :
[
{id: 1, bar: "foo"},
{id: 2, bar: "foo"}
]
I only set redis store properties and let default settings for aggregation, correlation and release.
The splitted messages have all the contentType header set to "application/json".
Why is this happening and how can I fix it ?
EDIT :
Here is an example :
The DSL : test-aggregator = http | splitter | aggregator | log
Deployed with these properties :
version.http=3.2.1
version.splitter=3.2.1
version.aggregator=2021.1.x
version.log=3.2.1
app.http.server.port=8181
app.splitter.splitter.expression=#jsonPath(payload, '$.store.book')
app.aggregator.spring.cloud.stream.kafka.default.consumer.standard-headers=both
Then I post this JSON file to the http source
:
{ "store": {
"book": [
{
"author": "Nigel Rees",
"title": "Sayings of the Century"
},
{
"author": "Evelyn Waugh",
"title": "Sword of Honour"
},
{
"author": "Herman Melville",
"title": "Moby Dick"
},
{
"author": "J. R. R. Tolkien",
"title": "The Lord of the Rings"
}
]
}}
To do that, I use the SCDF shell like this :
http post --target http://<ip-http-source>:8181 --file data/test.json --contentType "application/json; charset=utf-8"
When I check the Kafka messages using Kowl after the splitter, I have the 4 books messages as JSON with the good contentType
header. But after the aggregator, here is the results in the log sink and also with Kowl :
[
"eyJhdXRob3IiOiJOaWdlbCBSZWVzIiwidGl0bGUiOiJTYXlpbmdzIG9mIHRoZSBDZW50dXJ5In0=",
"eyJhdXRob3IiOiJFdmVseW4gV2F1Z2giLCJ0aXRsZSI6IlN3b3JkIG9mIEhvbm91ciJ9",
"eyJhdXRob3IiOiJIZXJtYW4gTWVsdmlsbGUiLCJ0aXRsZSI6Ik1vYnkgRGljayJ9",
"eyJhdXRob3IiOiJKLiBSLiBSLiBUb2xraWVuIiwidGl0bGUiOiJUaGUgTG9yZCBvZiB0aGUgUmluZ3MifQ=="
]
答案1
得分: 0
以下是翻译好的部分:
configure this property:
aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))]
这意味着将每个分组消息的byte[]
负载反序列化为Map
,这实际上是任何JSON的良好Java表示。
这样,聚合器将生成一个以JSON格式序列化的地图列表作为绑定生成者的答复。
我们正在考虑提供一些开箱即用的功能,可以将字节数组反序列化为地图以供聚合器输入使用,但是有疑虑输入是否始终为JSON。因此,我们需要想出一种强大而通用的解决方案。
英文:
So, here is an answer for the current aggregator-processor
version:
configure this property:
aggregation=#this.![@jacksonObjectMapper.readValue(payload, T(java.util.Map))]
It means to deserialize a byte[]
payload of each grouped message to the Map
, which is essentially are good Java representation of any JSON.
This way an aggregator will produce a reply as a list of maps which in the end will be serialized as JSON on the binder producer.
We are thinking about providing something out-of-the-box as a deserialization from byte array to map on the aggregator input, but there is a doubt if an input is always a JSON. So, we need to come up with some robust, common solution yet.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论