英文:
IntegrationFlow + 2 conditional transformers + Outbound gateway
问题
我有一个集成流程,根据某些条件需要运行一个转换器,然后使用出站网关发布一个 HTTP 请求。
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}
这是我尝试的代码,但是我得到了错误消息“没有可用的 output-channel 或 replyChannel 头部”,我认为我理解了其中的原因,但不确定如何实现我所需的功能。
谢谢。
英文:
I have an integration flow which needs to run one transformer or the other based on some condition and then post an http request with an outbound gateway.
@Bean
public IntegrationFlow messageFromKafka() {
return flow -> flow
.publishSubscribeChannel(s -> s
.subscribe(f1 -> f1
.<AttachmentEvent>filter(validator::isCondition1)
.transform(transformer1)
)
.subscribe(fl -> fl
.<AttachmentEvent>filter(validator::isCondition2)
.transform(transformer2)
.split()
)
)
.publishSubscribeChannel(s -> s
.subscribe(fl1 -> fl1
.transform(httpTransformer)
.<String, String>route(transformedMessage -> getFlowType(transformedMessage), mapping -> mapping
.subFlowMapping("operation1", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test"))
)
.subFlowMapping("operation2", sf -> sf
.<String>filter(message -> isVendorStatusDescNotCancelled(message))
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test2"))
)
.subFlowMapping("operation3", sf -> sf
.handle(getOAuth2Handler(HttpMethod.PUT, "http://localhost:8080/test3"))
)
)
)
.subscribe(fl2 -> fl2
.handle(getKafkaHandler())
)
);
}
This is my attempt, however I am getting this error message "no output-channel or replyChannel header available" which I think I understand the why, but not sure how to achieve what I need.
Thanks.
答案1
得分: 0
在集成中,条件流程使用 router
模式进行处理:链接
虽然看起来你的问题与条件解析完全无关。
我认为你的每个 handle(getOAuth2Handler(...))
都会返回一些值,在这些子流程中你没有将其作为回复进行处理。如果你对这个回复不感兴趣,可以在 handle()
之后为这些子流程配置一个 nullChannel
。
英文:
In the integration a conditional flows are handled using router
pattern: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/message-routing.html#messaging-routing-chapter
Although looks like your problem is fully not related to condition resolution.
I think each of your handle(getOAuth2Handler(...))
returns some value which you don't handle as a reply in those sub-flows. If you are not interested in that reply, consider to configure for those sub-flow a nullChannel
after handle()
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论