有没有办法在Spring RSocket中查看REQUEST_N交换?

huangapple go评论69阅读模式
英文:

Is there a way to see the REQUEST_N exchanges in Spring RSocket

问题

我正在使用Spring支持的RSocket,具体来说是请求流模型。即:

@MessageMapping("stream")
Flux<SubscriptionMessage> stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {
    //....
}

如果我理解RSocket正确的话,Flux响应将作为一系列Payload消息以请求(n)的约定方式返回给客户端。例如,每次返回n个Payload消息。在每个消息系列之后,客户端会发送一个REQUEST_N消息来授信服务器发送额外的消息集,从而提供了反压缓解的基础。

在Java库的API中(org.springframework.messaging.rsocket,基于io.rsocket.RSocket),是否有一种方式来处理/访问到达的REQUEST_N消息,或者通过策略明确设置N的值(并查看请求者传递给服务器的值)?

原因/为什么:我正在实现一个RSocket的外观,用于Kafka,试图为订阅提供一个支持在消息被消耗时自动提交偏移量的请求流机制。对于请求流约定,我认为请求者偶尔进行REQUEST_N交互是提前提交主题的偏移量的理想时机,因为它是由请求者传输的,意味着响应者发送的前导Payload消息已经被接收。

我看到的唯一其他选择是使用请求通道模型,以便请求者可以发送初始订阅请求并开始接收数据,但也可以发送定期消息以明确控制相同通道上的提交偏移量。我考虑提供这个选项,但想知道是否有一种方式可以将逻辑注入到流的周期性请求(n)循环中。

英文:

I'm using the Spring support for RSocket, specifically the request-stream model. I.e.:

@MessageMapping(&quot;stream&quot;)
Flux&lt;SubscriptionMessage&gt; stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {
    //....
}

If I understand RSocket correctly, the Flux response is delivered back to the client as a series of
Payload messages under a request(n) convention. E.g. n payload messages at a time. After each series of messages, the client credits the server to send an additional set using a REQUEST_N message, and this provides the basis for back-pressure relief.

In the API of the java library (org.springframework.messaging.rsocket, which is based on io.rsocket.RSocket), is there any way to handle/visit the REQUEST_N messages as they arrive, or to set the value of N explicitly via the Strategy (and see the value that a requestor has passed to a server)?

Reason/Why: I'm implementing an rsocket facade over Kafka, and am attempting to provide a request-stream mechanism for subscriptions that will support automatic offset commit as the messages are consumed.
For the request-stream convention, I think the occasional REQUEST_N interaction by the requestor
to be the ideal point where the commit offset of the topic can be advanced, since it's transmission by
the requestor means that the preceding Payload messages sent by the responder have been received.

The only other option I've seen is to use the request-channel model, so that the requestor can send
and initial subscription request, and beging receiving data, but also send
period messages to specifically control the commit offset over the same channel. I'm considering providing that anyway,
but wanted to know if there was a way to inject logic into the periodic request(n) cycle of a stream.

答案1

得分: 1

默认情况下,反应堆的默认设置是无限的或取消的。 您可以使用内置运算符,如take或limitRate,来自定义请求和行为。

您可以在此链接中查看示例代码:https://github.com/making/rsc/blob/50d0c3dc43c60d3b3fe42e5586df49adb016a9cb/src/main/java/am/ik/rsocket/InteractionModel.java#L41-L52

或者您可以实现自己的运算符,或使用自定义的订阅/订阅逻辑来精确控制。 这需要更多的工作量,所以请先尝试,并展示您尝试做的示例。

有关更详细信息,请参阅文档:https://projectreactor.io/docs/core/release/reference/#_on_backpressure_and_ways_to_reshape_requests

英文:

The default of reactor used to be infinite or cancel. You can use the built in operators like take or limitRate to customise the request n behaviours.

https://github.com/making/rsc/blob/50d0c3dc43c60d3b3fe42e5586df49adb016a9cb/src/main/java/am/ik/rsocket/InteractionModel.java#L41-L52

Or implement your own operator, or use custom subscribe/subscription logic to control this precisely. This is a lot more involved so try it out first and show an example of what you are trying to do.

See docs like https://projectreactor.io/docs/core/release/reference/#_on_backpressure_and_ways_to_reshape_requests

huangapple
  • 本文由 发表于 2020年9月11日 05:41:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/63838053.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定