英文:
Testing subscription with repeat()
问题
以下是翻译好的内容:
我想要测试一个使用repeat()
操作符在延迟的单元上并订阅结果的函数。在测试中,我使用TestPublisher
来模拟来自单元的新值。
在一个非常简化的形式下,代码如下所示:
package de.cronos.mad.messages.backend;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
public class RepeatTest {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
@Test
public void repeatTest() {
TestPublisher<String> testPublisher = TestPublisher.create();
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(testPublisher::mono);
testPublisher.emit("Hello");
testPublisher.emit("World");
}
}
在将“Hello”记录到标准输出后,执行挂起。我认为我理解了这是为什么会发生:emit(…)
调用是从主线程发生的,并且从那里“驱动”了订阅。
我不知道的是如何修改这个测试以使其完成,即不挂起?
英文:
I want to test a function that uses the repeat()
operator on a deferred mono and subscribes to the result. In the test, I use the TestPublisher
to simulate new values from the mono.
In a very simplified form, it looks like this:
package de.cronos.mad.messages.backend;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;
public class RepeatTest {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
@Test
public void repeatTest() {
TestPublisher<String> testPublisher = TestPublisher.create();
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(testPublisher::mono);
testPublisher.emit("Hello");
testPublisher.emit("World");
}
}
The execution hangs after logging "Hello" to stdout. I think I understand why this happens: The emit(…)
call happens from the main thread and "drives" the subscription from there.
What I do not know is how to modify this test so that it completes, i.e. does not hang?
答案1
得分: 1
注解:
-
使用无参数的
.repeat()
操作符表示无限重复。一旦原始订阅成功完成,它立即启动新的订阅。终止它的两种方法是:-
限制迭代次数,例如
.repeat(10)
,由 vins 建议; -
通过发送错误信号中止序列,例如使用
Mono.error(Throwable)
或直接抛出异常。
-
-
调用
testPublisher::mono
声明此TestPublisher
实例遵循Mono
合约,即仅发送单个值。因此,调用
.emit()
两次或使用两个参数调用它都不会有帮助。额外的值将被忽略。 -
TestPublisher的javadoc说,
> TestPublisher通常是热的,...将第一个终止信号重放给后续订阅者。
这意味着后加入的订阅者立即接收终止信号。由
repeat()
操作符创建的订阅立即接收重放的终止信号,从而导致repeat()
在紧密循环中重新订阅。
我认为一个可能的解决方案是使用Mono.defer()
或Mono.fromSupplier()
为.repeat()
创建的每个订阅创建一个新值。例如:
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
public class So63117029 {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
public static void main(String[] args) {
LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(() -> Mono.fromSupplier(data::remove));
}
}
当队列中没有更多数据时,方法Queue.remove()
会抛出NoSuchElementException
。
英文:
Notes:
-
The
.repeat()
operator when used without arguments means to repeat indefinitely. It initiates a new subscription immediately once the original one completes successfully. Two ways to terminate it are:-
Limit the count of iterations, e.g. as
.repeat(10)
, as suggested by vins; -
Abort the sequence by sending an error signal, e.g. with
Mono.error(Throwable)
or just by throwing an exception.
-
-
Calling
testPublisher::mono
declares that this instance ofTestPublisher
adheres to aMono
contract, i.e. only sends a single value.Thus neither calling
.emit()
twice, nor calling it with two arguments will help. Extra values will be ignored. -
The javadoc for TestPublisher says,
> TestPublisher are generally hot, [...] replaying the first termination signal to subsequent subscribers.
It means that late subscribers immediately receive the termination signal. The subscriptions created by
repeat()
operator immediately receive the replayed termination signal, which in turn causesrepeat()
to resubscribe, in a tight loop.
I think that a possible solution is to use Mono.defer()
or Mono.fromSupplier()
to create a new value for each subscription created by .repeat()
. E.g.:
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import reactor.core.publisher.Mono;
public class So63117029 {
private static class TestSubject {
public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
}
}
public static void main(String[] args) {
LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));
TestSubject testSubject = new TestSubject();
testSubject.logMonoValues(() -> Mono.fromSupplier(data::remove));
}
}
The method Queue.remove()
throws a NoSuchElementException
when there is no more data in the queue.
答案2
得分: 0
The TestPublisher emit
method accepts an array of values. Once emitted, it closes the source. So you cannot emit one by one. Instead, pass all the values like this.
testPublisher.emit("Hello", "world");
// from TestPublisher emit
public final TestPublisher<T> emit(T... values) {
Objects.requireNonNull(values, "values array is null, please cast to T if null T required");
Object[] var2 = values;
int var3 = values.length;
for(int var4 = 0; var4 < var3; ++var4) {
T t = var2[var4];
this.next(t);
}
return this.complete();
}
Regarding the hanging behavior, the repeat
here seems to cause the issue as it indefinitely tries to reconnect the source Mono again and again.
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
Just change it like this to understand the behavior.
Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);
英文:
The TestPublisher emit
method accepts array of values. Once emitted, it closes the source. So you can not emit one by one. Instead pass all the values like this.
testPublisher.emit("Hello", "world");
// from TestPublisher emit
public final TestPublisher<T> emit(T... values) {
Objects.requireNonNull(values, "values array is null, please cast to T if null T required");
Object[] var2 = values;
int var3 = values.length;
for(int var4 = 0; var4 < var3; ++var4) {
T t = var2[var4];
this.next(t);
}
return this.complete();
}
Regarding the hanging behavior, the repeat
here seems to cause the issue as it indefinitely tries to reconnect the source Mono again and again.
Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
Just change it like this to understand the behavior.
Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);
通过集体智慧和协作来改善编程学习和解决问题的方式。致力于成为全球开发者共同参与的知识库,让每个人都能够通过互相帮助和分享经验来进步。
评论