检查特定事件是否紧随另一个事件发生,并使用 RxJava 发出成功信号。

huangapple go评论59阅读模式
英文:

Check one specific event is followed by another one and emit success using RxJava

问题

以下是翻译好的内容:

我需要检查一个无限的可观察对象(来自设备的事件),以发出一个特定的事件,我们称之为“Started”,然后紧接着是另一个事件,“Finished”。然而,在这两个事件之间,可以接收任意数量的不同事件,它们必须被忽略。这个问题的结果应该是一个 Completable.complete(),当“Started”事件在设置的超时时间之前被“Finished”事件跟随时,操作是成功的。

我已经有一个解决这个问题的方法,但它看起来很丑陋且过于复杂,我认为可能有一个更优雅/简单的解决方案。我的当前代码如下,我已经将代码泛化,以便更容易理解。在这个示例中,基本上我会检查在 Flowable 发出数字“5”之后,在 10 秒的超时时间内是否接收到数字“8”:

Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
        .publish().autoConnect(1);

return events
        .filter(number -> number == 5)
        .firstElement()
        .concatMapCompletable(number -> {
            if (number == 5) {
                return events
                        .filter(number2 -> number2 == 8)
                        .firstElement()
                        .concatMapCompletable(number2 -> {
                            if (number2 == 8) {
                                return Completable.complete();
                            } else {
                                return Completable.error(new Exception("Number 3 expected, got " + number2));
                            }
                        });
            } else {
                return Completable.error(new Exception("Number 2 expected, got " + number));
            }
        })
        .timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")));

编辑:
我已经找到了一个更清晰的版本,但它似乎有些奇怪,因为我在使用 .filter 操作符,然后在收到第一个元素时完成操作。我在下面引用它供参考:

Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
        .publish().autoConnect(1);

TestObserver testObserver = events
        .filter(number -> number == 5)
        .firstElement()
        .concatMapCompletable(number ->
                events
                        .filter(number2 -> number2 == 8)
                        .firstElement()
                        .concatMapCompletable(number2 ->
                                Completable.complete()))
        .timeout(10, TimeUnit.SECONDS, Completable.error(new Exception("Timeout!")))
        .test();

更新2:
这是我更加满意的版本:

Flowable<Long> events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
        .publish().autoConnect(1);

TestObserver testObserver = events
        .skipWhile(number -> number != 5)
        .firstElement()
        .flatMapCompletable(number -> Completable.fromObservable(events
                .takeUntil(number2 -> number2 == 8)
                .toObservable()
        ));
英文:

I need to check that an infinite observable (events from a device) emits ne specific event lets call it "Started" which is then followed by another one, "Finished". However in between these two events, any number of different events can be received and they must be ignored. The result of this should be a Completable.complete() which is successful when the "Started" event was followed by the "Finished" event before a set timeout.

I have a working solution for this problem, however it looks ugly and too complex and I think there probably is a more elegant/simple solution. My current code looks like this, I have generalized my code so it is easier to understand, basically in this example I check that after the Flowable emits the number "5" before a timeout of 10 seconds the number "8" is received.:

    Flowable&lt;Long&gt; events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
            .publish().autoConnect(1);

    return events
            .filter(number -&gt; number == 5)
            .firstElement()
            .concatMapCompletable(number -&gt; {
                if (number == 5) {
                    return events
                            .filter(number2 -&gt; number2 == 8)
                            .firstElement()
                            .concatMapCompletable(number2 -&gt; {
                                if (number2 == 8) {
                                    return Completable.complete();
                                } else {
                                    return Completable.error(new Exception(&quot;Number 3 expected, got &quot; + number2));
                                }
                            });
                } else {
                    return Completable.error(new Exception(&quot;Number 2 expected, got &quot; + number));
                }
            })
            .timeout(10, TimeUnit.SECONDS, Completable.error(new Exception(&quot;Timeout!&quot;)));

EDIT:
I have found a cleaner version, however it seems weird since I am using the .filter operator to then Complete on the first element received, I post it here below for reference:

    Flowable&lt;Long&gt; events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
            .publish().autoConnect(1);

    TestObserver testObserver = events
            .filter(number -&gt; number == 5)
            .firstElement()
            .concatMapCompletable(number -&gt;
                    events
                            .filter(number2 -&gt; number2 == 8)
                            .firstElement()
                            .concatMapCompletable(number2 -&gt;
                                    Completable.complete()))
            .timeout(10, TimeUnit.SECONDS, Completable.error(new Exception(&quot;Timeout!&quot;)))
            .test();

UPDATE2:
Version which I am much more happy about:

    Flowable&lt;Long&gt; events = Flowable.interval(1, TimeUnit.SECONDS, testScheduler)
            .publish().autoConnect(1);

    TestObserver testObserver = events
            .skipWhile(number -&gt; number != 5)
            .firstElement()
            .flatMapCompletable(number -&gt; Completable.fromObservable(events
                    .takeUntil(number2 -&gt; number2 == 8)
                    .toObservable()
            ));

答案1

得分: 0

以下是您要求的翻译内容:

我不太确定我是否完全理解您想要做什么,但您可以使用类似以下的 bufferwindow 运算符:

Flowable.just(1, 2, 3, 4, 5)
        .buffer(2, 1)
        .filter(e -> e.size() > 1)
        .flatMapCompletable(e -> {
            int first = e.get(0);
            int second = e.get(1);
            if (first == 2) {
                if (second == 3) {
                    return Completable.complete();
                } else {
                    return Completable.error(new Exception("..."));
                }
            }

            return Completable.fromObservable(Observable.just(e));
        })

UPDATE

Observable<Long> source = Observable.interval(1, TimeUnit.SECONDS)
        .share();

source
        .skipWhile(e -> e != 5)
        .flatMapCompletable(e -> Completable.fromObservable(source
                .takeUntil(x -> x == 8)
                .timeout(10, TimeUnit.SECONDS)))
        .subscribe();
英文:

I'm not sure to understand what you want to do exactelly but you can use buffer or window operators like the following:

Flowable.just(1, 2, 3, 4, 5)
        .buffer(2, 1)
        .filter(e -&gt; e.size() &gt; 1)
        .flatMapCompletable(e -&gt; {
            int first = e.get(0);
            int second = e.get(1);
            if (first == 2) {
                if (second == 3) {
                    return Completable.complete();
                } else {
                    return Completable.error(new Exception(&quot;...&quot;));
                }
            }

            return Completable.fromObservable(Observable.just(e));
        })

UPDATE

Observable&lt;Long&gt; source = Observable.interval(1, TimeUnit.SECONDS)
        .share();

source
        .skipWhile(e -&gt; e != 5)
        .flatMapCompletable(e -&gt; Completable.fromObservable(source
                .takeUntil(x -&gt; x == 8)
                .timeout(10, TimeUnit.SECONDS)))
        .subscribe();

huangapple
  • 本文由 发表于 2020年10月20日 19:14:34
  • 转载请务必保留本文链接:https://go.coder-hub.com/64444007.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定