Apache Camel,sFTP组件和幂等性存储库

huangapple go评论78阅读模式
英文:

Apache Camel, sFTP component and idempotent repository

问题

Here is the translated content you requested:

我想要在高可用(HA)模式下构建应用程序。
这意味着我可以有可变数量的实例,即我需要有5个应用程序实例。
该应用程序应从ftp/sftp读取数据,避免重复(一个文件不能被处理两次)。
为了解决这个问题,我决定使用active/active设置中的集群化camel路由。该设置使用幂等存储库。

以下是我的幂等存储库配置(我使用spring boot,camel spring boot starters,sql和ftp)

@Configuration
public class IdempotentRepoConf {

    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcMessageIdRepository sftpProcesorName() {
        return new JdbcMessageIdRepository(dataSource, "sftpProcesorName");
    }

}

以及我的路由器

@Component
public class FooSftpRouter extends RouteBuilder {

    @Autowired
    private IdempotentRepository idempotentRepository;

    @Override
    public void configure() throws Exception {
        from("sftp:localhost:2221/upload/files/foo?username=foo" +
                "&password=pass" +
                "&move=./.done" +
                "&moveFailed=.error" +
                "&idempotentRepository=#sftpProcesorName")
                .idempotentConsumer(header(Exchange.FILE_NAME),idempotentRepository)
                    .to("sftp:localhost:2221/upload/files/bar?username=foo&password=pass")
                .end();

    }
}

当我只运行一个实例时,一切都正常工作,但当我运行多个实例时,出现错误

2020-08-03 15:14:14.589  WARN 18071 --- [pload/files/foo] o.a.c.c.file.remote.SftpConsumer         : Error processing file RemoteFile[Foo 4 ] due to Cannot retrieve file: upload/files/foo/Foo 4 . Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: upload/files/foo/Foo 4 ]
...

为了收到此警告,我执行以下步骤:

  1. 将几个文件放在sftp的目录/home/foo/upload/files/foo中(例如:
for i in {1..10}; do touch "Foo $i "; done;
  1. 检查数据库(PostgreSQL)
select * from camel_messageprocessed;

如我所预期,我有10条记录

但在日志中,我多次看到警告和错误

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 3

Caused by: com.jcraft.jsch.SftpException: No such file

我的build.gradle文件中有以下依赖项

compile 'org.apache.camel.springboot:camel-ftp-starter:3.2.0'
compile 'org.apache.camel.springboot:camel-sql-starter:3.2.0'

我还尝试了选项readLock=idempotent,但是根据文档中的说明,此选项仅适用于文件组件,可能在ftp/sftp组件中不起作用。

所以,我如何解决避免重复处理并只处理一次的问题?

英文:

I want to build application in high availability (HA) mode.
This means I can have changeable number of instances i.e. I need to have 5 instances of application.
The application should read data from ftp/sftp, avoiding duplicates (one file can't be process 2 times).
To resolve this problem I decide to use clustered camel routes in active/active setup. This setup is using Idempotent repository.

Below is my Idempotent Repository Configuration (I'm using spring boot, and camel spring boot starters, sql and ftp)

@Configuration
public class IdempotentRepoConf {

    @Autowired
    private DataSource dataSource;

    @Bean
    public JdbcMessageIdRepository sftpProcesorName() {
        return new JdbcMessageIdRepository(dataSource, "sftpProcesorName");
    }

}

And my Router

@Component
public class FooSftpRouter extends RouteBuilder {

    @Autowired
    private IdempotentRepository idempotentRepository;

    @Override
    public void configure() throws Exception {
        from("sftp:localhost:2221/upload/files/foo?username=foo" +
                "&password=pass" +
                "&move=./.done" +
                "&moveFailed=.error" +
                "&idempotentRepository=#sftpProcesorName")
                .idempotentConsumer(header(Exchange.FILE_NAME),idempotentRepository)
                    .to("sftp:localhost:2221/upload/files/bar?username=foo&password=pass")
                .end();

    }
}

When I run just one instance everything is working without any problem, but when I run more than one instance I have problem with error

2020-08-03 15:14:14.589  WARN 18071 --- [pload/files/foo] o.a.c.c.file.remote.SftpConsumer         : Error processing file RemoteFile[Foo 4 ] due to Cannot retrieve file: upload/files/foo/Foo 4 . Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: upload/files/foo/Foo 4 ]

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 4 
	at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:778) ~[camel-ftp-3.2.0.jar:3.2.0]
	at org.apache.camel.component.file.remote.SftpOperations.retrieveFile(SftpOperations.java:717) ~[camel-ftp-3.2.0.jar:3.2.0]
	at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:434) ~[camel-file-3.2.0.jar:3.2.0]
	at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:145) ~[camel-ftp-3.2.0.jar:3.2.0]
	at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:234) ~[camel-file-3.2.0.jar:3.2.0]
	at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:196) ~[camel-file-3.2.0.jar:3.2.0]
	at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187) ~[camel-support-3.2.0.jar:3.2.0]
	at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:106) ~[camel-support-3.2.0.jar:3.2.0]
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.jcraft.jsch.SftpException: No such file
	at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873) ~[jsch-0.1.55.jar:na]
	at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2225) ~[jsch-0.1.55.jar:na]
	at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1318) ~[jsch-0.1.55.jar:na]
	at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1290) ~[jsch-0.1.55.jar:na]
	at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:759) ~[camel-ftp-3.2.0.jar:3.2.0]
	... 13 common frames omitted

To receive this warrning I do these steps

  1. Place several files in sftp in localization /home/foo/upload/files/foo (i.e.
for i in {1..10}; do  touch "Foo $i "; done;
  1. Check database (postgress)
select * from camel_messageprocessed;

and as I expected I have 10 records

  processorname   | messageid |        createdat        
------------------+-----------+-------------------------
 sftpProcesorName | Foo 1     | 2020-08-03 15:14:13.392
 sftpProcesorName | Foo 10    | 2020-08-03 15:14:13.607
 sftpProcesorName | Foo 9     | 2020-08-03 15:14:14.409
 sftpProcesorName | Foo 6     | 2020-08-03 15:14:14.419
 sftpProcesorName | Foo 8     | 2020-08-03 15:14:14.427
 sftpProcesorName | Foo 2     | 2020-08-03 15:14:14.435
 sftpProcesorName | Foo 3     | 2020-08-03 15:14:14.447
 sftpProcesorName | Foo 5     | 2020-08-03 15:14:14.455
 sftpProcesorName | Foo 4     | 2020-08-03 15:14:14.462
 sftpProcesorName | Foo 7     | 2020-08-03 15:14:14.469
(10 rows)

but in logs I see Warn and errors multiple time

org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 3

and

Caused by: com.jcraft.jsch.SftpException: No such file

My build.gradle have dependency

    compile 'org.apache.camel.springboot:camel-ftp-starter:3.2.0'
    compile 'org.apache.camel.springboot:camel-sql-starter:3.2.0'

I also tried with option readLock=idempotent but this option in documentation is (only for file component) and probably don't work in ftp/sftp component

So my problem to avoid duplication and process only one time is still not resolved.

what I'm doing wrong?

答案1

得分: 3

显然你的流程在竞争。我认为这没有起到作用。

&idempotentRepository=#sftpProcesorName

而且idempotentConsumer() DSL 仅在SFTP完成后运行。

从记忆中尝试一些方法:

  • readLock=fileLock # 可能不起作用
  • readlock=markerFile # 或许对于“大多数情况正常”的操作已经足够
  • inProgressRepository=#jdbcRepository 应该是一个坚固的解决方案。
英文:

Clearly your processes are competing. I don't think this is doing anything

&idempotentRepository=#sftpProcesorName

and the idempotentConsumer() DSL is operating only after SFTP has occurred.

Some items to try from memory:

  • readLock=fileLock # probably wont work
  • readlock=markerFile # might be all you need for "mostly ok" operation
  • inProgressRepository=#jdbcRepository Should be iron-clad solution.

huangapple
  • 本文由 发表于 2020年8月3日 21:46:23
  • 转载请务必保留本文链接:https://go.coder-hub.com/63230658.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定