将InputStream转换为对象,然后将对象转换为Json。

huangapple go评论53阅读模式
英文:

Transform InputStream into Object, then Object into Json

问题

I am reading in an InputStream from an Sftp.inboundStreamingAdapter that stream is passed to a message channel that this flow reads from and tries to convert the stream (which is a .csv) into an object, and then for the secondary transformation it should convert that object into json. The problem is the header file (I think). The csv looks like this the screenshot below. My object uses the headers to map to my object, but since I am splitting the file up by each line in the file I believe that causes the error. I'm assuming this isn't the correct approach to this problem, any help would be much appreciated.

[![Sample Csv][1]][1]

@Bean
public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
                                       QueueChannel kafkaPojoMessageChannel) {

    return IntegrationFlow.from(inboundFilesMessageChannel)
                          .split(Files.splitter())
                          .transform(new StreamToMyObject) // TODO: Turn InputStream to MyObject Object
                          .transform(new ObjectToJsonTransformer())
                          .log(LoggingHandler.Level.DEBUG,
                               "AcousticEngageDataSftpToKafkaIntegrationFlow",
                               m -> "Payload: " + m.getPayload())
                          .channel(kafkaPojoMessageChannel)
                          .get();
}

I have tried creating my own custom InputStreamToObject Transformers, but I have came up short, my latest attempt I used an ObjectInputStream and have tried to use the ois.readFrom(inputStream) method to convert it into my ObjectClass using something like this, and adding the StreamToObject class in the first transformer.

public class StreamToMyObjectConverter implements GenericTransformer<InputStream, MyObject> {
    @Override
    public MyObject transform(InputStream inputStream) {
        try (ObjectInputStream ois = new ObjectInputStream(inputStream)){
            return (MyObject) ois.readObject();
        }
        catch (IOException | ClassNotFoundException e) {
            throw new RuntimeException(e);
        }

    }
}

Object: Which is generated by jsonschema2pojo.

I think the issue is I can't map to the object from the stream because of the headers. I'm not sure if this is the best approach. Any suggestions would be much appreciated.

@JsonInclude(Include.NON_NULL)
@JsonPropertyOrder({"email", "RECIPIENT_ID", "ENCODED_RECIPIENT_ID", "contactId", "code", "messageId", "userAgent", "messageName", "mailingTemplateId", "subjectLine", "docType", "reportId", "sendType", "bounceType", "urlDescription", "clickUrl", "optOutDetails", "messageGroupId", "programId", "timestamp", "originatedFrom", "eventId", "externalSystemName", "externalSystemReferenceId", "trackingCode"})
public class MyObject {
    @JsonProperty("email")
    private String email;
    @JsonProperty("RECIPIENT_ID")
    private String recipientId;
    @JsonProperty("ENCODED_RECIPIENT_ID")
    private String encodedRecipientId;
    @JsonProperty("contactId")
    private String contactId;
    @JsonProperty("code")
    private String code;
    @JsonProperty("messageId")
    private String messageId;
    @JsonProperty("userAgent")
    private String userAgent;
    @JsonProperty("messageName")
    private String messageName;
    @JsonProperty("mailingTemplateId")
    private String mailingTemplateId;
    @JsonProperty("subjectLine")
    private String subjectLine;
    @JsonProperty("docType")
    private String docType;
    @JsonProperty("reportId")
    private String reportId;
    @JsonProperty("sendType")
    private String sendType;
    @JsonProperty("bounceType")
    private String bounceType;
    @JsonProperty("urlDescription")
    private String urlDescription;
    @JsonProperty("clickUrl")
    private String clickUrl;
    @JsonProperty("optOutDetails")
    private String optOutDetails;
    @JsonProperty("messageGroupId")
    private String messageGroupId;
    @JsonProperty("programId")
    private String programId;
    @JsonProperty("timestamp")
    private String timestamp;
    @JsonProperty("originatedFrom")
    private String originatedFrom;
    @JsonProperty("eventId")
    private String eventId;
    @JsonProperty("externalSystemName")
    private String externalSystemName;
    @JsonProperty("externalSystemReferenceId")
    private String externalSystemReferenceId;
    @JsonProperty("trackingCode")
    private String trackingCode;
}

StackTrace:

Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@5f2f9a4d], failedMessage=GenericMessage [payload=FileMarker [filePath=/downloaddummy_acoustic.csv, mark=START], headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"dummy_acoustic.csv","link":false,"modified":1686691008000,"permissions":"rw-r-----","remoteDirectory":"/download","size":1250}, file_remoteDirectory=/download, id=ca7bd14f-6967-0040-4f56-e63c715ae1c5, file_marker=START, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@4e9ca373, file_remoteFile=dummy_acoustic.csv, timestamp=1686758001364}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:117)
    at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:115)
    at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
    ... 42 more
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method transform(org.springframework.integration.file.splitter.FileSplitter$FileMarker) cannot be found on type com.thrivent.enterprisemarketingchannelactivation.engage.converter.StreamToEmailInteractionConverter
    at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:225)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:135)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:380)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:93)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:119)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:376)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:154)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:611)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:604)
    at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:590)


<details>
<summary>英文:</summary>

I am reading in an InputStream from an `Sftp.inboundStreamingAdapter` that stream is passed to a message channel that this flow reads from and tries to convert the stream (which is a .csv) into an object, and then for the secondary transformation it should convert that object into json. The problem is the header file (I think). The csv looks like this the screenshot below. My object uses the headers to map to my object, but since I am splitting the file up by each line in the file I believe that causes the error. I&#39;m assuming this isn&#39;t the correct approach to this problem, any help would be much appreciated.


[![Sample Csv][1]][1]

    @Bean
    public IntegrationFlow readCsvFileFlow(MessageChannel inboundFilesMessageChannel,
                                           QueueChannel kafkaPojoMessageChannel) {

        return IntegrationFlow.from(inboundFilesMessageChannel)
                              .split(Files.splitter())
                              .transform(new StreamToMyObject) // TODO: Turn InputStream to MyObject Object
                              .transform(new ObjectToJsonTransformer())
                              .log(LoggingHandler.Level.DEBUG,
                                   &quot;AcousticEngageDataSftpToKafkaIntegrationFlow&quot;,
                                   m -&gt; &quot;Payload: &quot; + m.getPayload())
                              .channel(kafkaPojoMessageChannel)
                              .get();
    }

I have tried creating my own custom InputStreamToObject Transformers, but I have came up short, my latest attempt I used an `ObjectInputStream` and have tried to use the ois.readFrom(inputStream) method to convert it into my ObjectClass using something like this, and adding the StreamToObject class in the first transformer. 

public class StreamToMyObjectConverter implements GenericTransformer<InputStream, MyObject> {
@Override
public MyObject transform(InputStream inputStream) {
try (ObjectInputStream ois = new ObjectInputStream(inputStream)){
return (MyObject) ois.readObject();
}
catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}

}
Object: Which is generated by jsonschema2pojo.
I think the issue is I can&#39;t map to the object from the stream because of the headers. I&#39;m not sure if this is the best approach. Any suggestions would be much appreciated.

@JsonInclude(Include.NON_NULL)
@JsonPropertyOrder({"email", "RECIPIENT_ID", "ENCODED_RECIPIENT_ID", "contactId", "code", "messageId", "userAgent", "messageName", "mailingTemplateId", "subjectLine", "docType", "reportId", "sendType", "bounceType", "urlDescription", "clickUrl", "optOutDetails", "messageGroupId", "programId", "timestamp", "originatedFrom", "eventId", "externalSystemName", "externalSystemReferenceId", "trackingCode"})
public class MyObject {
@JsonProperty("email")
private String email;
@JsonProperty("RECIPIENT_ID")
private String recipientId;
@JsonProperty("ENCODED_RECIPIENT_ID")
private String encodedRecipientId;
@JsonProperty("contactId")
private String contactId;
@JsonProperty("code")
private String code;
@JsonProperty("messageId")
private String messageId;
@JsonProperty("userAgent")
private String userAgent;
@JsonProperty("messageName")
private String messageName;
@JsonProperty("mailingTemplateId")
private String mailingTemplateId;
@JsonProperty("subjectLine")
private String subjectLine;
@JsonProperty("docType")
private String docType;
@JsonProperty("reportId")
private String reportId;
@JsonProperty("sendType")
private String sendType;
@JsonProperty("bounceType")
private String bounceType;
@JsonProperty("urlDescription")
private String urlDescription;
@JsonProperty("clickUrl")
private String clickUrl;
@JsonProperty("optOutDetails")
private String optOutDetails;
@JsonProperty("messageGroupId")
private String messageGroupId;
@JsonProperty("programId")
private String programId;
@JsonProperty("timestamp")
private String timestamp;
@JsonProperty("originatedFrom")
private String originatedFrom;
@JsonProperty("eventId")
private String eventId;
@JsonProperty("externalSystemName")
private String externalSystemName;
@JsonProperty("externalSystemReferenceId")
private String externalSystemReferenceId;
@JsonProperty("trackingCode")
private String trackingCode;
}

StackTrace:

Caused by: org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@5f2f9a4d], failedMessage=GenericMessage [payload=FileMarker [filePath=/downloaddummy_acoustic.csv, mark=START], headers={file_remoteHostPort=transfer-campaign-us-2.goacoustic.com:22, file_remoteFileInfo={"directory":false,"filename":"dummy_acoustic.csv","link":false,"modified":1686691008000,"permissions":"rw-r-----","remoteDirectory":"/download","size":1250}, file_remoteDirectory=/download, id=ca7bd14f-6967-0040-4f56-e63c715ae1c5, file_marker=START, closeableResource=org.springframework.integration.file.remote.session.CachingSessionFactory$CachedSession@4e9ca373, file_remoteFile=dummy_acoustic.csv, timestamp=1686758001364}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:117)
at org.springframework.integration.transformer.AbstractMessageProcessingTransformer.transform(AbstractMessageProcessingTransformer.java:115)
at org.springframework.integration.transformer.MessageTransformingHandler.handleRequestMessage(MessageTransformingHandler.java:119)
... 42 more
Caused by: org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method transform(org.springframework.integration.file.splitter.FileSplitter$FileMarker) cannot be found on type com.thrivent.enterprisemarketingchannelactivation.engage.converter.StreamToEmailInteractionConverter
at org.springframework.expression.spel.ast.MethodReference.findAccessorForMethod(MethodReference.java:225)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:135)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:380)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:93)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:119)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:376)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:169)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:154)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:611)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:604)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:590)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:561)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:476)
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:354)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:114)
... 44 more


[1]: https://i.stack.imgur.com/r3peq.png
</details>
# 答案1
**得分**: 1
看起来你在`Files.splitter()`上缺少了这个选项:
```java
/**
* 指定要作为消息中的标头在剩余行中发送的第一行的标头名称。
* @param firstLineHeaderName 要发送第一行的标头名称。
* @return FileSplitterSpec
*/
public FileSplitterSpec firstLineAsHeader(String firstLineHeaderName) {

如果你不使用FileSplittermarkers = true选项,就不会出现org.springframework.integration.file.splitter.FileSplitter$FileMarker错误。

更多信息请参考文档:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter

英文:

It looks like you are missing this option on that Files.splitter():

/**
* Specify the header name for the first line to be carried as a header in the
* messages emitted for the remaining lines.
* @param firstLineHeaderName the header name to carry first line.
* @return the FileSplitterSpec
*/
public FileSplitterSpec firstLineAsHeader(String firstLineHeaderName) {

The org.springframework.integration.file.splitter.FileSplitter$FileMarker error is not possible if you don't use markers = true for that FileSplitter.

See more in docs: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter

huangapple
  • 本文由 发表于 2023年6月14日 23:54:28
  • 转载请务必保留本文链接:https://go.coder-hub.com/76475435.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定