订阅者如何使用响应式拉取背压来控制发布者?

huangapple go评论99阅读模式
英文:

How can a subscriber control a Publisher with reactive pull backpressure?

问题

我有一个发布者,可能发布速度比订阅者处理数据的速度更快。为了处理这个问题,我开始使用背压(backpressure)。因为我不想丢弃任何数据,我使用了反应式拉取式背压。我理解这样的情况是订阅者能够告诉发布者何时发布更多数据,正如这里和接下来的段落所描述的。

发布者是一个Flowable,它在并行中异步工作,然后合并到一个顺序的Flowable中。数据应该被缓冲最多10个元素,当缓冲区满时,Flowable不应再发布更多数据,并等待下一次请求。

订阅者是一个DisposableSubscriber,开始时请求10个项目。每个消耗的项目需要一些计算,之后会请求一个新的项目。

我的最小工作示例(MWE)如下所示:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Flowable.fromIterable(src)
        .parallel(10, 1)
        .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
        .flatMap(i -> Single.fromCallable(() -> {
            System.out.println("publisher: " + i);
            Thread.sleep(200);
            return i;
        }).toFlowable())
        .sequential(1)
        .onBackpressureBuffer(10)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }
            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }
            @Override
            public void onError(Throwable t) {
            }
            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}

我期望这段代码的行为是:订阅者请求前10个项目,发布者发布前10个项目。然后订阅者在onNext中进行计算,并请求更多项目,发布者将发布这些项目。

实际发生的情况是:一开始,发布者似乎无限地发布项目。在某个时点,例如发布了14个项目之后,订阅者处理了第一个项目。在此期间,发布者继续发布项目。在发布了大约30个项目之后,会抛出io.reactivex.exceptions.MissingBackpressureException: Buffer is full异常,流结束。

我的问题:我做错了什么?**如何让订阅者控制发布者何时发布数据?**显然,我做了一些严重错误。否则,期望与实际情况不会如此不同。

上述MWE的示例输出:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
英文:

I have a publisher that may publish faster than the subscriber can handle data. To handle this, I started working with backpressure. Because I do not want to discard any data, I use reactive pull backpressure. I understood this as the Subscriber being able to tell the Publisher when to publish more data, as described in this and the follwing paragraphs.

The publisher is a Flowable that does its work asnychronous in parallel and is merged into a sequential Flowable afterwards. Data should be buffered up to 10 elements, and when this buffer is full, the Flowable should not publish any more data and wait for the next request.

The subscriber is a DisposableSubscriber that requests 10 items at start. Every consumed item requires some computation, and after that a new item will be requested.

My MWE looks like this:

List&lt;Integer&gt; src = new ArrayList&lt;&gt;();
for (int i = 0; i &lt; 200; i++) {
src.add(i);
}
Flowable.fromIterable(src)
.parallel(10, 1)
.runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
.flatMap(i -&gt; Single.fromCallable(() -&gt; {
System.out.println(&quot;publisher: &quot; + i);
Thread.sleep(200);
return i;
}).toFlowable())
.sequential(1)
.onBackpressureBuffer(10)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber&lt;Integer&gt;() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println(&quot;subscriber: &quot; + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

What I expect this code to do is the following: The subscriber requests the first 10 items. The publisher publishes the first 10 items. The subscriber then does its computation in onNext and requests more items, which the publisher will publish.

What actually happens: At first, the publisher seems to unboundedly publish items. At some point, e.g. after 14 published items, the subscriber handles its first item. While that happens, the publisher continues to publish items. After around 30 published items, a io.reactivex.exceptions.MissingBackpressureException: Buffer is full is thrown and the stream ends.

My question: what am I doing wrong? How can I let the subscriber control if and when the publisher publishes data? Obviously, I am doing something horribly wrong. Otherwise, the expectation would not be such different to the reality.

Example output of the above MWE:

publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full

答案1

得分: 1

不是 Rx 专家,但让我试着解释一下.. observeOn(...) 有自己的默认缓冲区大小为 128。所以,从一开始它就会从上游请求比你的缓冲区能容纳的更多数据。

observeOn(...) 可以接受一个可选的缓冲区大小覆盖,但即使你提供了,ParallelFlowable 仍会比你想要的更频繁地调用你的 flatMap(...) 方法。我不完全确定原因,也许在将流合并回顺序时,它有自己内部的缓冲处理。

我认为你可以通过使用 flatMap(...) 而不是 parallel(...),并提供一个 maxConcurrency 参数,来更接近你想要的行为。

还有一点要记住的是,不要调用 subscribeOn(...) - 它是为了影响整个上游 Flowable。所以如果你已经在调用 parallel(...).runOn(...),它没有影响,或者影响将是意外的。

有了上述信息,我认为下面的代码更接近你想要的效果:

List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
    src.add(i);
}
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
Flowable.fromIterable(src)
        .flatMap(
                i -> Flowable.just(i)
                        .subscribeOn(scheduler) // 这里的 subscribeOn(...) 仅影响嵌套的 Flowable
                        .map(__ -> {
                            System.out.println("publisher: " + i);
                            Thread.sleep(200);
                            return i;
                        }),
                10) // 最大并发数
        .observeOn(Schedulers.newThread(), false, 10) // 覆盖缓冲区大小
        .doOnError(Throwable::printStackTrace)
        .subscribeWith(new DisposableSubscriber<Integer>() {
            @Override
            protected void onStart() {
                request(10);
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("subscriber: " + integer);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                request(1);
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onComplete() {
            }
        });
try {
    Thread.sleep(1000000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
英文:

Not an expert in Rx, but let me take a stab at it.. observeOn(...) has its own default buffer size of 128. So, right from the start it's going to request more from upstream than your buffer can hold.

observeOn(...) accepts an optional buffer size override, but even if you supply it, the ParallelFlowable is going to be invoking your flatMap(...) method more frequently than you want. I'm not 100% sure why, maybe it has its own internal buffering it performs when merging the rails back to sequential.

I think you can get closer to your desired behavior by using flatMap(...) instead of parralel(...), supplying a maxConcurrency argument.

One other thing to keep in mind is that you don't want to call subscribeOn(...) - it's meant to affect the upstream Flowable in its entirety. So if you're already calling parallel(...).runOn(...), it has no effect or the effect will be unexpected.

Armed with the above, I think this gets you closer to what you're looking for:

	List&lt;Integer&gt; src = new ArrayList&lt;&gt;();
for (int i = 0; i &lt; 200; i++) {
src.add(i);
}
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
Flowable.fromIterable(src)
.flatMap(
i -&gt; Flowable.just( i )
.subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
.map( __ -&gt; {
System.out.println(&quot;publisher: &quot; + i);
Thread.sleep(200);
return i;
} ),
10) // max concurrency
.observeOn(Schedulers.newThread(), false, 10) // override buffer size
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber&lt;Integer&gt;() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println(&quot;subscriber: &quot; + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}

huangapple
  • 本文由 发表于 2020年3月16日 21:14:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/60706697.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定