英文:
Spring Gateway AsyncPredicate not working with reactor and flux
问题
我们为 Spring Gateway 编写了一个自定义的 Predicate 工厂,用于路由请求。我们正在解析 XML 请求的主体,然后根据主体中特定的方法来派生路由。在这样做的过程中,我们编写了以下代码来创建 ServerRequest。
@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
return exchange -> {
Class<String> inClass = String.class;
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody != null) {
try {
boolean test = config.pattern.matcher((String) cachedBody).matches();
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
} catch (ClassCastException e) {
LOG.error("Predicate test failed because String.class does not match the cached body object", e);
}
return Mono.just(false);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
}
};
}
在旧版本的 Spring-Boot-Parent (2.1.7.RELEASE)
和 spring-cloud-dependencies (Greenwich.RELEASE)
下,这个解决方案完美运行。但在最新版本的 Spring-Boot-Parent (2.3.1.RELEASE)
和 spring-cloud-dependencies (Hoxton.SR6)
下,我遇到了以下异常。网关应用程序正常启动,没有任何错误。
Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
... 84 more
是否有其他人也遇到了同样的问题,并知道如何解决?
英文:
We have written a custom Predicate factory for the Spring-Gateway to route the requests. We are parsing the body of an XML request and then the route is being derived, based on particular Method existing in the Body. While doing this we have written the following code to create the ServerRquest.
@Override
public AsyncPredicate<ServerWebExchange> applyAsync(Config config) {
return exchange -> {
Class<String> inClass = String.class;
Object cachedBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY);
if (cachedBody != null) {
try {
boolean test = config.pattern.matcher((String) cachedBody).matches();
exchange.getAttributes().put(TEST_ATTRIBUTE, test);
return Mono.just(test);
} catch (ClassCastException e) {
LOG.error("Predicate test failed because String.class does not match the cached body object", e);
}
return Mono.just(false);
} else {
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachedFlux);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
}
};
}
With an older Version of Spring-Boot-Parent (2.1.7.RELEASE) and spring-cloud-dependencies (Greenwich.RELEASE)
working this solution perfectly. But with the latest version of Spring-Boot-Parent (2.3.1.RELEASE) and spring-cloud-dependencies (Hoxton.SR6)
I am getting the following exception. The gateway application starts normally, without any error.
Caused by: java.lang.ClassCastException: class reactor.core.publisher.FluxDefer cannot be cast to class org.springframework.core.io.buffer.PooledDataBuffer (reactor.core.publisher.FluxDefer and org.springframework.core.io.buffer.PooledDataBuffer are in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @306a30c7)
at org.springframework.cloud.gateway.filter.RemoveCachedBodyFilter.lambda$filter$0(RemoveCachedBodyFilter.java:37) ~[spring-cloud-gateway-core-2.2.3.RELEASE.jar!/:2.2.3.RELEASE]
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.runFinally(FluxDoFinally.java:156) ~[reactor-core-3.3.6.RELEASE.jar!/:3.3.6.RELEASE]
... 84 more
Had anyone else also the same problem and know how to solve this?
答案1
得分: 3
问题是,这些API的格林威治版本是beta版。现在在 CACHED_REQUEST_BODY_ATTR
中期望的对象必须是一个 PooledDataBuffer
。所以我现在已经相应地更改了我的代码。现在看起来如下:
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
在更新了这个类之后,它现在按预期工作。
英文:
The problem was, the greenwich version of those apis was beta. Now the object expected in CACHED_REQUEST_BODY_ATTR
is required to be a PooledDataBuffer
. So I changed my code accordinly now. Which looks like as following now:
return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap(dataBuffer -> {
DataBufferUtils.retain(dataBuffer);
Flux<DataBuffer> cachedFlux = Flux
.defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())));
PooledDataBuffer cachePool = (PooledDataBuffer) dataBuffer.slice(0, dataBuffer.readableByteCount());
ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public Flux<DataBuffer> getBody() {
return cachedFlux;
}
};
return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders)
.bodyToMono(inClass).doOnNext(objectValue -> {
exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, objectValue);
exchange.getAttributes().put(CACHED_REQUEST_BODY_ATTR, cachePool);
}).map(objectValue -> config.pattern.matcher((String) objectValue).matches());
});
After updating the class, it is working as expected now.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论