英文:
How to efficiently split a single input Flux into many output Flux based on a computed element property?
问题
我们有一些代码,给定一个包含所有事件的`Flux<Event>`。然后客户端会请求一个包含这些事件子集的`Flux<Event>`。
代码大致如下:
/**
* 所有传入事件
*/
private val allEvents: Flux<Event> = ...
/**
* 返回具有匹配键的事件流。
*/
fun eventsForKey(key: String): Flux<Event> {
return allEvents.filter { event ->
event.key == key
}
}
所以我们有了allEvents
,它包含了所有传入的事件,然后会调用eventsForKey
函数(可能多次)来创建一个只包含指定键的事件的Flux<Event>
。可能会同时存在许多这些经过筛选的Flux
实例。
我的担忧是,这实际上在为每个事件确定要传递到哪个“子Flux”时进行线性搜索。也就是说,如果在某个特定时刻存在 n 个子Flux实例,并且有一个单独的事件到达,那么该事件将会被与所有 n 个筛选谓词进行比较。
我希望能够指定一个输入的Flux和一个键函数,然后(重复地)为任何给定的键值获取一个输出的Flux。每个子Flux的行为就像上面筛选后的那些,但是与其为每个事件执行 n 次谓词检查不同,每个事件将会导致一个键计算和一个用于输出Flux的单一字典查找。与不匹配现有子Flux的事件应该被丢弃,就像使用筛选时一样。
我找到了Flux.groupBy
(也是这个相关问题的被接受答案),但是:
-
它的返回类型是不便使用的
Flux<GroupedFlux<K,T>>
:-
当其第一个事件出现时,我不希望一个组的子Flux就开始存在。我需要能够在需要时为给定的键获取一个Flux,这可能发生在任何与该键匹配的事件到达之前。
-
我也不想处理没有下游消费者要求的组。不匹配下游消费者要求的键的事件应该被过滤掉。
-
-
它的文档说明:
> 请注意,groupBy 对具有低群组基数的情况效果最佳,因此请相应选择您的 keyMapper 函数。
我不确定“低群组基数”是否意味着每个“组”的大小需要小,还是组的数量需要少。(我也不知道在此情况下“小”是什么意思。)我特别在尝试处理子Flux实例的数量可能很大的情况。
Reactor是否提供了一种有效地将Flux解复用的方法?
<details>
<summary>英文:</summary>
We have some code that is given a `Flux<Event>` containing all events. Clients
then request a `Flux<Event>` for a subset of these events.
The code does something like:
``` kotlin
// Note: Kotlin code, but this question is not Kotlin-specific
/**
* All incoming events
*/
private val allEvents: Flux<Event> = ...
/**
* Returns an flux of the events with the matching key.
*/
fun eventsForKey(key: String): Flux<Event> {
return allEvents.filter { event ->
event.key == key
}
}
So we've got allEvents
, which has all of the incoming events, and the
eventsForKey
function is called (potentially many times) to create a
Flux<Event>
of only the events with the key specified. There are potentially
a lot of these filtered Flux
instances that are alive concurrently.
My concern is that this is effectively doing a linear search for which
"sub-Flux" to deliver each event to. That is, if there are n sub-Flux
instances alive at a given moment, and a single event arrives, the event will
be tested against all n filter predicates.
What I want is a something that will let me specify an input Flux and a key
function, and then (repeatedly) obtain an output Flux for any given key
value. Each sub-Flux would behave just like the filtered ones above, but
instead of executing n predicate checks for each event, each event would
result in one key computation and a single dictionary lookup for the outgoing
Flux. Events that don't match an existing sub-Flux should be discarded,
just as they would be with a filter.
I found Flux.groupBy
(which is also the accepted answer to this related
question) but:
-
Its return type is the unwieldy
Flux<GroupedFlux<K,T>>
:-
I don't want the sub-Flux for a group to come into existence when its
first event appears. I need to be able to obtain a Flux for a given key on
demand, which is potentially before any events matching that key have
arrived. -
I also don't want to have to deal with groups that no downstream consumer
has asked for. Events that don't match a key downstream consumers have
asked for should just be filtered out.
-
-
Its documentation states:
> Note that groupBy works best with a low cardinality of groups, so chose
> your keyMapper function accordingly.I'm not sure if "a low cardinality of groups" means each "group" needs to be
small, or if the number of groups needs to be small. (and I don't know what
"small" means in this context.) I am specifically trying to deal with a
situation where the number of sub-Flux instances may be large.
Does Reactor provide a way to efficiently demultiplex a Flux like this?
答案1
得分: 1
你的问题对我来说听起来很有趣,我正在尝试着处理这个。这个解决方案可能不太优雅,但我只是想分享一下!
根据你的要求,你似乎需要一些有状态的谓词来在子流传递之前过滤事件,以避免每个订阅者都要自行进行过滤!在这种情况下,我们需要在某个地方维护一个列表/集合,来保存允许的事件列表。[在我的示例中,我假设我有一个字符串的流,第一个字符是事件。根据你在问题中提供的其他答案]
// 字符和对应流的映射
private static final Map<Character, Flux<String>> CHAR_FLUX = new HashMap<>();
// 允许的字符,初始为空
private static final List<Character> ALLOWED_CHARS = new ArrayList<>();
// 有状态谓词
private static final Predicate<Character> IS_ALLOWED = c -> {
System.out.println("IS_ALLOWED 检查: " + c);
return ALLOWED_CHARS.contains(c);
};
Flux<GroupedFlux<Character, String>> groupedFluxFlux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4", "a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4")
.delayElements(Duration.ofMillis(1000))
.filter(s -> IS_ALLOWED.test(s.charAt(0))) // 检查是否允许
.groupBy(s -> s.charAt(0)) // 仅对允许的键进行分组
.cache();
groupBy
返回的是只能被一个订阅者消耗的 UnicastProcessor。在你的情况下,如果你期望同一个键有多个订阅者,那么我们需要使用这个映射。否则就不需要。
你的 eventsForKey
方法将会在将键值添加到列表/集合后,从映射中返回键值。
// 这里的过滤器只对一个订阅者起作用,不会对每个事件进行过滤
ALLOWED_CHARS.add('a');
return CHAR_FLUX.computeIfAbsent('a', k -> Flux.defer(() -> groupedFluxFlux.filter(gf -> gf.key() == 'a').flatMap(Function.identity())).cache());
假设:
你有一个有限的事件集合(基数)。否则,列表/映射可能会增长,而且 groupedFlux
的性能也可能不太好。
英文:
Your question sounded very interesting to me and I was playing with this. This solution might not be elegant; but I simply wanted to share!
Your requirement sounds like you need some stateful predicate for filtering events before sub-fluxing to avoid every subscriber to do the filtering on their own!
In that case, we need to maintain a list/set somewhere to hold the list of allowed events. [In my example, I am going to assume I have a flux of string and the first character is the event. Based on other answer you have included in your question]
// map for char and the corresponding flux
private static final Map<Character, Flux<String>> CHAR_FLUX = new HashMap<>();
// allowed chars. empty initially
private static final List<Character> ALLOWED_CHARS = new ArrayList<>();
// stateful predicate
private static final Predicate<Character> IS_ALLOWED = c -> {
System.out.println("IS_ALLOWED check : " + c);
return ALLOWED_CHARS.contains(c);
};
Flux<GroupedFlux<Character, String>> groupedFluxFlux = Flux.just("a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4", "a1", "b1", "c1", "a2", "b2", "c2", "a3", "b3", "c3", "a4", "b4", "c4")
.delayElements(Duration.ofMillis(1000))
.filter(s -> IS_ALLOWED.test(s.charAt(0))) // check if it is allowed
.groupBy(s -> s.charAt(0)) // group by starts only for the allowed keys
.cache();
groupBy
returns unicastprocessor which can be consumed by only one subscriber. In your case, if you expect more than 1 subscriber for the same key, then we need this map. Otherwise it is not required.
Your eventsForKey method would return the key value from the map after adding it to the list/set.
// here the filter is just 1 filter for 1 subscriber. does not filter for every event
ALLOWED_CHARS.add('a');
return CHAR_FLUX.computeIfAbsent('a', k -> Flux.defer(() -> groupedFluxFlux.filter(gf -> gf.key() == 'a').flatMap(Function.identity())).cache());
Assumptions:
You have a limited set of events (cardinality). Otherwise the list/map might grow & groupedFlux might also not perform very well.
答案2
得分: 1
要正确地做到这一点,可能需要比我个人熟悉的核心反应堆框架更好的理解,但似乎你想要一个由`HashMap`驱动的单个`Subscriber`和多个`Publishers`。在概念上,装饰一个`Subscriber`应该足够简单:
```java
class DeMuxedSubscriber<T> implements Subscriber<T> {
Map<T, SimplePublisher<T>> mapPublishers = new HashMap<>();
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T s) {
if (mapPublishers.get(s) != null)
mapPublishers.get(s).subscriber.onNext(s);
}
@Override
public void onError(Throwable t) {
mapPublishers.values().forEach(sp -> sp.subscriber.onError(t));
}
@Override
public void onComplete() {
mapPublishers.values().forEach(sp -> sp.subscriber.onComplete());
}
public Publisher<T> getPublisher(T s) {
mapPublishers.putIfAbsent(s, new SimplePublisher<T>());
return mapPublishers.get(s);
}
};
而且可能在某个地方有一个很好地处理成为发布者的类,但是这足以说明问题:
class SimplePublisher<T> implements Publisher<T> {
Subscriber<? super T> subscriber;
@Override
public void subscribe(Subscriber<? super T> s) {
subscriber = s;
}
}
然后你可以创建一个简单的示例来使用它。这一切似乎有点尴尬,这里显示的示例DeMuxedSubscriber
忽略了背压,但是嘿,细节:
Flux<String> wordFlux = Flux.generate(() -> new Integer(0), (i, sink) -> {
if (i >= 100)
sink.complete();
i = i + 1;
sink.next(Integer.toString(largestPrimeFactor(i)));
return i;
});
DeMuxedSubscriber<String> deMuxedSubscriber = new DeMuxedSubscriber<>();
Flux.from(deMuxedSubscriber.getPublisher("3")).subscribe(System.out::println);
Flux.from(deMuxedSubscriber.getPublisher("5")).subscribe(System.out::println);
wordFlux.subscribe(deMuxedSubscriber);
<details>
<summary>英文:</summary>
To do it properly probably takes a better understanding of the core reactor framework than I am personally familiar with but it seems that you want a single `Subscriber` and multiple `Publishers` driven by a `HashMap`. A decorated `Subscriber` should be easy enough in concept:
class DeMuxedSubscriber<T> implements Subscriber<T> {
Map<T, SimplePublisher<T>> mapPublishers = new HashMap<>();
@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(T s) {
if ( mapPublishers.get(s) != null)
mapPublishers.get(s).subscriber.onNext(s);
}
@Override
public void onError(Throwable t) {
mapPublishers.values().forEach(sp->sp.subscriber.onError(t));
}
@Override
public void onComplete() {
mapPublishers.values().forEach(sp->sp.subscriber.onComplete());
}
public Publisher<T> getPublisher(T s) {
mapPublishers.putIfAbsent(s, new SimplePublisher<T>());
return mapPublishers.get(s);
}
};
And there is probably a class somewhere that handles being a publisher just fine but this will suffice to illustrate:
class SimplePublisher<T> implements Publisher<T> {
Subscriber<? super T> subscriber;
@Override
public void subscribe(Subscriber<? super T> s) {
subscriber = s;
}
}
And then you can make a simple example to use it. This all seems a bit awkward, and the example `DeMuxedSubscriber` shown here ignores backpressure, but hey, details:
Flux<String> wordFlux = Flux.generate(() -> new Integer(0), (i, sink) -> {
if (i >= 100)
sink.complete();
i = i + 1;
sink.next(Integer.toString(largestPrimeFactor(i)));
return i;
});
DeMuxedSubscriber<String> deMuxedSubscriber = new DeMuxedSubscriber<>();
Flux.from(deMuxedSubscriber.getPublisher("3")).subscribe(System.out::println);
Flux.from(deMuxedSubscriber.getPublisher("5")).subscribe(System.out::println);
wordFlux.subscribe(deMuxedSubscriber);
</details>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论