从Camel路由中拆分后获取原始消息

huangapple go评论82阅读模式
英文:

Getting original message from Camel route after splitting

问题

我有一个camel路由,从队列中批量读取消息,处理消息,然后逐个将它们发送到API,并等待API响应。

当出现500响应时,我无法获取开始时接收到的原始消息。

我认为分裂器返回的是原始消息?

以下是我的路由:

from(gatewayRouteConfig.getInputQueueEndpoint())
    .process(process1)
    .process(process2)
    .setExchangePattern(ExchangePattern.InOnly)
    .choice()
    .when().jsonpath("Message", true)
    .setBody(MESSAGE_WRAPPER_EXTRACTOR)
    .end()
    .split().method(messageSplitter, "splitMessages")
    .log(INFO, "Received Message : ${body}")
    .process(process3)
    .process(validate)
    .process(identity)
    .process(requestProcessor) //抛出异常的地方,异常处理程序中使用的原始消息正在被捕获
    .process(someService::gatewayResponseTimeStop)
    .process(someService::endToEndResponseStop)
    .process(someService::markGatewayDeliveryAsSuccess)
    .log("Completed processing...");

以下是同一类中的异常处理程序:

private void configureExceptionHandlers() {
    onException(ProviderException.class) //当发生500错误时抛出
        .useOriginalMessage() //使用process(requestProcessor)方法接收到的消息,而不是路由开始时的消息形式
        .handled(true)
        .log(ERROR, LOGGER, EXCEPTION_MESSAGE_WITH_STACKTRACE)
        .to(DLQ);
}
英文:

I have a camel route which reads in messages in batches from a queue, processes the message and proceeds to send them one by one to an api and awaits the api response.

I am having issue with retrieving the original message recieved at the start when a 500 response is thrown.

I thought that the splitter returns the original message?

Here is my route:

    from(gatewayRouteConfig.getInputQueueEndpoint())
            .process(process1)
            .process(process2)
            .setExchangePattern(ExchangePattern.InOnly)
            .choice()
            .when().jsonpath("Message", true)
            .setBody(MESSAGE_WRAPPER_EXTRACTOR)
            .end()
            .split().method(messageSplitter, "splitMessages")
            .log(INFO, "Received Message : ${body}")
            .process(process3)
            .process(validate)
            .process(identity)
            .process(requestProcessor) //where the exception is thrown and the original message that is used in the exception handler is picking up
            .process(someService::gatewayResponseTimeStop)
            .process(someService::endToEndResponseStop)
            .process(someService::markGatewayDeliveryAsSuccess)
            .log("Completed processing...");

and here is my exception handler in the same class:

    private void configureExceptionHandlers() {

        onException(ProviderException.class) //thrown when 500 error occurs
            .useOriginalMessage() //picks message in the form the process(requestProcessor) method recieves it instead of start of route
            .handled(true)
            .log(ERROR, LOGGER, EXCEPTION_MESSAGE_WITH_STACKTRACE)
            .to(DLQ);

答案1

得分: 1

通过添加 shareUnitOfWork 解决:

.split().method(activityMessageSplitter, "splitActivityMessages").shareUnitOfWork()
英文:

Solved by adding shareUnitOfWork

.split().method(activityMessageSplitter, "splitActivityMessages").shareUnitOfWork()

huangapple
  • 本文由 发表于 2020年5月29日 23:07:07
  • 转载请务必保留本文链接:https://go.coder-hub.com/62089040.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定