英文:
IntegrationFlow + 2 conditional transformers + Outbound gateway
问题
我有一个集成流程,根据某些条件需要运行一个转换器,然后使用出站网关发布一个 HTTP 请求。
@Bean
public IntegrationFlow messageFromKafka() {
    return flow -> flow
            .publishSubscribeChannel(s -> s
                    .subscribe(f1 -> f1
                            .<AttachmentEvent>filter(validator::isCondition1)
                            .transform(transformer1)
                    )
                    .subscribe(fl -> fl
                            .<AttachmentEvent>filter(validator::isCondition2)
                            .transform(transformer2)
                            .split()
                    )
            )
            .publishSubscribeChannel(s -> s
                    .subscribe(fl1 -> fl1
                            .transform(httpTransformer)
                            .<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
                                    .subFlowMapping("operation1", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
                                    )
                                    .subFlowMapping("operation2", sf -> sf
                                            .<String>filter(message -> isVendorStatusDescNotCancelled(message))
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
                                    )
                                    .subFlowMapping("operation3", sf -> sf
                                            .handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
                                    )
                            )
                    )
                    .subscribe(fl2 -> fl2
                            .handle(getKafkaHandler())
                    )
            );
}
这是我尝试的代码,但是我得到了错误消息“没有可用的 output-channel 或 replyChannel 头部”,我认为我理解了其中的原因,但不确定如何实现我所需的功能。
谢谢。
英文:
I have an integration flow which needs to run one transformer or the other based on some condition and then post an http request with an outbound gateway.
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}  
This is my attempt, however I am getting this error message "no output-channel or replyChannel header available" which I think I understand the why, but not sure how to achieve what I need.
Thanks.
答案1
得分: 0
在集成中,条件流程使用 router 模式进行处理:链接
虽然看起来你的问题与条件解析完全无关。
我认为你的每个 handle(getOAuth2Handler(...)) 都会返回一些值,在这些子流程中你没有将其作为回复进行处理。如果你对这个回复不感兴趣,可以在 handle() 之后为这些子流程配置一个 nullChannel。
英文:
In the integration a conditional flows are handled using router pattern: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter
Although looks like your problem is fully not related to condition resolution.
I think each of your handle(getOAuth2Handler(...)) returns some value which you don't handle as a reply in those sub-flows. If you are not interested in that reply, consider to configure for those sub-flow a nullChannel after handle().
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。


评论