RxAndroid 2.0 学习笔记

pmdykoif 7年前
   <p>Rxjava 2.x正式版出来已经快两个月了。在之前的项目中也在使用Rx。但却一直没有时间对整个的知识进行梳理,恰好今天抽出时间,也系统的再学习一遍RxJava/RxAndroid</p>    <h2>RxJava的使用</h2>    <h3>一、观察者/被观察者</h3>    <p>1、前奏:</p>    <p>在观察者之前就要前提下 <strong>backpressure</strong> 这个概念。简单来说, <strong>backpressure</strong> 是在异步场景中,被观察者发送事件速度远快于观察者的处理速度时,告诉被观察者降低发送速度的策略。</p>    <p>2、在2.0中有以下几种观察者</p>    <ul>     <li>Observable/Observer</li>     <li>Flowable/Subscriber</li>     <li>Single/SingleObserver</li>     <li>Completable/CompletableObserver</li>     <li>Maybe/MaybeObserver</li>    </ul>    <p>依次的来看一下:</p>    <p>Observable</p>    <pre>  <code class="language-java">Observable  .just(1, 2, 3)  .subscribe(new Observer < Integer > () {  @Override public void onSubscribe(Disposable d) {}  @Override public void onNext(Integer value) {}  @Override public void onError(Throwable e) {}  @Override public void onComplete() {}  });</code></pre>    <p>这里要提的就是onSubscribe(Disposable d),disposable用于取消订阅。</p>    <p>就用简单的just这个操作符来分析一下。</p>    <pre>  <code class="language-java">@SuppressWarnings("unchecked")  @SchedulerSupport(SchedulerSupport.NONE)   public static < T > Observable < T > just(T item1, T item2, T item3, T item4) {      ObjectHelper.requireNonNull(item1, "The first item is null");      ObjectHelper.requireNonNull(item2, "The second item is null");      ObjectHelper.requireNonNull(item3, "The third item is null");      ObjectHelper.requireNonNull(item4, "The fourth item is null");        return fromArray(item1, item2, item3, item4);  }</code></pre>    <pre>  <code class="language-java">@SchedulerSupport(SchedulerSupport.NONE)   public static < T > Observable < T > fromArray(T...items) {      ObjectHelper.requireNonNull(items, "items is null");      if (items.length == 0) {          return empty();      } else if (items.length == 1) {          return just(items[0]);      }      return RxJavaPlugins.onAssembly(new ObservableFromArray < T > (items));  }</code></pre>    <pre>  <code class="language-java">@Override   public void subscribeActual(Observer < ?super T > s) {      FromArrayDisposable < T > d = new FromArrayDisposable < T > (s, array);      s.onSubscribe(d);      if (d.fusionMode) {          return;      }      d.run();  }    @Override   public void dispose() {      disposed = true;  }    @Override   public boolean isDisposed() {      return disposed;  }    void run() {      T[] a = array;      int n = a.length;      for (int i = 0; i < n && !isDisposed(); i++) {          T value = a[i];          if (value == null) {              actual.onError(new NullPointerException("The " + i + "th element is null"));              return;          }          actual.onNext(value);      }      if (!isDisposed()) {          actual.onComplete();      }  }</code></pre>    <p>just实际调用了 fromArray 方法,中创建了 ObservableFromArray 的实例,在这个实例中实现了 Observable 这个接口,在调用 subscribe 方法进行绑定之后,首先调用了 subscribeActual 方法, onSubscribe 就会回调。</p>    <p>在取消绑定是我们可以将Disposable添加到CompositeDisposable中或者直接调用Disposable的dispose() 方法在流的任意位置取消。</p>    <p>此外, 为了简化代码,我使用了Consumer作为观察者(可以当成1.0时候的Action1 、ActionX) subscribe 的返回值就是一个Disposable ( subscribe 的返回值根据传入的参数不同,也有不同)我把这个对象添加到CompositeDisposable,并在中途取消,但发射器仍然会把所有的数据全都发射完。因为LambdaSubscriber(也就是传入Consumer 所构造的观察者)的 dispose 和 isDispose 略有不同,并不是简简单单的true/false, 说实话,我没看懂Consumer的这两个方法干了什么...........尴尬</p>    <pre>  <code class="language-java">LambdaSubscriber 瞅瞅    @Override  public void dispose() {   cancel();  }    @Override  public boolean isDisposed() {      return get() == SubscriptionHelper.CANCELLED;  }</code></pre>    <p>Flowable</p>    <p>是2.0之后用的最多的观察者了,他与上一个的区别在于支持背压,也就是说,下游会知道上游有多少数据,所以他Subscriber会是这样</p>    <pre>  <code class="language-java">Flowable  .just(1, 2, 3, 4)  .subscribe(new Subscriber < Integer > () {  @Override public void onSubscribe(Subscription s) {    s.request(Long.MAX_VALUE);  }  @Override public void onNext(Integer integer) {}  @Override public void onError(Throwable t) {}  @Override public void onComplete() {}  });</code></pre>    <p>onSubscribe 这个回调传出了一个Subscription, 我们要指定他传出数据的大小, 调用他的 request() 方法。如没有要求可以传入一个Long的最大数值 Long.MAX_VALUE 。</p>    <p>要说明一下,request这个方法若不调用,下游的onNext与OnComplete都不会调用;若你写的数量小于,只会传你的个数,但是不会调用onComplete方法,可以看下 FlowableFromArray 的 slowPath 方法</p>    <pre>  <code class="language-java">@Override void slowPath(long r) {      long e = 0;      T[] arr = array;      int f = arr.length;      int i = index;      Subscriber < ?super T > a = actual;      for (;;) {          while (e != r && i != f) {              if (cancelled) {                  return;              }              T t = arr[i];              if (t == null) {                  a.onError(new NullPointerException("array element is null"));                  return;              } else {                  a.onNext(t);              }              e++;              i++;          }          if (i == f) {              if (!cancelled) {                  a.onComplete();              }              return;          }          r = get();          if (e == r) {              index = i;              r = addAndGet( - e);              if (r == 0L) {                  return;              }              e = 0L;          }      }  }  }</code></pre>    <p>需要if (i == f) f 是这个数据的大小,i是当前发送数据的个数,所以不会调用onComplete</p>    <p>休息一下</p>    <p>这是几种被观察者实现的接口</p>    <ul>     <li>Observable 接口 ObservableSource</li>     <li>Flowable 接口 Publisher</li>     <li>Single 接口 SingleSource</li>     <li>Completable 接口 CompletableSource</li>     <li>Maybe 接口 MaybeSource</li>    </ul>    <p>梳理完了两个被观察者,发现Flowable支持背压,父类是Publisher;Observable不支持背压,父类是ObservableSource,他们的实现方式,与其的操作符,到最后的观察者,都有些不同,他们是完全独立开的。各自之间也互不影响。</p>    <p>Single</p>    <p>单值相应的模式: 就是只有一个值呗?</p>    <p>Completable</p>    <p>表示没有任何值但仅指示完成或异常的延迟计算。</p>    <p>Maybe</p>    <p>maybe 可以当成上面两个的合体吧!</p>    <p>后面的三种,就不细掰了,我就是这么不求甚解。</p>    <h3>二、操作符</h3>    <p>操作符基本就没有改变,但还是会发现,我擦,from没了,可以使用fromIterable</p>    <p>之前的actionx 也替换了Action \ Consumer \ BiConsumer</p>    <p>Func也跟action一样, 名字改变了Function</p>    <p>感觉这样是正经Rx了。</p>    <h3>三、线程切换</h3>    <p>当然现场切换没有发生改变,用法还是一样,但是之前没有看过源码,不知道怎样神奇的把线程切换了,难道是来自东方的神秘力量。趁着还有时间,撸一下代码。</p>    <p>在调用 subscribeOn(Schedulers.io()) 之后,会创建ObservableSubscribeOn</p>    <pre>  <code class="language-java">parent.setDisposable(scheduler.scheduleDirect(new Runnable() {  @Override  public void run() {          source.subscribe(parent);      }  }  ));</code></pre>    <p>在这个过程中,会把source也就是ObservableSource在线程中订阅,同时也把把传入的Observer变成SubscribeOnObserver。若指定的是io线程,可以在 IoScheduler 中看见对线程的管理</p>    <p>在调用 observeOn(AndroidSchedulers.mainThread()) 时,会产生一个ObservableObserveOn,同时还会把Observer变成ObserveOnObserver,可以发现在 HandlerScheduler ,在ui线程调用了ObserveOnObserver的 run 方法</p>    <h3>四、Rxjava的数据传递</h3>    <p>Rxjava是我在工作这几个月最喜欢的框架,没有之一。完全解决了我这个有洁癖的人在打代码时的玻璃心。</p>    <p>虽然重复造轮轮子是没有必要的(我也造不出来),但是为了全面的了解Rxjava,我也准备简单的实现一下,数据在每个操作符之中传输的整个过程。</p>    <p>在实现之前先猜想一下大概的过程吧:</p>    <p>我的需求是在一个static方法中产生一个数值,并且通过一层层的接口传递下去,下面的操作符的人参是上一个的返回值,最后输出,我就模仿着Rx的 Maybe 的名字实现吧。</p>    <ul>     <li>首先我要一直‘点’下去的话Maybe 一定要返回自己</li>     <li>接口要一层层的传进去,这样的话就可以在发射数据时,发原始数据传入这个一堆的接口,然后每个接口计算自己的实现。</li>     <li>最后返回结果</li>    </ul>    <p>之后就是仿造源码完成这段需求了,当然这些方法也都简单写了,就是为了弄清楚思路:</p>    <p>1、创建一个MaybeSource,我们的Maybe 和 各个操作符都会实现它。</p>    <pre>  <code class="language-java">public interface MaybeSource {       void subscribe(MaybeObserver observer);  }</code></pre>    <p>2、创建一个MaybeObserver, 这就是最后绑定的时候的接口</p>    <pre>  <code class="language-java">public interface MaybeObserver {      void onSuccess(int value);  }</code></pre>    <p>3、创建Function, 这个在操作符中用于实现</p>    <pre>  <code class="language-java">public interface Function {      int apply(int t);  }</code></pre>    <p>4、当然少不了Maybe, 这里就实现just和map两个方法吧</p>    <pre>  <code class="language-java">public abstract class Maybe implements MaybeSource {      public static Maybe just(int item) {          return new MaybeJust(item);      }        public final Maybe map(Function mapper) {          return new MaybeMap(this, mapper);      }  }</code></pre>    <p>5、just实际返回的对象是MaybeJust,他的父类是Maybe</p>    <pre>  <code class="language-java">public class MaybeJust extends Maybe {      final int value;        public MaybeJust(int value) {          this.value = value;      }        @Override      public void subscribe(MaybeObserver observer) {          observer.onSuccess(value);      }  }</code></pre>    <p>6、map实际返回的对象是MaybeMap,他的父类是Maybe</p>    <pre>  <code class="language-java">public class MaybeMap extends Maybe {      final Function mapper;      final MaybeSource source;        public MaybeMap(MaybeSource source, Function mapper) {          this.source = source;          this.mapper = mapper;      }        @Override      public void subscribe(MaybeObserver observer) {          source.subscribe(new MapMaybeObserver(observer, mapper));      }        static final class MapMaybeObserver implements MaybeObserver {          final MaybeObserver actual;            final Function mapper;            MapMaybeObserver(MaybeObserver actual, Function mapper) {              this.actual = actual;              this.mapper = mapper;          }            @Override          public void onSuccess(int value) {              this.actual.onSuccess(this.mapper.apply(value));          }      }  }</code></pre>    <p>7、在main中可以这么运行</p>    <pre>  <code class="language-java">Maybe  .just(1)  .map(new Function() {        @Override      public int apply(int t) {          return t + 1;      }  }).map(new Function() {        @Override      public int apply(int t) {          return t * 4;      }  }).subscribe(new MaybeObserver() {        @Override      public void onSuccess(int value) {          System.out.println(value);      }  });</code></pre>    <p>8、运行结果,传入1,先+1, 在 * 4,最后结果应该是8</p>    <p><img src="https://simg.open-open.com/show/2c2ac95945cbcdeb1f123b539209f5c3.png"></p>    <p>得到了期望的结果</p>    <h2>RxJava 2.0 + Retrofit 2 .0</h2>    <p>之前做过一个项目,没用什么架构,也没什么封装。但对我帮助最大的是,之前是不能接受这样的代码的,感觉看上去脑袋都大了。但看习惯了, 也就习惯了。</p>    <p>但平时自己弄个小项目还是使用mvp,自己的洁癖可能更加强烈一点</p>    <p>在Retrofit 中选择了Flowable作为返回值,支持背压,在2.0之后应该最为常用</p>    <pre>  <code class="language-java">@GET("/")    Flowable<ResponseBody> getText();</code></pre>    <p>在RxJava 2.0 中使用CompositeDisposable做解除绑定的操作, Consumer 回调中使用了三个Consumer,作为成功、失败、完成的回调</p>    <pre>  <code class="language-java">public <T> void addSubscription(Flowable flowable,          final RxSubscriber<T> subscriber) {          if (mCompositeDisposable == null) {              mCompositeDisposable = new CompositeDisposable();          }            if (subscriber == null) {              Log.e(TAG, "rx callback is null");                return;          }            Disposable disposable = flowable.subscribeOn(Schedulers.io())                                          .observeOn(AndroidSchedulers.mainThread())                                          .subscribe(new Consumer<T>() {                      @Override                      public void accept(T o) throws Exception {                          subscriber.onNext(o);                      }                  },                  new Consumer<Throwable>() {                      @Override                      public void accept(Throwable throwable)                          throws Exception {                          subscriber.onError(throwable);                      }                  },                  new Action() {                      @Override                      public void run() throws Exception {                          subscriber.onComplete();                      }                  });</code></pre>    <p>此外,之前的项目后台接口也是奇葩,同一个人写的接口,接口的返回格式更是多种多样,还不改,没办法,客户端只能将就着服务端,谁叫我们是新来的呢。遇到这种问题,就不直接转成对象格式了,先转成ResponseBody得到Body,再拿出string来。</p>    <p>okhttp中response的body对象就是这个ResponseBody,他的string() 方法就可以获得整个body,然后再做json解析吧</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/4e93354021392f08052e260ba682b912.png"></p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/a7cd3e25122c5128f6756d927750a52d.png"></p>    <p style="text-align:center"> </p>    <p style="text-align:center"> </p>    <p>来自:http://www.jianshu.com/p/a608e71fc5e1</p>    <p> </p>