IntegrationFlow + 2 conditional transformers + Outbound gateway

huangapple go评论112阅读模式
英文:

IntegrationFlow + 2 conditional transformers + Outbound gateway

问题

我有一个集成流程根据某些条件需要运行一个转换器然后使用出站网关发布一个 HTTP 请求

@Bean
public IntegrationFlow messageFromKafka() {
    return flow -> flow
            .publishSubscribeChannel(s -> s
                    .subscribe(f1 -> f1
                            .<AttachmentEvent>filter(validator::isCondition1)
                            .transform(transformer1)
                    )
                    .subscribe(fl -> fl
                            .<AttachmentEvent>filter(validator::isCondition2)
                            .transform(transformer2)
                            .split()
                    )
            )
            .publishSubscribeChannel(s -> s
                    .subscribe(fl1 -> fl1
                            .transform(httpTransformer)
                            .<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
                                    .subFlowMapping("operation1", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
                                    )
                                    .subFlowMapping("operation2", sf -> sf
                                            .<String>filter(message -> isVendorStatusDescNotCancelled(message))
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
                                    )
                                    .subFlowMapping("operation3", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
                                    )
                            )
                    )
                    .subscribe(fl2 -> fl2
                            .handle(getKafkaHandler())
                    )
            );
}

这是我尝试的代码,但是我得到了错误消息“没有可用的 output-channel 或 replyChannel 头部”,我认为我理解了其中的原因,但不确定如何实现我所需的功能。

谢谢。

英文:

I have an integration flow which needs to run one transformer or the other based on some condition and then post an http request with an outbound gateway.

@Bean
public IntegrationFlow messageFromKafka() {
return flow -&gt; flow
.publishSubscribeChannel(s -&gt; s
.subscribe(f1 -&gt; f1
.&lt;AttachmentEvent&gt;filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -&gt; fl
.&lt;AttachmentEvent&gt;filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -&gt; s
.subscribe(fl1 -&gt; fl1
.transform(httpTransformer)
.&lt;String, String&gt;route(transformedMessage -&gt; getFlowType(transformedMessage), mapping -&gt; mapping
.subFlowMapping(&quot;operation1&quot;, sf -&gt; sf
.handle(getOAuth2Handler(HttpMethod.PUT, &quot;http://localhost:8080/test&quot;))
)
.subFlowMapping(&quot;operation2&quot;, sf -&gt; sf
.&lt;String&gt;filter(message -&gt; isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, &quot;http://localhost:8080/test2&quot;))
)
.subFlowMapping(&quot;operation3&quot;, sf -&gt; sf
.handle(getOAuth2Handler(HttpMethod.PUT, &quot;http://localhost:8080/test3&quot;))
)
)
)
.subscribe(fl2 -&gt; fl2
.handle(getKafkaHandler())
)
);
}  

This is my attempt, however I am getting this error message "no output-channel or replyChannel header available" which I think I understand the why, but not sure how to achieve what I need.

Thanks.

答案1

得分: 0

在集成中,条件流程使用 router 模式进行处理:链接

虽然看起来你的问题与条件解析完全无关。

我认为你的每个 handle(getOAuth2Handler(...)) 都会返回一些值,在这些子流程中你没有将其作为回复进行处理。如果你对这个回复不感兴趣,可以在 handle() 之后为这些子流程配置一个 nullChannel

英文:

In the integration a conditional flows are handled using router pattern: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter

Although looks like your problem is fully not related to condition resolution.

I think each of your handle(getOAuth2Handler(...)) returns some value which you don't handle as a reply in those sub-flows. If you are not interested in that reply, consider to configure for those sub-flow a nullChannel after handle().

huangapple
  • 本文由 发表于 2020年8月25日 03:44:36
  • 转载请务必保留本文链接:https://go.coder-hub.com/63567762.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定