有没有办法在Spring RSocket中查看REQUEST_N交换?

huangapple go评论69阅读模式

Is there a way to see the REQUEST_N exchanges in Spring RSocket



Flux<SubscriptionMessage> stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {






I'm using the Spring support for RSocket, specifically the request-stream model. I.e.:

Flux&lt;SubscriptionMessage&gt; stream(final SubscriptionMessage request, @AuthenticationPrincipal UserDetails user) {

If I understand RSocket correctly, the Flux response is delivered back to the client as a series of
Payload messages under a request(n) convention. E.g. n payload messages at a time. After each series of messages, the client credits the server to send an additional set using a REQUEST_N message, and this provides the basis for back-pressure relief.

In the API of the java library (org.springframework.messaging.rsocket, which is based on io.rsocket.RSocket), is there any way to handle/visit the REQUEST_N messages as they arrive, or to set the value of N explicitly via the Strategy (and see the value that a requestor has passed to a server)?

Reason/Why: I'm implementing an rsocket facade over Kafka, and am attempting to provide a request-stream mechanism for subscriptions that will support automatic offset commit as the messages are consumed.
For the request-stream convention, I think the occasional REQUEST_N interaction by the requestor
to be the ideal point where the commit offset of the topic can be advanced, since it's transmission by
the requestor means that the preceding Payload messages sent by the responder have been received.

The only other option I've seen is to use the request-channel model, so that the requestor can send
and initial subscription request, and beging receiving data, but also send
period messages to specifically control the commit offset over the same channel. I'm considering providing that anyway,
but wanted to know if there was a way to inject logic into the periodic request(n) cycle of a stream.


得分: 1

默认情况下,反应堆的默认设置是无限的或取消的。 您可以使用内置运算符,如take或limitRate,来自定义请求和行为。


或者您可以实现自己的运算符,或使用自定义的订阅/订阅逻辑来精确控制。 这需要更多的工作量,所以请先尝试,并展示您尝试做的示例。



The default of reactor used to be infinite or cancel. You can use the built in operators like take or limitRate to customise the request n behaviours.


Or implement your own operator, or use custom subscribe/subscription logic to control this precisely. This is a lot more involved so try it out first and show an example of what you are trying to do.

See docs like https://projectreactor.io/docs/core/release/reference/#_on_backpressure_and_ways_to_reshape_requests

  • 本文由 发表于 2020年9月11日 05:41:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/63838053.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
