返回 `Mono` 的链式处理程序

huangapple go评论80阅读模式
英文:

Chain handlers returning Mono

问题

我正在尝试创建一个流程,从WebFlux.inboundChannelAdapter开始,使用一个返回Mono的第一个处理程序,然后是一个命令式的第二个处理程序。最终,我希望只有在流程完成后才返回REST调用。

根据我从Reactive Streams Support文档了解的情况,我不应该修改我的命令式处理程序以接受Mono作为参数。不幸的是,我一直遇到异常:

org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@7f08caf] (simple.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String, failedMessage=GenericMessage [payload=MonoJust, headers={id=13de5ed2-0100-9b5c-323b-4d213a05a582, timestamp=1689592273907}]
...
Caused by: java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String
...

Spring Integration版本:5.5.18
Java版本:1.8

我使用一个小的演示应用程序重现了这个问题:

@Bean
IntegrationFlow flow() {

    return IntegrationFlows
            .from(WebFlux.inboundChannelAdapter("/start")
                    .requestMapping(m -> m.methods(HttpMethod.GET))
            )
            .handle((p,h) -> Mono.just("foo")) //实际上,这里涉及到了一个WebClient
            .handle(toUpperCase())
            .get();
}

GenericHandler<String> toUpperCase() {
    return (p,h) -> p.toUpperCase();
}

我尝试了在处理程序的ConsumerEndpointSpec上使用.channel(c -> c.flux()).reactive()的组合,但没有成功。

请问您能帮助我找到解决方案吗?

英文:

I am trying to make a flow that starts from a WebFlux.inboundChannelAdapter, use a first handler that returns a Mono, and then a second one that is imperative.
Eventually I would like the REST call to return only once the flow has completed.

From what I understood from the Reactive Streams Support documentation I should not have to modify my imperative handler to take a Mono as parameter. Unfortunately I keep facing the exception:

org.springframework.messaging.MessageHandlingException: error occurred in message handler [ServiceActivator for [org.springframework.integration.handler.LambdaMessageProcessor@7f08caf] (simple.org.springframework.integration.config.ConsumerEndpointFactoryBean#1)]; nested exception is java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String, failedMessage=GenericMessage [payload=MonoJust, headers={id=13de5ed2-0100-9b5c-323b-4d213a05a582, timestamp=1689592273907}]
	at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:64)
	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:87)
	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:36)
	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:295)
	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:276)
	at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
	at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
	at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
	at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:456)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:324)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:267)
	at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:231)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:87)
	at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:36)
	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:295)
	at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:276)
	at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
	at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:490)
	at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:281)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:537)
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:343)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
	at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
	at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
	at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$6(FluxMessageChannel.java:147)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassCastException: reactor.core.publisher.MonoJust cannot be cast to java.lang.String
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.integration.handler.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:105)
	at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:105)
	at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:55)
	... 54 more

Spring integration version: 5.5.18
Java: 1.8

I reproduce the issue with a small demo application:

@Bean
IntegrationFlow flow() {

	return IntegrationFlows
			.from(WebFlux.inboundChannelAdapter(&quot;/start&quot;)
					.requestMapping(m -&gt; m.methods(HttpMethod.GET))
			)
			.handle((p,h) -&gt; Mono.just(&quot;foo&quot;)) //In reality the is a WebClient involved here
			.handle(toUpperCase())
			.get();
}

GenericHandler&lt;String&gt; toUpperCase() {
	return (p,h) -&gt; p.toUpperCase();
}

I have tried a combination of .channel(c -&gt; c.flux()) and .reactive() on handler's ConsumerEndpointSpec but without success.

Could you please help me to figure out a solution ?

答案1

得分: 1

.handle((p,h) -> Mono.just("foo"))
.handle(toUpperCase())

框架不知道(而且实际上也不关心)你从第一个处理程序返回了一个 Mono。第二个处理程序期望我们以 String 作为输入载荷类型,但你的是一个 Mono

对于那个旧版本,我建议像这样修复:

.handle((p,h) -> Mono.just("foo"), e -> e.async(true))。

它涵盖了对 Future 和 Reactive Streams Publisher 回复消息的处理。

英文:

So, you do this:

 .handle((p,h) -&gt; Mono.just(&quot;foo&quot;))
 .handle(toUpperCase())

The Framework doesn't know (and really doesn't care) that you return a Mono from the first handler. The second handler expects from us a String as an input payload type, but your one is a Mono.

For that old version I would suggest to fix it like this:

.handle((p,h) -&gt; Mono.just(&quot;foo&quot;), e -&gt; e.async(true)).

It covers both Future and Reactive Streams Publisher reply message handling.

huangapple
  • 本文由 发表于 2023年7月17日 19:17:25
  • 转载请务必保留本文链接:https://go.coder-hub.com/76703922.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定