英文:
How do you implement heartbeat-like functionality using Reactor's Flux?
问题
以下是您要翻译的内容:
让我们想象有一个发出一些数据的Flux
:
Flux<Integer> payloads = Flux.range(0, 5)
.delayElements(Duration.ofSeconds(2));
这个Flux
只会每2秒发出一次0到4之间的整数。
任务是将另一个信号混合到这个Flux
中。另一个信号每隔N个时间单位就会发出一些内容。类似这样:
Flux<Integer> heartbeats = Flux.just(-1)
.repeat()
.delayElements(Duration.ofSeconds(1));
- 结果的
Flux
应该每2秒输出0到4的数字,同时每1秒输出-1。 - 此外,结果的
Flux
必须在payloads
Flux
完成时立即完成。 - 它应该传播两个
Flux
中任何一个发出的错误信号。
(请注意,上面描述的示例仅用于说明目的。实际发出数据的Flux
可能以不可预测的速率发出:有时快,有时慢,有时长时间不发出任何数据)。
我使用“开箱即用”的操作符所能实现的最佳效果是:
Flux<Integer> flux = payloads.mergeWith(heartbeats);
但这违反了要求2,因为合并结果只有在所有合并组件完成时才会完成,而heartbeats
永远不会完成。
(实际上,如果有一个标志在payloads
完成时设置为true,并且在heartbeats
上使用了repeat(BooleanSupplier)
变体而不是repeat()
,那么这可以工作,但这将延迟结果Flux
的完成,直到下一个心跳的时刻)。
尝试的下一个方法是编写自己的Publisher
实现,以满足我的任务要求,但我想避免这样做,因为正确实现Publisher
+Subscription
似乎是一项棘手的任务。
是否有更简单的方法?
英文:
Let's imagine there is a Flux
emitting some payloads:
Flux<Integer> payloads = Flux.range(0, 5)
.delayElements(Duration.ofSeconds(2));
This one just emits integers between 0 and 4, once per 2 seconds.
The task is to mix another signal into this Flux
. That another signal just emits something every N units of time. Something like this:
Flux<Integer> heartbeats = Flux.just(-1)
.repeat()
.delayElements(Duration.ofSeconds(1));
- The resulting
Flux
should output numbers from 0 to 4 once per 2 seconds, also number -1 each 1 second - Also, the resulting
Flux
must complete as soon aspayloads
Flux
completes - It should propagate any error signal that any of the two
Flux
es emit
(Please note that the example described above is just for illustration purposes. The real Flux
emitting payloads may emit at inpredictable rate: sometimes fast, sometimes slowly, sometimes not emitting at all for long periods of time).
The best I could achieve with the 'out-of-the-box' operators was
Flux<Integer> flux = payloads.mergeWith(heartbeats);
But this violates requirement 2 as the merge result only completes when all the merge components complete, and the heartbeats
does not ever complete.
(Actually, this could be made work if there was a flag that is set to true when the payloads
completes, and repeat(BooleanSupplier)
variant was used instead of repeat()
on the heartbeats
, but this would delay the completion of the resulting Flux
till the moment of the next heartbeat.)
The next thing to try is to write my own Publisher
implementation that would work in the way that is required for my task, but I'd like to avoid this as implementing a Publisher
+Subscription
correctly seems to be a tricky task.
Is there an easier way?
答案1
得分: 1
FluxSink API 可能能够帮助您实现您的需求。您可以通过 Flux::create 访问该 API,并使用 FluxSink
发射器将发射的信号发送到下游。
以下是 Kotlin 中的一个简单实现示例:
val flux = Flux.create<Int> { emitter ->
payloads.subscribe(
{ emitter.next(it) },
{ emitter.error(it) },
{ emitter.complete() },
)
heartbeats.subscribe(
{ emitter.next(it) },
{ emitter.error(it) },
)
}
请注意,在通用情况下,此实现将 payloads 和 heartbeats 转换为热的 Flux
流。
英文:
FluxSink API might be able to help to implement your requirements. You can access the API through Flux::create and use the FluxSink
emitter to send the emitted signals downstream.
Here is a simple implementation in Kotlin:
val flux = Flux.create<Int> { emitter ->
payloads.subscribe(
{ emitter.next(it) },
{ emitter.error(it) },
{ emitter.complete() },
)
heartbeats.subscribe(
{ emitter.next(it) },
{ emitter.error(it) },
)
}
Please note, that this implementation turns payloads and heartbeats to hot Flux
es in a generic case.
答案2
得分: 0
基于 @zokni 的答案,以下是我使用 Flux.create()
实现的内容:
Flux<Integer> flux = Flux.create(sink -> {
AtomicReference<Subscription> payloadSubscription = new AtomicReference<>();
AtomicReference<Subscription> heartbeatsSubscription = new AtomicReference<>();
payloads.subscribe(
t -> {
sink.next(t);
payloadSubscription.get().request(1);
},
sink::error,
() -> {
sink.complete();
heartbeatsSubscription.get().cancel();
},
subscription -> {
payloadSubscription.set(subscription);
subscription.request(1);
}
);
heartbeats.subscribe(
sink::next,
sink::error,
() -> {},
subscription -> {
heartbeatsSubscription.set(subscription);
subscription.request(Long.MAX_VALUE);
}
);
sink.onCancel(() -> {
payloadSubscription.get().cancel();
heartbeatsSubscription.get().cancel();
});
});
这个解决方案相当冗长,但具有以下便利的特性:
payloads
发布者的完成操作也会使合并的发布者完成- 在合并的发布者订阅被取消时,
payloads
和heartbeats
的订阅也会被取消 - 在
payloads
完成时,heartbeats
的订阅会被正确地取消 - 从
payloads
逐个元素地请求数据,而不是无限制地需求 - 对合并的发布者进行重复订阅(使用
Flux.repeat()
)会产生预期的结果
我仍然不确定是否可以从 heartbeats
那里请求 Long.MAX_VALUE
,也许在那里执行相同的操作是有意义的:在前一个元素被消耗后,逐个元素地请求另一个元素。
英文:
Building on the answer of @zokni, here is what I was able to achieve using Flux.create()
:
Flux<Integer> flux = Flux.create(sink -> {
AtomicReference<Subscription> payloadSubscription = new AtomicReference<>();
AtomicReference<Subscription> heartbeatsSubscription = new AtomicReference<>();
payloads.subscribe(
t -> {
sink.next(t);
payloadSubscription.get().request(1);
},
sink::error,
() -> {
sink.complete();
heartbeatsSubscription.get().cancel();
},
subscription -> {
payloadSubscription.set(subscription);
subscription.request(1);
}
);
heartbeats.subscribe(sink::next, sink::error, () -> {}, subscription -> {
heartbeatsSubscription.set(subscription);
subscription.request(Long.MAX_VALUE);
});
sink.onCancel(() -> {
payloadSubscription.get().cancel();
heartbeatsSubscription.get().cancel();
});
});
This solution is pretty verbose, but it has the following handy properties:
- The completion of the
payloads
publisher makes the merged publisher complete as well - Both
payloads
's andheartbeats
' subscriptions are cancelled on the cancellation of the merged publisher subscription heartbeats
' subscription is cancelled properly onpayloads
completion- Data is requested from
payloads
one by one element, instead of an unbounded demand - Repetitive subscription to the merged publisher (using
Flux.repeat()
) produce the expected results
I'm still not sure whether it's ok to ask for Long.MAX_VALUE
from heartbeats
, maybe it makes sense to do the same there: ask another element only after the previous was consumed, one by one.
答案3
得分: 0
这是一个仅使用现有操作符的解决方案:
Flux<Integer> payloadsShared = payloads.publish().refCount(2);
Flux<Integer> flux = Flux.merge(payloadsShared,
heartbeats.takeUntilOther(payloadsShared.ignoreElements()));
我使用 Flux.takeUntilOther(Publisher)
从心跳 Flux 中获取元素,直到 payloads Flux 发出 onComplete 信号为止。由于 takeUntilOther
在接收到第一个信号(onComplete 或 onNext)时完成,我使用 Flux.ignoreElements()
来丢弃 onNext 信号。
原始的 payloads Flux 仍然需要与这个修改过的心跳 Flux 合并。为了使该 Flux 能够被两个 Flux 流处理,我使用 Flux.publish().refCount(2)
。
英文:
Here is a solution with only out-of-the-box operators:
Flux<Integer> payloadsShared = payloads.publish().refCount(2);
Flux<Integer> flux = Flux.merge(payloadsShared,
heartbeats.takeUntilOther(payloadsShared.ignoreElements()));
I'm using Flux.takeUntilOther(Publisher)
to take elements from the heartbeat Flux until the payloads Flux emits an onComplete signal. Since takeUntilOther
completes on the first received signal (onComplete or onNext), I drop the onNext signals with Flux.ignoreElements()
.
The original payloads Flux still has to be merged to this modified heartbeat. To allow the Flux to be processed by two Flux pipelines, I use Flux.publish().refCount(2)
.
答案4
得分: -1
"seems to be a tricky task" - yes it is tricky unless you use the right tool for this. The solution you need is an actor with one output port and ability to delay, taken from my library df4j:
import org.df4j.core.port.OutFlow;
import org.df4j.core.dataflow.Actor;
class DelayActor extends Actor {
OutFlow<Integer> out = new OutFlow<>(this);
int period = 1000; // ms
int cnt = 0;
int maxValue = 4;
@Override
protected void runAction() throws Throwable {
out.onNext(-1);
if (cnt % 2 == 0) {
int value = cnt / 2;
out.onNext(value);
if (value == maxValue) {
complete();
out.onComplete();
return;
}
}
cnt++;
delay(period);
}
}
Port OutFlow out
implements org.reactivestreams.Publisher
and can accept all org.reactivestreams.Subscriber
s. If Flux
is needed, it can be obtained as Flux.from(actor.out)
.
英文:
"seems to be a tricky task" - yes it is tricky unless you use the right tool for this. The solution you need is an actor with one output port and ability to delay, taken from my library df4j:
import org.df4j.core.port.OutFlow;
import org.df4j.core.dataflow.Actor;
class DelayActor extends Actor {
OutFlow<Integer> out = new OutFlow<>(this) ;
int period = 1000; // ms
int cnt = 0;
int maxValue = 4;
@Override
protected void runAction() throws Throwable {
out.onNext(-1);
if (cnt % 2 == 0) {
int value = cnt / 2;
out.onNext(value);
if (value == maxValue) {
complete();
out.onComplete();
return;
}
}
cnt++;
delay(period);
}
}
Port OutFlow out
implements org.reactivestreams.Publisher
and can accept all org.reactivestreams.Subscriber
s. If Flux
is needed, it can be obtained as Flux.from(actor.out)
.
答案5
得分: -1
所以,我们希望将同一个订阅者订阅到两个不同的发布者。由于一些原因,这在响应式流的规范中是被禁止的。幸运的是,我的异步库DF4J实现了多种通信协议,其中我称之为ReverseFlow
,多个生产者可以向同一个消费者提供数据 - 就像许多线程可以异步地写入同一个BlockingQueue一样。
import java.time.Duration;
import org.df4j.core.communicator.AsyncArrayBlockingQueue;
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.df4j.core.port.OutChannel;
import org.junit.Test;
import reactor.core.publisher.Flux;
/** 将响应式流转换为ReverseFlow.Producer */
class Adapter<T> extends Actor {
public InpFlow<T> inp = new InpFlow<>(this);
public OutChannel<T> out = new OutChannel<>(this);
{start();}
@Override
protected void runAction() throws Throwable {
if (inp.isCompletedExceptionally()) {
out.onError(inp.getCompletionException());
} else if (inp.isCompleted()) {
out.onComplete();
} else {
out.onNext(inp.remove());
}
}
}
@Test
public void mergeTest() throws InterruptedException {
Flux<Integer> payloads = Flux.range(0, 5)
.delayElements(Duration.ofSeconds(2));
Adapter<Integer> adapter1 = new Adapter<>();
payloads.subscribe(adapter1.inp);
Flux<Integer> heartbeats = Flux.just(-1)
.repeat()
.delayElements(Duration.ofSeconds(1));
Adapter<Integer> adapter2 = new Adapter<>();
heartbeats.subscribe(adapter2.inp);
/* 我们在两端都使用具有异步接口的队列 */
AsyncArrayBlockingQueue<Integer> queue = new AsyncArrayBlockingQueue<>();
queue.feedFrom(adapter1.out);
queue.feedFrom(adapter2.out);
/* 在输出端,AsyncArrayBlockingQueue是一个普通的Publisher */
Flux<Integer> merged = Flux.from(queue);
merged.subscribe(System.out::println);
Thread.sleep(13000);
}
<dependencies>
<dependency>
<groupId>org.df4j</groupId>
<artifactId>df4j-core</artifactId>
<version>8.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.2.RELEASE</version>
</dependency>
</dependencies>
英文:
So, we'd like to subscribe same subscriber to two different publishers. For some reasons, it is prohibited by the specification of reactive streams. Luckily, my asynchronous library DF4J implements, among others, communication protocol which I named ReverseFlow
, where multiple producers can feed the same consumer - just like many threads can write to the same BlockingQueue, but in asynchronous fashion.
import java.time.Duration;
import org.df4j.core.communicator.AsyncArrayBlockingQueue;
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.df4j.core.port.OutChannel;
import org.junit.Test;
import reactor.core.publisher.Flux;
/** converts reactive stream into ReverseFlow.Producer */
class Adapter<T> extends Actor {
public InpFlow<T> inp = new InpFlow<>(this);
public OutChannel<T> out = new OutChannel<>(this);
{start();}
@Override
protected void runAction() throws Throwable {
if (inp.isCompletedExceptionally()) {
out.onError(inp.getCompletionException());
} else if (inp.isCompleted()) {
out.onComplete();
} else {
out.onNext(inp.remove());
}
}
}
@Test
public void mergeTest() throws InterruptedException {
Flux<Integer> payloads = Flux.range(0, 5)
.delayElements(Duration.ofSeconds(2));
Adapter<Integer> adapter1 = new Adapter<>();
payloads.subscribe(adapter1.inp);
Flux<Integer> heartbeats = Flux.just(-1)
.repeat()
.delayElements(Duration.ofSeconds(1));
Adapter<Integer> adapter2 = new Adapter<>();
heartbeats.subscribe(adapter2.inp);
/* we use a queue with asynchronous interfaces on both ends */
AsyncArrayBlockingQueue<Integer> queue = new AsyncArrayBlockingQueue<>();
queue.feedFrom(adapter1.out);
queue.feedFrom(adapter2.out);
/* on the output end, AsyncArrayBlockingQueue is an ordinary Publisher */
Flux<Integer> merged = Flux.from(queue);
merged.subscribe(System.out::println);
Thread.sleep(13000);
}
<dependencies>
<dependency>
<groupId>org.df4j</groupId>
<artifactId>df4j-core</artifactId>
<version>8.3</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.3.2.RELEASE</version>
</dependency>
</dependencies>
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论