测试使用 repeat() 进行订阅

huangapple go评论68阅读模式
英文:

Testing subscription with repeat()

问题

以下是翻译好的内容:

我想要测试一个使用repeat()操作符在延迟的单元上并订阅结果的函数。在测试中,我使用TestPublisher来模拟来自单元的新值。

在一个非常简化的形式下,代码如下所示:

package de.cronos.mad.messages.backend;

import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;

public class RepeatTest {

    private static class TestSubject {
        public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
            Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
        }
    }

    @Test
    public void repeatTest() {
        TestPublisher<String> testPublisher = TestPublisher.create();

        TestSubject testSubject = new TestSubject();
        testSubject.logMonoValues(testPublisher::mono);

        testPublisher.emit("Hello");
        testPublisher.emit("World");
    }
}

在将“Hello”记录到标准输出后,执行挂起。我认为我理解了这是为什么会发生:emit(…)调用是从主线程发生的,并且从那里“驱动”了订阅。

我不知道的是如何修改这个测试以使其完成,即不挂起?

英文:

I want to test a function that uses the repeat() operator on a deferred mono and subscribes to the result. In the test, I use the TestPublisher to simulate new values from the mono.

In a very simplified form, it looks like this:

package de.cronos.mad.messages.backend;

import java.util.function.Supplier;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.publisher.TestPublisher;

public class RepeatTest {

    private static class TestSubject {
        public void logMonoValues(Supplier&lt;Mono&lt;String&gt;&gt; monoSupplier) {
            Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
        }
    }

    @Test
    public void repeatTest() {
        TestPublisher&lt;String&gt; testPublisher = TestPublisher.create();

        TestSubject testSubject = new TestSubject();
        testSubject.logMonoValues(testPublisher::mono);

        testPublisher.emit(&quot;Hello&quot;);
        testPublisher.emit(&quot;World&quot;);
    }
}

The execution hangs after logging "Hello" to stdout. I think I understand why this happens: The emit(…) call happens from the main thread and "drives" the subscription from there.

What I do not know is how to modify this test so that it completes, i.e. does not hang?

答案1

得分: 1

注解:

  1. 使用无参数的.repeat()操作符表示无限重复。一旦原始订阅成功完成,它立即启动新的订阅。终止它的两种方法是:

    1. 限制迭代次数,例如.repeat(10),由 vins 建议;

    2. 通过发送错误信号中止序列,例如使用Mono.error(Throwable)或直接抛出异常。

  2. 调用testPublisher::mono声明此TestPublisher实例遵循Mono合约,即仅发送单个值。

    因此,调用.emit()两次或使用两个参数调用它都不会有帮助。额外的值将被忽略。

  3. TestPublisher的javadoc说,

    > TestPublisher通常是热的,...将第一个终止信号重放给后续订阅者。

    这意味着后加入的订阅者立即接收终止信号。由repeat()操作符创建的订阅立即接收重放的终止信号,从而导致repeat()在紧密循环中重新订阅。

我认为一个可能的解决方案是使用Mono.defer()Mono.fromSupplier().repeat()创建的每个订阅创建一个新值。例如:

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;

public class So63117029 {

  private static class TestSubject {
    public void logMonoValues(Supplier<Mono<String>> monoSupplier) {
      Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
    }
  }

  public static void main(String[] args) {
    LinkedBlockingQueue<String> data = new LinkedBlockingQueue<>(List.of("Hello", "World"));

    TestSubject testSubject = new TestSubject();
    testSubject.logMonoValues(() -> Mono.fromSupplier(data::remove));
  }
}

当队列中没有更多数据时,方法Queue.remove()会抛出NoSuchElementException

英文:

Notes:

  1. The .repeat() operator when used without arguments means to repeat indefinitely. It initiates a new subscription immediately once the original one completes successfully. Two ways to terminate it are:

    1. Limit the count of iterations, e.g. as .repeat(10), as suggested by vins;

    2. Abort the sequence by sending an error signal, e.g. with Mono.error(Throwable) or just by throwing an exception.

  2. Calling testPublisher::mono declares that this instance of TestPublisher adheres to a Mono contract, i.e. only sends a single value.

    Thus neither calling .emit() twice, nor calling it with two arguments will help. Extra values will be ignored.

  3. The javadoc for TestPublisher says,

    > TestPublisher are generally hot, [...] replaying the first termination signal to subsequent subscribers.

    It means that late subscribers immediately receive the termination signal. The subscriptions created by repeat() operator immediately receive the replayed termination signal, which in turn causes repeat() to resubscribe, in a tight loop.

I think that a possible solution is to use Mono.defer() or Mono.fromSupplier() to create a new value for each subscription created by .repeat(). E.g.:

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

import reactor.core.publisher.Mono;

public class So63117029 {

  private static class TestSubject {
    public void logMonoValues(Supplier&lt;Mono&lt;String&gt;&gt; monoSupplier) {
      Mono.defer(monoSupplier).repeat().subscribe(System.out::println);
    }
  }

  public static void main(String[] args) {
    LinkedBlockingQueue&lt;String&gt; data = new LinkedBlockingQueue&lt;&gt;(List.of(&quot;Hello&quot;, &quot;World&quot;));

    TestSubject testSubject = new TestSubject();
    testSubject.logMonoValues(() -&gt; Mono.fromSupplier(data::remove));
  }
}

The method Queue.remove() throws a NoSuchElementException when there is no more data in the queue.

答案2

得分: 0

The TestPublisher emit method accepts an array of values. Once emitted, it closes the source. So you cannot emit one by one. Instead, pass all the values like this.

testPublisher.emit("Hello", "world");

// from TestPublisher emit
public final TestPublisher<T> emit(T... values) {
    Objects.requireNonNull(values, "values array is null, please cast to T if null T required");
    Object[] var2 = values;
    int var3 = values.length;

    for(int var4 = 0; var4 < var3; ++var4) {
        T t = var2[var4];
        this.next(t);
    }

    return this.complete();
}

Regarding the hanging behavior, the repeat here seems to cause the issue as it indefinitely tries to reconnect the source Mono again and again.

Mono.defer(monoSupplier).repeat().subscribe(System.out::println);

Just change it like this to understand the behavior.

Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);
英文:

The TestPublisher emit method accepts array of values. Once emitted, it closes the source. So you can not emit one by one. Instead pass all the values like this.

testPublisher.emit(&quot;Hello&quot;, &quot;world&quot;);

// from TestPublisher emit
public final TestPublisher&lt;T&gt; emit(T... values) {
    Objects.requireNonNull(values, &quot;values array is null, please cast to T if null T required&quot;);
    Object[] var2 = values;
    int var3 = values.length;

    for(int var4 = 0; var4 &lt; var3; ++var4) {
        T t = var2[var4];
        this.next(t);
    }

    return this.complete();
}

Regarding the hanging behavior, the repeat here seems to cause the issue as it indefinitely tries to reconnect the source Mono again and again.

Mono.defer(monoSupplier).repeat().subscribe(System.out::println);

Just change it like this to understand the behavior.

Mono.defer(monoSupplier).repeat(10).subscribe(System.out::println);

huangapple
  • 本文由 发表于 2020年7月27日 22:03:46
  • 转载请务必保留本文链接:https://go.coder-hub.com/63117029.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定