英文:
Serializing custom Spring Integration MessageSources that produce lists of messages
问题
Java 11 和 Spring Integration 5.x 在这里。我正在尝试创建一个自定义的 MessageSource<Widget>
,可以应用于集成流程,如下所示:
@Bean
public IntegrationFlow myFlow(WidgetMessageSource widgetMessageSource) {
return IntegrationFlows.from(widgetMessageSource)
.get();
}
这个 WidgetMessageSource
查询一个远程系统并获取 Widget
实例,然后我需要将它们生成为 Message<Widget>
并将它们放在我的流程中:
public class WidgetMessageSource implements MessageSource<Widget> {
@Override
public Message<Widget> receive() {
List<Widget> widgets = fetchStagedWidgets();
// 如何生成 List<Message<Widget>> 而不仅仅是一个 Message<Widget>???
}
private List<Widget> fetchStagedWidgets() {
// 连接到远程系统并拉取0+个 widgets
return where_the_magic_happens(); // TODO
}
}
我的问题是,当调用 fetchStagedWidgets
时,会返回0+个 Widget
实例。如果没有获取到 Widget
实例,则什么都不应该发生(不会在流程中放置任何消息)。
但是,如果获取到 Widget
实例,我需要每个 Widget
都以自己的 Message<Widget>
形式出现。因此,在 fetchStagedWidgets
被调用时,我在将要发生的事情和 MessageSource
API 期望的事情之间存在基数不匹配(每次调用1个消息)。
在这里有哪些选项?我唯一能想到的是序列化 fetchStagedWidgets
调用背后的内容,以便它只会返回0个或1个 Widget
实例,但实际上需要很多复杂性(服务+基础设施)才能使 fetchStagedWidgets
以这种方式运行。在这里还有其他选项吗,可以使用 MessageSource
API 或 Spring Integration 中的其他设施?提前感谢您的帮助!
英文:
Java 11 and Spring Integration 5.x here. I am trying to create a custom MessageSource<Widget>
that can be applied to an integration flow like so:
@Bean
public IntegrationFlow myFlow(WidgetMessageSource widgetMessageSource) {
return IntegrationFlows.from(widgetMessageSource)
.get();
}
This WidgetMessageSource
queries a remote system and fetches Widget
instances that I then need to produce Message<Widget>
with, and place them on my flow:
public class WidgetMessageSource implements MessageSource<Widget> {
@Override
public Message<Widget> receive() {
List<Widget> widgets = fetchStagedWidgets();
// How to produce a List<Message<Widget>> and not just a Message<Widget>???
}
private List<Widget> fetchStagedWidgets() {
// connect to remote system and pull down 0+ widgets
return where_the_magic_happens(); // TODO
}
}
My problem is, when fetchStagedWidgets
is called, 0+ Widget
instances will be returned. If no Widget
instances are fetched, then nothing should happen (no messages are placed on the flow).
But if Widget
instances are fetched, I need each Widget
to appear in its own Message<Widget>
. So I have a cardinality mismatch between what will happen when fetchStagedWidgets
is called and what the MessageSource
API expects (1 message per call).
What are my options here? The only thing I can think of is serializing what is sitting behind the fetchStagedWidgets
call so that it will only ever return 0 or 1 Widget
instances, but there's actually quite a lot of complexity (services + infrastructure) required to get fetchStagedWidgets
to behave that way. Any other options here, using the MessageSource
API or some other facility within Spring Integration? Thanks in advance!
答案1
得分: 1
For Spring Integration(以及任何消息传递),消息中的有效载荷是无关紧要的:分派消息的基础设施可能会查看标头,但在大多数情况下不关心有效载荷。
因此,您可以自由地将单个消息的有效载荷设置为List
以进行返回。您可以生成类似于Message<List<Message<Widget>>>
的东西,但这是否真的有意义并提供了比Message<List<Widget>>
更多的优势呢?为什么要将列表中的每个项都包装成其各自的消息?
我们在框架中有一个用于特定于数据库的消息源的示例。例如,JdbcPollingChannelAdapter
。它返回一个映射记录的List
。R2dbcMessageSource
也是如此,它生成一个映射记录的Flux
。
您可能需要查看.split()
来处理下游的List<Widget>
有效载荷。
从技术上讲,我也看不出为什么需要自定义MessageSource
。普通的Supplier<List<Widget>>
将适用于您当前调用fetchStagedWidgets()
方法的要求。
英文:
For Spring Integration (and messaging at all), it doesn't matter what payload you have in the message: the infrastructure which dispatches messages might look into headers, but in most cases doesn't care about the payload.
So, you are free to have a single message with a List
as payload to return over there. You can produce something like Message<List<Message<Widget>>>
, but does it really make sense and give any advantages over Message<List<Widget>>
? Why would one wrap every item in the list into its individual message?
We have a sample in the framework for DB-specific message sources. For example, JdbcPollingChannelAdapter
. It does return a List
mapped records. Same happens with a R2dbcMessageSource
, which produces a Flux
of mapped record.
You probably need to look into a .split()
to deal with a List<Widget>
payload downstream.
Technically I also don't see a reason in the custom MessageSource
. The plain Supplier<List<Widget>>
will fit into your current requirements to call that fetchStagedWidgets()
method.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论