如何使用 Reactor 3.x 将 List 转换为 Flux

huangapple go评论97阅读模式
英文:

How to convert List<T> to Flux<T> by using Reactor 3.x

问题

我有一个异步调用的Thrift接口:

  1. public CompletableFuture<List<Long>> getFavourites(Long userId){
  2. CompletableFuture<List<Long>> future = new CompletableFuture();
  3. OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
  4. callback.addObserver(new OctoObserver() {
  5. @Override
  6. public void onSuccess(Object o) {
  7. future.complete((List<Long>) o);
  8. }
  9. @Override
  10. public void onFailure(Throwable throwable) {
  11. future.completeExceptionally(throwable);
  12. }
  13. });
  14. try {
  15. recommendAsyncService.getFavorites(userId, callback);
  16. } catch (TException e) {
  17. log.error("OctoCall RecommendAsyncService.getFavorites", e);
  18. }
  19. return future;
  20. }

现在它返回一个CompletableFuture<List<Long>>。然后我调用它通过使用Flux来进行一些处理。

  1. public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
  2. // 不喜欢这种写法
  3. List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
  4. System.out.println(recommendList);
  5. return Flux.fromIterable(recommendList)
  6. .flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
  7. .userId(userId)
  8. .productId(id)
  9. .productType((int) (Math.random()*100))
  10. .build())))
  11. .take(5)
  12. .publishOn(mdpScheduler);
  13. }

然而,我想要从getFavourites方法中获得一个Flux<Long>,并且我可以在getRecommend方法中使用它。
或者,你可以推荐一个Flux API,然后我可以将List<Long> recommendList 转换为 Flux<Long> recommendFlux

英文:

I have a Asyn call thrift interface:

  1. public CompletableFuture&lt;List&lt;Long&gt;&gt; getFavourites(Long userId){
  2. CompletableFuture&lt;List&lt;Long&gt;&gt; future = new CompletableFuture();
  3. OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
  4. callback.addObserver(new OctoObserver() {
  5. @Override
  6. public void onSuccess(Object o) {
  7. future.complete((List&lt;Long&gt;) o);
  8. }
  9. @Override
  10. public void onFailure(Throwable throwable) {
  11. future.completeExceptionally(throwable);
  12. }
  13. });
  14. try {
  15. recommendAsyncService.getFavorites(userId, callback);
  16. } catch (TException e) {
  17. log.error(&quot;OctoCall RecommendAsyncService.getFavorites&quot;, e);
  18. }
  19. return future;
  20. }

Now it returns a CompletableFuture<List<Long>>. And then I call it to do some processor by using Flux.

  1. public Flux&lt;Product&gt; getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
  2. // do not like it
  3. List&lt;Long&gt; recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
  4. System.out.println(recommendList);
  5. return Flux.fromIterable(recommendList)
  6. .flatMap(id -&gt; Mono.defer(() -&gt; Mono.just(Product.builder()
  7. .userId(userId)
  8. .productId(id)
  9. .productType((int) (Math.random()*100))
  10. .build())))
  11. .take(5)
  12. .publishOn(mdpScheduler);
  13. }

However, I want to get a Flux<Long> from getFavourites method and I can use it in getRecommend method.<br>
Or, you can recommend a Flux API ,and I can convert the List&lt;Long&gt; recommendList to Flux&lt;Long&gt; recommendFlux.

答案1

得分: 4

要将CompletableFuture<List<T>>转换为Flux<T>,您可以使用Mono#fromFutureMono#flatMapMany

  1. var future = new CompletableFuture<List<Long>>();
  2. future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
  3. CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
  4. Flux<Long> flux = Mono.fromFuture(() -> future).flatMapMany(Flux::fromIterable);
  5. flux.subscribe(System.out::println);

在回调中异步接收的List<T>也可以被转换为Flux<T>,而无需使用CompletableFuture
您可以直接使用Mono#createMono#flatMapMany

  1. Flux<Long> flux = Mono.<List<Long>>create(sink -> {
  2. Callback<List<Long>> callback = new Callback<List<Long>>() {
  3. @Override
  4. public void onResult(List<Long> list) {
  5. sink.success(list);
  6. }
  7. @Override
  8. public void onError(Exception e) {
  9. sink.error(e);
  10. }
  11. };
  12. client.call("query", callback);
  13. }).flatMapMany(Flux::fromIterable);
  14. flux.subscribe(System.out::println);

或者只需使用Flux#create在一次传递中进行多次发射:

  1. Flux<Long> flux = Flux.create(sink -> {
  2. Callback<List<Long>> callback = new Callback<List<Long>>() {
  3. @Override
  4. public void onResult(List<Long> list) {
  5. list.forEach(sink::next);
  6. }
  7. @Override
  8. public void onError(Exception e) {
  9. sink.error(e);
  10. }
  11. };
  12. client.call("query", callback);
  13. });
  14. flux.subscribe(System.out::println);
英文:

To convert a CompletableFuture&lt;List&lt;T&gt;&gt; into a Flux&lt;T&gt; you can use Mono#fromFuture with Mono#flatMapMany:

  1. var future = new CompletableFuture&lt;List&lt;Long&gt;&gt;();
  2. future.completeAsync(() -&gt; List.of(1L, 2L, 3L, 4L, 5L),
  3. CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
  4. Flux&lt;Long&gt; flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
  5. flux.subscribe(System.out::println);

List&lt;T&gt; received asynchronously in a callback can be also converted into a Flux&lt;T&gt; without using a CompletableFuture.
You can directly use Mono#create with Mono#flatMapMany:

  1. Flux&lt;Long&gt; flux = Mono.&lt;List&lt;Long&gt;&gt;create(sink -&gt; {
  2. Callback&lt;List&lt;Long&gt;&gt; callback = new Callback&lt;List&lt;Long&gt;&gt;() {
  3. @Override
  4. public void onResult(List&lt;Long&gt; list) {
  5. sink.success(list);
  6. }
  7. @Override
  8. public void onError(Exception e) {
  9. sink.error(e);
  10. }
  11. };
  12. client.call(&quot;query&quot;, callback);
  13. }).flatMapMany(Flux::fromIterable);
  14. flux.subscribe(System.out::println);

Or simply using Flux#create with multiple emissions in one pass:

  1. Flux&lt;Long&gt; flux = Flux.create(sink -&gt; {
  2. Callback&lt;List&lt;Long&gt;&gt; callback = new Callback&lt;List&lt;Long&gt;&gt;() {
  3. @Override
  4. public void onResult(List&lt;Long&gt; list) {
  5. list.forEach(sink::next);
  6. }
  7. @Override
  8. public void onError(Exception e) {
  9. sink.error(e);
  10. }
  11. };
  12. client.call(&quot;query&quot;, callback);
  13. });
  14. flux.subscribe(System.out::println);

答案2

得分: 4

简单的解决方案是使用Flux.fromIterable,如下面的示例所示:

  1. public Flux<Integer> fromListToFlux(){
  2. List<Integer> intList = Arrays.asList(1,2,5,7);
  3. return Flux.fromIterable(intList);
  4. }

Spring Boot版本为3.1.0:

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>3.1.0</version>
  5. <relativePath/> <!-- lookup parent from repository -->
  6. </parent>

注意:这不是推荐的方法,因为当您在响应式管道之外工作时,它不会完全保持响应式,这里创建了一个列表。

英文:

Simple solution is to use Flux.fromIterable as shown in below example

  1. public Flux&lt;Integer&gt; fromListToFlux(){
  2. List&lt;Integer&gt; intList = Arrays.asList(1,2,5,7);
  3. return Flux.fromIterable(intList);
  4. }

Springboot version is 3.1.0

  1. &lt;parent&gt;
  2. &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
  3. &lt;artifactId&gt;spring-boot-starter-parent&lt;/artifactId&gt;
  4. &lt;version&gt;3.1.0&lt;/version&gt;
  5. &lt;relativePath/&gt; &lt;!-- lookup parent from repository --&gt;
  6. &lt;/parent&gt;

Note: This is not recommended because it will not be completely reactive when you work out of the reactive pipeline, here creating a List.

huangapple
  • 本文由 发表于 2020年8月24日 15:53:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63556833.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定