使用一个字符串Id与Flux<String>列表中的一个匹配来筛选Mono<T>。

huangapple go评论79阅读模式
英文:

Filter a Mono<T> with a String Id matching one of a list of Flux<String>

问题

I'm fetching a Mono (with T containing an id and orgId fields). I also have a Flux of orgIds. I need to return the Mono if T.getOrgId() is contained in the Flux.

I'm not entirely sure to understand the exact Mono/Flux flow. But so far I have tried 2 approaches, blocking and non-blocking.

Blocking

public Mono<T> getItemBlocking(final String orgId, final String id) {
    final Flux<String> orgs = getOrgs(orgId);
    final T t = repository.findById(id).block();
    final Boolean tInOrg = orgs.hasElement(t.getOrgId()).subscribe().isDisposed();
    return tInOrg ? repository.findById(id) : Mono.error(someError.error(id)); // Ideally would not make the request again
}

The code is not working, I'm receiving the following exception:

"java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-4 at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)

Non-blocking

public Mono<T> getItemNonBlocking(final String orgId, final String id) {
    final Flux<String> orgs = getOrgs(orgId);
    return (Mono<T>) repository.findById(id)
            .filter(t -> orgs.hasElement(t.getOrgId()).block()) // Need to make this non-blocking but I'm not able to, but need the Mono<Boolean> to be a Boolean
            .switchIfEmpty(Mono.error(someError.error(id)))
            .subscribe();
}

This code is not working also, I'm receiving the exception: java.lang.ClassCastException: reactor.core.publisher.LambdaMonoSubscriber incompatible with reactor.core.publisher.Mono at service.getItemNonBlocking

Any clue on what I'm doing wrong?

英文:

I'm fetching a Mono<T> (with T containing an id and orgId fields). I also have a Flux<String> of orgIds. I need to return the Mono<T> if T.getOrgId() is contained in the Flux.

I'm not entirely sure to understand the exact Mono/Flux flow. But so far I have tried 2 approaches, blocking and non-blocking.

Blocking

public Mono&lt;T&gt; getItemBlocking(final String orgId, final String id) {
        final Flux&lt;String&gt; orgs = getOrgs(orgId);
        final T t = repository.findById(id).block();
        final Boolean tInOrg = orgs.hasElement(t.getOrgId()).subscribe().isDisposed();
        return tInOrg ? repository.findById(id) : Mono.error(someError.error(id)); // Ideally would not make the request again
}

The code is not working, I'm receiving the following exception:

&quot;java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread
reactor-http-nio-4
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83)

Non-blocking

public Mono&lt;T&gt; getItemNonBlocking(final String orgId, final String id) {
        final Flux&lt;String&gt; orgs = getOrgs(orgId);
        return (Mono&lt;T&gt;) repository.findById(id)
                .filter(t -&gt; orgs.hasElement(t.getOrgId()).block()) // Need to make this non blocking but I&#39;m not able to, but need the Mono&lt;Boolean&gt; to be a Boolean
                .switchIfEmpty(Mono.error(someError.error(id))
                .subscribe();
}

This code is not working also, I'm receiving the exception:
java.lang.ClassCastException: reactor.core.publisher.LambdaMonoSubscriber incompatible with
reactor.core.publisher.Mono
at service.getItemNonBlocking

Any clue on what I'm doing wrong?

答案1

得分: 0

  1. 你应该避免阻塞。阻塞是设计用于必须与非响应式组件交互的情况。
  2. 如果你返回一个 Mono,你不应该自己订阅它。应该由用户/消费者来执行订阅。

现在,你可以通过修改你的第二个示例来实现你想要的效果:

  1. filter 替换为 filterWhen,以允许使用响应式流作为操作符,而不是阻塞操作。
  2. 直接返回结果的响应式流管道,而不是订阅它。
return repository.findById(id)
                .filterWhen(t -&gt; getOrgs(orgId).hasElement(t.getOrgId()))
                .switchIfEmpty(Mono.error(someError.error(id));
英文:
  1. You should avoid blocking. Blocking is designed for cases where you have to interact with non reactive components.
  2. If you return a Mono, you should not subscribe yourself. It is the user/consumer that should do it.

Now, you can achieve what you want by modifying your second example:

  1. Change filter with filterWhen, to allow a reactive stream as operator, instead of a blocking op.
  2. Return the resulting reactive pipeline directly, instead of subscribing to it.
return repository.findById(id)
                .filterWhen(t -&gt; getOrgs(orgId).hasElement(t.getOrgId()))
                .switchIfEmpty(Mono.error(someError.error(id));

huangapple
  • 本文由 发表于 2023年6月29日 22:01:31
  • 转载请务必保留本文链接:https://go.coder-hub.com/76581775.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定