Spring网关AsyncPredicate在与Reactor和Flux一起使用时不起作用

huangapple go评论172阅读模式
英文:

Spring Gateway AsyncPredicate not working with reactor and flux

问题

我们为 Spring Gateway 编写了一个自定义的 Predicate 工厂,用于路由请求。我们正在解析 XML 请求的主体,然后根据主体中特定的方法来派生路由。在这样做的过程中,我们编写了以下代码来创建 ServerRequest。

@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
    return exchange -> {
        Class<String> inClass = String.class;

        Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);

        if (cachedBody != null) {
            try {
                boolean test = config.pattern.matcher((String) cachedBody).matches();
                exchange.getAttributes().put(TEST_ATTRIBUTE, test);
                return Mono.just(test);
            } catch (ClassCastException e) {
                LOG.error("Predicate test failed because String.class does not match the cached body object", e);
            }
            return Mono.just(false);
        } else {
            return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

                DataBufferUtils.retain(dataBuffer);

                Flux<DataBuffer> cachedFlux = Flux
                        .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

                ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

                    @Override
                    public Flux<DataBuffer> getBody() {
                        return cachedFlux;
                    }
                };
                return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
                        .bodyToMono(inClass).doOnNext(objectValue -> {
                            exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                            exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
                        }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
            });

        }
    };
}

在旧版本的 Spring-Boot-Parent (2.1.7.RELEASE)spring-cloud-dependencies (Greenwich.RELEASE) 下,这个解决方案完美运行。但在最新版本的 Spring-Boot-Parent (2.3.1.RELEASE)spring-cloud-dependencies (Hoxton.SR6) 下,我遇到了以下异常。网关应用程序正常启动,没有任何错误。

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
        at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
        ... 84 more

是否有其他人也遇到了同样的问题,并知道如何解决?

英文:

We have written a custom Predicate factory for the Spring-Gateway to route the requests. We are parsing the body of an XML request and then the route is being derived, based on particular Method existing in the Body. While doing this we have written the following code to create the ServerRquest.

@Override
public AsyncPredicate&lt;ServerWebExchange&gt; applyAsync(Config config) {
return exchange -&gt; {
Class&lt;String&gt; inClass = String.class;
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody != null) {
try {
boolean test = config.pattern.matcher((String) cachedBody).matches();
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
} catch (ClassCastException e) {
LOG.error(&quot;Predicate test failed because String.class does not match the cached body object&quot;, e);
}
return Mono.just(false);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -&gt; {
DataBufferUtils.retain(dataBuffer);
Flux&lt;DataBuffer&gt; cachedFlux = Flux
.defer(() -&gt; Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux&lt;DataBuffer&gt; getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -&gt; {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
}).map(objectValue -&gt; config.pattern.matcher((String) objectValue).matches());
});
}
};
}

With an older Version of Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE) working this solution perfectly. But with the latest version of Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6) I am getting the following exception. The gateway application starts normally, without any error.

Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
... 84 more

Had anyone else also the same problem and know how to solve this?

答案1

得分: 3

问题是,这些API的格林威治版本是beta版。现在在 CACHED_REQUEST_BODY_ATTR 中期望的对象必须是一个 PooledDataBuffer。所以我现在已经相应地更改了我的代码。现在看起来如下:

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {

    DataBufferUtils.retain(dataBuffer);

    Flux<DataBuffer> cachedFlux = Flux
            .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));

    PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());

    ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {

        @Override
        public Flux<DataBuffer> getBody() {
            return cachedFlux;
        }
    };
    return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
            .bodyToMono(inClass).doOnNext(objectValue -> {
                exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
                exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
            }).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});

在更新了这个类之后,它现在按预期工作。

英文:

The problem was, the greenwich version of those apis was beta. Now the object expected in CACHED_REQUEST_BODY_ATTR is required to be a PooledDataBuffer. So I changed my code accordinly now. Which looks like as following now:

return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -&gt; {
DataBufferUtils.retain(dataBuffer);
Flux&lt;DataBuffer&gt; cachedFlux = Flux
.defer(() -&gt; Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux&lt;DataBuffer&gt; getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -&gt; {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
}).map(objectValue -&gt; config.pattern.matcher((String) objectValue).matches());
});

After updating the class, it is working as expected now.

huangapple
  • 本文由 发表于 2020年7月27日 19:18:54
  • 转载请务必保留本文链接:https://go.coder-hub.com/63114188.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定