多态 JSON 编组与 Apache Camel

huangapple go评论97阅读模式
英文:

Polymorphic Json Marshalling with Apache Camel

问题

我正在重构一个Camel路由,希望能使其更加通用化。(如果需要任何可能的Bean注入解决方案,我也在使用Spring Boot)

  1. from(fromKafka)
  2. .routeId("Rest Models")
  3. .removeHeaders("*")
  4. .aggregate(new GroupedBodyAggregationStrategy())
  5. .constant(true)
  6. .completionTimeout(batchingInterval)
  7. .process(new ListOfJsonToJsonArray())
  8. .unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
  9. .enrich("seda:rest", mergeRestResult)

处理器 ListOfJsonToJsonArray() 接受Kafka消息的JSON字符串表示,并使用 {[ ]} 在外部用逗号分隔的方式将所有内容连接起来。

因此,InputArrayPojo.class 是一个包装器,用于从Kafka接收的对象数组。我需要将这些对象捆绑在一起,以便在增强处理中对REST接口进行小批量处理。所包含的对象格式为 InputPojo.class(实际上只是一个模式,但还执行了一些基本的数据质量检查)。

我需要找到一种方法来使 InputPojo.class 通用化,以便对于新的作业,我们可以运行相同的路由,但提供一个不同的 InputPojo.class

我尝试过应用多态性并为 InputPojo 创建一个接口,但在尝试构建接口时遇到了错误。

  1. @JsonSubTypes({
  2. @JsonSubTypes.Type(value = InputPojo.class, name = "online")
  3. })
  4. public interface InputPojoInterface {
  5. }

我还尝试过一些参数化,但是在这方面我没有成功,因为它不会应用bean的构造函数,因此方法也不存在。

我还包括了以下内容

  1. com.fasterxml.jackson.databind.exc.InvalidDefinitionException - 无法构造`InputPojoInterface`的实例没有构造函数如默认构造函数):抽象类型需要映射到具体类型具有自定义反序列化器或包含附加类型信息
  2. [Source: (ByteArrayInputStream); line: 1, column: 10]通过引用链InputArrayPojo["data"]->java.util.ArrayList[0]
  1. @JsonInclude(JsonInclude.Include.NON_NULL)
  2. @JsonPropertyOrder({
  3. "data"
  4. })
  5. public class InputArrayPojo {
  6. @JsonProperty("data")
  7. private List<InputPojo> data = null;
  8. @JsonIgnore
  9. private Map<String, Object> additionalProperties = new HashMap<String, Object>();
  10. @JsonProperty("data")
  11. public List<InputPojo> getData() {
  12. return data;
  13. }
  14. @JsonProperty("data")
  15. public void setData(List<InputPojo> data) {
  16. this.data = data;
  17. }
  18. @JsonAnyGetter
  19. public Map<String, Object> getAdditionalProperties() {
  20. return this.additionalProperties;
  21. }
  22. @JsonAnySetter
  23. public void setAdditionalProperty(String name, Object value) {
  24. this.additionalProperties.put(name, value);
  25. }
  26. }

增强处理还需要实现某种类型的通用逻辑:

  1. @Override
  2. public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  3. List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
  4. List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
  5. List<ModelResultPojo> outputList = new ArrayList<>();
  6. for (int i = 0; i < originalMessages.size(); ++i) {
  7. ModelResultPojo output = new ModelResultPojo();
  8. IngestionOutPojo raw = originalMessages.get(i);
  9. PredictionPojo enrich = enrichmentMessages.get(i);
  10. /*
  11. enrichment logic to create modelResult
  12. */
  13. outputList.add(modelResult);
  14. }
  15. newExchange.getIn().setBody(outputList);
  16. return newExchange;
  17. }
英文:

I'm refactoring a camel route to hopefully be a little more generic. (I'm also using spring boot, if that helps for any possible bean injection solutions)

  1. from(fromKafka)
  2. .routeId(&quot;Rest Models&quot;)
  3. .removeHeaders(&quot;*&quot;)
  4. .aggregate(new GroupedBodyAggregationStrategy())
  5. .constant(true)
  6. .completionTimeout(batchingInterval)
  7. .process(new ListOfJsonToJsonArray())
  8. .unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
  9. .enrich(&quot;seda:rest&quot;, mergeRestResult)

the processor ListOfJsonToJsonArray() takes the json string representation of the kafka message, and joins everything, comma separated, with a {[ ]} on the outside.

The InputArrayPojo.class is thus a wrapper for the array of objects that are coming in from kafka. I need to bundle the objects in order to mini-batch to the REST interface in the enrichment. The objects contained are of format InputPojo.class (effectively just a schema, but also performs some basic data quality checks)

I need a way to generify InputPojo.class such that for our new jobs, we can run the same route, but supply a different InputPojo.class.

I've tried to apply polymorphism and create an interface for InputPojo, however this runs into an error when trying to construct the interface.

  1. @JsonSubTypes({
  2. @JsonSubTypes.Type(value=InputPojo.class, name = &quot;online&quot;)
  3. })
  4. public interface InputPojoInterface {
  5. }

I also tried some parameterisation, but I had no luck there either because it would not apply the constructor of the bean, none of the methods then existed.

I've also included

  1. com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
  2. at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo[&quot;data&quot;]-&gt;java.util.ArrayList[0])]
  1. @JsonInclude(JsonInclude.Include.NON_NULL)
  2. @JsonPropertyOrder({
  3. &quot;data&quot;
  4. })
  5. public class InputArrayPojo{
  6. @JsonProperty(&quot;data&quot;)
  7. private List&lt;InputPojo&gt; data = null;
  8. @JsonIgnore
  9. private Map&lt;String, Object&gt; additionalProperties = new HashMap&lt;String, Object&gt;();
  10. @JsonProperty(&quot;data&quot;)
  11. public List&lt;InputPojo&gt; getData() {
  12. return data;
  13. }
  14. @JsonProperty(&quot;data&quot;)
  15. public void setData(List&lt;InputPojo&gt; data) {
  16. this.data = data;
  17. }
  18. @JsonAnyGetter
  19. public Map&lt;String, Object&gt; getAdditionalProperties() {
  20. return this.additionalProperties;
  21. }
  22. @JsonAnySetter
  23. public void setAdditionalProperty(String name, Object value) {
  24. this.additionalProperties.put(name, value);
  25. }
  26. }

The enrichment also needs to implement some type of generifying logic

  1. @Override
  2. public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
  3. List&lt;IngestionOutPojo&gt; originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
  4. List&lt;PredictionPojo&gt; enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
  5. List&lt;ModelResultPojo&gt; outputList = new ArrayList&lt;&gt;();
  6. for (int i = 0; i &lt; originalMessages.size(); ++i) {
  7. ModelResultPojo output = new ModelResultPojo();
  8. IngestionOutPojo raw = originalMessages.get(i);
  9. PredictionPojo enrich = enrichmentMessages.get(i);
  10. /*
  11. enrichment logic to create modelResult
  12. */
  13. outputList.add(modelResult)
  14. }
  15. newExchange.getIn().setBody(outputList);
  16. return newExchange
  17. }

答案1

得分: 1

我最终通过以下方式得出解决方案:

解组为默认类型:Map<String,Object>(未指定类时,它会解组为Map<String,Object>)

之后,我编写了一个实现处理器的抽象类。在这个处理器中,我使用该Map,并对该Map应用一个抽象的editFields()函数。

因此,我现在通过Map实现了业务逻辑的多态处理,而不是通过POJO实现。

英文:

I ended up coming up with a solution by doing the following:

unmarshalled to the default type: Map<String,Object> (without specifying a class, it camel unmarshalls to a Map<String,Object>)

After that I wrote an abstract class that implements a processor. In this processor I take the Map, and apply an abstract editFields() function to the Map.

thus I now have polymorphic handling of business logic through a Map instead of through a POJO.

huangapple
  • 本文由 发表于 2020年10月16日 17:58:05
  • 转载请务必保留本文链接:https://go.coder-hub.com/64386965.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定