英文:
Reading Server-Sent Events using reactive Spring WebClient and feeding them to a slow/restricted consumer produces an OverflowException
问题
我有以下测试:
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Schedulers;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
class MainTest {
private TestServer server;
@BeforeEach
void init() throws Exception {
server = new TestServer();
}
@AfterEach
void cleanup() throws Exception {
server.shutdown();
}
@Test
void test() {
WebClient client = WebClient.builder().build();
client.get()
.uri("http://localhost:9999/")
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})
.map(ServerSentEvent::data)
.publishOn(Schedulers.single())
.blockLast();
}
private static class TestServer {
private final Server server;
private TestServer() throws Exception {
server = new Server(9999);
server.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request request, HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws IOException, ServletException {
PrintWriter writer = httpServletResponse.getWriter();
for (int i = 0; i < 1_000_000; i++) {
writer.println("id:" + i);
writer.println("data:" + i);
writer.println();
}
request.setHandled(true);
}
});
server.start();
}
void shutdown() throws Exception {
server.stop();
}
}
}
它启动一个发出一百万个SSE(服务器推送事件)消息的服务器,并尝试使用Spring的响应式`WebClient`来读取它们。唯一的非平凡之处在于,在消耗消息之前,我插入了`.publishOn(Schedulers.single())`,因为在消费者较慢的情况下推荐使用(在我的实际项目中,我实际上有一个相对较慢的消费者,即`KafkaSender`,它已经默认使用`Schedulers.single()`)。
POM文件中相应的片段如下:
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.31.v20200723</version>
<scope>test</scope>
</dependency>
</dependencies>
在我的机器上,这个测试立即失败:
reactor.core.Exceptions$OverflowException: 由于缺少请求而无法发出缓冲区
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
// 后续的堆栈跟踪省略...
如果我删除`publishOn(...)`行,它可以正常工作。
一方面,我理解快速的生产者可能会压垮一个慢/受限制的消费者。另一方面,我正在使用响应式流,因此我期望Web客户端在没有需求时不会读取任何内容。
我在这里做错了什么吗?还是这是错误行为(由库引起的)?
如果我做错了什么,如何修复?在`.publishOn(...)`之前插入`.onBackpressureBuffer()`似乎可以让程序无错误运行,但这是正确的解决方案吗?如果内存用完怎么办?有没有办法只是减缓读取速度?
完整项目代码在这里:https://github.com/rpuch/webclient-sse-with-slow-consumer
英文:
I have the following test:
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Schedulers;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
class MainTest {
private TestServer server;
@BeforeEach
void init() throws Exception {
server = new TestServer();
}
@AfterEach
void cleanup() throws Exception {
server.shutdown();
}
@Test
void test() {
WebClient client = WebClient.builder().build();
client.get()
.uri("http://localhost:9999/")
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
})
.map(ServerSentEvent::data)
.publishOn(Schedulers.single())
.blockLast();
}
private static class TestServer {
private final Server server;
private TestServer() throws Exception {
server = new Server(9999);
server.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request request, HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws IOException, ServletException {
PrintWriter writer = httpServletResponse.getWriter();
for (int i = 0; i < 1_000_000; i++) {
writer.println("id:" + i);
writer.println("data:" + i);
writer.println();
}
request.setHandled(true);
}
});
server.start();
}
void shutdown() throws Exception {
server.stop();
}
}
}
It starts a server that emits one million SSE (server-sent event) messages, and tries to read them all using Spring's reactive WebClient
. The only non-trivial thing is that before consuming the messages, I inserted .publishOn(Schedulers.single())
as it is recommended for slow consumers (and in my real project I actually have a relatively slow consumer, namely KafkaSender
, that already uses Schedulers.single()
by default).
The corresponding snippet of the POM follows:
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webflux</artifactId>
<version>5.2.3.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>0.9.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.6.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>9.4.31.v20200723</version>
<scope>test</scope>
</dependency>
</dependencies>
On my machine, this test fails immediately:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Body from GET http://localhost:9999/ [DefaultClientResponse]
Stack trace:
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:304)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:300)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:405)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:649)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:249)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:588)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:316)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:303)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:417)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:271)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2482)
at MainTest.test(MainTest.java:42)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:128)
If I remove the publishOn(...)
line, it works fine.
On the one hand, I understand that a fast producer can overwhelm a slow/restricted consumer. On the other hand, I'm using a reactive flow, so I expect that the web client does not read anything if there is no demand.
Is it that I do something wrong here? Or it is an erroneous behavior (by the libraries)?
If I'm doing something wrong, how can this be fixed? .onBackpressureBuffer()
just before the .publishOn(...)
seems to allow the program run without errors, but is it a correct solution? What if it runs out of memory? Is there any way to just read slower?
The full project code is here: https://github.com/rpuch/webclient-sse-with-slow-consumer
答案1
得分: 2
你对于.onBackpressureBuffer()
的理解是正确的,它可以处理这种情况,但同时也会带来内存方面的问题。
默认情况下,Spring WebFlux/WebClient依赖于TCP反压机制。如果你需要在网络边界上实现响应式流的语义,你可以尝试使用RSocket以及它与Spring Framework的集成。
英文:
You are right about .onBackpressureBuffer()
, it can handle the situation, but also at the same time you have the problem with the memory.
By default Spring WebFlux/WebClient relies on the TCP back pressure. If you need Reactive Streams semantics across network boundary you can try RSocket and its integration with Spring Framework.
答案2
得分: 0
当我将 reactor-core
版本从 3.3.2
更改为 3.3.3
时,测试通过了:它会让处理器工作约 2 分钟(嗯,你知道,一百万条消息),但最终成功完成。
因此看起来问题可能是由于 reactor-core
中的一个 bug 导致的,可能是这个问题 https://github.com/reactor/reactor-core/issues/1937 (在 3.3.3
中已关闭)。
英文:
When I change reactor-core
version from 3.3.2
to 3.3.3
, the test passes: it heats the processor for about 2 minutes (well, a million messages, you know), but it finishes successfully.
So it looks like the issue was caused by a bug in reactor-core
, probably this one https://github.com/reactor/reactor-core/issues/1937 (closed in 3.3.3
).
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论