Spring Sftp 使用出站网关在入站适配器消息处理程序内获取文件

huangapple go评论80阅读模式
英文:

Spring Sftp fetch file using outbound gateway within inbound adapter message handler

问题

我正在使用Java DSL中的入站适配器来轮询SFTP服务器上的PDF文件。我有一个使用案例,即在获取PDF文件后,应用程序将从SFTP服务器上以CSV格式获取具有相同名称的配置文件。在获取配置文件后,应用程序将使用配置文件中定义的属性处理原始PDF文件,并使用出站适配器将其上传回SFTP服务器。

我在使用出站网关的处理程序中遇到了在同一线程中获取配置文件的问题。

以下是我的代码:

注册集成流程:

  for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getOutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();

入站适配器集成流程:

  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),
            e -> e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -> {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }

出站适配器集成流程:

  private IntegrationFlow getOutboundIntegrationFlow() {

    return IntegrationFlows.from("sftpOutboundChannel")
        .handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format("headers['%s']", FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean("sftpOutboundChannel")
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }

SFTP消息处理程序:

  @Async("sftpHandlerAsyncExecutor")
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      处理原始文件并将处理后的文件存储在本地目录上的输出文件路径中
     */
      
    boolean success = uploadFileToSftpServer(outputFilePath, client, entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }

出站网关GET集成流程:

  private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from("sftpGetInputChannel")
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.GET, "payload")
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,
                AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format("headers['%s']", Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autoCreateLocalDirectory(true))
        .channel("nullChannel")
        .get();
  }

  @Bean("sftpGetInputChannel")
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }

messageHandler.handleMessage() 方法在异步模式下被调用(使用ThreadPoolTaskExecutor),其中内部使用出站网关获取配置文件。但我找不到一个单一的通道,在其中可以在同一线程中发送和接收消息载荷。我在Spring文档中找到了MessagingTemplate,但无法找到将其与我的出站网关集成流程连接的方法。

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage<>("/dir/file.csv", headers)) 会在使用DirectChannel()时引发"Dispatcher has no subscribers for channel"异常。

我正在寻找解决方案,以便可以通过以下方式之一从服务器获取所需的文件:

  • 将MessagingTemplate与IntegrationFlow集成(如果可能),并使用适当的通道。
  • 在入站适配器流程中使用一些消息处理程序的链接,在轮询原始文件后,它将使用SFTP出站网关获取另一个文件,然后使用这两个对象(原始文件和配置文件)调用最终处理程序。我正在尝试使用上面的自定义代码来实现类似的事情。
  • 在多线程环境中使用发送和轮询通道进行GET命令的任何其他方法。

在使用GET命令时,应用程序需要在运行时决定目录路径。

英文:

I am using Inbound Adapter using Java DSL to poll pdf files from the SFTP server. I have a use case where after fetching the pdf file application will fetch the config file present in CSV format with the same name on the SFTP server. After fetching the config file, the application will process the original pdf file using the properties defined in the config file and upload it back to SFTP server using the outbound adapter.

I am facing problems with fetching config files within the handler on the same thread using the outbound gateway.

Here is my code :

Register Integration Flows:

  for (String client : clientsArr) {
      this.flowContext.registration(getInboundIntegrationFlow(client)).register();
  }

  this.flowContext.registration(getOutboundIntegrationFlow()).register();
  this.flowContext.registration(sftpGatewayGetIntegrationFlow()).register();

Inbound Adapter Integration Flow:


  @Autowired
  private SftpPdfMessageHandler messageHandler;

  private IntegrationFlow getInboundIntegrationFlow(String client) {

    String remoteDirectory = getRemoteDirectory(client);
    String localDirectory = getLocalDirectory(client);
    String inboundAdapterId = getInboundAdapterId(client);

    return IntegrationFlows
        .from(Sftp.inboundAdapter(sftpSessionFactory())
                .preserveTimestamp(true)
                .remoteDirectory(remoteDirectory)
                .autoCreateLocalDirectory(true)
                .localDirectory(new File(localDirectory))
                .maxFetchSize(Integer.parseInt(sftpProperties.getMaxFetchSize()))
                .filter(new SftpSimplePatternFileListFilter(sftpProperties.getRemoteFileFilter()))
                .deleteRemoteFiles(true),
            e -&gt; e.id(inboundAdapterId)
                .autoStartup(true)
                .poller(Pollers
                    .fixedDelay(Long.parseLong(sftpProperties.getPollPeriodInSeconds()), TimeUnit.SECONDS)
                    .receiveTimeout(Long.parseLong(sftpProperties.getPollerTimeout()))
                    .maxMessagesPerPoll(Long.parseLong(sftpProperties.getMaxMessagesPerPoll()))
                ))
        .handle(inBoundHandler())
        .get();
  }

  public MessageHandler inBoundHandler() {
    return message -&gt; {
      File file = (File) message.getPayload();
      messageHandler.handleMessage(file);
    };
  }

Outbound Adapter Integration Flow:

  private IntegrationFlow getOutboundIntegrationFlow() {

    return IntegrationFlows.from(&quot;sftpOutboundChannel&quot;)
        .handle(Sftp.outboundAdapter(sftpSessionFactory(), FileExistsMode.FAIL)
            .remoteDirectoryExpression(String.format(&quot;headers[&#39;%s&#39;]&quot;, FileHeaders.REMOTE_DIRECTORY))).get();
  }

  @Bean(&quot;sftpOutboundChannel&quot;)
  public MessageChannel sftpOutboundChannel() {
    return new DirectChannel();
  }

SFTP Message Handler:

  @Async(&quot;sftpHandlerAsyncExecutor&quot;)
  public void handleMessage(File originalFile) {

    File configFile = fetchConfigFile();

    /*
      process original file and store processed file in output file path on local directory
     */
      
    boolean success = uploadFileToSftpServer(outputFilePath, client, entity);

    if (success) {
      deleteFileFromLocal(originalFile);
    }
  }

Outbound Gateway GET Integration Flow:

  private IntegrationFlow sftpGatewayGetIntegrationFlow() {
    return IntegrationFlows.from(&quot;sftpGetInputChannel&quot;)
        .handle(Sftp.outboundGateway(sftpSessionFactory(),
            AbstractRemoteFileOutboundGateway.Command.GET, &quot;payload&quot;)
            .options(AbstractRemoteFileOutboundGateway.Option.DELETE,
                AbstractRemoteFileOutboundGateway.Option.PRESERVE_TIMESTAMP)
            .localDirectoryExpression(String.format(&quot;headers[&#39;%s&#39;]&quot;, Constants.HEADER_LOCAL_DIRECTORY_NAME))
            .autoCreateLocalDirectory(true))
        .channel(&quot;nullChannel&quot;)
        .get();
  }

  @Bean(&quot;sftpGetInputChannel&quot;)
  public MessageChannel sftpGetInputChannel() {
    return new DirectChannel();
  }

messageHandler.handleMessage() method is called in async (with ThreadPoolTaskExecutor) which internally fetch config file using the outbound gateway. But I couldn't find a single channel where I can send and receive a message payload in the same thread. I found MessagingTemplate in spring docs but couldn't find a way to connect this with my outbound gateway integration flow.

sftpGetMessageTemplate.sendAndReceive(sftpGetInputChannel, new GenericMessage&lt;&gt;(&quot;/dir/file.csv&quot;, headers)) gives "Dispatcher has no subscribers for channel" exception with DirectChannel().

I am looking for a solution where I can fetch required file from the server with any of the following ways :

  • Integrate MessagingTemplate with IntegrationFlow (if possible) using an appropriate channel.
  • Some chaining of message handlers in the inbound adapter flow where after polling the original file it will fetch another file using sftp outbound gateway and then call final handler with both objects (original file and config file). I am trying to achieve a similar thing using the custom code above.
  • Any other way to use send and poller channels for GET command in a multi-threaded environment.

> Application needs to decide the directory path on runtime while using the GET command.

答案1

得分: 1

你可能需要了解什么是 @MessagingGateway,以及如何使其与你的出站网关上的通道进行交互。

有关更多信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

如果你真的希望将配置文件作为结果,就不应该使用 .channel(&quot;nullChannel&quot;). 使用手动控制的网关,会有一个包含 TemporaryReplyChannel 实例的 replyChannel 头部。然后在你的代码中,你只需要将这个函数接口用作调用的 API。

实际上,该消息网关使用了提到的 MessagingTemplate.sendAndReceive()

英文:

You probably need to learn what is a @MessagingGateway and how to make it to interact with channels on your outbound gateway.

See docs for more info: https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/messaging-endpoints.html#gateway

If you really would like to get a config file as a result, you should not do .channel(&quot;nullChannel&quot;). With the gateway in hands there is going to be replyChannel header with a TemporaryReplyChannel instance populated by the gateway. Then in your code you just going to use that functional interface as an API to call.

In fact that messaging gateway uses a mentioned MessagingTemplate.sendAndReceive().

huangapple
  • 本文由 发表于 2020年8月21日 23:19:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/63525578.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定