在每个元素被消耗后向 Flux 添加副作用?

huangapple go评论43阅读模式
英文:

Add a side-effect to a Flux after each element is consumed?

问题

这是一个简化的示例,但它说明了要点。

假设我有一个定义如下的方法:

Flux<String> generateFlux() {
  return Flux.just("hello", "world"); // (S)
}

是否有一种方法可以使 S 例如在元素被消耗之后打印出一些东西,而不需要 flux 的订阅者做任何事情或了解任何事情?

例如,我想修改 S,使得这个:

generateFlux().doOnNext(System.out::println).block()

实际上在控制台上打印出这个:

hello
consumed
world
consumed

是否可以使用 Reactor 3.3 实现这个目标,如果可以,应该如何做?

更新

我猜我的问题不太清楚,所以我会添加一些更多的细节。我的问题的要点是,我希望这部分保持_不变_:

generateFlux().doOnNext(System.out::println).block() // A

因此,我想修改 S,即 Flux.just("hello", "world"),以适应我的要求,而不是 A。我知道你可以向 A 添加 subscribedoOnNext 等,但这_不是_我的问题。我想知道是否有一种方法可以修改 S,以便在没有对 A 进行任何更改的情况下,当订阅者/消费者处理所有项目时,将会打印出这个:

hello
consumed
world
consumed

也就是说,S 在第一个元素被消耗("hello")后以某种方式接收到一个信号,然后它可以执行 doOnNext(或其他操作),在这种情况下打印 "consumed"。当第二个元素被消耗("world")时,它应该再次打印 "consumed"。

是否有可能实现这个?

英文:

This is a simplified example but it illustrates the point.

Let's say I have a method defined like this:

Flux<String> generateFlux() {
  return Flux.just("hello", "world"); // (S)
}

Is there a way to make S for example print something <i>after</i> an element has been consumed without the subscriber of the flux having to do or know anything about it?

For example, I'd to modify S so that this:

generateFlux().doOnNext(System.out::println).block()

actually prints out this to the console:

hello
consumed
world
consumed

Is this possible to do this using Reactor 3.3 and if so how?

Update

I guess my question was unclear so I'll add some more details. The gist of my question is that I want this to be unchanged:

generateFlux().doOnNext(System.out::println).block() // A

So I want to modify S, i.e. Flux.just(&quot;hello&quot;, &quot;world&quot;) to accommodate what I'm asking, not A. I know you can add subscribe and doOnNext etc to A, but this is not what I'm asking. I'm asking if there's a way to modify S, so that without any changes made to A, this will be printed when the subscriber/consumer of A has processed all items:

hello
consumed
world
consumed

I.e. that S somehow receives a signal that the first element has been consumed ("hello"), and then it can perform a doOnNext (or whatever) and in this case print "consumer". When the second element has been consumed ("world") then it should print "consumed" again.

Is this possible?

答案1

得分: 1

A Flux is a publisher and the correct way to consume data from a flux is using subscribe.
So

 generateFlux.subscribe(word -> {
      System.out.println(word);
      System.out.println("consumed" + word);
});

using subscribe with passing just a Consumer as a parameter you will override what the subscriber will do on next.
Basically behind will be created a Subscriber with having those 2 prints as a behaviour.
How Flux works you need to understand this 2 interfaces

   public interface Publisher<T> {
       void subscribe(Subscriber<? super T> var1);
   }

   public interface Subscriber<T> {
       void onSubscribe(Subscription var1);
 
       void onNext(T var1);
 
       void onError(Throwable var1);

       void onComplete();
}
英文:

A Flux is a publisher and the correct way to consume data from a flux is using subscribe.
So

 generateFlux.subscribe(word -&gt; {
      System.out.println(word);
      System.out.println(&quot;consumed&quot; + word);
});

using subscribe with passing just a Consumer as a parameter you will override what the subscriber will do on next.
Basically behind will be created a Subscriber with having those 2 prints as a behaviour.
How Flux works you need to understand this 2 interfaces

   public interface Publisher&lt;T&gt; {
       void subscribe(Subscriber&lt;? super T&gt; var1);
   }

   public interface Subscriber&lt;T&gt; {
       void onSubscribe(Subscription var1);
 
       void onNext(T var1);
 
       void onError(Throwable var1);

       void onComplete();
}

答案2

得分: 0

你可以使用doOnEach在不修改发布者(错误和完成也不修改)的情况下,为给定流中的每个元素添加副作用。

Flux.just("hello", "world")
    .doOnEach(item -> System.out.println(item))
    .blockLast();

还有许多重载版本用于不同的生命周期,如doOnErrordoOnSuccessdoOnNext...

有关Flux的文档,您可以在这里找到所有操作符和描述:

Flux文档

更新:

根据您的更新,您可能考虑到横切关注点。您可以使用Hooks在组装时修改响应式流操作符。

Hooks文档

英文:

You can use doOnEach to add side effects to every element consumed in the given stream without modify the publisher (error and completion too).

  Flux.just(&quot;hello&quot;, &quot;world&quot;)
                .doOnEach(item -&gt; System.out.println(item))
                .blockLast();

doOnEach Documentation

Also there are lot of overloads for the different life cycles, like doOnError, doOnSuccess, doOnNext...

Documentation of Flux, you can find there all the operators and descriptions to those.

Update:

According to your update, you may think to the cross-cuting concern.
You can use Hooks to modify the reactive stream operators on assembling time.

Hooks Doc

答案3

得分: 0

你可以使用doOnNext回调来拦截发布者推送下一个元素的操作,但请使用subscribe而不是block。

generateFlux()
    .doOnNext(next -> System.out.println("Next Element pushed by publisher" + next))
    .subscribe();

你可以使用重载了所有onNext、onError和onCompleted行为的subscribe签名。Flux中有多种重载的subscribe方法。

英文:

You can use doOnNext callback to intercept the action of the publisher pushing the next element. but use subscribe instead of block

generateFlux()
.doOnNext(next =&gt; System.out.println(&quot;Next Element pushed by publisher&quot; + next))
.subscribe();

You can use the subscribe signature that overrides all behaviour of onNext, onError & onCompleted. subscribe method is overloaded in Flux

huangapple
  • 本文由 发表于 2020年8月1日 17:33:14
  • 转载请务必保留本文链接:https://go.coder-hub.com/63203737.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定