如何在Spring Integration中处理批量操作

huangapple go评论67阅读模式
英文:

How to handle bulk operations in Spring Integration

问题

我们正在使用Java DSL开发Spring Integration Flow。该应用程序从远程文件读取数据并将数据插入MongoDB。我们正在流式传输文件行,需要将数据批量插入MongoDB。根据我对Spring Integration文档和示例的理解,似乎没有针对此操作的批量选项,我无法弄清楚如何实现所期望的行为。我们尝试使用Aggregation,但我们没有找到适合固定批量大小的解决方案。

涉及的bean示例:

@Configuration
public class SampleConfiguration {

  ...

  @Bean
  MessagingTemplate messagingTemplate(ApplicationContext context) {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setBeanFactory(context);
    return messagingTemplate;
  }

  @Bean
  IntegrationFlow sftpSource() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost("localhost");
    factory.setPort(22);
    factory.setUser("foo");
    factory.setPassword("foo");
    factory.setAllowUnknownKeys(true);
    SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
    return IntegrationFlow
        .from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
                .remoteDirectory("upload")
                .patternFilter("*.csv")
                .maxFetchSize(1),
            spec -> spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
                .autoStartup(true))
        .split(Files
            .splitter()
            .markers()
            .charset(StandardCharsets.UTF_8)
            .firstLineAsHeader("fileHeader")
            .applySequence(true))
        .filter(payload -> !(payload instanceof FileSplitter.FileMarker))
        .enrichHeaders(h -> h.errorChannel("errorChannel"))
        .handle((String payload, MessageHeaders headers) -> {
          String header = headers.get("fileHeader", String.class);
          String rowWithHeader = header + "\n" + payload;
          try (StringReader reader = new StringReader(rowWithHeader)) {
            CsvToBean<MyPojo> beanReader = new CsvToBeanBuilder<MyPojo>(reader)
                .withType(MyPojo.class)
                .withSeparator(';')
                .build();
            return beanReader.iterator().next();
          }
        })
        .handle(MongoDb
            .outboundGateway(mongoTemplate)
            .entityClass(MyPojo.class)
            .collectionNameFunction(m -> "mypojo")
            .collectionCallback(
                (collection, message) -> {
                  MyPojo myPojo = (MyPojo) message.getPayload();
                  Document document = new Document();
                  mongoTemplate.getConverter().write(myPojo, document);
                  return collection.insertOne(document);
                }))
        .channel("logChannel")
        .get();
  }

  @Bean
  IntegrationFlow logFiles() {
    return IntegrationFlow
        .from("logChannel")
        .handle(message -> log.info("message received: {}", message))
        .get();
  }

  @Bean
  IntegrationFlow logErrors() {
    return IntegrationFlow
        .from("errorChannel")
        .handle(message -> {
          MessagingException exception = (MessagingException) message.getPayload();
          log.error("error message received: {} for message {}",
              exception.getMessage(),
              exception.getFailedMessage());
        })
        .get();
  }
  ...
}

更新 - 聚合步骤

.aggregate(aggregatorSpec -> aggregatorSpec
            .correlationStrategy(message -> message
                .getHeaders()
                .get(FileHeaders.REMOTE_FILE))
            .releaseStrategy(group -> group.size() >= 5)
            .groupTimeout(200L)
            .sendPartialResultOnExpiry(true))
.handle(MongoDb...)

请注意,上述示例是一段Java代码,描述了Spring Integration Flow的配置和处理步骤。如果您需要进一步的信息或有其他问题,请告诉我。

英文:

we are developing a Spring Integration Flow using Java DSL. This application reads from a remote File and inserts data in MongoDB. We are streaming the file lines and we need to bulk-insert data in MongoDB. From my understanding of the Spring Integration documentation and samples, there's no bulk option for this and I can't figure out how to implement the expected behaviour. We tried using Aggregation but we didn't find a suitable solution for fixed batch size.

Sample of the involved beans

@Configuration
public class SampleConfiguration {

  ...

  @Bean
  MessagingTemplate messagingTemplate(ApplicationContext context) {
    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setBeanFactory(context);
    return messagingTemplate;
  }

  @Bean
  IntegrationFlow sftpSource() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost(&quot;localhost&quot;);
    factory.setPort(22);
    factory.setUser(&quot;foo&quot;);
    factory.setPassword(&quot;foo&quot;);
    factory.setAllowUnknownKeys(true);
    SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
    return IntegrationFlow
        .from(Sftp.inboundStreamingAdapter(template, Comparator.comparing(DirEntry::getFilename))
                .remoteDirectory(&quot;upload&quot;)
                .patternFilter(&quot;*.csv&quot;)
                .maxFetchSize(1),
            spec -&gt; spec.poller(Pollers.fixedRate(Duration.ofMillis(1000)))
                .autoStartup(true))
        .split(Files
            .splitter()
            .markers()
            .charset(StandardCharsets.UTF_8)
            .firstLineAsHeader(&quot;fileHeader&quot;)
            .applySequence(true))
        .filter(payload -&gt; !(payload instanceof FileSplitter.FileMarker))
        .enrichHeaders(h -&gt; h.errorChannel(&quot;errorChannel&quot;))
        .handle((String payload, MessageHeaders headers) -&gt; {
          String header = headers.get(&quot;fileHeader&quot;, String.class);
          String rowWithHeader = header + &quot;\n&quot; + payload;
          try (StringReader reader = new StringReader(rowWithHeader)) {
            CsvToBean&lt;MyPojo&gt; beanReader = new CsvToBeanBuilder&lt;MyPojo&gt;(reader)
                .withType(MyPojo.class)
                .withSeparator(&#39;;&#39;)
                .build();
            return beanReader.iterator().next();
          }
        })
        .handle(MongoDb
            .outboundGateway(mongoTemplate)
            .entityClass(MyPojo.class)
            .collectionNameFunction(m -&gt; &quot;mypojo&quot;)
            .collectionCallback(
                (collection, message) -&gt; {
                  MyPojo myPojo = (MyPojo) message.getPayload();
                  Document document = new Document();
                  mongoTemplate.getConverter().write(myPojo, document);
                  return collection.insertOne(document);
                }))
        .channel(&quot;logChannel&quot;)
        .get();
  }

  @Bean
  IntegrationFlow logFiles() {
    return IntegrationFlow
        .from(&quot;logChannel&quot;)
        .handle(message -&gt; log.info(&quot;message received: {}&quot;, message))
        .get();
  }

  @Bean
  IntegrationFlow logErrors() {
    return IntegrationFlow
        .from(&quot;errorChannel&quot;)
        .handle(message -&gt; {
          MessagingException exception = (MessagingException) message.getPayload();
          log.error(&quot;error message received: {} for message {}&quot;, exception.getMessage(),
              exception.getFailedMessage());
        })
        .get();
  }
  ...
}

Update - aggregation step

.aggregate(aggregatorSpec -&gt; aggregatorSpec
            .correlationStrategy(message -&gt; message
                .getHeaders()
                .get(FileHeaders.REMOTE_FILE))
            .releaseStrategy(group -&gt; group.size() &gt;= 5)
            .groupTimeout(200L)
            .sendPartialResultOnExpiry(true))
.handle(MongoDb...)

答案1

得分: 1

我认为你需要查看一个FileAggregator,它能够根据FileMarkerReleaseStrategy将行收集到一个列表中:https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator。

然后,是的,你可以在你的collectionCallback()中将载荷强制转换为List,并批量插入到MongoDB。

更新

为了能够从聚合器中使用相同的关联键(文件名)来分块,我们需要从存储中删除已释放的组。请查看AggregatorSpec.expireGroupsUponCompletion()选项和相应的文档:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy。

英文:

I think you need to look into a FileAggregator which is able to collect lines into a list according to the FileMarkerReleaseStrategy: https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-aggregator.

Then yes, you can cast a payload into a List in your collectionCallback() and perform batch insert into a MongoDB.

UPDATE

To be able to chunk from the aggregator using the same correlation key (file name), we need to remove already released group from the store. See AggregatorSpec.expireGroupsUponCompletion() option and respective docs: https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#releasestrategy

huangapple
  • 本文由 发表于 2023年5月24日 20:31:40
  • 转载请务必保留本文链接:https://go.coder-hub.com/76323585.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定