英文:
SSE with Reactor Core Sinks
问题
Sure, here are the translations for the code-related parts of your text:
我有一个使用Spring Boot编写的后端应用程序,我必须实现向前端应用程序发送SSE。我的后端必须支持多个订阅者,并能够向前端的多个用户发送多个事件。此外,我必须以编程方式触发这些事件,因此我发现可以使用Project Reactor的Sinks类来完成:
private final Map<Long, Many<MyNotification>> sinkMap = new ConcurrentHashMap<>();
public Flux<ServerSentEvent<MyNotification>> createSubscription(Long subscriptionId) {
Sinks.Many<MyNotification> sink;
if (sinkMap.get(subscriptionId) != null) {
sink = sinkMap.get(subscriptionId);
} else {
sink = Sinks.many().multicast().onBackpressureBuffer();
}
sinkMap.put(subscriptionId, sink);
return sink.asFlux().map(event -> ServerSentEvent.builder(event).build());
}
Please note that the translation above only includes the code portion and not the questions you've asked. If you need translations for the questions as well, please let me know.
英文:
I have a backend application written in Spring Boot, and I have to implement sending of SSE to frontend application. My backend have to support multiple subscribers and to be able to send multiple events to multiple users in the front. Additionally, I have to programmatically trigger the events, so I found it that it can be done with project Reactor Sinks class:
private final Map<Long, Many<MyNotification>> sinkMap = new ConcurrentHashMap<>();
public Flux<ServerSentEvent<MyNotification>> createSubscription(Long subscriptionId) {
Sinks.Many<MyNotification> sink;
if (sinkMap.get(subscriptionId) != null) {
sink = sinkMap.get(subscriptionId);
} else {
sink = Sinks.many().multicast().onBackpressureBuffer();
}
sinkMap.put(subscriptionId, sink);
return sink.asFlux().map(event -> ServerSentEvent.builder(event).build());
}
I have the following questions:
- Is it OK to hold sinks in the map, since I'm figuring out that the application itself will be horizontally scaled? How will it behave in cases of more applications in parallel?
- Is there a way to somehow "store those sinks" (i.e. Redis or something), so the subscriptions can be centralized?
- Is there a way to know if subscribers are still subscribed, so that map can be cleared if necessary?
- Do you have maybe other suggestions for implementation of SSE in Spring Boot, that does not include Reactor Sinks?
答案1
得分: 2
首先,你的示例虽然使用了ConcurrentHashMap
,但并不是线程安全的,因为你分别调用了get
和put
。你应该使用computeIfAbsent
或compute
来保证在关键点上对映射的原子更新。
另外,目前还不清楚你计划将sink保留在映射中多长时间 - 是否会重复使用单个sink并交给新的消费者,还是每个消费者都有唯一的sink?
至于在集群中向多个客户端发布消息的问题,这取决于你的具体需求。
-
是的和否。只要你愿意,你可以在映射中保留对它的引用。一旦订阅者计数降至零,sink 就会失效,必须将其移除,除非你禁用了
autoCancel
,在这种情况下可以重复使用。 -
Sink 是一个代表发布者和订阅者之间契约关系的运行时数据结构句柄。也许你想存储通过 sink 交付给订阅者的最近数据?存储一个包含发布逻辑的运行时数据结构并不太合理。
-
是的。有多种方法可以实现这一点。最简单的选择是忽略订阅者计数,将 sink 作为永久的单例保留。
-
Reactor 将是最简单的方法。
英文:
First off, your example is not thread safe even though you are using ConcurrentHashMap
, because you are calling get
and put
separately. You should have used computeIfAbsent
or compute
to guarantee atomic update of the map in regard to the key.
It is also not clear how long do you plan to keep the sink in the map - whether a single sink will be reused and handed over to new consumers or not. Is the sink unique per consumer?
As far as publishing messages to multiple clients in a cluster is concerned, it depends on what you are trying to do.
-
Yes and no. You can keep the reference in the map as long as you want. Once the subscriber count drops to zero, the sink is dead and must be removed that is unless you disable autoCancel in which case it can be reused.
-
Sink is a handle to a runtime datastructure representing a contract between publisher and subscriber. Do you perhaps want to store recent data that were handed over to the subscriber via the sink? Storing a runtime datastructure encompassing a publishing logic does not make much sense.
-
Yes. This can be done in multitude of ways. The simplest option is to ignore the subscriber count and keep the sink as a singleton forever.
-
Reactor would be the easiest.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论