英文:
Handling exception that might occur in an integration flow
问题
我有一个REST控制器,它调用一个使用@MessagingGateway(errorChannel = ERROR_CHANNEL)
注解的网关。
这样,无论在网关启动的集成流程下游发生什么错误,都会流入一个错误通道,然后由另一个集成流程处理,这一切都按预期工作。
现在,还有另一种情况,一个集成流程从Kafka读取消息,将这些消息路由到另一个通道,另一个集成流程处理这些消息,另一个流程发送HTTP请求到远程服务。
public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
return attachmentEventBaseFlow(".*")
.filter(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null &&
m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY, String.class).equalsIgnoreCase(tenantId));
}
private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
.log(LoggingHandler.Level.DEBUG, "Inside Kafka Consumer")
.filter(Message.class, m -> filter(m, eventRegex))
.transform(messageToEventTransformer);
}
@Bean
public IntegrationFlow kafkaConsumerFlow() {
return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
.route(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY, String.class), p ->
p.resolutionRequired(false)
.channelMapping("eventType", "transformMessagesFromKafkaAndPublishAnotherEvent")
.defaultOutputChannel("nullChannel"))
.get();
}
@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
return flow -> flow
.transform(transformer)
.handle(getKafkaHandler());
}
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(gatewayCall, e -> e.advice(expressionAdvice()));
}
如何处理上述流程中可能发生的异常?
正如你所看到的,我正在使用ExpressionEvaluatingRequestHandlerAdvice来处理handle方法的工作,但不确定如何处理可能在转换器中发生的异常?
当通过Kafka消费者启动流程时,配置了带有错误通道的消息网关可以解决问题,但我不知道如何实现这一点。
谢谢。
在Artem的回答后进行了编辑以澄清:
这是一个配置,用于向远程服务发送请求的集成流程,似乎无法捕获并将异常路由到错误通道,没有ExpressionEvaluatingRequestHandlerAdvice:
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(getOAuth2Handler(HttpMethod.PUT, "remote url"), e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}
private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod, String url) {
return new OAuth2RequestHandler(oAuth2RestTemplate, httpMethod, url);
}
OAuth2RequestHandler类实现了MessageHandler接口:
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
String requestBody = (String) message.getPayload();
ResponseEntity<String> response = oAuth2RestTemplate.exchange(url, httpMethod, new HttpEntity<>(requestBody), String.class);
}
如何处理此集成流程中可能发生的异常?
英文:
I have a REST controller which calls a gateway annotated with @MessagingGateway(errorChannel = ERROR_CHANNEL)
This way, whatever error occurs downstream the integration flow initiated by the gateway will flow into an error channel which will be handled by another integration flow, this is working as expected.
Now, there is another scenario where an integration flow reads messages from Kafka, routes those messages to another channel, one more integration flow processes those messages and another flow sends an HTTP request to a remote service.
public IntegrationFlowBuilder attachmentEventTenantRouter(String tenantId) {
return attachmentEventBaseFlow(".*")
.filter(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY) != null && m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.TENANT_ID_KEY, String.class).equalsIgnoreCase(tenantId));
}
private IntegrationFlowBuilder attachmentEventBaseFlow(String eventRegex) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaListenerContainerFactory.createContainer(topic)).errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME))
.log(LoggingHandler.Level.DEBUG, "Inside Kafka Consumer")
.filter(Message.class, m -> filter(m, eventRegex))
.transform(messageToEventTransformer);
}
@Bean
public IntegrationFlow kafkaConsumerFlow() {
return fromKafkaFlowHelper.attachmentEventTenantRouter(TENANT_ID)
.route(Message.class, m -> m.getHeaders().get(KafkaConstants.HEADER_PREFIX + MessageHeader.EVENT_TYPE_KEY, String.class), p -> p
.resolutionRequired(false)
.channelMapping("eventType", "transformMessagesFromKafkaAndPublishAnotherEvent")
.defaultOutputChannel("nullChannel"))
.get();
}
@Bean
public IntegrationFlow transformMessagesFromKafkaAndPublishAnotherEvent() {
return flow -> flow
.transform(transformer)
.handle( getKafkaHandler() );
}
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(gatewayCall, e -> e.advice(expressionAdvice()));
}
How can I do to handle the exceptions that might occur in the flows above?
As you can see, I am using a ExpressionEvaluatingRequestHandlerAdvice which does the work for the handle method, but not sure how to handle exceptions that might occur in the transformers?
The massage gateway with an error channel configured does the trick when the gateway is called by a rest controller, but when the flow is initiated by the Kafka consumer, I'm lost how to achieve this.
Thanks.
EDITED AFTER Artem's RESPONSE TO ADD CLARIFICATION:
This is the configuration of the integration flow that posts a request to a remote service and whose exceptions does not seem to be caught and routed to the errorChannel without a ExpressionEvaluatingRequestHandlerAdvice:
@Bean
public IntegrationFlow sendHttpRequestToRemoteServiceFromKafkaEvent() {
return flow -> flow
.transform(transformer)
.handle(getOAuth2Handler(HttpMethod.PUT, "remote url"), e -> e.advice(expressionEvaluatingRequestHandlerAdvice));
}
private OAuth2RequestHandler getOAuth2Handler(HttpMethod httpMethod, String url) {
return new OAuth2RequestHandler(oAuth2RestTemplate, httpMethod, url);
}
And class OAuth2RequestHandler which implements a MessageHandler
@Override
public void handleMessage(org.springframework.messaging.Message<?> message) throws MessagingException {
String requestBody = (String) message.getPayload();
ResponseEntity<String> response = oAuth2RestTemplate.exchange(url, httpMethod, new HttpEntity<>(requestBody), String.class);
}
答案1
得分: 0
我看到你已经在Kafka消息驱动的通道适配器上使用了errorChannel()
。那么,问题是什么?
你流程中的这部分与提到的带有errorChannel
配置的@MessagingGateway
完全等效。
英文:
I see you use already an errorChannel()
on the Kafka message-driven channel adapter. So, what is the question?
This part of your flow is exact equivalent to mentioned @MesaagingGateway
with its errorChannel
configuration.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论