Removing a file from ConcurrentMetadataStore if file processing fails in spring-integration-sftp

huangapple go评论79阅读模式
英文:

Removing a file from ConcurrentMetadataStore if file processing fails in spring-integration-sftp

问题

我在使用spring-integration-sftp从SFTP服务器拉取文件并进行一些处理时遇到了问题。我正在使用RedisMetadataStore来支持我的SftpPersistentAcceptOnceFileListFilter。我在我的用例中使用了Streaming Inbound Channel Adapter

问题是,一旦新的文件事件传递到我的处理程序,该文件就会在Redis元数据存储中被标记为已处理,因此如果下游处理失败,该文件将永远丢失。我已经设置了重试,但我想要做的是在Redis中将该文件标记为未处理,并在下一个轮询周期中再次获取它。

我发现RedisMetadataStore类上有一个remove方法,可以在处理过程中发生任何异常时删除一个键,但我想知道是否建议直接调用remove方法?因为据我所理解,这些方法应该由过滤器调用,而不是直接调用,但我找不到处理这种情况的任何信息。

是否有任何构造可以在处理过程中从SftpPersistentAcceptOnceFileListFilter中删除文件,如果处理失败?

英文:

I am facing a problem while using spring-integration-sftp to pull files from SFTP servers and do some processing. I'm using RedisMetadataStore to power my SftpPersistentAcceptOnceFileListFilter. I'm using a Streaming Inbound Channel Adapter for my use case.

The problem is as soon as a new file event is delivered to my handler that file is marked as processed in the Redis metadata store, so now if my downstream processing fails, that file is lost forever. I have retries in place, but what I would like to do is mark that file as un-processed in redis and have it picked up again in next poll cycle.

I found that there is a remove method on RedisMetadataStore class that can remove a key if there is any exception while processing, but wanted to know is it advisable to call the remove method directly ? Because as far as I can understand, these methods are supposed to be called by the filter and not directly but I couldn't find any thing to deal with this situation.

Are there any constructs to remove a file from SftpPersistentAcceptOnceFileListFilter if it fails during processing ?

答案1

得分: 1

提到的SftpPersistentAcceptOnceFileListFilter是一个ResettableFileListFilter

/**
 * 一个可以通过从其状态中删除特定文件来重置的{@link FileListFilter}。
 * @param <F> 将被过滤的类型。
 *
 * 作者:Gary Russell
 *
 * 自4.1.7以来
 */
public interface ResettableFileListFilter<F> extends FileListFilter<F> {

所以,您确实可以将其作为顶级 bean 并在文件处理失败时调用其remove()方法。

该框架在多个地方都这样做,但实际上是在消息传递到目标通道之前:

private void resetFilterIfNecessary(AbstractFileInfo<F> file) {
    if (this.filter instanceof ResettableFileListFilter) {
        this.logger.info(
            LogMessage.format("Removing the remote file '%s' from the filter for a subsequent transfer attempt",
                    file.getFilename()));
        ((ResettableFileListFilter<F>) this.filter).remove(file.getFileInfo());
    }
}

您可以在下游处理的消息中使用FileHeaders.REMOTE_FILE_INFO头来访问FileInfo

英文:

The mentioned SftpPersistentAcceptOnceFileListFilter is a ResettableFileListFilter:

/**
 * A {@link FileListFilter} that can be reset by removing a specific file from its
 * state.
 * @param &lt;F&gt; The type that will be filtered.
 *
 * @author Gary Russell
 *
 * @since 4.1.7
 */
public interface ResettableFileListFilter&lt;F&gt; extends FileListFilter&lt;F&gt; {

So, you indeed can just have it as a top-level bean and call its remove() whenever your file processing has failed.

The framework does that in several places, but really before the message is delivered to target channel:

private void resetFilterIfNecessary(AbstractFileInfo&lt;F&gt; file) {
	if (this.filter instanceof ResettableFileListFilter) {
		this.logger.info(
				LogMessage.format(&quot;Removing the remote file &#39;%s&#39; from the filter for a subsequent transfer attempt&quot;,
						file.getFilename()));
		((ResettableFileListFilter&lt;F&gt;) this.filter).remove(file.getFileInfo());
	}
}

That FileInfo is available for your as a FileHeaders.REMOTE_FILE_INFO header in the message you process downstream.

huangapple
  • 本文由 发表于 2023年6月15日 20:58:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/76482739.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定