英文:
Apache Camel, sFTP component and idempotent repository
问题
Here is the translated content you requested:
我想要在高可用(HA)模式下构建应用程序。
这意味着我可以有可变数量的实例,即我需要有5个应用程序实例。
该应用程序应从ftp/sftp读取数据,避免重复(一个文件不能被处理两次)。
为了解决这个问题,我决定使用active/active
设置中的集群化camel路由。该设置使用幂等存储库。
以下是我的幂等存储库配置(我使用spring boot,camel spring boot starters,sql和ftp)
@Configuration
public class IdempotentRepoConf {
@Autowired
private DataSource dataSource;
@Bean
public JdbcMessageIdRepository sftpProcesorName() {
return new JdbcMessageIdRepository(dataSource, "sftpProcesorName");
}
}
以及我的路由器
@Component
public class FooSftpRouter extends RouteBuilder {
@Autowired
private IdempotentRepository idempotentRepository;
@Override
public void configure() throws Exception {
from("sftp:localhost:2221/upload/files/foo?username=foo" +
"&password=pass" +
"&move=./.done" +
"&moveFailed=.error" +
"&idempotentRepository=#sftpProcesorName")
.idempotentConsumer(header(Exchange.FILE_NAME),idempotentRepository)
.to("sftp:localhost:2221/upload/files/bar?username=foo&password=pass")
.end();
}
}
当我只运行一个实例时,一切都正常工作,但当我运行多个实例时,出现错误
2020-08-03 15:14:14.589 WARN 18071 --- [pload/files/foo] o.a.c.c.file.remote.SftpConsumer : Error processing file RemoteFile[Foo 4 ] due to Cannot retrieve file: upload/files/foo/Foo 4 . Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: upload/files/foo/Foo 4 ]
...
为了收到此警告,我执行以下步骤:
- 将几个文件放在sftp的目录
/home/foo/upload/files/foo
中(例如:
for i in {1..10}; do touch "Foo $i "; done;
- 检查数据库(PostgreSQL)
select * from camel_messageprocessed;
如我所预期,我有10条记录
但在日志中,我多次看到警告和错误
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 3
和
Caused by: com.jcraft.jsch.SftpException: No such file
我的build.gradle文件中有以下依赖项
compile 'org.apache.camel.springboot:camel-ftp-starter:3.2.0'
compile 'org.apache.camel.springboot:camel-sql-starter:3.2.0'
我还尝试了选项readLock=idempotent
,但是根据文档中的说明,此选项仅适用于文件组件,可能在ftp/sftp组件中不起作用。
所以,我如何解决避免重复处理并只处理一次的问题?
英文:
I want to build application in high availability (HA) mode.
This means I can have changeable number of instances i.e. I need to have 5 instances of application.
The application should read data from ftp/sftp, avoiding duplicates (one file can't be process 2 times).
To resolve this problem I decide to use clustered camel routes in active/active
setup. This setup is using Idempotent repository.
Below is my Idempotent Repository Configuration (I'm using spring boot, and camel spring boot starters, sql and ftp)
@Configuration
public class IdempotentRepoConf {
@Autowired
private DataSource dataSource;
@Bean
public JdbcMessageIdRepository sftpProcesorName() {
return new JdbcMessageIdRepository(dataSource, "sftpProcesorName");
}
}
And my Router
@Component
public class FooSftpRouter extends RouteBuilder {
@Autowired
private IdempotentRepository idempotentRepository;
@Override
public void configure() throws Exception {
from("sftp:localhost:2221/upload/files/foo?username=foo" +
"&password=pass" +
"&move=./.done" +
"&moveFailed=.error" +
"&idempotentRepository=#sftpProcesorName")
.idempotentConsumer(header(Exchange.FILE_NAME),idempotentRepository)
.to("sftp:localhost:2221/upload/files/bar?username=foo&password=pass")
.end();
}
}
When I run just one instance everything is working without any problem, but when I run more than one instance I have problem with error
2020-08-03 15:14:14.589 WARN 18071 --- [pload/files/foo] o.a.c.c.file.remote.SftpConsumer : Error processing file RemoteFile[Foo 4 ] due to Cannot retrieve file: upload/files/foo/Foo 4 . Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot retrieve file: upload/files/foo/Foo 4 ]
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 4
at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:778) ~[camel-ftp-3.2.0.jar:3.2.0]
at org.apache.camel.component.file.remote.SftpOperations.retrieveFile(SftpOperations.java:717) ~[camel-ftp-3.2.0.jar:3.2.0]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:434) ~[camel-file-3.2.0.jar:3.2.0]
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:145) ~[camel-ftp-3.2.0.jar:3.2.0]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:234) ~[camel-file-3.2.0.jar:3.2.0]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:196) ~[camel-file-3.2.0.jar:3.2.0]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:187) ~[camel-support-3.2.0.jar:3.2.0]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:106) ~[camel-support-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.jcraft.jsch.SftpException: No such file
at com.jcraft.jsch.ChannelSftp.throwStatusError(ChannelSftp.java:2873) ~[jsch-0.1.55.jar:na]
at com.jcraft.jsch.ChannelSftp._stat(ChannelSftp.java:2225) ~[jsch-0.1.55.jar:na]
at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1318) ~[jsch-0.1.55.jar:na]
at com.jcraft.jsch.ChannelSftp.get(ChannelSftp.java:1290) ~[jsch-0.1.55.jar:na]
at org.apache.camel.component.file.remote.SftpOperations.retrieveFileToStreamInBody(SftpOperations.java:759) ~[camel-ftp-3.2.0.jar:3.2.0]
... 13 common frames omitted
To receive this warrning I do these steps
- Place several files in sftp in localization
/home/foo/upload/files/foo
(i.e.
for i in {1..10}; do touch "Foo $i "; done;
- Check database (postgress)
select * from camel_messageprocessed;
and as I expected I have 10 records
processorname | messageid | createdat
------------------+-----------+-------------------------
sftpProcesorName | Foo 1 | 2020-08-03 15:14:13.392
sftpProcesorName | Foo 10 | 2020-08-03 15:14:13.607
sftpProcesorName | Foo 9 | 2020-08-03 15:14:14.409
sftpProcesorName | Foo 6 | 2020-08-03 15:14:14.419
sftpProcesorName | Foo 8 | 2020-08-03 15:14:14.427
sftpProcesorName | Foo 2 | 2020-08-03 15:14:14.435
sftpProcesorName | Foo 3 | 2020-08-03 15:14:14.447
sftpProcesorName | Foo 5 | 2020-08-03 15:14:14.455
sftpProcesorName | Foo 4 | 2020-08-03 15:14:14.462
sftpProcesorName | Foo 7 | 2020-08-03 15:14:14.469
(10 rows)
but in logs I see Warn and errors multiple time
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot retrieve file: upload/files/foo/Foo 3
and
Caused by: com.jcraft.jsch.SftpException: No such file
My build.gradle have dependency
compile 'org.apache.camel.springboot:camel-ftp-starter:3.2.0'
compile 'org.apache.camel.springboot:camel-sql-starter:3.2.0'
I also tried with option readLock=idempotent
but this option in documentation is (only for file component)
and probably don't work in ftp/sftp component
So my problem to avoid duplication and process only one time is still not resolved.
what I'm doing wrong?
答案1
得分: 3
显然你的流程在竞争。我认为这没有起到作用。
&idempotentRepository=#sftpProcesorName
而且idempotentConsumer()
DSL 仅在SFTP完成后运行。
从记忆中尝试一些方法:
readLock=fileLock
# 可能不起作用readlock=markerFile
# 或许对于“大多数情况正常”的操作已经足够inProgressRepository=#jdbcRepository
应该是一个坚固的解决方案。
英文:
Clearly your processes are competing. I don't think this is doing anything
&idempotentRepository=#sftpProcesorName
and the idempotentConsumer()
DSL is operating only after SFTP has occurred.
Some items to try from memory:
readLock=fileLock
# probably wont workreadlock=markerFile
# might be all you need for "mostly ok" operationinProgressRepository=#jdbcRepository
Should be iron-clad solution.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论