春季集成 FTP 适配器和批处理

huangapple go评论112阅读模式
英文:

Spring integration ftp adapter and batch processing

问题

@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
            .preserveTimestamp(true)
            .maxFetchSize(ftpProperties.maxFetchSize)
            .remoteDirectory(ftpProperties.remoteDirectory)
            .localDirectory(File(ftpProperties.downloadDirectory))
            .filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
            .regexFilter(".*\\.zip$")
    ) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
            .channel(MessageChannels.executor(Executors.newWorkStealingPool()))
            .transform(unZipTransformer())
            .handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
            .get()
}
英文:

I need to poll FTP server and process new or changed files. I use Spring Integration 5.3.2 with inbound FTP adapter and poller with fixed rate of 5 seconds. All files downloaded immediately in local directory, but integration flow underlying handlers invokes after every 5 seconds for each file. I want to process downloaded list of files immediately in concurent threads, but poll ftp each 5 seconds after flow ends. How can i do it?

@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
            .preserveTimestamp(true)
            .maxFetchSize(ftpProperties.maxFetchSize)
            .remoteDirectory(ftpProperties.remoteDirectory)
            .localDirectory(File(ftpProperties.downloadDirectory))
            .filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
            .regexFilter(".*\\.zip$")
    ) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
            .channel(MessageChannels.executor(Executors.newWorkStealingPool()))
            .transform(unZipTransformer())
            .handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
            .get()
}

答案1

得分: 0

设置 maxMessagesPerPoll - 默认值为1;-1 表示无限。

Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
英文:

set maxMessagesPerPoll - it defaults to 1; -1 means infinity.

Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)

答案2

得分: 0

听起来更像是您想要从FtpOutboundGateway使用MGET命令。因此,您仍然可以使用一个空字符串负载的普通轮询器来发送,类似于 IntegrationFlows.from(() -> ""),并带有所需的轮询器选项。

调用Ftp.outboundGateway()与远程目录执行MGET命令将返回一个List<File>作为回复消息。因此,您可以批量处理文件。

有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway

英文:

Sounds more like you want to use an MGET command from the FtpOutboundGateway. So, you still can have a plain poller with empty string payload to send, something like IntegrationFlows.from(() -> "") with a desired poller options.

The calling an Ftp.outboundGateway() with an MGET command against a remote dir will give you a List<File> as a reply message. So, you are free to do anything with your files as a batch.

See docs for more info: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway

huangapple
  • 本文由 发表于 2020年8月26日 23:10:13
  • 转载请务必保留本文链接:https://go.coder-hub.com/63600559.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定