英文:
Problem rebounding a failed message in a splitter-aggregator flow
问题
我有一个包含分离器的SI流程,它创建了blockMessages,并且在流程下游有一个聚合器,用于重新将它们合并。在这之间,对由分离器创建的每个blockMessage进行了一些处理,包括下载和保存文件,然后发送远程通知。不幸的是,这个流程对一些blockMessages失败了,所以我考虑创建一个备用错误流程,目前只将错误保存到数据库中,然后应该返回控制权给聚合器以完成最后的工作。
这是否是正确的方法(返回到主流程),以及如何欺骗聚合器将通过errorFlow路由的消息解释为正常消息(因为目前的问题是,当我从errorFlow返回后,来自error flow的消息与正常流程后续的消息不同,聚合器无法正确处理它们,也不知道如何完成)?还有没有其他处理这些失败消息的方法?
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(inbound)
.split(notificationSplitter)
.channel(inChannel)
.get();
}
@Bean
@DependsOn("downloadFlowError") // download error
public IntegrationFlow downloadFlow() {
return IntegrationFlows
.from(inChannel)
.enrichHeaders(s -> s.headerExpressions(c -> c.put("block", "payload")))
.handle(saveToSmb)
.channel(notifyChannel)
.get();
}
@Bean
public IntegrationFlow notifyFlow() {
return IntegrationFlows
.from(notifyChannel)
.handle(blockHandler)
.headerFilter("contentType")
.enrichHeaders(Collections.singletonMap("contentType", APPLICATION_JSON_VALUE))
.handle(outboundHttpNotifier)
.channel(resultsChannel)
.get();
}
@Bean
public IntegrationFlow resultsFlow() {
return IntegrationFlows
.from(resultsChannel)
.aggregate(aggregator)
.get();
}
@Bean
public IntegrationFlow downloadFlowError() {
return IntegrationFlows
.from(errorChannel)
.handle(errorHandler)
.channel(resultsChannel)
.get();
}
异常是在saveToSmb
组件中抛出的。
聚合器应该像这样,但现在还需要一个ReleaseStrategy:
@Component
public class BlockAggregator implements CorrelationStrategy, MessageGroupProcessor {
@Override
public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().get("correlationId");
}
@Override
@Transactional
public String processMessageGroup(MessageGroup group) {
Message message = group.getMessages().iterator().next();
BlockMessage m = Objects.requireNonNull(message.getHeaders().get(BLOCK_MESSAGE_KEY, BlockMessage.class));
// ...
log.info("finished processing event");
return "OK";
}
}
还添加了一个ErrorHandler,它的作用类似,但是由于常规流程较长且执行更多处理,所以我认为错误也应该进行调整:
@Component
public class ErrorHandler implements GenericHandler<MessageHandlingException> {
@Override
public Object handle(MessageHandlingException payload, MessageHeaders headers) {
BlockMessage blockMessage = (BlockMessage) payload.getFailedMessage().getPayload();
// ...
return payload.getFailedMessage();
}
}
请注意,上述代码片段只是你提供的一部分,并不包括完整的配置和逻辑。如果需要进一步的帮助或解释,请提供更多上下文或具体问题。
英文:
I have a SI flow with a splitter which creates blockMessages and an aggregator down the flow which rejoins them. In between there is some processing for every blockMessage which is created by the splitter which involves downloading & saving a file & then sending a remote notification. Unfortunately this flow fails for some blockMessages so I thought I create an fallback error flow which for the moment only saves the error into the database and should return control to the aggregator to finish the job at the end.
Is this a correct approach (to return to the main flow) and how to trick Aggregator to interpret messages routed through the errorFlow as normal messages (cause for the moment the problem is that after I return from the errorFlow, the messages from the error flow differ from the messages which follow the normal flow and the aggregator doesnt handle them properly and doesnt know to finish) ? Or can you recommend another approach for dealing with these failing messages?
@Bean
public IntegrationFlow inFlow() {
return IntegrationFlows
.from(inbound)
.split(notificationSplitter)
.channel(inChannel)
.get();
}
@Bean
@DependsOn("downloadFlowError") //download error
public IntegrationFlow downloadFlow() {
return IntegrationFlows
.from(inChannel)
.enrichHeaders(s -> s.headerExpressions(c -> c.put("block", "payload")))
.handle(saveToSmb)
.channel(notifyChannel)
.get();
}
@Bean
public IntegrationFlow notifyFlow() {
return IntegrationFlows
.from(notifyChannel)
.handle(blockHandler)
.headerFilter("contentType")
.enrichHeaders(Collections.singletonMap("contentType", APPLICATION_JSON_VALUE))
.handle(outboundHttpNotifier)
.channel(resultsChannel)
.get();
}
@Bean
public IntegrationFlow resultsFlow() {
return IntegrationFlows
.from(resultsChannel)
.aggregate(aggregator)
.get();
}
@Bean
public IntegrationFlow downloadFlowError() {
return IntegrationFlows
.from(errorChannel)
.handle(errorHandler)
.channel(resultsChannel)
.get();
}
The exception is thrown in saveToSmb component.
Aggregator is like this but now needs also a ReleaseStrategy:
@Component
public class BlockAggregator implements CorrelationStrategy, MessageGroupProcessor {
@Override
public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().get("correlationId");
}
@Override
@Transactional
public String processMessageGroup(MessageGroup group) {
Message message = group.getMessages().iterator().next();
BlockMessage m = Objects.requireNonNull(message.getHeaders().get(BLOCK_MESSAGE_KEY, BlockMessage.class));
...
log.info ("finished processing event");
return "OK";
}
PS added also ErrorHandler which is like this, but its return sends a different type of message to the aggregator because the regular flow is longer and does more processing, so I guess I should adapt errors also:
@Component
public class ErrorHandler implements GenericHandler<MessageHandlingException> {
@Override
public Object handle(MessageHandlingException payload, MessageHeaders headers) {
BlockMessage blockMessage = (BlockMessage)payload.getFailedMessage().getPayload();
....
return payload.getFailedMessage();
}
答案1
得分: 1
你关于将错误处理结果发送到聚合器的逻辑是正确的。唯一你所忽略的是错误通道接收到一个 ErrorMessage
。不清楚你在你的 errorHandler
中做了什么,但建议将该 ErrorMessage
的有效载荷转换为 MessagingException
并获取其 failedMessage
以进行进一步处理。其中一个重要部分是 headers
,其中包含了有关关联和/或 replyChannel
的所有所需信息。你肯定可以在错误处理程序中构建一个新的消息作为补偿,但请确保保留 failedMessage
的标头。
更多信息请参见此处:https://github.com/spring-projects/spring-integration/issues/3984
英文:
Your logic about sending an error handler result to an aggregator is correct. Only what you are missing that error channel receives an ErrorMessage
. Not clear what you do in your errorHandler
, but recommendation is to cast a payload of that ErrorMessage
into a MessagingException
and take its failedMessage
for further processing. An important part there is headers
where you got all the required information about correlation and/or replyChannel
. You definitely can build a new message from error handler as a compensation, but be sure to preserve headers of that failedMessage
.
More info is here: https://github.com/spring-projects/spring-integration/issues/3984
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论