英文:
Chain handlers returning Mono
问题
我正在尝试创建一个流程,从WebFlux.inboundChannelAdapter开始,使用一个返回Mono的第一个处理程序,然后是一个命令式的第二个处理程序。最终,我希望只有在流程完成后才返回REST调用。
根据我从Reactive Streams Support文档了解的情况,我不应该修改我的命令式处理程序以接受Mono作为参数。不幸的是,我一直遇到异常:
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@7f08caf] (simple.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String, failedMessage=GenericMessage [payload=MonoJust, headers={id=13de5ed2-0100-9b5c-323b-4d213a05a582, timestamp=1689592273907}]
...
Caused by: java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String
...
Spring Integration版本:5.5.18
Java版本:1.8
我使用一个小的演示应用程序重现了这个问题:
@Bean
IntegrationFlow flow() {
return IntegrationFlows
.from(WebFlux.inboundChannelAdapter("/start")
.requestMapping(m -> m.methods(HttpMethod.GET))
)
.handle((p,h) -> Mono.just("foo")) //实际上,这里涉及到了一个WebClient
.handle(toUpperCase())
.get();
}
GenericHandler<String> toUpperCase() {
return (p,h) -> p.toUpperCase();
}
我尝试了在处理程序的ConsumerEndpointSpec上使用.channel(c -> c.flux())
和.reactive()
的组合,但没有成功。
请问您能帮助我找到解决方案吗?
英文:
I am trying to make a flow that starts from a WebFlux.inboundChannelAdapter, use a first handler that returns a Mono, and then a second one that is imperative.
Eventually I would like the REST call to return only once the flow has completed.
From what I understood from the Reactive Streams Support documentation I should not have to modify my imperative handler to take a Mono as parameter. Unfortunately I keep facing the exception:
org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@7f08caf] (simple.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String, failedMessage=GenericMessage [payload=MonoJust, headers={id=13de5ed2-0100-9b5c-323b-4d213a05a582, timestamp=1689592273907}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:64)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:87)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:36)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:295)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:276)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:87)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:36)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:295)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:276)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$6(FluxMessageChannel.java:147)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:105)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:105)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
... 54 more
Spring integration version: 5.5.18
Java: 1.8
I reproduce the issue with a small demo application:
@Bean
IntegrationFlow flow() {
return IntegrationFlows
.from(WebFlux.inboundChannelAdapter("/start")
.requestMapping(m -> m.methods(HttpMethod.GET))
)
.handle((p,h) -> Mono.just("foo")) //In reality the is a WebClient involved here
.handle(toUpperCase())
.get();
}
GenericHandler<String> toUpperCase() {
return (p,h) -> p.toUpperCase();
}
I have tried a combination of .channel(c -> c.flux())
and .reactive()
on handler's ConsumerEndpointSpec
but without success.
Could you please help me to figure out a solution ?
答案1
得分: 1
.handle((p,h) -> Mono.just("foo"))
.handle(toUpperCase())
框架不知道(而且实际上也不关心)你从第一个处理程序返回了一个 Mono
。第二个处理程序期望我们以 String
作为输入载荷类型,但你的是一个 Mono
。
对于那个旧版本,我建议像这样修复:
.handle((p,h) -> Mono.just("foo"), e -> e.async(true))。
它涵盖了对 Future
和 Reactive Streams Publisher
回复消息的处理。
英文:
So, you do this:
.handle((p,h) -> Mono.just("foo"))
.handle(toUpperCase())
The Framework doesn't know (and really doesn't care) that you return a Mono
from the first handler. The second handler expects from us a String
as an input payload type, but your one is a Mono
.
For that old version I would suggest to fix it like this:
.handle((p,h) -> Mono.just("foo"), e -> e.async(true)).
It covers both Future
and Reactive Streams Publisher
reply message handling.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论