序列化自定义的Spring集成MessageSources,生成消息列表。

huangapple go评论46阅读模式
英文:

Serializing custom Spring Integration MessageSources that produce lists of messages

问题

Java 11 和 Spring Integration 5.x 在这里。我正在尝试创建一个自定义的 MessageSource<Widget>,可以应用于集成流程,如下所示:

@Bean
public IntegrationFlow myFlow(WidgetMessageSource widgetMessageSource) {
    return IntegrationFlows.from(widgetMessageSource)                 
        .get();
}

这个 WidgetMessageSource 查询一个远程系统并获取 Widget 实例,然后我需要将它们生成为 Message<Widget> 并将它们放在我的流程中:

public class WidgetMessageSource implements MessageSource<Widget> {
    
    @Override
    public Message<Widget> receive() {

        List<Widget> widgets = fetchStagedWidgets();
        // 如何生成 List<Message<Widget>> 而不仅仅是一个 Message<Widget>???

    }

    private List<Widget> fetchStagedWidgets() {
        // 连接到远程系统并拉取0+个 widgets
        return where_the_magic_happens(); // TODO
    }

}

我的问题是,当调用 fetchStagedWidgets 时,会返回0+个 Widget 实例。如果没有获取到 Widget 实例,则什么都不应该发生(不会在流程中放置任何消息)。

但是,如果获取到 Widget 实例,我需要每个 Widget 都以自己的 Message<Widget> 形式出现。因此,在 fetchStagedWidgets 被调用时,我在将要发生的事情和 MessageSource API 期望的事情之间存在基数不匹配(每次调用1个消息)。

在这里有哪些选项?我唯一能想到的是序列化 fetchStagedWidgets 调用背后的内容,以便它只会返回0个或1个 Widget 实例,但实际上需要很多复杂性(服务+基础设施)才能使 fetchStagedWidgets 以这种方式运行。在这里还有其他选项吗,可以使用 MessageSource API 或 Spring Integration 中的其他设施?提前感谢您的帮助!

英文:

Java 11 and Spring Integration 5.x here. I am trying to create a custom MessageSource<Widget> that can be applied to an integration flow like so:

@Bean
public IntegrationFlow myFlow(WidgetMessageSource widgetMessageSource) {
    return IntegrationFlows.from(widgetMessageSource)                 
        .get();
}

This WidgetMessageSource queries a remote system and fetches Widget instances that I then need to produce Message<Widget> with, and place them on my flow:

public class WidgetMessageSource implements MessageSource<Widget> {
    
    @Override
    public Message<Widget> receive() {

        List<Widget> widgets = fetchStagedWidgets();
        // How to produce a List<Message<Widget>> and not just a Message<Widget>???

    }

    private List<Widget> fetchStagedWidgets() {
        // connect to remote system and pull down 0+ widgets
        return where_the_magic_happens(); // TODO
    }

}

My problem is, when fetchStagedWidgets is called, 0+ Widget instances will be returned. If no Widget instances are fetched, then nothing should happen (no messages are placed on the flow).

But if Widget instances are fetched, I need each Widget to appear in its own Message<Widget>. So I have a cardinality mismatch between what will happen when fetchStagedWidgets is called and what the MessageSource API expects (1 message per call).

What are my options here? The only thing I can think of is serializing what is sitting behind the fetchStagedWidgets call so that it will only ever return 0 or 1 Widget instances, but there's actually quite a lot of complexity (services + infrastructure) required to get fetchStagedWidgets to behave that way. Any other options here, using the MessageSource API or some other facility within Spring Integration? Thanks in advance!

答案1

得分: 1

For Spring Integration(以及任何消息传递),消息中的有效载荷是无关紧要的:分派消息的基础设施可能会查看标头,但在大多数情况下不关心有效载荷。

因此,您可以自由地将单个消息的有效载荷设置为List以进行返回。您可以生成类似于Message<List<Message<Widget>>>的东西,但这是否真的有意义并提供了比Message<List<Widget>>更多的优势呢?为什么要将列表中的每个项都包装成其各自的消息?

我们在框架中有一个用于特定于数据库的消息源的示例。例如,JdbcPollingChannelAdapter。它返回一个映射记录的ListR2dbcMessageSource也是如此,它生成一个映射记录的Flux

您可能需要查看.split()来处理下游的List<Widget>有效载荷。

从技术上讲,我也看不出为什么需要自定义MessageSource。普通的Supplier<List<Widget>>将适用于您当前调用fetchStagedWidgets()方法的要求。

英文:

For Spring Integration (and messaging at all), it doesn't matter what payload you have in the message: the infrastructure which dispatches messages might look into headers, but in most cases doesn't care about the payload.

So, you are free to have a single message with a List as payload to return over there. You can produce something like Message&lt;List&lt;Message&lt;Widget&gt;&gt;&gt;, but does it really make sense and give any advantages over Message&lt;List&lt;Widget&gt;&gt;? Why would one wrap every item in the list into its individual message?

We have a sample in the framework for DB-specific message sources. For example, JdbcPollingChannelAdapter. It does return a List mapped records. Same happens with a R2dbcMessageSource, which produces a Flux of mapped record.

You probably need to look into a .split() to deal with a List&lt;Widget&gt; payload downstream.

Technically I also don't see a reason in the custom MessageSource. The plain Supplier&lt;List&lt;Widget&gt;&gt; will fit into your current requirements to call that fetchStagedWidgets() method.

huangapple
  • 本文由 发表于 2023年5月21日 23:26:29
  • 转载请务必保留本文链接:https://go.coder-hub.com/76300638.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定