如何使用 Reactor 3.x 将 List 转换为 Flux

huangapple go评论80阅读模式
英文:

How to convert List<T> to Flux<T> by using Reactor 3.x

问题

我有一个异步调用的Thrift接口:

public CompletableFuture<List<Long>> getFavourites(Long userId){
    CompletableFuture<List<Long>> future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List<Long>) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error("OctoCall RecommendAsyncService.getFavorites", e);
    }
    return future;
}

现在它返回一个CompletableFuture<List<Long>>。然后我调用它通过使用Flux来进行一些处理。

public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // 不喜欢这种写法
    List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
                    .userId(userId)
                    .productId(id)
                    .productType((int) (Math.random()*100))
                    .build())))
            .take(5)
            .publishOn(mdpScheduler);
}

然而,我想要从getFavourites方法中获得一个Flux<Long>,并且我可以在getRecommend方法中使用它。
或者,你可以推荐一个Flux API,然后我可以将List<Long> recommendList 转换为 Flux<Long> recommendFlux

英文:

I have a Asyn call thrift interface:

public CompletableFuture&lt;List&lt;Long&gt;&gt; getFavourites(Long userId){
    CompletableFuture&lt;List&lt;Long&gt;&gt; future = new CompletableFuture();
    OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
    callback.addObserver(new OctoObserver() {
        @Override
        public void onSuccess(Object o) {
            future.complete((List&lt;Long&gt;) o);
        }

        @Override
        public void onFailure(Throwable throwable) {
            future.completeExceptionally(throwable);
        }
    });
    try {
        recommendAsyncService.getFavorites(userId, callback);
    } catch (TException e) {
        log.error(&quot;OctoCall RecommendAsyncService.getFavorites&quot;, e);
    }
    return future;
}

Now it returns a CompletableFuture<List<Long>>. And then I call it to do some processor by using Flux.

public Flux&lt;Product&gt; getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
    // do not like it
    List&lt;Long&gt; recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);

    System.out.println(recommendList);
    return Flux.fromIterable(recommendList)
            .flatMap(id -&gt; Mono.defer(() -&gt; Mono.just(Product.builder()
                    .userId(userId)
                    .productId(id)
                    .productType((int) (Math.random()*100))
                    .build())))
            .take(5)
            .publishOn(mdpScheduler);
}

However, I want to get a Flux<Long> from getFavourites method and I can use it in getRecommend method.<br>
Or, you can recommend a Flux API ,and I can convert the List&lt;Long&gt; recommendList to Flux&lt;Long&gt; recommendFlux.

答案1

得分: 4

要将CompletableFuture<List<T>>转换为Flux<T>,您可以使用Mono#fromFutureMono#flatMapMany

var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
    CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux<Long> flux = Mono.fromFuture(() -> future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

在回调中异步接收的List<T>也可以被转换为Flux<T>,而无需使用CompletableFuture
您可以直接使用Mono#createMono#flatMapMany

Flux<Long> flux = Mono.<List<Long>>create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      sink.success(list);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

或者只需使用Flux#create在一次传递中进行多次发射:

Flux<Long> flux = Flux.create(sink -> {
  Callback<List<Long>> callback = new Callback<List<Long>>() {
    @Override
    public void onResult(List<Long> list) {
      list.forEach(sink::next);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call("query", callback);
});

flux.subscribe(System.out::println);
英文:

To convert a CompletableFuture&lt;List&lt;T&gt;&gt; into a Flux&lt;T&gt; you can use Mono#fromFuture with Mono#flatMapMany:

var future = new CompletableFuture&lt;List&lt;Long&gt;&gt;();
future.completeAsync(() -&gt; List.of(1L, 2L, 3L, 4L, 5L),
    CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));

Flux&lt;Long&gt; flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

List&lt;T&gt; received asynchronously in a callback can be also converted into a Flux&lt;T&gt; without using a CompletableFuture.
You can directly use Mono#create with Mono#flatMapMany:

Flux&lt;Long&gt; flux = Mono.&lt;List&lt;Long&gt;&gt;create(sink -&gt; {
  Callback&lt;List&lt;Long&gt;&gt; callback = new Callback&lt;List&lt;Long&gt;&gt;() {
    @Override
    public void onResult(List&lt;Long&gt; list) {
      sink.success(list);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call(&quot;query&quot;, callback);
}).flatMapMany(Flux::fromIterable);

flux.subscribe(System.out::println);

Or simply using Flux#create with multiple emissions in one pass:

Flux&lt;Long&gt; flux = Flux.create(sink -&gt; {
  Callback&lt;List&lt;Long&gt;&gt; callback = new Callback&lt;List&lt;Long&gt;&gt;() {
    @Override
    public void onResult(List&lt;Long&gt; list) {
      list.forEach(sink::next);
    }

    @Override
    public void onError(Exception e) {
      sink.error(e);
    }
  };
  client.call(&quot;query&quot;, callback);
});

flux.subscribe(System.out::println);

答案2

得分: 4

简单的解决方案是使用Flux.fromIterable,如下面的示例所示:

public Flux<Integer> fromListToFlux(){
    List<Integer> intList = Arrays.asList(1,2,5,7);
    return Flux.fromIterable(intList);
}

Spring Boot版本为3.1.0:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.0</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

注意:这不是推荐的方法,因为当您在响应式管道之外工作时,它不会完全保持响应式,这里创建了一个列表。

英文:

Simple solution is to use Flux.fromIterable as shown in below example

public Flux&lt;Integer&gt; fromListToFlux(){
    List&lt;Integer&gt; intList = Arrays.asList(1,2,5,7);
    return Flux.fromIterable(intList);
}

Springboot version is 3.1.0

&lt;parent&gt;
    &lt;groupId&gt;org.springframework.boot&lt;/groupId&gt;
    &lt;artifactId&gt;spring-boot-starter-parent&lt;/artifactId&gt;
    &lt;version&gt;3.1.0&lt;/version&gt;
    &lt;relativePath/&gt; &lt;!-- lookup parent from repository --&gt;
&lt;/parent&gt;

Note: This is not recommended because it will not be completely reactive when you work out of the reactive pipeline, here creating a List.

huangapple
  • 本文由 发表于 2020年8月24日 15:53:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/63556833.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定