Reading Server-Sent Events using reactive Spring WebClient and feeding them to a slow/restricted consumer produces an OverflowException

huangapple go评论86阅读模式
英文:

Reading Server-Sent Events using reactive Spring WebClient and feeding them to a slow/restricted consumer produces an OverflowException

问题

我有以下测试

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Schedulers;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;

class MainTest {
    private TestServer server;

    @BeforeEach
    void init() throws Exception {
        server = new TestServer();
    }

    @AfterEach
    void cleanup() throws Exception {
        server.shutdown();
    }

    @Test
    void test() {
        WebClient client = WebClient.builder().build();

        client.get()
                .uri("http://localhost:9999/")
                .retrieve()
                .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<String>>() {
                })
                .map(ServerSentEvent::data)
                .publishOn(Schedulers.single())
                .blockLast();
    }

    private static class TestServer {
        private final Server server;

        private TestServer() throws Exception {
            server = new Server(9999);
            server.setHandler(new AbstractHandler() {
                @Override
                public void handle(String target, Request request, HttpServletRequest httpServletRequest,
                                   HttpServletResponse httpServletResponse) throws IOException, ServletException {
                    PrintWriter writer = httpServletResponse.getWriter();
                    for (int i = 0; i < 1_000_000; i++) {
                        writer.println("id:" + i);
                        writer.println("data:" + i);
                        writer.println();
                    }

                    request.setHandled(true);
                }
            });
            server.start();
        }

        void shutdown() throws Exception {
            server.stop();
        }
    }
}

它启动一个发出一百万个SSE服务器推送事件消息的服务器并尝试使用Spring的响应式`WebClient`来读取它们唯一的非平凡之处在于在消耗消息之前我插入了`.publishOn(Schedulers.single())`,因为在消费者较慢的情况下推荐使用在我的实际项目中我实际上有一个相对较慢的消费者`KafkaSender`,它已经默认使用`Schedulers.single()`)。

POM文件中相应的片段如下

<dependencies>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webflux</artifactId>
        <version>5.2.3.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor.netty</groupId>
        <artifactId>reactor-netty</artifactId>
        <version>0.9.4.RELEASE</version>
    </dependency>

    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter-api</artifactId>
        <version>5.6.2</version>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.eclipse.jetty</groupId>
        <artifactId>jetty-server</artifactId>
        <version>9.4.31.v20200723</version>
        <scope>test</scope>
    </dependency>
</dependencies>

在我的机器上这个测试立即失败

reactor.core.Exceptions$OverflowException: 由于缺少请求而无法发出缓冲区

    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
    // 后续的堆栈跟踪省略...

如果我删除`publishOn(...)`它可以正常工作

一方面我理解快速的生产者可能会压垮一个慢/受限制的消费者另一方面我正在使用响应式流因此我期望Web客户端在没有需求时不会读取任何内容

我在这里做错了什么吗还是这是错误行为由库引起的)?

如果我做错了什么如何修复`.publishOn(...)`之前插入`.onBackpressureBuffer()`似乎可以让程序无错误运行但这是正确的解决方案吗如果内存用完怎么办有没有办法只是减缓读取速度

完整项目代码在这里https://github.com/rpuch/webclient-sse-with-slow-consumer
英文:

I have the following test:

import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Schedulers;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
class MainTest {
private TestServer server;
@BeforeEach
void init() throws Exception {
server = new TestServer();
}
@AfterEach
void cleanup() throws Exception {
server.shutdown();
}
@Test
void test() {
WebClient client = WebClient.builder().build();
client.get()
.uri(&quot;http://localhost:9999/&quot;)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference&lt;ServerSentEvent&lt;String&gt;&gt;() {
})
.map(ServerSentEvent::data)
.publishOn(Schedulers.single())
.blockLast();
}
private static class TestServer {
private final Server server;
private TestServer() throws Exception {
server = new Server(9999);
server.setHandler(new AbstractHandler() {
@Override
public void handle(String target, Request request, HttpServletRequest httpServletRequest,
HttpServletResponse httpServletResponse) throws IOException, ServletException {
PrintWriter writer = httpServletResponse.getWriter();
for (int i = 0; i &lt; 1_000_000; i++) {
writer.println(&quot;id:&quot; + i);
writer.println(&quot;data:&quot; + i);
writer.println();
}
request.setHandled(true);
}
});
server.start();
}
void shutdown() throws Exception {
server.stop();
}
}
}

It starts a server that emits one million SSE (server-sent event) messages, and tries to read them all using Spring's reactive WebClient. The only non-trivial thing is that before consuming the messages, I inserted .publishOn(Schedulers.single()) as it is recommended for slow consumers (and in my real project I actually have a relatively slow consumer, namely KafkaSender, that already uses Schedulers.single() by default).

The corresponding snippet of the POM follows:

&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.springframework&lt;/groupId&gt;
&lt;artifactId&gt;spring-webflux&lt;/artifactId&gt;
&lt;version&gt;5.2.3.RELEASE&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;io.projectreactor.netty&lt;/groupId&gt;
&lt;artifactId&gt;reactor-netty&lt;/artifactId&gt;
&lt;version&gt;0.9.4.RELEASE&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.junit.jupiter&lt;/groupId&gt;
&lt;artifactId&gt;junit-jupiter-api&lt;/artifactId&gt;
&lt;version&gt;5.6.2&lt;/version&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.eclipse.jetty&lt;/groupId&gt;
&lt;artifactId&gt;jetty-server&lt;/artifactId&gt;
&lt;version&gt;9.4.31.v20200723&lt;/version&gt;
&lt;scope&gt;test&lt;/scope&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;

On my machine, this test fails immediately:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
|_ checkpoint ⇢ Body from GET http://localhost:9999/ [DefaultClientResponse]
Stack trace:
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:231)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:304)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:845)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:213)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.emit(FluxBufferPredicate.java:300)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNextNewBuffer(FluxBufferPredicate.java:260)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.tryOnNext(FluxBufferPredicate.java:214)
at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onNext(FluxBufferPredicate.java:186)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:405)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:649)
at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:249)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:192)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:218)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:351)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:348)
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:588)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:316)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:303)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:417)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:271)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2482)
at MainTest.test(MainTest.java:42)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:686)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:212)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:208)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:137)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:71)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:135)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:125)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:135)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:122)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:80)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:248)
at org.junit.platform.launcher.core.DefaultLauncher.lambda$execute$5(DefaultLauncher.java:211)
at org.junit.platform.launcher.core.DefaultLauncher.withInterceptedStreams(DefaultLauncher.java:226)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:199)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:132)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:128)

If I remove the publishOn(...) line, it works fine.

On the one hand, I understand that a fast producer can overwhelm a slow/restricted consumer. On the other hand, I'm using a reactive flow, so I expect that the web client does not read anything if there is no demand.

Is it that I do something wrong here? Or it is an erroneous behavior (by the libraries)?

If I'm doing something wrong, how can this be fixed? .onBackpressureBuffer() just before the .publishOn(...) seems to allow the program run without errors, but is it a correct solution? What if it runs out of memory? Is there any way to just read slower?

The full project code is here: https://github.com/rpuch/webclient-sse-with-slow-consumer

答案1

得分: 2

你对于.onBackpressureBuffer()的理解是正确的,它可以处理这种情况,但同时也会带来内存方面的问题。

默认情况下,Spring WebFlux/WebClient依赖于TCP反压机制。如果你需要在网络边界上实现响应式流的语义,你可以尝试使用RSocket以及它与Spring Framework的集成。

英文:

You are right about .onBackpressureBuffer(), it can handle the situation, but also at the same time you have the problem with the memory.

By default Spring WebFlux/WebClient relies on the TCP back pressure. If you need Reactive Streams semantics across network boundary you can try RSocket and its integration with Spring Framework.

答案2

得分: 0

当我将 reactor-core 版本从 3.3.2 更改为 3.3.3 时,测试通过了:它会让处理器工作约 2 分钟(嗯,你知道,一百万条消息),但最终成功完成。

因此看起来问题可能是由于 reactor-core 中的一个 bug 导致的,可能是这个问题 https://github.com/reactor/reactor-core/issues/1937 (在 3.3.3 中已关闭)。

英文:

When I change reactor-core version from 3.3.2 to 3.3.3, the test passes: it heats the processor for about 2 minutes (well, a million messages, you know), but it finishes successfully.

So it looks like the issue was caused by a bug in reactor-core, probably this one https://github.com/reactor/reactor-core/issues/1937 (closed in 3.3.3).

huangapple
  • 本文由 发表于 2020年8月27日 02:23:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/63603603.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定