英文:
How to convert List<T> to Flux<T> by using Reactor 3.x
问题
我有一个异步调用的Thrift接口:
public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addObserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}
@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId, callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites", e);
}
return future;
}
现在它返回一个CompletableFuture<List<Long>>
。然后我调用它通过使用Flux来进行一些处理。
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
// 不喜欢这种写法
List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
然而,我想要从getFavourites
方法中获得一个Flux<Long>
,并且我可以在getRecommend
方法中使用它。
或者,你可以推荐一个Flux API
,然后我可以将List<Long> recommendList
转换为 Flux<Long> recommendFlux
。
英文:
I have a Asyn call thrift interface:
public CompletableFuture<List<Long>> getFavourites(Long userId){
CompletableFuture<List<Long>> future = new CompletableFuture();
OctoThriftCallback callback = new OctoThriftCallback(thriftExecutor);
callback.addObserver(new OctoObserver() {
@Override
public void onSuccess(Object o) {
future.complete((List<Long>) o);
}
@Override
public void onFailure(Throwable throwable) {
future.completeExceptionally(throwable);
}
});
try {
recommendAsyncService.getFavorites(userId, callback);
} catch (TException e) {
log.error("OctoCall RecommendAsyncService.getFavorites", e);
}
return future;
}
Now it returns a CompletableFuture<List<Long>>. And then I call it to do some processor by using Flux.
public Flux<Product> getRecommend(Long userId) throws InterruptedException, ExecutionException, TimeoutException {
// do not like it
List<Long> recommendList = wrapper.getRecommend(userId).get(2, TimeUnit.SECONDS);
System.out.println(recommendList);
return Flux.fromIterable(recommendList)
.flatMap(id -> Mono.defer(() -> Mono.just(Product.builder()
.userId(userId)
.productId(id)
.productType((int) (Math.random()*100))
.build())))
.take(5)
.publishOn(mdpScheduler);
}
However, I want to get a Flux<Long> from getFavourites
method and I can use it in getRecommend
method.<br>
Or, you can recommend a Flux API
,and I can convert the List<Long> recommendList
to Flux<Long> recommendFlux
.
答案1
得分: 4
要将CompletableFuture<List<T>>
转换为Flux<T>
,您可以使用Mono#fromFuture
和Mono#flatMapMany
:
var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
Flux<Long> flux = Mono.fromFuture(() -> future).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
在回调中异步接收的List<T>
也可以被转换为Flux<T>
,而无需使用CompletableFuture
。
您可以直接使用Mono#create
和Mono#flatMapMany
:
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
}).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
或者只需使用Flux#create
在一次传递中进行多次发射:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
});
flux.subscribe(System.out::println);
英文:
To convert a CompletableFuture<List<T>>
into a Flux<T>
you can use Mono#fromFuture
with Mono#flatMapMany
:
var future = new CompletableFuture<List<Long>>();
future.completeAsync(() -> List.of(1L, 2L, 3L, 4L, 5L),
CompletableFuture.delayedExecutor(3, TimeUnit.SECONDS));
Flux<Long> flux = Mono.fromFuture(future).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
List<T>
received asynchronously in a callback can be also converted into a Flux<T>
without using a CompletableFuture
.
You can directly use Mono#create
with Mono#flatMapMany
:
Flux<Long> flux = Mono.<List<Long>>create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
sink.success(list);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
}).flatMapMany(Flux::fromIterable);
flux.subscribe(System.out::println);
Or simply using Flux#create
with multiple emissions in one pass:
Flux<Long> flux = Flux.create(sink -> {
Callback<List<Long>> callback = new Callback<List<Long>>() {
@Override
public void onResult(List<Long> list) {
list.forEach(sink::next);
}
@Override
public void onError(Exception e) {
sink.error(e);
}
};
client.call("query", callback);
});
flux.subscribe(System.out::println);
答案2
得分: 4
简单的解决方案是使用Flux.fromIterable
,如下面的示例所示:
public Flux<Integer> fromListToFlux(){
List<Integer> intList = Arrays.asList(1,2,5,7);
return Flux.fromIterable(intList);
}
Spring Boot版本为3.1.0:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
注意:这不是推荐的方法,因为当您在响应式管道之外工作时,它不会完全保持响应式,这里创建了一个列表。
英文:
Simple solution is to use Flux.fromIterable
as shown in below example
public Flux<Integer> fromListToFlux(){
List<Integer> intList = Arrays.asList(1,2,5,7);
return Flux.fromIterable(intList);
}
Springboot version is 3.1.0
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
Note: This is not recommended because it will not be completely reactive when you work out of the reactive pipeline, here creating a List.
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论