为什么当定义为lambda时,我的处理程序方法未被触发?

huangapple go评论74阅读模式
英文:

Why is my handler method not triggered when defined as a lambda?

问题

我正在使用DSL语法定义一个IntegrationFlow,从SFTP流式传输到S3,方式如下:

return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
                        .remoteDirectory("remoteDirectory"),
                e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
                .transform(new StreamTransformer())
                .handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // 上传至S3
                .get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
        S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
        s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
        s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        return s3MessageHandler;
    }

这样可以正常工作:文件会被成功上传到S3存储桶中。然而,我想避免使用SPEL语法,并且希望能够从消息中提取标头(header),传递给s3uploadMessageHandler方法,这样我就可以在s3UploadMessageHandler方法中使用简单的ValueExpression来设置keyExpression

为了实现这一点,我将代码从:

handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // 上传至S3

更改为:

handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // 上传至S3

但现在这个处理程序似乎不再被触发。日志中没有错误信息,从日志中我可以看出SFTP轮询仍然在工作。

我尝试找出导致这个问题的原因,我发现在进入IntegrationFlowdefinition.java中的handle方法时,当没有使用lambda表达式时,messageHandler的类类型是S3MessageHandler,而使用lambda表达式时类类型是MyCallingClass$lambda

我错过了什么导致我的情景无法正常工作?

英文:

I am defining an IntegrationFlow to stream from SFTP to S3 with the DSL syntax this way :

return IntegrationFlows.from(Sftp.inboundStreamingAdapter(remoteFileTemplate)
                        .remoteDirectory("remoteDirectory"),
                e -> e.poller(Pollers.fixedDelay(POLL, TimeUnit.SECONDS)))
                .transform(new StreamTransformer())
                .handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3
                .get();
private S3MessageHandler s3UploadMessageHandler(String folderPath, String spelFileName) {
        S3MessageHandler s3MessageHandler = new S3MessageHandler(amazonS3, s3ConfigProperties.getBuckets().getCardManagementData());
        s3MessageHandler.setKeyExpression(new SpelExpressionParser().parseExpression(String.format("'%s/'.concat(%s)", folderPath, spelFileName)));
        s3MessageHandler.setCommand(S3MessageHandler.Command.UPLOAD);
        return s3MessageHandler;
    }

And it works as intended : the file is well uploaded to my S3 bucket. However, I would like to avoid SPEL syntax, and inject headers from the message to the s3uploadMessageHandler method, this way I could use a simple ValueExpression to set the keyExpression in the s3UploadMessageHandler method.
To do this, I changed

handle(s3UploadMessageHandler(outputFolderPath, "headers['file_remoteFile']")) // Upload on S3

to

handle(m -> s3UploadMessageHandler(outputFolderPath, (String) m.getHeaders().get("file_remoteFile"))) // Upload on S3

But now this handler doesn't seem to be triggered anymore. There is no errors in the logs, and I know from the logs that the SFTP polling is still working.

I tried to find the reason behind this, and I saw that when entering the handle method in IntegrationFlowdefinition.java, the messageHandler class type is different : it's an S3MessageHandler when called without lambda, and a MyCallingClass$lambda when calling with a lambda expression.

What did I miss to make my scenario working ?

答案1

得分: 3

有两种处理消息的方式。一种是通过MessageHandler的实现来处理 - 这是最有效的方法,适用于通道适配器实现,比如S3MessageHandler。另一种方式是POJO方法调用 - 当你不需要担心任何框架接口时,这是最用户友好的方法。

因此,当你像这样使用.handle(s3UploadMessageHandler(...))时,你引用了一个MessageHandler,框架知道必须注册一个用于该MessageHandler的bean,因为你的s3UploadMessageHandler()不是一个@Bean

当你将其用作lambda表达式时,框架将其视为POJO方法调用,并且为MethodInvokingMessageHandler注册了一个bean,但不会为你的S3MessageHandler注册。

无论如何,即使你将S3UploadMessageHandler()更改为@Bean方法,也不会起作用,因为你不允许框架调用S3MessageHandler.handleMessage()。你在这里所做的只是在运行时调用那个private方法,以针对每个请求消息创建一个S3MessageHandler实例:MethodInvokingMessageHandler在其handleMessage()中调用你的lambda,仅此而已 - 不会发生任何关于S3的操作。

在这里,ValueExpression无法帮助你,因为你需要针对每个单独的请求消息评估目标文件。因此,你需要一个运行时表达式。实际上,使用new SpelExpressionParser().parseExpression()没有任何问题。只是因为我们别无选择,必须只有一个单态的S3MessageHandler,而且不能像你尝试通过那个可疑的lambda和ValueExpression来实现的那样在每个请求上重新创建它。

英文:

There are two ways to handle a message. One is via a MessageHandler implementation - this is the most efficient approach and that's done in the framework for channel adapter implementation, like that S3MessageHandler. Another way is a POJO method invocation - this is the most user-friendly approach when you don't need to worry about any framework interfaces.

So, when you use it like this .handle(s3UploadMessageHandler(...)) you refer to a MessageHandler and the framework knows that a bean for that MessageHandler has to be registered since your s3UploadMessageHandler() is not a @Bean.

When you use it as a lambda, the framework treats it as a POJO method invocation and there is a bean registered for the MethodInvokingMessageHandler, but not your S3MessageHandler.

Anyway, even if you change your s3UploadMessageHandler() to be a @Bean method it is not going to work because you don't let the framework to call the S3MessageHandler.handleMessage(). What you do here is just call that private method at runtime to create an S3MessageHandler instance against every request message: the MethodInvokingMessageHandler calls your lambda in its handleMessage() and that's all - nothing is going to happen with S3.

The ValueExpression cannot help you here because you need to evaluate a destination file against every single request message. Therefore you need a runtime expression. There is indeed nothing wrong with the new SpelExpressionParser().parseExpression(). Just because we don't have a choice and have to have only single stateless S3MessageHandler and don't recreate it at runtime on every request like you try to achieve with that suspicious lambda and ValueExpression.

huangapple
  • 本文由 发表于 2020年4月8日 15:16:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/61095211.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定