英文:
Filter a Mono<T> with a String Id matching one of a list of Flux<String>
问题
I'm fetching a Monoid
and orgId
fields). I also have a Flux
I'm not entirely sure to understand the exact Mono/Flux flow. But so far I have tried 2 approaches, blocking and non-blocking.
Blocking
public Mono<T> getItemBlocking(final String orgId, final String id) {
final Flux<String> orgs = getOrgs(orgId);
final T t = repository.findById(id).block();
final Boolean tInOrg = orgs.hasElement(t.getOrgId()).subscribe().isDisposed();
return tInOrg ? repository.findById(id) : Mono.error(someError.error(id)); // Ideally would not make the request again
}
The code is not working, I'm receiving the following exception:
"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
Non-blocking
public Mono<T> getItemNonBlocking(final String orgId, final String id) {
final Flux<String> orgs = getOrgs(orgId);
return (Mono<T>) repository.findById(id)
.filter(t -> orgs.hasElement(t.getOrgId()).block()) // Need to make this non-blocking but I'm not able to, but need the Mono<Boolean> to be a Boolean
.switchIfEmpty(Mono.error(someError.error(id)))
.subscribe();
}
This code is not working also, I'm receiving the exception: java.lang.ClassCastException: reactor.core.publisher.LambdaMonoSubscriber incompatible with reactor.core.publisher.Mono at service.getItemNonBlocking
Any clue on what I'm doing wrong?
英文:
I'm fetching a Mono<T> (with T containing an id
and orgId
fields). I also have a Flux<String> of orgIds. I need to return the Mono<T> if T.getOrgId() is contained in the Flux.
I'm not entirely sure to understand the exact Mono/Flux flow. But so far I have tried 2 approaches, blocking and non-blocking.
Blocking
public Mono<T> getItemBlocking(final String orgId, final String id) {
final Flux<String> orgs = getOrgs(orgId);
final T t = repository.findById(id).block();
final Boolean tInOrg = orgs.hasElement(t.getOrgId()).subscribe().isDisposed();
return tInOrg ? repository.findById(id) : Mono.error(someError.error(id)); // Ideally would not make the request again
}
The code is not working, I'm receiving the following exception:
"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread
reactor-http-nio-4
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)
Non-blocking
public Mono<T> getItemNonBlocking(final String orgId, final String id) {
final Flux<String> orgs = getOrgs(orgId);
return (Mono<T>) repository.findById(id)
.filter(t -> orgs.hasElement(t.getOrgId()).block()) // Need to make this non blocking but I'm not able to, but need the Mono<Boolean> to be a Boolean
.switchIfEmpty(Mono.error(someError.error(id))
.subscribe();
}
This code is not working also, I'm receiving the exception:
java.lang.ClassCastException: reactor.core.publisher.LambdaMonoSubscriber incompatible with
reactor.core.publisher.Mono
at service.getItemNonBlocking
Any clue on what I'm doing wrong?
答案1
得分: 0
- 你应该避免阻塞。阻塞是设计用于必须与非响应式组件交互的情况。
- 如果你返回一个 Mono,你不应该自己订阅它。应该由用户/消费者来执行订阅。
现在,你可以通过修改你的第二个示例来实现你想要的效果:
- 将 filter 替换为 filterWhen,以允许使用响应式流作为操作符,而不是阻塞操作。
- 直接返回结果的响应式流管道,而不是订阅它。
return repository.findById(id)
.filterWhen(t -> getOrgs(orgId).hasElement(t.getOrgId()))
.switchIfEmpty(Mono.error(someError.error(id));
英文:
- You should avoid blocking. Blocking is designed for cases where you have to interact with non reactive components.
- If you return a Mono, you should not subscribe yourself. It is the user/consumer that should do it.
Now, you can achieve what you want by modifying your second example:
- Change filter with filterWhen, to allow a reactive stream as operator, instead of a blocking op.
- Return the resulting reactive pipeline directly, instead of subscribing to it.
return repository.findById(id)
.filterWhen(t -> getOrgs(orgId).hasElement(t.getOrgId()))
.switchIfEmpty(Mono.error(someError.error(id));
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论