英文:
Spring integration ftp adapter and batch processing
问题
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.maxFetchSize(ftpProperties.maxFetchSize)
.remoteDirectory(ftpProperties.remoteDirectory)
.localDirectory(File(ftpProperties.downloadDirectory))
.filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
.regexFilter(".*\\.zip$")
) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
.channel(MessageChannels.executor(Executors.newWorkStealingPool()))
.transform(unZipTransformer())
.handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
.get()
}
英文:
I need to poll FTP server and process new or changed files. I use Spring Integration 5.3.2 with inbound FTP adapter and poller with fixed rate of 5 seconds. All files downloaded immediately in local directory, but integration flow underlying handlers invokes after every 5 seconds for each file. I want to process downloaded list of files immediately in concurent threads, but poll ftp each 5 seconds after flow ends. How can i do it?
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.maxFetchSize(ftpProperties.maxFetchSize)
.remoteDirectory(ftpProperties.remoteDirectory)
.localDirectory(File(ftpProperties.downloadDirectory))
.filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
.regexFilter(".*\\.zip$")
) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
.channel(MessageChannels.executor(Executors.newWorkStealingPool()))
.transform(unZipTransformer())
.handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
.get()
}
答案1
得分: 0
设置 maxMessagesPerPoll
- 默认值为1;-1 表示无限。
Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
英文:
set maxMessagesPerPoll
- it defaults to 1; -1 means infinity.
Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
答案2
得分: 0
听起来更像是您想要从FtpOutboundGateway
使用MGET
命令。因此,您仍然可以使用一个空字符串负载的普通轮询器来发送,类似于 IntegrationFlows.from(() -> "")
,并带有所需的轮询器选项。
调用Ftp.outboundGateway()
与远程目录执行MGET
命令将返回一个List<File>
作为回复消息。因此,您可以批量处理文件。
有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway
英文:
Sounds more like you want to use an MGET
command from the FtpOutboundGateway
. So, you still can have a plain poller with empty string payload to send, something like IntegrationFlows.from(() -> "")
with a desired poller options.
The calling an Ftp.outboundGateway()
with an MGET
command against a remote dir will give you a List<File>
as a reply message. So, you are free to do anything with your files as a batch.
See docs for more info: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论