如何使用 Reactor 的 Flux 实现类似心跳的功能?

huangapple go评论72阅读模式
英文:

How do you implement heartbeat-like functionality using Reactor's Flux?

问题

以下是您要翻译的内容:

让我们想象有一个发出一些数据的Flux

Flux<Integer> payloads = Flux.range(0, 5)
        .delayElements(Duration.ofSeconds(2));

这个Flux只会每2秒发出一次0到4之间的整数。

任务是将另一个信号混合到这个Flux中。另一个信号每隔N个时间单位就会发出一些内容。类似这样:

Flux<Integer> heartbeats = Flux.just(-1)
        .repeat()
        .delayElements(Duration.ofSeconds(1));
  1. 结果的Flux应该每2秒输出0到4的数字,同时每1秒输出-1。
  2. 此外,结果的Flux必须在payloads Flux完成时立即完成。
  3. 它应该传播两个Flux中任何一个发出的错误信号。

(请注意,上面描述的示例仅用于说明目的。实际发出数据的Flux可能以不可预测的速率发出:有时快,有时慢,有时长时间不发出任何数据)。

我使用“开箱即用”的操作符所能实现的最佳效果是:

Flux<Integer> flux = payloads.mergeWith(heartbeats);

但这违反了要求2,因为合并结果只有在所有合并组件完成时才会完成,而heartbeats永远不会完成。

(实际上,如果有一个标志在payloads完成时设置为true,并且在heartbeats上使用了repeat(BooleanSupplier)变体而不是repeat(),那么这可以工作,但这将延迟结果Flux的完成,直到下一个心跳的时刻)。

尝试的下一个方法是编写自己的Publisher实现,以满足我的任务要求,但我想避免这样做,因为正确实现Publisher+Subscription似乎是一项棘手的任务。

是否有更简单的方法?

英文:

Let's imagine there is a Flux emitting some payloads:

Flux&lt;Integer&gt; payloads = Flux.range(0, 5)
        .delayElements(Duration.ofSeconds(2));

This one just emits integers between 0 and 4, once per 2 seconds.

The task is to mix another signal into this Flux. That another signal just emits something every N units of time. Something like this:

Flux&lt;Integer&gt; heartbeats = Flux.just(-1)
        .repeat()
        .delayElements(Duration.ofSeconds(1));
  1. The resulting Flux should output numbers from 0 to 4 once per 2 seconds, also number -1 each 1 second
  2. Also, the resulting Flux must complete as soon as payloads Flux completes
  3. It should propagate any error signal that any of the two Fluxes emit

(Please note that the example described above is just for illustration purposes. The real Flux emitting payloads may emit at inpredictable rate: sometimes fast, sometimes slowly, sometimes not emitting at all for long periods of time).

The best I could achieve with the 'out-of-the-box' operators was

Flux&lt;Integer&gt; flux = payloads.mergeWith(heartbeats);

But this violates requirement 2 as the merge result only completes when all the merge components complete, and the heartbeats does not ever complete.

(Actually, this could be made work if there was a flag that is set to true when the payloads completes, and repeat(BooleanSupplier) variant was used instead of repeat() on the heartbeats, but this would delay the completion of the resulting Flux till the moment of the next heartbeat.)

The next thing to try is to write my own Publisher implementation that would work in the way that is required for my task, but I'd like to avoid this as implementing a Publisher+Subscription correctly seems to be a tricky task.

Is there an easier way?

答案1

得分: 1

FluxSink API 可能能够帮助您实现您的需求。您可以通过 Flux::create 访问该 API,并使用 FluxSink 发射器将发射的信号发送到下游。

以下是 Kotlin 中的一个简单实现示例:

val flux = Flux.create<Int> { emitter ->
    payloads.subscribe(
        { emitter.next(it) },
        { emitter.error(it) },
        { emitter.complete() },
    )

    heartbeats.subscribe(
        { emitter.next(it) },
        { emitter.error(it) },
    )
}

请注意,在通用情况下,此实现将 payloadsheartbeats 转换为热的 Flux 流。

英文:

FluxSink API might be able to help to implement your requirements. You can access the API through Flux::create and use the FluxSink emitter to send the emitted signals downstream.

Here is a simple implementation in Kotlin:

val flux = Flux.create&lt;Int&gt; { emitter -&gt;
    payloads.subscribe(
        { emitter.next(it) },
        { emitter.error(it) },
        { emitter.complete() },
    )

    heartbeats.subscribe(
        { emitter.next(it) },
        { emitter.error(it) },
    )
}

Please note, that this implementation turns payloads and heartbeats to hot Fluxes in a generic case.

答案2

得分: 0

基于 @zokni 的答案,以下是我使用 Flux.create() 实现的内容:

Flux<Integer> flux = Flux.create(sink -> {
    AtomicReference<Subscription> payloadSubscription = new AtomicReference<>();
    AtomicReference<Subscription> heartbeatsSubscription = new AtomicReference<>();

    payloads.subscribe(
        t -> {
            sink.next(t);
            payloadSubscription.get().request(1);
        },
        sink::error,
        () -> {
            sink.complete();
            heartbeatsSubscription.get().cancel();
        },
        subscription -> {
            payloadSubscription.set(subscription);
            subscription.request(1);
        }
    );

    heartbeats.subscribe(
        sink::next,
        sink::error,
        () -> {},
        subscription -> {
            heartbeatsSubscription.set(subscription);
            subscription.request(Long.MAX_VALUE);
        }
    );

    sink.onCancel(() -> {
        payloadSubscription.get().cancel();
        heartbeatsSubscription.get().cancel();
    });
});

这个解决方案相当冗长,但具有以下便利的特性:

  1. payloads 发布者的完成操作也会使合并的发布者完成
  2. 在合并的发布者订阅被取消时,payloadsheartbeats 的订阅也会被取消
  3. payloads 完成时,heartbeats 的订阅会被正确地取消
  4. payloads 逐个元素地请求数据,而不是无限制地需求
  5. 对合并的发布者进行重复订阅(使用 Flux.repeat())会产生预期的结果

我仍然不确定是否可以从 heartbeats 那里请求 Long.MAX_VALUE,也许在那里执行相同的操作是有意义的:在前一个元素被消耗后,逐个元素地请求另一个元素。

英文:

Building on the answer of @zokni, here is what I was able to achieve using Flux.create():

Flux&lt;Integer&gt; flux = Flux.create(sink -&gt; {
    AtomicReference&lt;Subscription&gt; payloadSubscription = new AtomicReference&lt;&gt;();
    AtomicReference&lt;Subscription&gt; heartbeatsSubscription = new AtomicReference&lt;&gt;();

    payloads.subscribe(
            t -&gt; {
                sink.next(t);
                payloadSubscription.get().request(1);
            },
            sink::error,
            () -&gt; {
                sink.complete();
                heartbeatsSubscription.get().cancel();
            },
            subscription -&gt; {
                payloadSubscription.set(subscription);
                subscription.request(1);
            }
    );
    heartbeats.subscribe(sink::next, sink::error, () -&gt; {}, subscription -&gt; {
        heartbeatsSubscription.set(subscription);
        subscription.request(Long.MAX_VALUE);
    });

    sink.onCancel(() -&gt; {
        payloadSubscription.get().cancel();
        heartbeatsSubscription.get().cancel();
    });
});

This solution is pretty verbose, but it has the following handy properties:

  1. The completion of the payloads publisher makes the merged publisher complete as well
  2. Both payloads's and heartbeats' subscriptions are cancelled on the cancellation of the merged publisher subscription
  3. heartbeats' subscription is cancelled properly on payloads completion
  4. Data is requested from payloads one by one element, instead of an unbounded demand
  5. Repetitive subscription to the merged publisher (using Flux.repeat()) produce the expected results

I'm still not sure whether it's ok to ask for Long.MAX_VALUE from heartbeats, maybe it makes sense to do the same there: ask another element only after the previous was consumed, one by one.

答案3

得分: 0

这是一个仅使用现有操作符的解决方案:

Flux<Integer> payloadsShared = payloads.publish().refCount(2);
Flux<Integer> flux = Flux.merge(payloadsShared,
        heartbeats.takeUntilOther(payloadsShared.ignoreElements()));

我使用 Flux.takeUntilOther(Publisher) 从心跳 Flux 中获取元素,直到 payloads Flux 发出 onComplete 信号为止。由于 takeUntilOther 在接收到第一个信号(onComplete 或 onNext)时完成,我使用 Flux.ignoreElements() 来丢弃 onNext 信号。

原始的 payloads Flux 仍然需要与这个修改过的心跳 Flux 合并。为了使该 Flux 能够被两个 Flux 流处理,我使用 Flux.publish().refCount(2)

英文:

Here is a solution with only out-of-the-box operators:

Flux&lt;Integer&gt; payloadsShared = payloads.publish().refCount(2);
Flux&lt;Integer&gt; flux = Flux.merge(payloadsShared,
        heartbeats.takeUntilOther(payloadsShared.ignoreElements()));

I'm using Flux.takeUntilOther(Publisher) to take elements from the heartbeat Flux until the payloads Flux emits an onComplete signal. Since takeUntilOther completes on the first received signal (onComplete or onNext), I drop the onNext signals with Flux.ignoreElements().

The original payloads Flux still has to be merged to this modified heartbeat. To allow the Flux to be processed by two Flux pipelines, I use Flux.publish().refCount(2).

答案4

得分: -1

"seems to be a tricky task" - yes it is tricky unless you use the right tool for this. The solution you need is an actor with one output port and ability to delay, taken from my library df4j:

import org.df4j.core.port.OutFlow;
import org.df4j.core.dataflow.Actor;

class DelayActor extends Actor {
    OutFlow<Integer> out = new OutFlow<>(this);
    int period = 1000; // ms
    int cnt = 0;
    int maxValue = 4;

    @Override
    protected void runAction() throws Throwable {
        out.onNext(-1);
        if (cnt % 2 == 0) {
            int value = cnt / 2;
            out.onNext(value);
            if (value == maxValue) {
                complete();
                out.onComplete();
                return;
            }
        }
        cnt++;
        delay(period);
    }
}

Port OutFlow out implements org.reactivestreams.Publisher and can accept all org.reactivestreams.Subscribers. If Flux is needed, it can be obtained as Flux.from(actor.out).

英文:

"seems to be a tricky task" - yes it is tricky unless you use the right tool for this. The solution you need is an actor with one output port and ability to delay, taken from my library df4j:

import org.df4j.core.port.OutFlow;
import org.df4j.core.dataflow.Actor;

class DelayActor extends Actor {
    OutFlow&lt;Integer&gt; out = new OutFlow&lt;&gt;(this) ;
    int period = 1000; // ms
    int cnt = 0;
    int maxValue = 4;

    @Override
    protected void runAction() throws Throwable {
        out.onNext(-1);
        if (cnt % 2 == 0) {
            int value = cnt / 2;
            out.onNext(value);
            if (value == maxValue) {
                complete();
                out.onComplete();
                return;
            }
        }
        cnt++;
        delay(period);
    }
}

Port OutFlow out implements org.reactivestreams.Publisher and can accept all org.reactivestreams.Subscribers. If Flux is needed, it can be obtained as Flux.from(actor.out).

答案5

得分: -1

所以,我们希望将同一个订阅者订阅到两个不同的发布者。由于一些原因,这在响应式流的规范中是被禁止的。幸运的是,我的异步库DF4J实现了多种通信协议,其中我称之为ReverseFlow,多个生产者可以向同一个消费者提供数据 - 就像许多线程可以异步地写入同一个BlockingQueue一样。

import java.time.Duration;
import org.df4j.core.communicator.AsyncArrayBlockingQueue;
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.df4j.core.port.OutChannel;
import org.junit.Test;
import reactor.core.publisher.Flux;

/** 将响应式流转换为ReverseFlow.Producer */
class Adapter<T> extends Actor {
    public InpFlow<T> inp = new InpFlow<>(this);
    public OutChannel<T> out = new OutChannel<>(this);

    {start();}

    @Override
    protected void runAction() throws Throwable {
        if (inp.isCompletedExceptionally()) {
            out.onError(inp.getCompletionException());
        } else if (inp.isCompleted()) {
            out.onComplete();
        } else {
            out.onNext(inp.remove());
        }
    }
}

@Test
public void mergeTest() throws InterruptedException {
    Flux<Integer> payloads = Flux.range(0, 5)
        .delayElements(Duration.ofSeconds(2));
    Adapter<Integer> adapter1 = new Adapter<>();
    payloads.subscribe(adapter1.inp);
    Flux<Integer> heartbeats = Flux.just(-1)
        .repeat()
        .delayElements(Duration.ofSeconds(1));
    Adapter<Integer> adapter2 = new Adapter<>();
    heartbeats.subscribe(adapter2.inp);
    /* 我们在两端都使用具有异步接口的队列 */
    AsyncArrayBlockingQueue<Integer> queue = new AsyncArrayBlockingQueue<>();
    queue.feedFrom(adapter1.out);
    queue.feedFrom(adapter2.out);
    /* 在输出端,AsyncArrayBlockingQueue是一个普通的Publisher */
    Flux<Integer> merged = Flux.from(queue);
    merged.subscribe(System.out::println);
    Thread.sleep(13000);
}
<dependencies>
    <dependency>
        <groupId>org.df4j</groupId>
        <artifactId>df4j-core</artifactId>
        <version>8.3</version>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
        <version>3.3.2.RELEASE</version>
    </dependency>
</dependencies>
英文:

So, we'd like to subscribe same subscriber to two different publishers. For some reasons, it is prohibited by the specification of reactive streams. Luckily, my asynchronous library DF4J implements, among others, communication protocol which I named ReverseFlow, where multiple producers can feed the same consumer - just like many threads can write to the same BlockingQueue, but in asynchronous fashion.

import java.time.Duration;
import org.df4j.core.communicator.AsyncArrayBlockingQueue;
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.df4j.core.port.OutChannel;
import org.junit.Test;
import reactor.core.publisher.Flux;
/** converts reactive stream into ReverseFlow.Producer */
class Adapter&lt;T&gt; extends Actor {
public InpFlow&lt;T&gt; inp = new InpFlow&lt;&gt;(this);
public OutChannel&lt;T&gt; out = new OutChannel&lt;&gt;(this);
{start();}
@Override
protected void runAction() throws Throwable {
if (inp.isCompletedExceptionally()) {
out.onError(inp.getCompletionException());
} else if (inp.isCompleted()) {
out.onComplete();
} else {
out.onNext(inp.remove());
}
}
}
@Test
public void mergeTest() throws InterruptedException {
Flux&lt;Integer&gt; payloads = Flux.range(0, 5)
.delayElements(Duration.ofSeconds(2));
Adapter&lt;Integer&gt; adapter1 = new Adapter&lt;&gt;();
payloads.subscribe(adapter1.inp);
Flux&lt;Integer&gt; heartbeats = Flux.just(-1)
.repeat()
.delayElements(Duration.ofSeconds(1));
Adapter&lt;Integer&gt; adapter2 = new Adapter&lt;&gt;();
heartbeats.subscribe(adapter2.inp);
/* we use a queue with asynchronous interfaces on both ends */
AsyncArrayBlockingQueue&lt;Integer&gt; queue = new AsyncArrayBlockingQueue&lt;&gt;();
queue.feedFrom(adapter1.out);
queue.feedFrom(adapter2.out);
/* on the output end, AsyncArrayBlockingQueue is an ordinary Publisher */
Flux&lt;Integer&gt; merged = Flux.from(queue);
merged.subscribe(System.out::println);
Thread.sleep(13000);
}
&lt;dependencies&gt;
&lt;dependency&gt;
&lt;groupId&gt;org.df4j&lt;/groupId&gt;
&lt;artifactId&gt;df4j-core&lt;/artifactId&gt;
&lt;version&gt;8.3&lt;/version&gt;
&lt;/dependency&gt;
&lt;dependency&gt;
&lt;groupId&gt;io.projectreactor&lt;/groupId&gt;
&lt;artifactId&gt;reactor-core&lt;/artifactId&gt;
&lt;version&gt;3.3.2.RELEASE&lt;/version&gt;
&lt;/dependency&gt;
&lt;/dependencies&gt;

huangapple
  • 本文由 发表于 2020年10月11日 20:00:02
  • 转载请务必保留本文链接:https://go.coder-hub.com/64303764.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定