理解RxJava的线程模型

MicEdinburg 7年前
   <p>ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET、JavaScript和C++,Rx近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由ReactiveX这个组织负责维护,比较流行的有RxJava/RxJS/Rx.NET,社区网站是 <a href="/misc/goto?guid=4958979542149358570" rel="nofollow,noindex">reactivex.io</a> 。</p>    <p>Netflix参考微软的Reactive Extensions创建了Java的实现RxJava,主要是为了简化服务器端的并发。2013年二月份,Ben Christensen 和 Jafar Husain发在Netflix技术博客的一篇文章第一次向世界展示了RxJava。</p>    <p>RxJava也在Android开发中得到广泛的应用。</p>    <p>ReactiveX</p>    <p>An API for asynchronous programming with observable streams.</p>    <p>A combination of the best ideas from the Observer pattern, the Iterator pattern, and functional programming.</p>    <p>虽然RxJava是为异步编程实现的库,但是如果不清楚它的使用,或者错误地使用了它的线程调度,反而不能很好的利用它的异步编程提到系统的处理速度。本文通过实例演示错误的RxJava的使用,解释RxJava的线程调度模型,主要介绍 Scheduler 、 observeOn 和 subscribeOn 的使用。</p>    <p>本文中的例子以并发发送http request请求为基础,通过性能检验RxJava的线程调度。</p>    <h3>第一个例子,性能超好?</h3>    <p>我们首先看第一个例子:</p>    <pre>  <code class="language-java">public static void testRxJavaWithoutBlocking(int count) throws Exception {      CountDownLatch finishedLatch = new CountDownLatch(1);      long t = System.nanoTime();      Observable.range(0, count).map(i -> {          //System.out.println("A:" + Thread.currentThread().getName());          return 200;      }).subscribe(statusCode -> {          //System.out.println("B:" + Thread.currentThread().getName());      }, error -> {      }, () -> {          finishedLatch.countDown();      });      finishedLatch.await();      t = (System.nanoTime() - t) / 1000000; //ms      System.out.println("RxJavaWithoutBlocking TPS: " + count * 1000 / t);  }</code></pre>    <p>这个例子是一个基本的RxJava的使用,利用Range创建一个Observable, subscriber处理接收的数据。因为整个逻辑没有阻塞,程序运行起来很快,</p>    <p>输出结果为:</p>    <p>RxJavaWithoutBlocking TPS: <strong>7692307</strong> 。</p>    <h3>2 加上业务的模拟,性能超差</h3>    <p>上面的例子是一个理想化的程序,没雨任何阻塞。我们模拟一下实际的应用,加上业务处理。</p>    <p>业务逻辑是发送一个http的请求,httpserver是一个模拟器,针对每个请求有30毫秒的延迟。subscriber统计请求结果:</p>    <pre>  <code class="language-java">public static void testRxJavaWithBlocking(int count) throws Exception {          URL url = new URL("http://127.0.0.1:8999/");          CountDownLatch finishedLatch = new CountDownLatch(1);          long t = System.nanoTime();          Observable.range(0, count).map(i -> {              try {                  HttpURLConnection conn = (HttpURLConnection) url.openConnection();                  conn.setRequestMethod("GET");                  int responseCode = conn.getResponseCode();                  BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));                  String inputLine;                  while ((inputLine = in.readLine()) != null) {                      //response.append(inputLine);                  }                  in.close();                  return responseCode;              } catch (Exception ex) {                  return -1;              }          }).subscribe(statusCode -> {          }, error -> {          }, () -> {              finishedLatch.countDown();          });          finishedLatch.await();          t = (System.nanoTime() - t) / 1000000; //ms          System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);      }</code></pre>    <p>运行结果如下:</p>    <pre>  <code class="language-java">RxJavaWithBlocking TPS: 29。</code></pre>    <p>@#¥%%……&!</p>    <p>性能怎么突降呢,第一个例子看起来性能超好啊,http server只增加了一个30毫秒的延迟,导致这个方法每秒只能处理29个请求。</p>    <p>如果我们估算一下, 29*30= 870 毫秒,大约1秒,正好和单个线程发送处理所有的请求的TPS差不多。</p>    <p>后面我们也会看到,实际的确是一个线程处理的,你可以在代码中加入</p>    <h3>3 加上调度器,不起作用?</h3>    <p>如果你对 subscribeOn 和 observeOn 方法有些印象的话,可能会尝试使用调度器去解决:</p>    <pre>  <code class="language-java">public static void testRxJavaWithBlocking(int count) throws Exception {          URL url = new URL("http://127.0.0.1:8999/");          CountDownLatch finishedLatch = new CountDownLatch(1);          long t = System.nanoTime();          Observable.range(0, count).map(i -> {              try {                  HttpURLConnection conn = (HttpURLConnection) url.openConnection();                  conn.setRequestMethod("GET");                  int responseCode = conn.getResponseCode();                  BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));                  String inputLine;                  while ((inputLine = in.readLine()) != null) {                      //response.append(inputLine);                  }                  in.close();                  return responseCode;              } catch (Exception ex) {                  return -1;              }          }).subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()).subscribe(statusCode -> {          }, error -> {          }, () -> {              finishedLatch.countDown();          });          finishedLatch.await();          t = (System.nanoTime() - t) / 1000000; //ms          System.out.println("RxJavaWithBlocking TPS: " + count * 1000 / t);      }</code></pre>    <p>加上 .subscribeOn(Schedulers.io()).observeOn(Schedulers.computation()) 看一下性能:</p>    <p>RxJavaWithBlocking TPS: <strong>30</strong> 。</p>    <p>性能没有改观,是时候了解一下RxJava线程调度的问题了。</p>    <h3>4 RxJava的线程模型</h3>    <p>首先,依照 <a href="/misc/goto?guid=4959675738146996205" rel="nofollow,noindex">Observable Contract</a> ,  onNext 是顺序执行的,不会同时由多个线程并发执行。</p>    <p><img src="https://simg.open-open.com/show/48a9c5ec8ed190300a3a91a6b65920ad.gif"></p>    <p>图片来自 http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html</p>    <p>默认情况下,它是在调用subscribe方法的那个线程中执行的。如第一个例子和第二个例子,Rx的操作和消息接收处理都是在同一个线程中执行的。一旦由阻塞,比如第二个例子,久会导致这个线程被阻塞,吞吐量下降。</p>    <p><img src="https://simg.open-open.com/show/abe6429267941ef0757670ed18175b47.png"></p>    <p>图片来自 https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2</p>    <p>但是 subscribeOn 可以改变Observable的运行线程。</p>    <p><img src="https://simg.open-open.com/show/96539345f59ad777040a0f33ad79a464.png"></p>    <p>图片来自 https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2</p>    <p>上图中可以看到,如果你使用了 subscribeOn 方法,则Rx的运行将会切换到另外的线程上,而不是默认的调用线程。</p>    <p>需要注意的是,如果在Observable链中调用了多个 subscribeOn 方法,无论调用点在哪里,Observable链只会使用第一个 subscribeOn 指定的调度器,正所谓”一见倾情”。</p>    <p>但是 onNext 还是顺序执行的,所以第二个例子的性能依然低下。</p>    <p>observeOn 可以中途改变Observable链的线程。前面说了, subscribeOn 方法改变的源Observable的整个的运行线程,要想中途切换线程,就需要 observeOn 方法。</p>    <p><img src="https://simg.open-open.com/show/954c5d0a87b451d686c259c01759c868.gif"></p>    <p>图片来自 http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html</p>    <p>官方的一个简略晦涩的解释如下:</p>    <p>The SubscribeOn operator changes this behavior by specifying a different Scheduler on which the Observable should operate. The ObserveOn operator specifies a different Scheduler that the Observable will use to send notifications to its observers.</p>    <p>一图胜千言:</p>    <p><img src="https://simg.open-open.com/show/dd64b2fa5477b56e1a578d758e4d8ecf.png"></p>    <p>图片来自 http://reactivex.io</p>    <p>注意箭头的颜色和横轴的颜色,不同的颜色代表不同的线程。</p>    <h3>5 Schedulers</h3>    <p>上面我们了解了RxJava可以使用 subscribeOn 和 observeOn 可以改变和切换线程,以及 onNext 是顺序执行的,不是并发执行,至多也就切换到另外一个线程,如果它中间的操作是阻塞的,久会影响整个Rx的执行。</p>    <p>Rx是通过调度器来选择哪个线程执行的,RxJava内置了几种调度器,分别为不同的case提供线程:</p>    <ul>     <li><strong>io()</strong>  : 这个调度器时用于I/O操作, 它可以增长或缩减来确定线程池的大小它是使用CachedThreadScheduler来实现的。需要注意的是,它的线程池是无限制的,如果你使用了大量的线程的话,可能会导致OutOfMemory等资源用尽的异常。</li>     <li><strong>computation()</strong>  : 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。</li>    </ul>    <p>因为这些方法内部已经调用的调度器,所以你再调用 subscribeOn 是无效的,比如下面的例子总是使用 computation 调度器的线程。</p>    <pre>  <code class="language-java">Observable.just(1,2,3)                  .delay(1, TimeUnit.SECONDS)                  .subscribeOn(Schedulers.newThread())                  .map(i -> {                      System.out.println("map: " + Thread.currentThread().getName());                      return i;                  })                  .subscribe(i -> {});</code></pre>    <ul>     <li><strong>immediate()</strong>  :这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。</li>     <li><strong>newThread()</strong>  :创建一个新的线程只从。</li>     <li><strong>trampoline()</strong>  :为当前线程建立一个队列,将当前任务加入到队列中依次执行。</li>    </ul>    <p>同时, Schedulers 还提供了 from 静态方法,用户可以定制线程池:</p>    <pre>  <code class="language-java">ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());  Schedulers.from(es)</code></pre>    <h3>6 改造,异步执行</h3>    <p>现在,我们已经了解了RxJava的线程运行,以及相关的调度器。可以看到上面的例子还是顺序阻塞执行的,即使是切换到另外的线程上,依然是顺序阻塞执行,显示它的吞吐率非常非常的低。下一步我们就要改造这个例子,让它能异步的执行。</p>    <p>下面是一种改造方案,我先把代码贴出来,再解释:</p>    <pre>  <code class="language-java">public static void testRxJavaWithFlatMap(int count) throws Exception {      ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());      URL url = new URL("http://127.0.0.1:8999/");      CountDownLatch finishedLatch = new CountDownLatch(1);      long t = System.nanoTime();      Observable.range(0, count).subscribeOn(Schedulers.io()).flatMap(i -> {                  //System.out.println("A: " + Thread.currentThread().getName());                  return Observable.just(i).subscribeOn(Schedulers.from(es)).map(v -> {                              //System.out.println("B: " + Thread.currentThread().getName());                              try {                                  HttpURLConnection conn = (HttpURLConnection) url.openConnection();                                  conn.setRequestMethod("GET");                                  int responseCode = conn.getResponseCode();                                  BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));                                  String inputLine;                                  while ((inputLine = in.readLine()) != null) {                                      //response.append(inputLine);                                  }                                  in.close();                                  return responseCode;                              } catch (Exception ex) {                                  return -1;                              }                          }                  );              }      ).observeOn(Schedulers.computation()).subscribe(statusCode -> {          //System.out.println("C: " + Thread.currentThread().getName());      }, error -> {      }, () -> {          finishedLatch.countDown();      });      finishedLatch.await();      t = (System.nanoTime() - t) / 1000000; //ms      System.out.println("RxJavaWithFlatMap TPS: " + count * 1000 / t);      es.shutdownNow();  }</code></pre>    <p>通过 flatmap 可以将源Observable的元素项转成n个Observable,生成的每个Observable可以使用线程池并发的执行,同时flatmap还会将这n个Observable merge成一个Observable。你可以将其中的注释打开,看看线程的执行情况。</p>    <p>性能还不错:</p>    <p>RxJavaWithFlatMap TPS: <strong>3906</strong> 。</p>    <p>FlatMap— transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable</p>    <p><img src="https://simg.open-open.com/show/b40bb376eb8e24fb9b969f4edd053c9c.png"></p>    <p>图片来自 http://reactivex.io</p>    <h3>7 另一种解决方案</h3>    <p>我们已经清楚了要并行执行提高吞吐率的解决办法就是创建多个Observable并且并发执行。基于这种解决方案,我们还可以有其它的解决方案。</p>    <p>上一方案中利用flatmap创建多个Observable,针对我们的例子,我们何不直接创建多个Observable呢?</p>    <pre>  <code class="language-java">public static void testRxJavaWithParallel(int count) throws Exception {      ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactoryBuilder().setNameFormat("SubscribeOn-%d").build());      URL url = new URL("http://127.0.0.1:8999/");      CountDownLatch finishedLatch = new CountDownLatch(count);      long t = System.nanoTime();      for (int k = 0; k < count; k++) {          Observable.just(k).map(i -> {              //System.out.println("A: " + Thread.currentThread().getName());              try {                  HttpURLConnection conn = (HttpURLConnection) url.openConnection();                  conn.setRequestMethod("GET");                  int responseCode = conn.getResponseCode();                  BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));                  String inputLine;                  while ((inputLine = in.readLine()) != null) {                      //response.append(inputLine);                  }                  in.close();                  return responseCode;              } catch (Exception ex) {                  return -1;              }          }).subscribeOn(Schedulers.from(es)).observeOn(Schedulers.computation()).subscribe(statusCode -> {          }, error -> {          }, () -> {              finishedLatch.countDown();          });      }      finishedLatch.await();      t = (System.nanoTime() - t) / 1000000; //ms      System.out.println("RxJavaWithParallel TPS: " + count * 1000 / t);      es.shutdownNow();  }</code></pre>    <p>性能更好一点:</p>    <pre>  <code class="language-java">RxJavaWithParallel2 TPS: 4716。</code></pre>    <p>这个例子没有使用 Schedulers.io() 作为它的调度器,这是因为如果在大并发的情况下,可能会出现创建过多的线程导致资源不错,所以我们限定使用200个线程。</p>    <h3>8 总结</h3>    <ul>     <li><strong>subscribeOn()</strong>  改变的Observable运行(operate)使用的调度器,多次调用无效。</li>     <li><strong>observeOn()</strong>  改变Observable发送notifications的调度器,会影响后续的操作,可以多次调用</li>     <li>默认情况下, 操作链使用的线程是调用 subscribe() 的线程</li>     <li>Schedulers 提供了多个调度器,可以并行运行多个Observable</li>     <li>使用RxJava可以实现异步编程,但是依然要小心线程阻塞。而且由于这种异步的编程,调试代码可能更加的困难</li>    </ul>    <h3>9 参考文档</h3>    <ol>     <li><a href="/misc/goto?guid=4959675738146996205" rel="nofollow,noindex">http://reactivex.io/documentation/contract.html</a></li>     <li><a href="/misc/goto?guid=4959675738246835879" rel="nofollow,noindex">http://reactivex.io/documentation/operators/subscribeon.html</a>   <a href="/misc/goto?guid=4959675738333541493" rel="nofollow,noindex">中文翻译</a></li>     <li><a href="/misc/goto?guid=4959675738407459892" rel="nofollow,noindex">http://reactivex.io/documentation/operators/observeon.html</a>   <a href="/misc/goto?guid=4959675738488914629" rel="nofollow,noindex">中文翻译</a></li>     <li><a href="/misc/goto?guid=4959646508626849535" rel="nofollow,noindex">http://reactivex.io/documentation/scheduler.html</a></li>     <li><a href="/misc/goto?guid=4959675738598390809" rel="nofollow,noindex">http://tomstechnicalblog.blogspot.com/2016/02/rxjava-understanding-observeon-and.html</a></li>     <li><a href="/misc/goto?guid=4959675738678188681" rel="nofollow,noindex">http://tomstechnicalblog.blogspot.com/2015/11/rxjava-achieving-parallelization.html</a></li>     <li><a href="/misc/goto?guid=4959675738753997528" rel="nofollow,noindex">https://medium.com/@diolor/observe-in-the-correct-thread-1939bb9bb9d2</a>   <a href="/misc/goto?guid=4959654701768884017" rel="nofollow,noindex">中文翻译</a></li>     <li><a href="/misc/goto?guid=4959644422599087813" rel="nofollow,noindex">https://github.com/mcxiaoke/RxDocs</a></li>    </ol>    <p> </p>    <p>来自:http://colobu.com/2016/07/25/understanding-rxjava-thread-model/</p>    <p> </p>