预先创建内部Mono作为一个变量

huangapple go评论77阅读模式
英文:

Pre-create inner Mono as a variable

问题

假设简单的响应式流如下所示:

        Flux.just(1, 2, 3, 4, 5)
                .flatMap(i -> 
                        Mono.just(i)
                                .map( ... 映射一些值)
                                .flatMap( ... 异步调用)
                                .map(... 更多映射)
                )
                .doOnNext(log::info)
                .blockLast();

假设父流中的.flatMap()内部的Mono是一个带有复杂逻辑的复杂响应式流。是否有办法将该Mono保存为变量,然后可以传递给父流?换句话说,类似于这样:

final Function<Integer, Publisher<String>> monoPublisher = Mono.something()
                .map( ... 映射一些值)
                .flatMap( ... 异步调用)
                .map(... 更多映射);

Flux.just(1, 2, 3, 4, 5)
                .flatMap(monoPublisher)
                .doOnNext(log::info)
                .blockLast();

天真的答案可能是:

final Function<Integer, Publisher<String>> monoPublisher = i -> Mono.just(i) ...

然而,这仍然会将Mono的初始化推迟到实际订阅时,并且每个父流中的元素都会创建一个新的Mono。

我熟悉Mono.create(),但我不知道如何在这里使用它,而不需要创建自己的Publisher实现,该实现会包装在Mono.create()返回的sink周围。

编辑:
感谢ESala的建议。我只是修改了他的示例,使用ThreadLocal代替Queue,虽然在使用Flux.just()的这个示例中不需要,但在使用多线程生产者时可以防止竞争条件。

final ThreadLocal<Integer> threadLocalStorage = new ThreadLocal<>();

final Mono<String> mono = Mono.fromSupplier(threadLocalStorage::get)
                .map(Objects::toString);

Flux.just(1, 2, 3, 4, 5)
                .flatMap(i -> {
                    threadLocalStorage.set(i);
                    return mono;
                })
                .doOnNext(System.out::println)
                .blockLast();
英文:

Assume simple reactive stream line this:

        Flux.just(1, 2, 3, 4, 5)
                .flatMap(i -&gt; 
                        Mono.just(i)
                                .map( ... map some value)
                                .flatMap( ... async call)
                                .map(... more mappings)
                )
                .doOnNext(log::info)
                .blockLast()
        ;

Assume Mono inside parent stream .flatMap() is complicated reactive stream with complex logic. Is there a way to save that Mono as a variable that could be passed to the parent stream? In other words, something like this:

final Function&lt;Integer, Publisher&lt;String&gt;&gt; monoPublisher = Mono.something()
                .map( ... map some value)
                .flatMap( ... async call)
                .map(... more mappings);

        Flux.just(1, 2, 3, 4, 5)
                .flatMap(monoPublisher)
                .doOnNext(log::info)
                .blockLast()
        ;

The naive answer would be:

final Function&lt;Integer, Publisher&lt;String&gt;&gt; monoPublisher = i -&gt; Mono.just(i) ...

However, that would still defer initialization of Mono until actual subscription, and new Mono would be created for each element in parent stream.

I am familiar with Mono.create() but I don't see how to make it work here, without creating my own Publisher implementation that wraps around sink that is returned by Mono.create()

EDIT:
Thanks for ESala for the suggestion. I just modified his example to use ThreadLocal instead of Queue, which is not needed for this example using Flux.just() but would prevent race conditions when using multi-threaded producer

final ThreadLocal&lt;Integer&gt; threadLocalStorage = new ThreadLocal&lt;&gt;();

        final Mono&lt;String&gt; mono = Mono.fromSupplier(threadLocalStorage::get)
                .map(Objects::toString);

        Flux.just(1, 2, 3, 4, 5)
                .flatMap(i -&gt; {
                    threadLocalStorage.set(i);
                    return mono;
                })
                .doOnNext(System.out::println)
                .blockLast()
        ;

答案1

得分: 1

也许类似这样的代码可以工作,只需将LinkedList替换为支持并发访问的更高级的结构:

Queue<Integer> arguments = new LinkedList<>();

Publisher<String> expensivePublisher =
        Mono.fromSupplier(arguments::remove)
                .map( ... 映射某些值)
                .flatMap( ... 异步调用)
                .map(... 更多映射);

Flux.just(1, 2, 3)
        .flatMap(
                i -> {
                    arguments.add(i);
                    return expensivePublisher;
                })
        .blockLast();

但仍然感觉有些矫揉造作。如果确实有100个内部操作符,建议尝试在该部分进行优化。在那方面的改进可能会比重用Publisher以避免重新创建它具有更高的影响。

英文:

Maybe something like this would work, just replace the LinkedList with something fancier that supports concurrent access:

Queue&lt;Integer&gt; arguments = new LinkedList&lt;&gt;();

Publisher&lt;String&gt; expensivePublisher =
        Mono.fromSupplier(arguments::remove)
                .map( ... map some value)
                .flatMap( ... async call)
                .map(... more mappings);

Flux.just(1, 2, 3)
        .flatMap(
                i -&gt; {
                    arguments.add(i);
                    return expensivePublisher;
                })
        .blockLast();

But it still feels like a hack. If there truly are 100 operators in the inner, then would suggest trying to optimize on that side. Improvements in that area will likely have higher impact than reusing the Publisher to avoid recreating it.

huangapple
  • 本文由 发表于 2020年10月27日 02:20:59
  • 转载请务必保留本文链接:https://go.coder-hub.com/64542856.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定