使用OkHttp3和ReactiveX Java实现长轮询的正确方法

huangapple go评论52阅读模式
英文:

Correct way to implement long polling using OkHttp3 and ReactiveX Java

问题

以下是翻译好的部分:

使用OkHttp3(v4.4.1)实现长轮询,以获取RxJava(v2.2.11)Observable<String>,用于处理响应的每一行。是否可以在不阻塞线程的情况下完成,以保持对行的持续读取?如果需要阻塞某些线程,那么应阻塞哪个线程?是否有关于使用OkHttp3实现长轮询的通用示例?谷歌在这个主题上对我来说总是避而不谈...

TL;DR

我正在使用OKHttp3作为HTTP客户端,并通过makeGetObservable方法调用进行了封装,该调用返回使用newCall回调发射事件到Observable的Observable Response。现在我正在尝试为长轮询服务添加支持,并且我关心线程处理的问题。

下面的代码演示了我正在尝试做的事情(并且似乎有效),但我相当确定这样做是不好的。

// 返回Observable&lt;Response&gt;
makeGetObservable("http://my.service.com/api/events")
  // 检查错误并映射为Observable&lt;ResponseBody&gt;
  .map(this::mapRespBodyOrError)
  // flat map到表示长轮询响应行的Observable&lt;String&gt;
  .flatMap(respBody -> Observable.create(emitter -> {
    // 在响应体流上打开读取器
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      String line;
      // 阻塞并等待从输入读取一行
      while((line = reader.readLine()) !=null) {
        // 一旦从响应体输入流中读取了行,将其作为observable事件发射出去
        emitter.onNext(line);
      }
    }
  }));

如果您需要进一步的翻译或有其他问题,请随时提问。

英文:

How do I implement long polling using OkHttp3 (v4.4.1) to get RxJava (v2.2.11) Observable<String> for each line of the response? Can it be done without blocking a thread to keep reading lines? If I need to block some thread then which thread do I block? Any general examples on implementation of long polling using OkHttp3? Google's been very shy with me on this topic...

TL;DR

I am using OKHttp3 as a HTTP client and got it wrapped in makeGetObservable method call which returns Observable Response using the newCall callback to emit events to Observable. Now I am trying to add support for long polling service and I am concerned about threading.

Below code demonstrates what I am attempting to do (and seems to work) but I am pretty sure it is not OK.

// return Observable&lt;Response&gt;
makeGetObservable(&quot;http://my.service.com/api/events&quot;)
  // check for error and map to Observable&lt;ResponseBody&gt;
  .map(this::mapRespBodyOrError)
  // flat map to Observable&lt;String&gt; representing line of long polling response
  .flatMap(respBody -&gt; Observable.create(emitter -&gt; {
    // open reader on response body stream
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      String line;
      // block and wait to read a line from input
      while((line = reader.readLine()) !=null) {
        // once line was read from response body input stream emit it as observable event
        emitter.onNext(line);
      }
    }
  }));

答案1

得分: 0

经过一些研究,我发现被阻塞的线程是OkHttp客户端线程。这是由于我实现的makeGetObservable,它从OkHttp客户端的newCall..enqueue回调中发出。默认情况下,OkHttp客户端为一个资源拥有5个并发连接的线程池。因此,每次我订阅另一个长轮询资源时,我都会阻塞其中一个线程。在阻塞了5个线程之后,OkHttp客户端停止工作,因为没有线程来处理响应。

根据@Progman的建议,我使用了在IO调度程序中使用subscribeOn,它为每个阻塞IO操作生成一个新线程。在使用此调度程序时,必须小心并正确处理资源的释放。

我的实现目前如下(添加了调度程序、完成和错误事件):

// 返回Observable<Response>
makeGetObservable("http://my.service.com/api/events")
  // 检查错误并映射为Observable<ResponseBody>
  .map(this::mapRespBodyOrError)
  // 使用在IO调度程序中生成新线程来处理阻塞操作
  .observeOn(Schedulers.io())
  // flat map到代表长轮询响应行的Observable<String>
  .flatMap(respBody -> Observable.create(emitter -> {
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      // 阻塞读取可用的行
      String line;
      while ((line = reader.readLine()) != null) {
        // 发出每一行的事件
        emitter.onNext(line);
      }
      // 发出完成事件以指示我们完成了
      emitter.onComplete();
    } catch (IOException | RuntimeException err) {
      // 发出可能发生的任何错误
      emitter.onError(err);
    }
  }));
英文:

After some research I found that the thread which is being blocked is OkHttp client thread. This is due to my implementation of makeGetObservable which emits from newCall..enqueue callback of OkHttp client.
OkHttp client by default has pool of 5 threads for 5 concurrent connections for a resource. So I was blocking one of those threads every time I subscribed to another long-polling resource. After 5 blocked threads OkHttp client stopped working as it had no threads to process responses.

As suggested by @Progman I went with subscribeOn using IO scheduler which spawns new thread for every blocking IO operation. One has to be careful and properly dispose of resources with this scheduler.

My implementation currently looks as follows (added scheduler, completion and error events)

// return Observable&lt;Response&gt;
makeGetObservable(&quot;http://my.service.com/api/events&quot;)
  // check for error and map to Observable&lt;ResponseBody&gt;
  .map(this::mapRespBodyOrError)
  // use IO scheduler that spawns new threads to take care of blocking operations
  .observeOn(Schedulers.io())
  // flat map to Observable&lt;String&gt; representing line of long polling response
  .flatMap(respBody -&gt; Observable.create(emitter -&gt; {
    try (BufferedReader reader = new BufferedReader(respBody.charStream())) {
      // blocking read lines while available
      String line;
      while ((line = reader.readLine()) != null) {
        // emit event for every line
        emitter.onNext(line);
      }
      // emit the completion event to indicate we are done
      emitter.onComplete();
    } catch (IOException | RuntimeException err) {
      // emit any error that might have occurred
      emitter.onError(err);
    }
  }));

huangapple
  • 本文由 发表于 2020年10月4日 21:38:37
  • 转载请务必保留本文链接:https://go.coder-hub.com/64195326.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定