Spring Integration:并行处理来自Sftp.inboundStreamingAdapter的消息

huangapple go评论111阅读模式
英文:

Spring Integration: Handle messages from Sftp.inboundStreamingAdapter in parallel

问题

所以,

如果有一个使用Spring Integration 5.5 / Spring Boot 2.7的流程,从SFTP源读取XML文件,处理并存储它们,然后删除。事情确实可以工作,但是按顺序进行,也就是说每个文件在另一个文件之后处理。

我想要异步地进行处理和转换(不一定是并行的,顺序不重要),以增加吞吐量。但是我无法弄清楚如何进行配置。

代码如下:

IntegrationFlows
  .from(Sftp.inboundStreamingAdapter(sftpTemplate))
  .publishSubscribeChannel(spec -> spec
    .subscribe(
      flow -> 
        flow
         .transform( /* 将文件内容转换为XML */ )
         .handle( /* 以XML格式持久化 */ )
     )
    .subscribe(flow -> flow.handle( /* 删除文件 */ ))
   )
  .get()

我猜异步/并行处理必须在源和发布/订阅通道之间定义。但是如何配置呢?

英文:

So,

if have a Spring Integration 5.5 / Spring Boot 2.7 flow that reads XML-files from a SFTP source, processes and stores them, then deletes. Things do work, but sequentially, meaning each file is processed after the other.

I'd like to do the processing and transforming asynchronously (not necessary in parallel, order is no no importance) to increase the througput. But I can't figure out, how to configure it.

The code:

IntegrationFlows
  .from(Sftp.inboundStreamingAdapter(sftpTemplate))
  .publishSubscribeChannel(spec -> spec
    .subscribe(
      flow -> 
        flow
         .transform( /* file content to xml */ )
         .handle( /* persist as xml */ )
     )
    .subscribe(flow -> flow.handle( /* remove file */ ))
   )
  .get()

I guess the async / parallel handling must be defined between the source and the pub/sub-channel. But how?

答案1

得分: 1

你的publishSubscribeChannel()配置是正确的,不要尝试并行处理,因为在Sftp.inboundStreamingAdapter()之后,你处理的是一个InputStream类型的负载,无法删除(甚至关闭)远程文件。如果要从源头的角度进行并行处理,你需要查看IntegrationFlows.from()的第二个参数:

static IntegrationFlowBuilder from(MessageSource<?> messageSource,
		@Nullable Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {

Consumer<SourcePollingChannelAdapterSpec>可以让你访问源头轮询通道适配器的选项。其中之一是poller(Function<PollerFactory, PollerSpec> pollers),可以进行如下配置:

e -> e.poller(p -> p.fixedDelay(1000).taskExecutor())

taskExecutor允许将定时任务转移到不同的线程中。请注意maxMessagesPerPoll(对于SourcePollingChannelAdapter默认为1):如果大于1,则会在同一个线程中发出相应数量的消息。

你还可以在from()之后简单地添加一个channel(c -> c.executor())

英文:

Your publishSubscribeChannel() configuration is correct and don't try to make it parallel since you cannot remove (or even close) a remote file because you deal with an InputStream in a payload after Sftp.inboundStreamingAdapter(). To make it parallel from that source perspective, you need to look into a second argument of the IntegrationFlows.from():

static IntegrationFlowBuilder from(MessageSource&lt;?&gt; messageSource,
		@Nullable Consumer&lt;SourcePollingChannelAdapterSpec&gt; endpointConfigurer) {

That Consumer&lt;SourcePollingChannelAdapterSpec&gt; gives you an access to source polling channel adapter options. One of them is poller(Function&lt;PollerFactory, PollerSpec&gt; pollers) and can be configured like:

e -&gt; e.poller(p -&gt; p.fixedDelay(1000).taskExecutor())

That taskExecutor allows a scheduled task to be shifted to different thread. Be careful with maxMessagesPerPoll (1 by default for SourcePollingChannelAdapter): if it is more than 1, then that number of messages are going to be emitted in the same thread.

You also can simply have a channel(c -&gt; c.executor()) just after this from().

huangapple
  • 本文由 发表于 2023年8月8日 21:40:19
  • 转载请务必保留本文链接:https://go.coder-hub.com/76860135.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定