你为什么在使用Java DSL时必须使用`.fluxTransform(f -> f)`在入站的WebFlux网关上?

huangapple go评论78阅读模式
英文:

Why do I have to use .fluxTransform(f -> f) on an inbound webflux gateways when using Java DSL?

问题

我在使用Spring Integration中的webflux网关Java DSL时遇到了失败的响应它仅对前几个请求起作用具体说是<8个请求),在此之后我会收到响应错误

```java
org.springframework.integration.MessageTimeoutException在5000毫秒的超时内未能收到JMS响应
	at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
	Suppressedreactor.core.publisher.FluxOnAssembly$OnAssemblyException

当我在入站网关上使用.fluxTransform(f -&gt; f)时,或者当我使用非响应式的http出站网关时,我不会收到错误,即使在进行数千个请求的jmeter基准测试中也不会出错。

  • 为什么我必须在第一个流程中调用fluxTransform(f -&gt; f)才能使其正常工作?
  • 当我在第二个流程中使用Http.outboundGateway时为什么不需要fluxTransform(f -&gt; f)

情境
我使用了四个网关创建了一个路由,用于在远程机器上进行网络请求,但是我遇到了问题

集成流程1:
> 入站webflux网关 -> 出站jms网关

  @Bean
  public IntegrationFlow step1() {
    // 使用jms出站网关的请求-响应模式
    var gateway = Jms.outboundGateway(jmsConnectionFactory)
        .requestDestination("inboundWebfluxQueue")
        .replyDestination("outboundWebfluxQueue")
        .correlationKey("JMSCorrelationID");

    // 发送请求到jms,等待响应并将消息有效负载作为响应返回
    return IntegrationFlows.from(webfluxServer("/example/webflux"))
        // 没有下一行通常不会正常工作
        .fluxTransform(f -&gt; f)
        .handle(gateway).get();
  }

集成流程2:
> 入站jms网关 -> 出站webflux网关

  @Bean
  public IntegrationFlow step2_using_webflux() {
    var gateway = WebFlux.outboundGateway("http://localhost:8080/actuator/health")
        .httpMethod(HttpMethod.GET)
        .expectedResponseType(String.class)
        // 忽略头信息
        .mappedResponseHeaders();

    return IntegrationFlows.from(jmsInboundGateway())
        // 使用webflux出站网关将请求发送到TEST_URL
        .handle(gateway).get();
  }

完整的路由如下所示:
> 客户端网络请求 -> 流程1 -> (消息代理) -> 流程2 -> 服务器网络请求


<details>
<summary>英文:</summary>

I&#39;ve run into failed replies when using the webflux gateway Java DSL in Spring Integration. It only works for the first few requests (&lt;8 to be specific), I&#39;m getting reply errors afterwards:

```java
org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
	at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:741) ~[spring-integration-jms-5.3.2.RELEASE.jar:5.3.2.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:

When I use .fluxTransform(f -&gt; f) on the inbound gateway OR when I use the non-reactive http outbound gateway, I don't get the errors, even on a jmeter benchmark with thousands of requests.

  • Why do I have to call fluxTransform(f -&gt; f) in the first flow to make it work?
  • Why does it work without fluxTransform(f -&gt; f) when I use Http.outboundGatewayin the second flow?

Scenario
I've created a route using four gateways for a rather complex setup to make a web request on a remote machine, but I'm

Integration Flow 1:
> inbound webflux gateway -> outbound jms gateway

  @Bean
  public IntegrationFlow step1() {
    // request-reply pattern using the jms outbound gateway
    var gateway = Jms.outboundGateway(jmsConnectionFactory)
        .requestDestination(&quot;inboundWebfluxQueue&quot;)
        .replyDestination(&quot;outboundWebfluxQueue&quot;)
        .correlationKey(&quot;JMSCorrelationID&quot;);

    // send a request to jms, wait for the reply and return message payload as response
    return IntegrationFlows.from(webfluxServer(&quot;/example/webflux&quot;))
        // won&#39;t work consistently without the next line
        .fluxTransform(f -&gt; f)
        .handle(gateway).get();
  }

Integration Flow 2:
> inbound jms gateway -> outbound webflux gateway

  @Bean
  public IntegrationFlow step2_using_webflux() {
    var gateway = WebFlux.outboundGateway(&quot;http://localhost:8080/actuator/health&quot;)
        .httpMethod(HttpMethod.GET)
        .expectedResponseType(String.class)
        // ignore headers
        .mappedResponseHeaders();

    return IntegrationFlows.from(jmsInboundGateway())
        // use webflux outbound gateway to send the request to the TEST_URL
        .handle(gateway).get();
  }

The complete route looks like this:
> client web request -> flow 1 -> (message broker) -> flow 2 -> server web request

答案1

得分: 1

另一种方法是使用 .channel(MessageChannels.flux()) 替代 .fluxTransform(f -&gt; f)。这样我们就真的为 WebFlux 容器带来了反压力,使其等待请求事件循环中的可用插槽。

通过这种方式,我们只是发送到 JMS 队列,而不遵循反压力,你那边的 JMS 消费者跟不上。此外,我们还会向同一 Netty 服务器发送一个请求,再次为这些内部请求获取事件循环插槽。

如果你感兴趣,我编写了一个类似这样的单元测试来查看发生了什么:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {

	@Autowired
	private TestRestTemplate template;

	@Test
	void testSpringIntegrationWebFlux() {
		var executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(100);
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.setAwaitTerminationSeconds(10);
		executor.afterPropertiesSet();

		var numberOfExecutions = new AtomicInteger();

		for (var i = 0; i &lt; 100; i++) {
			executor.execute(() -&gt; {
				var responseEntity = this.template.getForEntity(&quot;/example/webflux&quot;, String.class);
				if (responseEntity.getStatusCode().is2xxSuccessful()) {
					numberOfExecutions.getAndIncrement();
				}
			});
		}

		executor.shutdown();

		assertThat(numberOfExecutions.get()).isEqualTo(100);
	}

}
英文:

Another way is to use a .channel(MessageChannels.flux()) instead of that .fluxTransform(f -&gt; f). This way we really bring a back-pressure to the the WebFlux container making it waiting for available slot in the request event loop.

With that we just send to JMS queue not-honoring back-pressure and and your JMS consumer on the other side can't keep up. Plus we send a request to the same Netty server internally acquiring an event loop slot again for those internal requests.

If you are interested I wrote a unit test like this to see what is going on:

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class IntegrationApplicationTests {


	@Autowired
	private TestRestTemplate template;

	@Test
	void testSpringIntegrationWebFlux() {
		var executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(100);
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.setAwaitTerminationSeconds(10);
		executor.afterPropertiesSet();

		var numberOfExecutions = new AtomicInteger();

		for (var i = 0; i &lt; 100; i++) {
			executor.execute(() -&gt; {
				var responseEntity = this.template.getForEntity(&quot;/example/webflux&quot;, String.class);
				if (responseEntity.getStatusCode().is2xxSuccessful()) {
					numberOfExecutions.getAndIncrement();
				}
			});
		}

		executor.shutdown();

		assertThat(numberOfExecutions.get()).isEqualTo(100);
	}

}

huangapple
  • 本文由 发表于 2020年8月17日 19:21:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/63449819.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定