英文:
How to handle bulk operations in Spring Integration
问题
我们正在使用Java DSL开发Spring Integration Flow。该应用程序从远程文件读取数据并将数据插入MongoDB。我们正在流式传输文件行,需要将数据批量插入MongoDB。根据我对Spring Integration文档和示例的理解,似乎没有针对此操作的批量选项,我无法弄清楚如何实现所期望的行为。我们尝试使用Aggregation
,但我们没有找到适合固定批量大小的解决方案。
涉及的bean示例:
@Configuration
public class SampleConfiguration {
...
@Bean
MessagingTemplate messagingTemplate(ApplicationContext context) {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(context);
return messagingTemplate;
}
@Bean
IntegrationFlow sftpSource() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
.remoteDirectory("upload")
.patternFilter("*.csv")
.maxFetchSize(1),
spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
.autoStartup(true))
.split(Files
.splitter()
.markers()
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker))
.enrichHeaders(h -> h.errorChannel("errorChannel"))
.handle((String payload, MessageHeaders headers) -> {
String header = headers.get("fileHeader", String.class);
String rowWithHeader = header + "\n" + payload;
try (StringReader reader = new StringReader(rowWithHeader)) {
CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
.withType(MyPojo.class)
.withSeparator(';')
.build();
return beanReader.iterator().next();
}
})
.handle(MongoDb
.outboundGateway(mongoTemplate)
.entityClass(MyPojo.class)
.collectionNameFunction(m -> "mypojo")
.collectionCallback(
(collection, message) -> {
MyPojo myPojo = (MyPojo) message.getPayload();
Document document = new Document();
mongoTemplate.getConverter().write(myPojo, document);
return collection.insertOne(document);
}))
.channel("logChannel")
.get();
}
@Bean
IntegrationFlow logFiles() {
return IntegrationFlow
.from("logChannel")
.handle(message -> log.info("message received: {}", message))
.get();
}
@Bean
IntegrationFlow logErrors() {
return IntegrationFlow
.from("errorChannel")
.handle(message -> {
MessagingException exception = (MessagingException) message.getPayload();
log.error("error message received: {} for message {}",
exception.getMessage(),
exception.getFailedMessage());
})
.get();
}
...
}
更新 - 聚合步骤
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message
.getHeaders()
.get(FileHeaders.REMOTE_FILE))
.releaseStrategy(group -> group.size() >= 5)
.groupTimeout(200L)
.sendPartialResultOnExpiry(true))
.handle(MongoDb...)
请注意,上述示例是一段Java代码,描述了Spring Integration Flow的配置和处理步骤。如果您需要进一步的信息或有其他问题,请告诉我。
英文:
we are developing a Spring Integration Flow using Java DSL. This application reads from a remote File and inserts data in MongoDB. We are streaming the file lines and we need to bulk-insert data in MongoDB. From my understanding of the Spring Integration documentation and samples, there's no bulk option for this and I can't figure out how to implement the expected behaviour. We tried using Aggregation
but we didn't find a suitable solution for fixed batch size.
Sample of the involved beans
@Configuration
public class SampleConfiguration {
...
@Bean
MessagingTemplate messagingTemplate(ApplicationContext context) {
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setBeanFactory(context);
return messagingTemplate;
}
@Bean
IntegrationFlow sftpSource() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost("localhost");
factory.setPort(22);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
return IntegrationFlow
.from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
.remoteDirectory("upload")
.patternFilter("*.csv")
.maxFetchSize(1),
spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
.autoStartup(true))
.split(Files
.splitter()
.markers()
.charset(StandardCharsets.UTF_8)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker))
.enrichHeaders(h -> h.errorChannel("errorChannel"))
.handle((String payload, MessageHeaders headers) -> {
String header = headers.get("fileHeader", String.class);
String rowWithHeader = header + "\n" + payload;
try (StringReader reader = new StringReader(rowWithHeader)) {
CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
.withType(MyPojo.class)
.withSeparator(';')
.build();
return beanReader.iterator().next();
}
})
.handle(MongoDb
.outboundGateway(mongoTemplate)
.entityClass(MyPojo.class)
.collectionNameFunction(m -> "mypojo")
.collectionCallback(
(collection, message) -> {
MyPojo myPojo = (MyPojo) message.getPayload();
Document document = new Document();
mongoTemplate.getConverter().write(myPojo, document);
return collection.insertOne(document);
}))
.channel("logChannel")
.get();
}
@Bean
IntegrationFlow logFiles() {
return IntegrationFlow
.from("logChannel")
.handle(message -> log.info("message received: {}", message))
.get();
}
@Bean
IntegrationFlow logErrors() {
return IntegrationFlow
.from("errorChannel")
.handle(message -> {
MessagingException exception = (MessagingException) message.getPayload();
log.error("error message received: {} for message {}", exception.getMessage(),
exception.getFailedMessage());
})
.get();
}
...
}
Update - aggregation step
.aggregate(aggregatorSpec -> aggregatorSpec
.correlationStrategy(message -> message
.getHeaders()
.get(FileHeaders.REMOTE_FILE))
.releaseStrategy(group -> group.size() >= 5)
.groupTimeout(200L)
.sendPartialResultOnExpiry(true))
.handle(MongoDb...)
答案1
得分: 1
我认为你需要查看一个FileAggregator
,它能够根据FileMarkerReleaseStrategy
将行收集到一个列表中:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator。
然后,是的,你可以在你的collectionCallback()
中将载荷强制转换为List
,并批量插入到MongoDB。
更新
为了能够从聚合器中使用相同的关联键(文件名)来分块,我们需要从存储中删除已释放的组。请查看AggregatorSpec.expireGroupsUponCompletion()
选项和相应的文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy。
英文:
I think you need to look into a FileAggregator
which is able to collect lines into a list according to the FileMarkerReleaseStrategy
: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator.
Then yes, you can cast a payload into a List
in your collectionCallback()
and perform batch insert into a MongoDB.
UPDATE
To be able to chunk from the aggregator using the same correlation key (file name), we need to remove already released group from the store. See AggregatorSpec.expireGroupsUponCompletion()
option and respective docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论