英文:
How do I define good error handling in camel-ftp component?
问题
你的代码中出现了一些异常,导致文件重复处理。主要问题似乎是在文件重命名步骤中出现了 GenericFileOperationFailedException
异常。你希望确保文件只传递一次到 JMS 组件,即使出现重试或错误也希望将消息发送到死信队列 (DLQ)。
对于你的问题,以下是可能的解决方案:
-
处理重命名失败:你可以尝试处理文件重命名失败的情况,以避免文件被重复处理。可以在出现
GenericFileOperationFailedException
时将消息标记为已处理,以防止重新传递。 -
配置错误处理策略:在Camel路由中,你可以使用
errorHandler
配置错误处理策略。你可以设置在出现异常时将消息发送到DLQ,以确保消息不会丢失。 -
限制重试次数:在路由配置中,你可以设置最大的重试次数,以避免无限重试。如果达到最大重试次数,消息应该被发送到DLQ。
-
监控和日志:添加详细的日志记录,以便能够跟踪消息的处理情况,包括错误和重试情况。
根据你的具体需求,你可以采用这些方法中的一个或多个来确保消息只传递一次,并在错误情况下将其发送到DLQ,以便稍后处理。
英文:
One of our application is a Camel Spring-Boot application.
The route is fairly simple: it gets a file from a FTP server using the camel-ftp component and pushes it to a JMS queue. We use Apache Artemis as broker.
camel version is : 2.20.2
Spring-Boot is : 2.7.8
@Component
@Slf4j
public class FTPListenerRoute extends RouteBuilder {
@Value("${ems.activemq.queue.name}")
private String activeMqTargetQueueName;
@Value("${ems.error-uri:error}")
private String errorUri;
@Value("${ems.max-redeliveries}")
private int maximumRetries;
private final FTPConfiguration ftpConfiguration;
@Autowired
public FTPListenerRoute(FTPConfiguration ftpConfiguration) {
this.ftpConfiguration = ftpConfiguration;
}
@Override
public void configure() throws Exception {
errorHandler(
springTransactionErrorHandler()
.loggingLevel(LoggingLevel.DEBUG)
.maximumRedeliveries(0)
.log(log).logHandled(true));
String uri = ftpConfiguration.buildUri();
from(uri)
.routeId("listener-main-route")
//.transacted()
.doTry()
.to("direct:msgInTransaction")
.doCatch(RuntimeCamelException.class)
.log(LoggingLevel.ERROR, log, "caugt exception, rerouting to : {}", errorUri)
.to(errorUri)
.endDoTry();
from("direct:msgInTransaction")
.routeId("ftplistener-transacted-route")
.log(LoggingLevel.INFO, "Processing ${id} with headers: ${headers}")
.transacted()
.to("jms:queue:" + activeMqTargetQueueName);
}
@Bean
public PlatformTransactionManager jmsTransactionManager(@Qualifier("jmsConnectionFactory") ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}
package be.fgov.minfin.esbsoa.ems.ftp.listener.configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.net.URISyntaxException;
import java.util.Optional;
@Component
public class FTPConfiguration {
// FTP Properties
@Value("${ems.ftp.username}")
private String ftpUsername;
@Value("${ems.ftp.password}")
private String ftpPassword;
@Value("${ems.ftp.host}")
private String ftpHost;
@Value("${ems.ftp.port:}")
private Optional<Integer> ftpPort;
@Value("${ems.ftp.path}")
private String ftpPath;
@Value("${ems.ftp.path.error:error}")
private String ftpErrorPath;
@Value("${ems.ftp.path.completed:completed}")
private String ftpCompletedPath;
@Value("${ems.ftp.delay:30000}")
private String ftpDelay;
@Value("${ems.ftp.filter.file.name:}")
private String fileNameFilter;
@Value("${ems.ftp.deleteFile:false}")
private boolean isFilesDeletionAfterCompletion;
@Value("${ems.ftp.filter.file.size:50000000}")
private int maxFileSize;
@Value("${ems.ftp.protocol:ftp}")
private String protocol;
@Value("${ems.ftp.passiveMode:true}")
private String passiveMode;
public String buildUri() throws URISyntaxException {
URIBuilder ftpUriBuilder = getUri(ftpPath);
ftpUriBuilder.addParameter("moveFailed", ftpErrorPath)
.addParameter("delay", ftpDelay)
.addParameter("binary", "true")
.addParameter("initialDelay", "5")
.addParameter("filterFile", "${file:size} <= " + maxFileSize)
.addParameter("readLock", "changed");
if (this.isFilesDeletionAfterCompletion) {
ftpUriBuilder.addParameter("delete", "true");
} else {
ftpUriBuilder.addParameter("move", ftpCompletedPath);
}
if (StringUtils.isNotBlank(fileNameFilter)) {
ftpUriBuilder.addParameter("include", fileNameFilter);
}
return ftpUriBuilder.build().toString();
}
private URIBuilder getUri(String path) {
URIBuilder uriBuilder = new URIBuilder()
.setScheme(protocol)
.setHost(ftpHost)
.setUserInfo(ftpUsername, ftpPassword)
.setPath(path)
.addParameter("passiveMode", passiveMode);
ftpPort.ifPresent(uriBuilder::setPort);
return uriBuilder;
}
}
During maintenance week-ends, servers are often rebooted and this route fails with the following error:
2023-03-14 07:50:53.596 INFO 1 --- [tra/Coda_Edepo/] ftplistener-main-route : Processing 3DAABB587130ED0-000000000000026E with headers: {CamelFileAbsolute=false, CamelFileAbsolutePath=Coda_Edepo/FIL.EMPCFF.679200407454.20230313.DEP, CamelFileHost=ftphost, CamelFileLastModified=1678754100000, CamelFileLength=516516, CamelFileName=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileNameConsumed=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileNameOnly=FIL.EMPCFF.679200407454.20230313.DEP, CamelFileParent=Coda_Edepo, CamelFilePath=Coda_Edepo//FIL.EMPCFF.679200407454.20230313.DEP, CamelFileRelativePath=FIL.EMPCFF.679200407454.20230313.DEP, CamelFtpReplyCode=226, CamelFtpReplyString=226 Transfer complete.
, CamelMessageTimestamp=1678754100000}
2023-03-14 07:50:53.617 WARN 1 --- [tra/Coda_Edepo/] o.a.c.c.file.GenericFileOnCompletion : Error during commit. Exchange[3DAABB587130ED0-000000000000026E]. Caused by: [org.apache.camel.component.file.GenericFileOperationFailedException - Cannot rename file: RemoteFile[FIL.EMPCFF.679200407454.20230313.DEP] to: RemoteFile[/Coda_Edepo//completed/FIL.EMPCFF.679200407454.20230313.DEP]]
org.apache.camel.component.file.GenericFileOperationFailedException: Cannot rename file: RemoteFile[FIL.EMPCFF.679200407454.20230313.DEP] to: RemoteFile[/Coda_Edepo//completed/FIL.EMPCFF.679200407454.20230313.DEP]
at org.apache.camel.component.file.strategy.GenericFileProcessStrategySupport.renameFile(GenericFileProcessStrategySupport.java:147) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.strategy.GenericFileRenameProcessStrategy.commit(GenericFileRenameProcessStrategy.java:121) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileOnCompletion.processStrategyCommit(GenericFileOnCompletion.java:134) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileOnCompletion.onCompletion(GenericFileOnCompletion.java:86) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileOnCompletion.onComplete(GenericFileOnCompletion.java:60) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.support.UnitOfWorkHelper.doneSynchronization(UnitOfWorkHelper.java:99) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.support.UnitOfWorkHelper.doneSynchronizations(UnitOfWorkHelper.java:88) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultUnitOfWork.done(DefaultUnitOfWork.java:238) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.support.UnitOfWorkHelper.doneUow(UnitOfWorkHelper.java:59) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:775) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$UnitOfWorkProcessorAdvice.after(CamelInternalProcessor.java:710) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:263) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.AsyncCallback.run(AsyncCallback.java:44) ~[camel-api-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.schedule(DefaultReactiveExecutor.java:59) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor$AsyncAfterTask.done(CamelInternalProcessor.java:275) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.spring.spi.TransactionErrorHandler.process(TransactionErrorHandler.java:138) ~[camel-spring-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.processor.Pipeline$PipelineTask.run(Pipeline.java:109) ~[camel-core-processor-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:179) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:64) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:184) ~[camel-core-processor-3.12.0.jar:3.12.0]
at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:399) ~[camel-base-engine-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileConsumer.processExchange(GenericFileConsumer.java:492) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.remote.RemoteFileConsumer.processExchange(RemoteFileConsumer.java:156) ~[camel-ftp-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileConsumer.processBatch(GenericFileConsumer.java:245) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:206) ~[camel-file-3.12.0.jar:3.12.0]
at org.apache.camel.support.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:191) ~[camel-support-3.12.0.jar:3.12.0]
at org.apache.camel.support.ScheduledPollConsumer.run(ScheduledPollConsumer.java:108) ~[camel-support-3.12.0.jar:3.12.0]
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.FutureTask.runAndReset(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[na:na]
at java.base/java.lang.Thread.run(Unknown Source) ~[na:na]
After that, the file is reprocessed again and again by camel until the rename operation succeeds.
This causes a big problem with duplicates in our database.
My understanding is the following:
- a file is put one the ftp server and is picked up by Camel
- a transaction is started
- the file is sent to the broker via JMS and is successful
- camel tries to "rename" the file to move it to the
completed
folder - renaming fails and
GenericFileOperationFailedException
is thrown - file is put back into the processing folder and reprocessed again from step 1.
I created an integration test using the Spock Framework and MockFtpServer library.
I have the following successful happy scenario:
def 'happy scenario'() {
given: 'one original file destination, one completed file destination'
fileSystem.add(new FileEntry("/upload/mockFile.txt", "test"))
fileSystem.add(new FileEntry("/upload/completed/mockFile.txt", "test"))
and: 'start the server'
ftpServer.start()
camelContext.start()
when: 'the file is actually sent to the ftp server'
remoteFile.storeFile("src/test-integration/resources/files/mockFile.txt", "/upload/mockFile.txt")
then: 'assert that it was'
// assert that the file is put on the ftp
"test" == remoteFile.readFile("/upload/mockFile.txt")
and: 'receive the file on the other end of the route'
def receivedMessage = jmsMessagingTemplate.receive(jmsDestinationQueue)
then: 'assert that that it is the correct file'
"test" == new String(receivedMessage.getPayload())
}
In the error scenario, I am stubbing the RENTO ftp command to throw GenericFileOperationFailedException
def 'failed scenario - GenericFileOperationFailedException'() {
given: 'an original file location on the FTP'
fileSystem.add(new FileEntry("/upload/mockFile.txt", "test"))
fileSystem.add(new FileEntry("/upload/completed/mockFile.txt"))
and: 'a custom ERROR code for the RNTO command'
ftpServer.setCommandHandler(CommandNames.RNTO, new ExceptionCommandHandler())
and: 'start the server'
ftpServer.start()
camelContext.start()
when: 'the file is actually sent to the ftp server'
remoteFile.storeFile("src/test-integration/resources/files/mockFile.txt", "/upload/mockFile.txt")
then:'assert that it was'
"test" == remoteFile.readFile("/upload/mockFile.txt")
and: 'assert that message was received in the destination queue'
def receivedFromDestinationQueue = jmsMessagingTemplate.receive(jmsQueueName)
assertNotNull(receivedFromDestinationQueue.payload)
and: 'try to poll DLQ, original message should be sent there'
def dlq = StringUtils.substringAfterLast(errorUri, ":")
def receivedFromDLQ = jmsMessagingTemplate.receive(dlq)
assertNotNull(receivedFromDLQ)
}
This test fails: the message is never received from the DLQ where the message should have been sent because of the GenericFileOperationFailedException.
I am polling both the destinationQueue, then the errorQueue because I noticed that the file WAS sent to the destination queue and processed.
What I'm trying to achieve is to be certain that the file is only delivered once to the JMS broker, in all circumstances.
If there are retries or errors anywhere in the process, the messages should be sent to the DLQ.
答案1
得分: 1
请问你是否已经尝试在你的路由中定义一个自定义的onCompletionExceptionHandler
,用于处理在文件完成过程中出现的任何抛出异常,其中消费者要么提交要么回滚?
例如:
from("ftp://...?onCompletionExceptionHandler=#MyHandler")
如果你将文件内容放入一个JMS队列,请确保所消费的文件不要太大(一些JMS代理可能无法处理太大的文件)。
英文:
Did you already try to define a custom onCompletionExceptionHandler
in your route, which is intended to
> "handle any thrown exceptions that happens during the file on
> completion process where the consumer does either a commit or
> rollback"
?
Eg:
from("ftp://...?onCompletionExceptionHandler=#MyHandler")
If you put file contents into a JMS queue, ensure the consumed files are not too large (some JMS brokers do not feel confortable with that).
答案2
得分: 0
I found a suitable solution inspired by this unit-test:
the idea is to reinject the exception in the route, so the error handler picks it up.
Following @TacheDeChoco suggestion, I set up a custom OnCompletionExceptionHandler
on my FTP endpoint.
> To use a custom org.apache.camel.spi.ExceptionHandler to handle any thrown exceptions that happens during the file on completion process where the consumer does either a commit or rollback. The default implementation will log any exception at WARN level and ignore.
I also used the idempotent and idempotent-key options as I know there will be only one file sent per day.
https://camel.apache.org/components/3.20.x/ftp-component.html#_endpoint_query_option_idempotent
This is the final result:
public String buildUri() throws URISyntaxException {
URIBuilder ftpUriBuilder = getUri(ftpPath);
ftpUriBuilder
// other connection params
...
//DO NOT USE at the same time as onCompletionExceptionHandler
//.addParameter("bridgeErrorHandler", "true")
.addParameter("throwExceptionOnConnectFailed", "true")
.addParameter("onCompletionExceptionHandler", "#redirectExceptionHandler")
.addParameter("idempotent", "true")
.addParameter("idempotentKey", "${file:name}-${date:now:yyyyMMdd}")
;
@Slf4j
@Component
public class RedirectExceptionHandler implements ExceptionHandler {
private final ProducerTemplate template;
public RedirectExceptionHandler(ProducerTemplate template) {
this.template = template;
}
@Override
public void handleException(Throwable exception) {
handleException(exception.getMessage(), null, exception);
}
@Override
public void handleException(String message, Throwable exception) {
handleException(message, null, exception);
}
@Override
public void handleException(String message, Exchange exchange, Throwable exception) {
exchange.setException(exception);
exchange.setMessage(exchange.getIn());
// unsure if these are necessary or reset by the errorHandler
exchange.setRollbackOnly(false);
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
// use a producerTemplate to send the exchange to the errorHandling endpoint
template.send("direct:errorHandling", exchange);
}
}
@Override
public void configure() throws Exception {
errorHandler(springTransactionErrorHandler().maximumRedeliveries(0));
// configure onException behaviour for all routes, disable redeliveries
onException(Exception.class)
.handled(true)
.maximumRedeliveries(0)
.to(errorUri);
from(ftpConfiguration.buildUri())
.routeId("listener-main-route")
.to("direct:msgInTransaction")
;
// simple direct: endpoint to rethrow exceptions to the global error-handler
from("direct:errorHandling")
.throwException(new ErrorHandlingException())
;
from("direct:msgInTransaction")
.routeId("ftplistener-transacted-route")
.transacted()
.log(LoggingLevel.INFO, "Processing ${id} with headers: ${headers}")
.to("jms:queue:" + activeMqTargetQueueName)
;
}
英文:
I found a suitable solution inspired by this unit-test:
https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerCustomExceptionHandlerTest.java
the idea is to reinject the exception in the route, so the error handler picks it up.
Following @TacheDeChoco suggestion, I set up a custom OnCompletionExceptionHandler
on my FTP endpoint.
> To use a custom org.apache.camel.spi.ExceptionHandler to handle any thrown exceptions that happens during the file on completion process where the consumer does either a commit or rollback. The default implementation will log any exception at WARN level and ignore.
I also used the idempotent and idempotent-key options as I know there will be only one file sent per day.
https://camel.apache.org/components/3.20.x/ftp-component.html#_endpoint_query_option_idempotent
This is the final result:
public String buildUri() throws URISyntaxException {
URIBuilder ftpUriBuilder = getUri(ftpPath);
ftpUriBuilder
// other connection params
...
//DO NOT USE at the same time as onCompletionExceptionHandler
//.addParameter("bridgeErrorHandler", "true")
.addParameter("throwExceptionOnConnectFailed", "true")
.addParameter("onCompletionExceptionHandler", "#redirectExceptionHandler")
.addParameter("idempotent", "true")
.addParameter("idempotentKey", "${file:name}-${date:now:yyyyMMdd}")
;
@Slf4j
@Component
public class RedirectExceptionHandler implements ExceptionHandler {
private final ProducerTemplate template;
public RedirectExceptionHandler(ProducerTemplate template) {
this.template = template;
}
@Override
public void handleException(Throwable exception) {
handleException(exception.getMessage(), null, exception);
}
@Override
public void handleException(String message, Throwable exception) {
handleException(message, null, exception);
}
@Override
public void handleException(String message, Exchange exchange, Throwable exception) {
exchange.setException(exception);
exchange.setMessage(exchange.getIn());
// unsure if these are necessary or reset by the errorHandler
exchange.setRollbackOnly(false);
exchange.adapt(ExtendedExchange.class).setRedeliveryExhausted(true);
// use a producerTemplate to send the exchange to the errorHandling endpoint
template.send("direct:errorHandling", exchange);
}
}
@Override
public void configure() throws Exception {
errorHandler(springTransactionErrorHandler().maximumRedeliveries(0));
// configure onException behaviour for all routes, disable redeliveries
onException(Exception.class)
.handled(true)
.maximumRedeliveries(0)
.to(errorUri);
from(ftpConfiguration.buildUri())
.routeId("listener-main-route")
.to("direct:msgInTransaction")
;
// simple direct: endpoint to rethrow exceptions to the global error-handler
from("direct:errorHandling")
.throwException(new ErrorHandlingException())
;
from("direct:msgInTransaction")
.routeId("ftplistener-transacted-route")
.transacted()
.log(LoggingLevel.INFO, "Processing ${id} with headers: ${headers}")
.to("jms:queue:" + activeMqTargetQueueName)
;
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论