RxJava2 常用方法介绍

AprilFlanne 2年前
   <p>之前看了 <a href="/misc/goto?guid=4959738217635070674" rel="nofollow,noindex">拉丁吴</a> 关于 RxJava 的一些文章,获益匪浅可惜的是大佬并没有对于各个运算符进行介绍,关于 RxJava2 也只给了一个 GitHub 上的 Demo <a href="/misc/goto?guid=4959738217726985527" rel="nofollow,noindex">RxJava2-Android-Samples</a> ,这个工程里面的注释不多,只能自己一个一个去试了,这种小例子感觉还是直接用 Java 来的方便些,于是自己就写了一个 <a href="/misc/goto?guid=4959738217813346560" rel="nofollow,noindex">RxJava2Deme</a></p>    <p>这篇文章就是对这个例子的一个一个操作进行了简单的说明,仅此而已为了简化下面的代码,先定义了一些复用的函数,有点多,就不贴上来了,大概就是 Observer<Integer> getObserver(int n) 这样的。 <a href="/misc/goto?guid=4959738217899665375" rel="nofollow,noindex">文件地址</a></p>    <h2>操作符</h2>    <h3>concat()</h3>    <p><a href="/misc/goto?guid=4959649645906935883" rel="nofollow,noindex">doc</a></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.concat(Observable.just(1, 2, 3), Observable.just(5, 6, 7))          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:1  0 onNext:2  0 onNext:3  0 onNext:5  0 onNext:6  0 onNext:7  0 onComplete     </code></pre>    <p>很简单的连接操作</p>    <h3>distinct()</h3>    <p><a href="/misc/goto?guid=4959738218017201762" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/548106706d934af050f67aef7d03bc36.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 1, 2, 3)          .distinct()          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onNext:1  0 onNext:2  0 onNext:3  0 onComplete     </code></pre>    <p>分析:就是很简单的去重</p>    <h3>filter()</h3>    <p><a href="/misc/goto?guid=4959657463019796329" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/8201a6c7a884f07566e631433950f203.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 3, 4)          .filter(i -> i > 2)          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:3  0 onNext:4  0 onComplete     </code></pre>    <p>分析:筛选也没啥说的,大家都会了</p>    <h3>buffer()</h3>    <p><a href="/misc/goto?guid=4959738218132423327" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/c2a7eda9f21b13dc1d37193839882b8a.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable<Integer> observable = Observable.just(1, 2 ,3, 4, 5);  observable.buffer(3, 2).subscribe(getListObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0list onSubscribe  0list onNext:[1, 2, 3]  0list onNext:[3, 4, 5]  0list onNext:[5]  0list onComplete     </code></pre>    <p>解析:</p>    <pre>  <code class="language-java">buffer(count, skip)` 从定义就差不多能看出作用了,将 observable 中的数据按 skip(步长)分成最长不超过 count 的 buffer,然后生成一个 observable     </code></pre>    <h3>debounce()</h3>    <p><a href="/misc/goto?guid=4959647703242238889" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/2418495b0a24fef0c7082ba9adcf3a31.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.create((ObservableOnSubscribe<Integer>) e -> {      e.onNext(1);      Thread.sleep(1000);      e.onNext(2);      Thread.sleep(400);      e.onNext(3);      Thread.sleep(1000);      e.onNext(4);      Thread.sleep(400);      e.onNext(5);      Thread.sleep(400);      e.onNext(6);      Thread.sleep(1000);      e.onComplete();  }).subscribeOn(Schedulers.newThread())          .observeOn(Schedulers.newThread())          .debounce(500, TimeUnit.MILLISECONDS)          .subscribe(getObserver(0));  Thread.sleep(10000);     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:1  0 onNext:3  0 onNext:6  0 onComplete     </code></pre>    <p>分析:</p>    <p>这么理解,在一段时间内只能有一次 onNext()</p>    <p>看上面的例子,我们将时间间隔设置为了 500ms</p>    <p>onNext(1) -> sleep(1000) (因为 1000 > 500) 所以 onNext(1) 成功执行 -> onNext(2) -> sleep(400) (因为 400 < 1000 所以 onNext(2) 被除掉了</p>    <p>这样依次下去,需要注意的只要写了 onNext() 时间就会被重置,即使这个 onNext() 被除掉了</p>    <p>在上面这个例子中 4 和 5 隔了 400ms,所以 4 没有触发,但是时间得重新计算</p>    <h3>defer()</h3>    <p><a href="/misc/goto?guid=4959738218238621705" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/c38c7e44f4ff4580ef593f264dcb8d52.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {      @Override      public ObservableSource<Integer> call() throws Exception {          return Observable.just(0, 1, 2, 3);      }  });        observable.subscribe(getObserver(0));  observable.subscribe(getObserver(1));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onNext:1  0 onNext:2  0 onNext:3  0 onComplete  1 onSubscribe  1 onNext:0  1 onNext:1  1 onNext:2  1 onNext:3  1 onComplete     </code></pre>    <p>分析:</p>    <p>就是在每次订阅的时候就会创建一个新的 Observable</p>    <h3>interval()</h3>    <p><a href="/misc/goto?guid=4959738218332401708" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/905d49717fda6b267addd5c6ed16f5c7.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.interval(3, 2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {      @Override      public void accept(Long aLong) throws Exception {          p("accept " + aLong + " at " + System.currentTimeMillis());      }  });     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">accept 0 at 1487498704028  accept 1 at 1487498706028  accept 2 at 1487498708028  accept 3 at 1487498710028  accept 4 at 1487498712028  ...     </code></pre>    <p>分析:</p>    <p>类似于一个定时任务这样吧,也可以像下面这么写:</p>    <pre>  <code class="language-java">Observable.interval(3, 2, TimeUtil.SECONDS);  Observable.just(1, 2, 3).subscribeWith(getDisposableObserver(1));     </code></pre>    <h3>last()</h3>    <p><a href="/misc/goto?guid=4959738218414980893" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/e9ec56e4bb8d1f4cfc072a088c425d52.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 2).last(3).subscribe(getSingleObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onSuccess :2     </code></pre>    <p>分析:</p>    <p>就是取出最后一个值,参数是没有值的时候的默认值,比如这样:</p>    <pre>  <code class="language-java">Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).last(3)          .subscribe(getSingleObserver(0));     </code></pre>    <p>输出就是 3 了</p>    <p>另见 lastOrError() 方法,区别就是 lastOrError() 没有默认值,没值就触发错误</p>    <h3>map()</h3>    <p><a href="/misc/goto?guid=4959631851652662337" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/8333a164ba7b155bc0cb4fd902c4cb00.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 2, 3)          .map(i -> "string" + i)          .subscribe(getStringObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:string0  0 onNext:string1  0 onNext:string2  0 onNext:string3  0 onComplete     </code></pre>    <p>分析:</p>    <p>把一个 Observable 转成另一个 Observable</p>    <p>###</p>    <p><a href="/misc/goto?guid=4959738218523253816" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/81851b6b9c689b74795c7e9dfbf64059.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.merge(Observable.just(0, 1), Observable.just(3, 4))          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onNext:1  0 onNext:3  0 onNext:4  0 onComplete     </code></pre>    <p>分析:</p>    <p>将多个 Observable 合起来,例子中只是两个</p>    <p>参数也支持使用迭代器将更多的组合起来</p>    <p>###</p>    <p><a href="/misc/goto?guid=4959738218621173659" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/f24a081c09ef53c23f4528b5dd847f97.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(1, 2, 3)          .reduce((i1, i2) -> i1 + i2)          .subscribe(getMaybeObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onSuccess 6     </code></pre>    <p>分析:</p>    <p>就是依次用一个方法处理每个值,可以有一个 seed 作为初始值</p>    <p>不指定 seed 则第一次传入的就是第一第二个</p>    <p>###</p>    <p><a href="/misc/goto?guid=4959738218702669093" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/d045d1b690551e1546992b94134d450b.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">PublishSubject<Integer> publishSubject = PublishSubject.create();  Observable<Integer> observable = publishSubject.replay().autoConnect();     observable.subscribe(getObserver(0));  publishSubject.onNext(0);  observable.subscribe(getObserver(1));  publishSubject.onNext(1);  publishSubject.onComplete();     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  1 onSubscribe  1 onNext:0  0 onNext:1  1 onNext:1  0 onComplete  1 onComplete     </code></pre>    <p>分析:</p>    <p>从结果可以很明显的看出来,使用了 replay() 后, subscribe(getObserver(1)) 之前的数据也被传入了</p>    <p>相对于 cache() 来说, replay() 提供了更多可控制的选项,在实际使用中可以通过指定 bufferSize 来防止内存占用过大等</p>    <p>还有一个 ReplaySubject 功能和这个应该是差不多的,下面就不单独说了,代码在 GitHub 上都有。</p>    <h3>merge()</h3>    <p><a href="/misc/goto?guid=4959657462922360507" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/58e16ac9c4a7a59b9d1de096babe8259.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 2, 3)          .scan((i1, i2) -> i1 + i2)          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onNext:1  0 onNext:3  0 onNext:6  0 onComplete     </code></pre>    <p>分析:</p>    <p>scan() 和上面提到的 reduce() 差不多</p>    <p>区别在于 reduce() 只输出最终结果,而 scan() 会将过程中的每一个结果输出</p>    <h3>skip()</h3>    <p><a href="/misc/goto?guid=4959738218827822572" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/8fe614af38414989058713ac8b56d43f.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 2, 3)          .skip(2)          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:2  0 onNext:3  0 onComplete     </code></pre>    <p>分析:</p>    <p>看名字也知道是干嘛的了,跳过一些数据,例子中跳过的是数据量,也可以跳过时间 skip(long time, TimeUnit unit)</p>    <h3>take()</h3>    <p><a href="/misc/goto?guid=4959646509260271122" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/deb2310825547b2fb88492af66080aaf.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.just(0, 1, 2, 3)          .skip(2)          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onNext:1  0 onComplete     </code></pre>    <p>分析:</p>    <p>从数据中取前几个出来</p>    <p>和 skip() 类似,参数可以指定为时间,那就是取出这段时间里的数据了</p>    <h3>sample() throttleFirst() throttleLast()</h3>    <p>这几个是相关的,一起说</p>    <p><a href="/misc/goto?guid=4959738218959715618" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/36084ef074056869b7efb47d8834d5b6.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.create((ObservableOnSubscribe<Integer>) e -> {      e.onNext(0);      Thread.sleep(200);      e.onNext(1);      Thread.sleep(600);      e.onNext(2);      Thread.sleep(300);      e.onNext(3);      Thread.sleep(1100);      e.onNext(4);      Thread.sleep(3000);      e.onComplete();  }).sample(1, TimeUnit.SECONDS) // throttleFirst 和 throttleLast 就将这里的 sample 改掉就行          .subscribe(getObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">sample:  0 onSubscribe  0 onNext:2  0 onNext:3  0 onNext:4  0 onComplete     throttleFirst:  0 onSubscribe  0 onNext:0  0 onNext:3  0 onNext:4  0 onComplete     throttleLast:  0 onSubscribe  0 onNext:2  0 onNext:3  0 onComplete     </code></pre>    <p>分析:</p>    <p>先从 sample() 开始吧,先看下官网上的定义</p>    <p>emit the most recent items emitted by an Observable within periodic time intervals</p>    <p>发出最接近周期点的事件</p>    <p>在例子中,我们使用了 1000 作为 时间间隔,随手画了张图,将就着看下</p>    <p><img src="https://simg.open-open.com/show/3d208088b508bec2605aad0db4d9c1b6.png"></p>    <p>在 A 点之前没有点,B 点之前最近的一个点是 2,C 点之前的是 3,所以输出 2 和 3。</p>    <p>对于 throttleFirst() ,它和 sample() 是相反的,只是要注意一下,图中的 0 点是被当做在 A 点之后处理的</p>    <p>而 throttleLast() 就是 sample() 换了个名字而已</p>    <h3>timer()</h3>    <p><a href="/misc/goto?guid=4959738219062530802" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/414a765397e0a3814b31389b4f6bc877.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.timer(1, TimeUnit.SECONDS)          .subscribe(getLongObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onComplete     </code></pre>    <p>分析:</p>    <p>定时任务了</p>    <h3>window()</h3>    <p><a href="/misc/goto?guid=4959738219181874493" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/83efb47adce43343fade68b3fb8d98fb.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.interval(1, TimeUnit.SECONDS)      .take(9)      .window(3, TimeUnit.SECONDS)      .subscribe(new Consumer<Observable<Long>>() {          int n = 0 ;          @Override          public void accept(Observable<Long> longObservable) throws Exception {              longObservable.subscribe(getLongObserver(n++));          }      });     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  0 onNext:1  0 onComplete  1 onSubscribe  1 onNext:2  1 onNext:3  1 onNext:4  1 onComplete  2 onSubscribe  2 onNext:5  2 onComplete     </code></pre>    <p>分析:</p>    <p>按照时间划分窗口,将数据发送给不同的 observable 。</p>    <h3>zip()</h3>    <p><a href="/misc/goto?guid=4959670499883097657" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/611485bcf4f58398d36a6f3470491331.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Observable.zip((ObservableSource<Integer>) observer -> {      observer.onNext(1);      observer.onNext(2);      observer.onNext(3);     }, (ObservableSource<String>) observer -> {      observer.onNext("str");      observer.onNext("text");      observer.onComplete();  }, (integer, s) -> integer + s).subscribe(getStringObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:1str  0 onNext:2text  0 onComplete     </code></pre>    <p>分析:</p>    <p>功能是可以将不同的观察者进行组合,并且 onNext() 是一对一对的,例子里的 3 就没有对象了(3 真是惨 =.=)</p>    <p>然后只要有一个执行了 onComplete 就会结束掉</p>    <h2>类</h2>    <h3>PublishSubject</h3>    <p><a href="/misc/goto?guid=4959738219307915783" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/f3eae6d22a4f91a43bca9a232df35d88.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">publishSubject.subscribe(getObserver(0));  publishSubject.onNext(0);  publishSubject.subscribe(getObserver(1));  publishSubject.onNext(1);  publishSubject.onComplete();     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onNext:0  1 onSubscribe  0 onNext:1  1 onNext:1  0 onComplete  1 onComplete     </code></pre>    <p>分析:</p>    <p>onNext() 会通知每个观察者,仅此而已</p>    <h3>AsyncSubject</h3>    <p><a href="/misc/goto?guid=4959738219405596915" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/e034246ad488aa7f1309ee9870f2243c.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">AsyncSubject<Integer> subscriber = AsyncSubject.create();  subscriber.subscribe(getObserver(1));  subscriber.onNext(1);     subscriber.subscribe(getObserver(2));  subscriber.onNext(2);  subscriber.onComplete();        </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">Observer1onSubscribe  Observer2onSubscribe  Observer1onNext: 2  Observer1onComplete  Observer2onNext: 2  Observer2onCompelte     </code></pre>    <p>分析:</p>    <p>查看文档,关于 AsyncObject 的介绍,在 调用 onComplete() 之前,除了 subscribe() 其它的操作都会被缓存,在调用 onComplete() 之后只有最后一个 onNext() 会生效</p>    <h3>BehaviorObject</h3>    <p><a href="/misc/goto?guid=4959738219492002090" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/11b540bfdda812ae611cac4bbcdbee6c.png"></p>    <pre>  <code class="language-java">BehaviorSubject<Integer> subscriber = BehaviorSubject.create();     subscriber.subscribe(getObserver(1));     subscriber.onNext(0);  subscriber.onNext(1);        subscriber.subscribe(getObserver(2));  subscriber.onNext(4);     subscriber.onComplete();     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">1 onSubscribe  1 onNext:0  1 onNext:1  2 onSubscribe  2 onNext:1  1 onNext:4  2 onNext:4  1 onComplete  2 onComplete     </code></pre>    <p>分析:</p>    <p>BehaviorSubject 的最后一次 onNext() 操作会被缓存,然后在 subscribe() 后立刻推给新注册的 Observer</p>    <p>对于上面的例子 onNext(1) 这个操作会被缓存,在 subscribe(observer2) 之后会立刻传入 onNext(1) 从而执行</p>    <h3>Completable</h3>    <p><a href="/misc/goto?guid=4959721339967119603" rel="nofollow,noindex">doc</a></p>    <p>先看下文档里的说明:</p>    <p>Represents a deferred computation without any value but only indication for completion or exception. The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?</p>    <p>也就是说 Completable 是没有 onNext 的,要么成功要么出错,不关心过程,在 subscribe 后的某个时间点返回结果</p>    <p>例子:</p>    <pre>  <code class="language-java">Completablecompletable = Completable.timer(1000, TimeUnit.MILLISECONDS);  completable.subscribe(new CompletableObserver() {      @Override      public void onSubscribe(Disposable d) {          p("onSubscribe " + System.currentTimeMillis());      }         @Override      public void onComplete() {          p("onComplete " + System.currentTimeMillis());      }         @Override      public void onError(Throwable e) {          p("onError" + e);      }  });     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">onSubscribe 1487237357951  onComplete 1487237358952     </code></pre>    <h3>Flowable</h3>    <p><a href="/misc/goto?guid=4959738219613950450" rel="nofollow,noindex">doc</a></p>    <p><img src="https://simg.open-open.com/show/e8ada8a7aea89c013f5bbb6bfcfa3770.png"></p>    <p>例子:</p>    <pre>  <code class="language-java">Flowable.just(0, 1, 2, 3)          .reduce(50, (a, b) -> a + b)          .subscribe(getSingleObserver(0));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0 onSubscribe  0 onSuccess :56     </code></pre>    <p>分析:</p>    <p>Flowable 在 RxJava2 中是用来解决背压问题的,至于具体的可以看前面推荐的文章,这里就不展开了。</p>    <h3>CompositeDisposable</h3>    <p>例子:</p>    <pre>  <code class="language-java">new CompositeDisposable().addAll(Observable.just(0, 1, 2, 3)          .subscribeOn(Schedulers.io())          .observeOn(Schedulers.computation())          .subscribeWith(getDisposableObserver(0)),     Observable.just(6, 7, 8, 9)          .subscribeOn(Schedulers.io())          .observeOn(Schedulers.computation())          .subscribeWith(getDisposableObserver(1)));     </code></pre>    <p>输出:</p>    <pre>  <code class="language-java">0  6  1  7  java.lang.InterruptedException: sleepinterrupted      atjava....  java.lang.InterruptedException: sleepinterrupted      atjava...     </code></pre>    <p>分析:</p>    <p>就是一个 Disposable 的集合</p>    <p>就这么多了,应该常用的操作符都差不多有了。</p>    <p>随便写的,如果有什么地方没说清楚欢迎留言,我会尽快回复。</p>    <p>顺便求波赞和星,谢谢。</p>    <p> </p>    <p>来自:https://blog.xujifa.cn/index.php/2017/02/20/rxjava2_introduction/</p>    <p> </p>