
huangapple go评论158阅读模式

How can a subscriber control a Publisher with reactive pull backpressure?






  1. List<Integer> src = new ArrayList<>();
  2. for (int i = 0; i < 200; i++) {
  3. src.add(i);
  4. }
  5. Flowable.fromIterable(src)
  6. .parallel(10, 1)
  7. .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
  8. .flatMap(i -> Single.fromCallable(() -> {
  9. System.out.println("publisher: " + i);
  10. Thread.sleep(200);
  11. return i;
  12. }).toFlowable())
  13. .sequential(1)
  14. .onBackpressureBuffer(10)
  15. .observeOn(Schedulers.newThread())
  16. .subscribeOn(Schedulers.newThread())
  17. .doOnError(Throwable::printStackTrace)
  18. .subscribeWith(new DisposableSubscriber<Integer>() {
  19. @Override
  20. protected void onStart() {
  21. request(10);
  22. }
  23. @Override
  24. public void onNext(Integer integer) {
  25. System.out.println("subscriber: " + integer);
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. request(1);
  32. }
  33. @Override
  34. public void onError(Throwable t) {
  35. }
  36. @Override
  37. public void onComplete() {
  38. }
  39. });
  40. try {
  41. Thread.sleep(1000000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }


实际发生的情况是:一开始,发布者似乎无限地发布项目。在某个时点,例如发布了14个项目之后,订阅者处理了第一个项目。在此期间,发布者继续发布项目。在发布了大约30个项目之后,会抛出io.reactivex.exceptions.MissingBackpressureException: Buffer is full异常,流结束。



  1. publisher: 5
  2. publisher: 7
  3. publisher: 8
  4. publisher: 0
  5. publisher: 2
  6. publisher: 6
  7. publisher: 9
  8. publisher: 3
  9. publisher: 4
  10. publisher: 1
  11. publisher: 18
  12. publisher: 17
  13. publisher: 15
  14. subscriber: 0
  15. publisher: 11
  16. publisher: 10
  17. publisher: 19
  18. publisher: 13
  19. publisher: 14
  20. publisher: 12
  21. publisher: 16
  22. publisher: 27
  23. publisher: 28
  24. publisher: 23
  25. publisher: 21
  26. publisher: 29
  27. publisher: 20
  28. publisher: 25
  29. publisher: 22
  30. publisher: 26
  31. io.reactivex.exceptions.MissingBackpressureException: Buffer is full

I have a publisher that may publish faster than the subscriber can handle data. To handle this, I started working with backpressure. Because I do not want to discard any data, I use reactive pull backpressure. I understood this as the Subscriber being able to tell the Publisher when to publish more data, as described in this and the follwing paragraphs.

The publisher is a Flowable that does its work asnychronous in parallel and is merged into a sequential Flowable afterwards. Data should be buffered up to 10 elements, and when this buffer is full, the Flowable should not publish any more data and wait for the next request.

The subscriber is a DisposableSubscriber that requests 10 items at start. Every consumed item requires some computation, and after that a new item will be requested.

My MWE looks like this:

  1. List&lt;Integer&gt; src = new ArrayList&lt;&gt;();
  2. for (int i = 0; i &lt; 200; i++) {
  3. src.add(i);
  4. }
  5. Flowable.fromIterable(src)
  6. .parallel(10, 1)
  7. .runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
  8. .flatMap(i -&gt; Single.fromCallable(() -&gt; {
  9. System.out.println(&quot;publisher: &quot; + i);
  10. Thread.sleep(200);
  11. return i;
  12. }).toFlowable())
  13. .sequential(1)
  14. .onBackpressureBuffer(10)
  15. .observeOn(Schedulers.newThread())
  16. .subscribeOn(Schedulers.newThread())
  17. .doOnError(Throwable::printStackTrace)
  18. .subscribeWith(new DisposableSubscriber&lt;Integer&gt;() {
  19. @Override
  20. protected void onStart() {
  21. request(10);
  22. }
  23. @Override
  24. public void onNext(Integer integer) {
  25. System.out.println(&quot;subscriber: &quot; + integer);
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. request(1);
  32. }
  33. @Override
  34. public void onError(Throwable t) {
  35. }
  36. @Override
  37. public void onComplete() {
  38. }
  39. });
  40. try {
  41. Thread.sleep(1000000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }

What I expect this code to do is the following: The subscriber requests the first 10 items. The publisher publishes the first 10 items. The subscriber then does its computation in onNext and requests more items, which the publisher will publish.

What actually happens: At first, the publisher seems to unboundedly publish items. At some point, e.g. after 14 published items, the subscriber handles its first item. While that happens, the publisher continues to publish items. After around 30 published items, a io.reactivex.exceptions.MissingBackpressureException: Buffer is full is thrown and the stream ends.

My question: what am I doing wrong? How can I let the subscriber control if and when the publisher publishes data? Obviously, I am doing something horribly wrong. Otherwise, the expectation would not be such different to the reality.

Example output of the above MWE:

  1. publisher: 5
  2. publisher: 7
  3. publisher: 8
  4. publisher: 0
  5. publisher: 2
  6. publisher: 6
  7. publisher: 9
  8. publisher: 3
  9. publisher: 4
  10. publisher: 1
  11. publisher: 18
  12. publisher: 17
  13. publisher: 15
  14. subscriber: 0
  15. publisher: 11
  16. publisher: 10
  17. publisher: 19
  18. publisher: 13
  19. publisher: 14
  20. publisher: 12
  21. publisher: 16
  22. publisher: 27
  23. publisher: 28
  24. publisher: 23
  25. publisher: 21
  26. publisher: 29
  27. publisher: 20
  28. publisher: 25
  29. publisher: 22
  30. publisher: 26
  31. io.reactivex.exceptions.MissingBackpressureException: Buffer is full


得分: 1

不是 Rx 专家,但让我试着解释一下.. observeOn(...) 有自己的默认缓冲区大小为 128。所以,从一开始它就会从上游请求比你的缓冲区能容纳的更多数据。

observeOn(...) 可以接受一个可选的缓冲区大小覆盖,但即使你提供了,ParallelFlowable 仍会比你想要的更频繁地调用你的 flatMap(...) 方法。我不完全确定原因,也许在将流合并回顺序时,它有自己内部的缓冲处理。

我认为你可以通过使用 flatMap(...) 而不是 parallel(...),并提供一个 maxConcurrency 参数,来更接近你想要的行为。

还有一点要记住的是,不要调用 subscribeOn(...) - 它是为了影响整个上游 Flowable。所以如果你已经在调用 parallel(...).runOn(...),它没有影响,或者影响将是意外的。


  1. List<Integer> src = new ArrayList<>();
  2. for (int i = 0; i < 200; i++) {
  3. src.add(i);
  4. }
  5. Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
  6. Flowable.fromIterable(src)
  7. .flatMap(
  8. i -> Flowable.just(i)
  9. .subscribeOn(scheduler) // 这里的 subscribeOn(...) 仅影响嵌套的 Flowable
  10. .map(__ -> {
  11. System.out.println("publisher: " + i);
  12. Thread.sleep(200);
  13. return i;
  14. }),
  15. 10) // 最大并发数
  16. .observeOn(Schedulers.newThread(), false, 10) // 覆盖缓冲区大小
  17. .doOnError(Throwable::printStackTrace)
  18. .subscribeWith(new DisposableSubscriber<Integer>() {
  19. @Override
  20. protected void onStart() {
  21. request(10);
  22. }
  23. @Override
  24. public void onNext(Integer integer) {
  25. System.out.println("subscriber: " + integer);
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. request(1);
  32. }
  33. @Override
  34. public void onError(Throwable t) {
  35. }
  36. @Override
  37. public void onComplete() {
  38. }
  39. });
  40. try {
  41. Thread.sleep(1000000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }

Not an expert in Rx, but let me take a stab at it.. observeOn(...) has its own default buffer size of 128. So, right from the start it's going to request more from upstream than your buffer can hold.

observeOn(...) accepts an optional buffer size override, but even if you supply it, the ParallelFlowable is going to be invoking your flatMap(...) method more frequently than you want. I'm not 100% sure why, maybe it has its own internal buffering it performs when merging the rails back to sequential.

I think you can get closer to your desired behavior by using flatMap(...) instead of parralel(...), supplying a maxConcurrency argument.

One other thing to keep in mind is that you don't want to call subscribeOn(...) - it's meant to affect the upstream Flowable in its entirety. So if you're already calling parallel(...).runOn(...), it has no effect or the effect will be unexpected.

Armed with the above, I think this gets you closer to what you're looking for:

  1. List&lt;Integer&gt; src = new ArrayList&lt;&gt;();
  2. for (int i = 0; i &lt; 200; i++) {
  3. src.add(i);
  4. }
  5. Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
  6. Flowable.fromIterable(src)
  7. .flatMap(
  8. i -&gt; Flowable.just( i )
  9. .subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
  10. .map( __ -&gt; {
  11. System.out.println(&quot;publisher: &quot; + i);
  12. Thread.sleep(200);
  13. return i;
  14. } ),
  15. 10) // max concurrency
  16. .observeOn(Schedulers.newThread(), false, 10) // override buffer size
  17. .doOnError(Throwable::printStackTrace)
  18. .subscribeWith(new DisposableSubscriber&lt;Integer&gt;() {
  19. @Override
  20. protected void onStart() {
  21. request(10);
  22. }
  23. @Override
  24. public void onNext(Integer integer) {
  25. System.out.println(&quot;subscriber: &quot; + integer);
  26. try {
  27. Thread.sleep(500);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. request(1);
  32. }
  33. @Override
  34. public void onError(Throwable t) {
  35. }
  36. @Override
  37. public void onComplete() {
  38. }
  39. });
  40. try {
  41. Thread.sleep(1000000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }

  • 本文由 发表于 2020年3月16日 21:14:58
  • 转载请务必保留本文链接:https://go.coder-hub.com/60706697.html



:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
