英文:
Why do I have to use .fluxTransform(f -> f) on an inbound webflux gateways when using Java DSL?
问题
我在使用Spring Integration中的webflux网关Java DSL时遇到了失败的响应。它仅对前几个请求起作用(具体说是<8个请求),在此之后我会收到响应错误:
```java
org.springframework.integration.MessageTimeoutException:在5000毫秒的超时内未能收到JMS响应
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed:reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
当我在入站网关上使用.fluxTransform(f -> f)
时,或者当我使用非响应式的http出站网关时,我不会收到错误,即使在进行数千个请求的jmeter基准测试中也不会出错。
- 为什么我必须在第一个流程中调用
fluxTransform(f -> f)
才能使其正常工作? - 当我在第二个流程中使用
Http.outboundGateway
时为什么不需要fluxTransform(f -> f)
?
情境
我使用了四个网关创建了一个路由,用于在远程机器上进行网络请求,但是我遇到了问题
集成流程1:
> 入站webflux网关 -> 出站jms网关
@Bean
public IntegrationFlow step1() {
// 使用jms出站网关的请求-响应模式
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// 发送请求到jms,等待响应并将消息有效负载作为响应返回
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// 没有下一行通常不会正常工作
.fluxTransform(f -> f)
.handle(gateway).get();
}
集成流程2:
> 入站jms网关 -> 出站webflux网关
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// 忽略头信息
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// 使用webflux出站网关将请求发送到TEST_URL
.handle(gateway).get();
}
完整的路由如下所示:
> 客户端网络请求 -> 流程1 -> (消息代理) -> 流程2 -> 服务器网络请求
<details>
<summary>英文:</summary>
I've run into failed replies when using the webflux gateway Java DSL in Spring Integration. It only works for the first few requests (<8 to be specific), I'm getting reply errors afterwards:
```java
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
When I use .fluxTransform(f -> f)
on the inbound gateway OR when I use the non-reactive http outbound gateway, I don't get the errors, even on a jmeter benchmark with thousands of requests.
- Why do I have to call
fluxTransform(f -> f)
in the first flow to make it work? - Why does it work without
fluxTransform(f -> f)
when I useHttp.outboundGateway
in the second flow?
Scenario
I've created a route using four gateways for a rather complex setup to make a web request on a remote machine, but I'm
Integration Flow 1:
> inbound webflux gateway -> outbound jms gateway
@Bean
public IntegrationFlow step1() {
// request-reply pattern using the jms outbound gateway
var gateway = Jms.outboundGateway(jmsConnectionFactory)
.requestDestination("inboundWebfluxQueue")
.replyDestination("outboundWebfluxQueue")
.correlationKey("JMSCorrelationID");
// send a request to jms, wait for the reply and return message payload as response
return IntegrationFlows.from(webfluxServer("/example/webflux"))
// won't work consistently without the next line
.fluxTransform(f -> f)
.handle(gateway).get();
}
Integration Flow 2:
> inbound jms gateway -> outbound webflux gateway
@Bean
public IntegrationFlow step2_using_webflux() {
var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class)
// ignore headers
.mappedResponseHeaders();
return IntegrationFlows.from(jmsInboundGateway())
// use webflux outbound gateway to send the request to the TEST_URL
.handle(gateway).get();
}
The complete route looks like this:
> client web request -> flow 1 -> (message broker) -> flow 2 -> server web request
答案1
得分: 1
另一种方法是使用 .channel(MessageChannels.flux())
替代 .fluxTransform(f -> f)
。这样我们就真的为 WebFlux 容器带来了反压力,使其等待请求事件循环中的可用插槽。
通过这种方式,我们只是发送到 JMS 队列,而不遵循反压力,你那边的 JMS 消费者跟不上。此外,我们还会向同一 Netty 服务器发送一个请求,再次为这些内部请求获取事件循环插槽。
如果你感兴趣,我编写了一个类似这样的单元测试来查看发生了什么:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux", String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}
英文:
Another way is to use a .channel(MessageChannels.flux())
instead of that .fluxTransform(f -> f)
. This way we really bring a back-pressure to the the WebFlux container making it waiting for available slot in the request event loop.
With that we just send to JMS queue not-honoring back-pressure and and your JMS consumer on the other side can't keep up. Plus we send a request to the same Netty server internally acquiring an event loop slot again for those internal requests.
If you are interested I wrote a unit test like this to see what is going on:
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {
@Autowired
private TestRestTemplate template;
@Test
void testSpringIntegrationWebFlux() {
var executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100);
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(10);
executor.afterPropertiesSet();
var numberOfExecutions = new AtomicInteger();
for (var i = 0; i < 100; i++) {
executor.execute(() -> {
var responseEntity = this.template.getForEntity("/example/webflux", String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
numberOfExecutions.getAndIncrement();
}
});
}
executor.shutdown();
assertThat(numberOfExecutions.get()).isEqualTo(100);
}
}
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论