英文:
Add a side-effect to a Flux after each element is consumed?
问题
这是一个简化的示例,但它说明了要点。
假设我有一个定义如下的方法:
Flux<String> generateFlux() {
return Flux.just("hello", "world"); // (S)
}
是否有一种方法可以使 S
例如在元素被消耗之后打印出一些东西,而不需要 flux 的订阅者做任何事情或了解任何事情?
例如,我想修改 S
,使得这个:
generateFlux().doOnNext(System.out::println).block()
实际上在控制台上打印出这个:
hello
consumed
world
consumed
是否可以使用 Reactor 3.3 实现这个目标,如果可以,应该如何做?
更新
我猜我的问题不太清楚,所以我会添加一些更多的细节。我的问题的要点是,我希望这部分保持_不变_:
generateFlux().doOnNext(System.out::println).block() // A
因此,我想修改 S
,即 Flux.just("hello", "world")
,以适应我的要求,而不是 A
。我知道你可以向 A
添加 subscribe
和 doOnNext
等,但这_不是_我的问题。我想知道是否有一种方法可以修改 S
,以便在没有对 A
进行任何更改的情况下,当订阅者/消费者处理所有项目时,将会打印出这个:
hello
consumed
world
consumed
也就是说,S
在第一个元素被消耗("hello")后以某种方式接收到一个信号,然后它可以执行 doOnNext
(或其他操作),在这种情况下打印 "consumed"。当第二个元素被消耗("world")时,它应该再次打印 "consumed"。
是否有可能实现这个?
英文:
This is a simplified example but it illustrates the point.
Let's say I have a method defined like this:
Flux<String> generateFlux() {
return Flux.just("hello", "world"); // (S)
}
Is there a way to make S
for example print something <i>after</i> an element has been consumed without the subscriber of the flux having to do or know anything about it?
For example, I'd to modify S
so that this:
generateFlux().doOnNext(System.out::println).block()
actually prints out this to the console:
hello
consumed
world
consumed
Is this possible to do this using Reactor 3.3 and if so how?
Update
I guess my question was unclear so I'll add some more details. The gist of my question is that I want this to be unchanged:
generateFlux().doOnNext(System.out::println).block() // A
So I want to modify S
, i.e. Flux.just("hello", "world")
to accommodate what I'm asking, not A
. I know you can add subscribe
and doOnNext
etc to A
, but this is not what I'm asking. I'm asking if there's a way to modify S
, so that without any changes made to A
, this will be printed when the subscriber/consumer of A
has processed all items:
hello
consumed
world
consumed
I.e. that S
somehow receives a signal that the first element has been consumed ("hello"), and then it can perform a doOnNext
(or whatever) and in this case print "consumer". When the second element has been consumed ("world") then it should print "consumed" again.
Is this possible?
答案1
得分: 1
A Flux is a publisher and the correct way to consume data from a flux is using subscribe.
So
generateFlux.subscribe(word -> {
System.out.println(word);
System.out.println("consumed" + word);
});
using subscribe with passing just a Consumer as a parameter you will override what the subscriber will do on next.
Basically behind will be created a Subscriber with having those 2 prints as a behaviour.
How Flux works you need to understand this 2 interfaces
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
英文:
A Flux is a publisher and the correct way to consume data from a flux is using subscribe.
So
generateFlux.subscribe(word -> {
System.out.println(word);
System.out.println("consumed" + word);
});
using subscribe with passing just a Consumer as a parameter you will override what the subscriber will do on next.
Basically behind will be created a Subscriber with having those 2 prints as a behaviour.
How Flux works you need to understand this 2 interfaces
public interface Publisher<T> {
void subscribe(Subscriber<? super T> var1);
}
public interface Subscriber<T> {
void onSubscribe(Subscription var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}
答案2
得分: 0
你可以使用doOnEach
在不修改发布者(错误和完成也不修改)的情况下,为给定流中的每个元素添加副作用。
Flux.just("hello", "world")
.doOnEach(item -> System.out.println(item))
.blockLast();
还有许多重载版本用于不同的生命周期,如doOnError
,doOnSuccess
,doOnNext
...
有关Flux
的文档,您可以在这里找到所有操作符和描述:
更新:
根据您的更新,您可能考虑到横切关注点。您可以使用Hooks
在组装时修改响应式流操作符。
英文:
You can use doOnEach
to add side effects to every element consumed in the given stream without modify the publisher (error and completion too).
Flux.just("hello", "world")
.doOnEach(item -> System.out.println(item))
.blockLast();
Also there are lot of overloads for the different life cycles, like doOnError, doOnSuccess, doOnNext...
Documentation of Flux, you can find there all the operators and descriptions to those.
Update:
According to your update, you may think to the cross-cuting concern.
You can use Hooks
to modify the reactive stream operators on assembling time.
答案3
得分: 0
你可以使用doOnNext回调来拦截发布者推送下一个元素的操作,但请使用subscribe而不是block。
generateFlux()
.doOnNext(next -> System.out.println("Next Element pushed by publisher" + next))
.subscribe();
你可以使用重载了所有onNext、onError和onCompleted行为的subscribe签名。Flux中有多种重载的subscribe方法。
英文:
You can use doOnNext callback to intercept the action of the publisher pushing the next element. but use subscribe instead of block
generateFlux()
.doOnNext(next => System.out.println("Next Element pushed by publisher" + next))
.subscribe();
You can use the subscribe signature that overrides all behaviour of onNext, onError & onCompleted. subscribe method is overloaded in Flux
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论