英文:
Why is my handler method not triggered when defined as a lambda?
问题
我正在使用DSL语法定义一个IntegrationFlow
,从SFTP流式传输到S3,方式如下:
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
.remoteDirectory("remoteDirectory"),
e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.transform(new StreamTransformer())
.handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // 上传至S3
.get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
return s3MessageHandler;
}
这样可以正常工作:文件会被成功上传到S3存储桶中。然而,我想避免使用SPEL
语法,并且希望能够从消息中提取标头(header),传递给s3uploadMessageHandler
方法,这样我就可以在s3UploadMessageHandler
方法中使用简单的ValueExpression
来设置keyExpression
。
为了实现这一点,我将代码从:
handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // 上传至S3
更改为:
handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // 上传至S3
但现在这个处理程序似乎不再被触发。日志中没有错误信息,从日志中我可以看出SFTP轮询仍然在工作。
我尝试找出导致这个问题的原因,我发现在进入IntegrationFlowdefinition.java
中的handle
方法时,当没有使用lambda表达式时,messageHandler
的类类型是S3MessageHandler
,而使用lambda表达式时类类型是MyCallingClass$lambda
。
我错过了什么导致我的情景无法正常工作?
英文:
I am defining an IntegrationFlow
to stream from SFTP to S3 with the DSL syntax this way :
return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
.remoteDirectory("remoteDirectory"),
e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
.transform(new StreamTransformer())
.handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
.get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
return s3MessageHandler;
}
And it works as intended : the file is well uploaded to my S3 bucket. However, I would like to avoid SPEL
syntax, and inject headers from the message to the s3uploadMessageHandler
method, this way I could use a simple ValueExpression
to set the keyExpression
in the s3UploadMessageHandler
method.
To do this, I changed
handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
to
handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3
But now this handler doesn't seem to be triggered anymore. There is no errors in the logs, and I know from the logs that the SFTP polling is still working.
I tried to find the reason behind this, and I saw that when entering the handle method in IntegrationFlowdefinition.java
, the messageHandler
class type is different : it's an S3MessageHandler
when called without lambda, and a MyCallingClass$lambda
when calling with a lambda expression.
What did I miss to make my scenario working ?
答案1
得分: 3
有两种处理消息的方式。一种是通过MessageHandler
的实现来处理 - 这是最有效的方法,适用于通道适配器实现,比如S3MessageHandler
。另一种方式是POJO方法调用 - 当你不需要担心任何框架接口时,这是最用户友好的方法。
因此,当你像这样使用.handle(s3UploadMessageHandler(...))
时,你引用了一个MessageHandler
,框架知道必须注册一个用于该MessageHandler
的bean,因为你的s3UploadMessageHandler()
不是一个@Bean
。
当你将其用作lambda表达式时,框架将其视为POJO方法调用,并且为MethodInvokingMessageHandler
注册了一个bean,但不会为你的S3MessageHandler
注册。
无论如何,即使你将S3UploadMessageHandler()
更改为@Bean
方法,也不会起作用,因为你不允许框架调用S3MessageHandler.handleMessage()
。你在这里所做的只是在运行时调用那个private
方法,以针对每个请求消息创建一个S3MessageHandler
实例:MethodInvokingMessageHandler
在其handleMessage()
中调用你的lambda,仅此而已 - 不会发生任何关于S3的操作。
在这里,ValueExpression
无法帮助你,因为你需要针对每个单独的请求消息评估目标文件。因此,你需要一个运行时表达式。实际上,使用new SpelExpressionParser().parseExpression()
没有任何问题。只是因为我们别无选择,必须只有一个单态的S3MessageHandler
,而且不能像你尝试通过那个可疑的lambda和ValueExpression
来实现的那样在每个请求上重新创建它。
英文:
There are two ways to handle a message. One is via a MessageHandler
implementation - this is the most efficient approach and that's done in the framework for channel adapter implementation, like that S3MessageHandler
. Another way is a POJO method invocation - this is the most user-friendly approach when you don't need to worry about any framework interfaces.
So, when you use it like this .handle(s3UploadMessageHandler(...))
you refer to a MessageHandler
and the framework knows that a bean for that MessageHandler
has to be registered since your s3UploadMessageHandler()
is not a @Bean
.
When you use it as a lambda, the framework treats it as a POJO method invocation and there is a bean registered for the MethodInvokingMessageHandler
, but not your S3MessageHandler
.
Anyway, even if you change your s3UploadMessageHandler()
to be a @Bean
method it is not going to work because you don't let the framework to call the S3MessageHandler.handleMessage()
. What you do here is just call that private
method at runtime to create an S3MessageHandler
instance against every request message: the MethodInvokingMessageHandler
calls your lambda in its handleMessage()
and that's all - nothing is going to happen with S3.
The ValueExpression
cannot help you here because you need to evaluate a destination file against every single request message. Therefore you need a runtime expression. There is indeed nothing wrong with the new SpelExpressionParser().parseExpression()
. Just because we don't have a choice and have to have only single stateless S3MessageHandler
and don't recreate it at runtime on every request like you try to achieve with that suspicious lambda and ValueExpression
.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论