英文:
Polymorphic Json Marshalling with Apache Camel
问题
我正在重构一个Camel路由,希望能使其更加通用化。(如果需要任何可能的Bean注入解决方案,我也在使用Spring Boot)
from(fromKafka)
.routeId("Rest Models")
.removeHeaders("*")
.aggregate(new GroupedBodyAggregationStrategy())
.constant(true)
.completionTimeout(batchingInterval)
.process(new ListOfJsonToJsonArray())
.unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
.enrich("seda:rest", mergeRestResult)
处理器 ListOfJsonToJsonArray()
接受Kafka消息的JSON字符串表示,并使用 {[ ]}
在外部用逗号分隔的方式将所有内容连接起来。
因此,InputArrayPojo.class
是一个包装器,用于从Kafka接收的对象数组。我需要将这些对象捆绑在一起,以便在增强处理中对REST接口进行小批量处理。所包含的对象格式为 InputPojo.class
(实际上只是一个模式,但还执行了一些基本的数据质量检查)。
我需要找到一种方法来使 InputPojo.class
通用化,以便对于新的作业,我们可以运行相同的路由,但提供一个不同的 InputPojo.class
。
我尝试过应用多态性并为 InputPojo
创建一个接口,但在尝试构建接口时遇到了错误。
@JsonSubTypes({
@JsonSubTypes.Type(value = InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}
我还尝试过一些参数化,但是在这方面我没有成功,因为它不会应用bean的构造函数,因此方法也不存在。
我还包括了以下内容
com.fasterxml.jackson.databind.exc.InvalidDefinitionException - 无法构造`InputPojoInterface`的实例(没有构造函数,如默认构造函数):抽象类型需要映射到具体类型,具有自定义反序列化器,或包含附加类型信息
在 [Source: (ByteArrayInputStream); line: 1, column: 10](通过引用链:InputArrayPojo["data"]->java.util.ArrayList[0])中
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"data"
})
public class InputArrayPojo {
@JsonProperty("data")
private List<InputPojo> data = null;
@JsonIgnore
private Map<String, Object> additionalProperties = new HashMap<String, Object>();
@JsonProperty("data")
public List<InputPojo> getData() {
return data;
}
@JsonProperty("data")
public void setData(List<InputPojo> data) {
this.data = data;
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
}
增强处理还需要实现某种类型的通用逻辑:
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
List<ModelResultPojo> outputList = new ArrayList<>();
for (int i = 0; i < originalMessages.size(); ++i) {
ModelResultPojo output = new ModelResultPojo();
IngestionOutPojo raw = originalMessages.get(i);
PredictionPojo enrich = enrichmentMessages.get(i);
/*
enrichment logic to create modelResult
*/
outputList.add(modelResult);
}
newExchange.getIn().setBody(outputList);
return newExchange;
}
英文:
I'm refactoring a camel route to hopefully be a little more generic. (I'm also using spring boot, if that helps for any possible bean injection solutions)
from(fromKafka)
.routeId("Rest Models")
.removeHeaders("*")
.aggregate(new GroupedBodyAggregationStrategy())
.constant(true)
.completionTimeout(batchingInterval)
.process(new ListOfJsonToJsonArray())
.unmarshal().json(JsonLibrary.Jackson, InputArrayPojo.class)
.enrich("seda:rest", mergeRestResult)
the processor ListOfJsonToJsonArray()
takes the json string representation of the kafka message, and joins everything, comma separated, with a {[ ]}
on the outside.
The InputArrayPojo.class is thus a wrapper for the array of objects that are coming in from kafka. I need to bundle the objects in order to mini-batch to the REST interface in the enrichment. The objects contained are of format InputPojo.class (effectively just a schema, but also performs some basic data quality checks)
I need a way to generify InputPojo.class such that for our new jobs, we can run the same route, but supply a different InputPojo.class.
I've tried to apply polymorphism and create an interface for InputPojo, however this runs into an error when trying to construct the interface.
@JsonSubTypes({
@JsonSubTypes.Type(value=InputPojo.class, name = "online")
})
public interface InputPojoInterface {
}
I also tried some parameterisation, but I had no luck there either because it would not apply the constructor of the bean, none of the methods then existed.
I've also included
com.fasterxml.jackson.databind.exc.InvalidDefinitionException - Cannot construct instance of `InputPojoInterface` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (ByteArrayInputStream); line: 1, column: 10] (through reference chain: InputArrayPojo["data"]->java.util.ArrayList[0])]
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"data"
})
public class InputArrayPojo{
@JsonProperty("data")
private List<InputPojo> data = null;
@JsonIgnore
private Map<String, Object> additionalProperties = new HashMap<String, Object>();
@JsonProperty("data")
public List<InputPojo> getData() {
return data;
}
@JsonProperty("data")
public void setData(List<InputPojo> data) {
this.data = data;
}
@JsonAnyGetter
public Map<String, Object> getAdditionalProperties() {
return this.additionalProperties;
}
@JsonAnySetter
public void setAdditionalProperty(String name, Object value) {
this.additionalProperties.put(name, value);
}
}
The enrichment also needs to implement some type of generifying logic
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
List<IngestionOutPojo> originalMessages = oldExchange.getIn().getBody(IngestionOutArrayPojo.class).getData();
List<PredictionPojo> enrichmentMessages = newExchange.getIn().getBody(PredictionArrayPojo.class).getData();
List<ModelResultPojo> outputList = new ArrayList<>();
for (int i = 0; i < originalMessages.size(); ++i) {
ModelResultPojo output = new ModelResultPojo();
IngestionOutPojo raw = originalMessages.get(i);
PredictionPojo enrich = enrichmentMessages.get(i);
/*
enrichment logic to create modelResult
*/
outputList.add(modelResult)
}
newExchange.getIn().setBody(outputList);
return newExchange
}
答案1
得分: 1
我最终通过以下方式得出解决方案:
解组为默认类型:Map<String,Object>(未指定类时,它会解组为Map<String,Object>)
之后,我编写了一个实现处理器的抽象类。在这个处理器中,我使用该Map,并对该Map应用一个抽象的editFields()函数。
因此,我现在通过Map实现了业务逻辑的多态处理,而不是通过POJO实现。
英文:
I ended up coming up with a solution by doing the following:
unmarshalled to the default type: Map<String,Object> (without specifying a class, it camel unmarshalls to a Map<String,Object>)
After that I wrote an abstract class that implements a processor. In this processor I take the Map, and apply an abstract editFields() function to the Map.
thus I now have polymorphic handling of business logic through a Map instead of through a POJO.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论