Java反应器如何正确启动异步可取消的副作用。

huangapple go评论67阅读模式
英文:

Java reactor how to properly start async cancellable sideeffect

问题

我正在尝试使用 Reactor 编写一些代码我知道如何使用 CompletableFuture 编写相应的代码我在其中收到了在非阻塞范围内调用订阅的警告

我的目标是使用超时调用 `turnOn()`,在超时后调用 `turnOff()`。如果再次调用 `turnOn()`,它应该取消旧的超时并等待新的超时

我应该如何做到这一点我可以使用 CompletableFuture 进行超时等待但是 Reactor 的 API 更加简便

这个测试按预期工作

```java
public class TimeoutTest {

    Service service;

    @BeforeEach
    public void setUp() {
        service = mock(Service.class);
    }

    CompletableFuture<Void> turnOffFuture = null;

    @DisplayName("Should timeout on turnOn with timeout")
    @Test
    public void timeoutCompletableFuture() throws InterruptedException {
        turnOn(Duration.ofMillis(100)).join();
        verify(service).turnOn();
        verify(service,never()).turnOff();
        Thread.sleep(1000);
        verify(service).turnOff();
    }

    private interface Service{
        void turnOn();
        void turnOff();
    }

    public void cancelTimeout() {
        if (turnOffFuture != null)
            turnOffFuture.cancel(false);
        turnOffFuture = null;
    }

    public CompletableFuture<Void> turnOn(Duration timeout) {

        CompletableFuture<Void> turnOnFuture = turnOn();
        cancelTimeout();
        turnOffFuture = turnOnFuture.thenRun(() -> delay(timeout))
                                    .thenRun(this::turnOff);
        return turnOnFuture;
    }


    private void delay(Duration duration) {
        try {
            Thread.sleep(BigDecimal.valueOf(duration.getSeconds())
                                   .scaleByPowerOfTen(3)
                                   .add(BigDecimal.valueOf(duration.getNano(), 6))
                                   .intValue());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private CompletableFuture<Void> turnOn() {
        return CompletableFuture.runAsync(() -> service.turnOn());
    }

    private CompletableFuture<Void> turnOff() {
        return CompletableFuture.runAsync(() -> service.turnOff());
    }
}

但是我的 Reactor 代码没有按预期工作。

public class TimeoutMonoTest {

    Service service;

    @BeforeEach
    public void setUp() {
        service = mock(Service.class);
    }

    Disposable turnOffDisposable = null;

    @DisplayName("Should timeout on turnOn with timeout")
    @Test
    public void timeoutMono() throws InterruptedException {
        turnOn(Duration.ofMillis(100)).block(Duration.ofMillis(10));
        verify(service).turnOn();
        verify(service, never()).turnOff();
        Thread.sleep(1000);
        verify(service).turnOff();
    }

    private interface Service {
        void turnOn();
        void turnOff();
    }

    public void cancelTimeout() {
        if (turnOffDisposable != null)
            turnOffDisposable.dispose();
        turnOffDisposable = null;
    }

    public Mono<Void> turnOn(Duration timeout) {

        Mono<Void> turnOnFuture = turnOn();
        cancelTimeout();
        turnOffDisposable = turnOnFuture.delayElement(timeout)
                                        .subscribe(it -> this.turnOff());
        return turnOnFuture;
    }


    private Mono<Void> turnOn() {
        service.turnOn();
        return Mono.just("not empty but mapped to void").then();
    }

    private Mono<Void> turnOff() {
        service.turnOff();
        return Mono.just("not empty but mapped to void").then();
    }
}
英文:

I'm trying to write something using reactor which I know how to write using completable futures. I'm getting "Calling subscribe in non-blocking scope" warning in it.

My goal is to call turnOn() with a timeout which should call turnOff() after the timeout. If turnOn() is called again it should cancel the old timeout and wait for a new timeout.

How should I do this? I could do a hibrate and use CompletableFuture for the timeout but reactor's api is just a bit easier.

this test works as expected:


public class TimeoutTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
CompletableFuture&lt;Void&gt; turnOffFuture = null;
@DisplayName(&quot;Should timeout on turnOn with timeout&quot;)
@Test
public void timeoutCompletableFuture() throws InterruptedException {
turnOn(Duration.ofMillis(100)).join();
verify(service).turnOn();
verify(service,never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service{
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffFuture != null)
turnOffFuture.cancel(false);
turnOffFuture = null;
}
public CompletableFuture&lt;Void&gt; turnOn(Duration timeout) {
CompletableFuture&lt;Void&gt; turnOnFuture = turnOn();
cancelTimeout();
turnOffFuture = turnOnFuture.thenRun(() -&gt; delay(timeout))
.thenRun(this::turnOff);
return turnOnFuture;
}
private void delay(Duration duration) {
try {
Thread.sleep(BigDecimal.valueOf(duration.getSeconds())
.scaleByPowerOfTen(3)
.add(BigDecimal.valueOf(duration.getNano(), 6))
.intValue());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private CompletableFuture&lt;Void&gt; turnOn() {
return CompletableFuture.runAsync(() -&gt; service.turnOn());
}
private CompletableFuture&lt;Void&gt; turnOff() {
return CompletableFuture.runAsync(() -&gt; service.turnOff());
}
}

but my reactor code does not.

public class TimeoutMonoTest {
Service service;
@BeforeEach
public void setUp() {
service = mock(Service.class);
}
Disposable turnOffDisposable = null;
@DisplayName(&quot;Should timeout on turnOn with timeout&quot;)
@Test
public void timeoutMono() throws InterruptedException {
turnOn(Duration.ofMillis(100)).block(Duration.ofMillis(10));
verify(service).turnOn();
verify(service, never()).turnOff();
Thread.sleep(1000);
verify(service).turnOff();
}
private interface Service {
void turnOn();
void turnOff();
}
public void cancelTimeout() {
if (turnOffDisposable != null)
turnOffDisposable.dispose();
turnOffDisposable = null;
}
public Mono&lt;Void&gt; turnOn(Duration timeout) {
Mono&lt;Void&gt; turnOnFuture = turnOn();
cancelTimeout();
turnOffDisposable = turnOnFuture.delayElement(timeout)
.subscribe(it -&gt; this.turnOff());
return turnOnFuture;
}
private Mono&lt;Void&gt; turnOn() {
service.turnOn();
return Mono.just(&quot;not empty but mapped to void&quot;).then();
}
private Mono&lt;Void&gt; turnOff() {
service.turnOff();
return Mono.just(&quot;not empty but mapped to void&quot;).then();
}
}

答案1

得分: 0

问题出在turnOn()turnOff()方法中对void mono的映射上。实际上,它们并没有获得“下一个”信号,而只是一个“成功”信号。

修复方法很简单,只需要将turnOn方法更改为:

    public Mono<Void> turnOn(Duration timeout) {
        cancelTimeout();
        Mono<Void> turnOnMono = turnOn();
        turnOffDisposable = turnOnMono.delayElement(timeout)
                                      .then(turnOff())
                                      .subscribe();
        return turnOn();
    }
英文:

The problem lies in the mapping to void mono's in the turnOn() and turnOff() methods. They do not actually get a "next" signal, just a "success" signal.

The fix is simply to change the turnOn method to:

    public Mono&lt;Void&gt; turnOn(Duration timeout) {
cancelTimeout();
Mono&lt;Void&gt; turnOnMono = turnOn();
turnOffDisposable = turnOnMono.delayElement(timeout)
.then(turnOff())
.subscribe();
return turnOn();
}

huangapple
  • 本文由 发表于 2020年10月22日 19:41:44
  • 转载请务必保留本文链接:https://go.coder-hub.com/64481490.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定