RxJava学习笔记(变换Observables)

xjc1027 7年前
   <h3><strong>转换Observables</strong></h3>    <p>转换Observables,顾名思义就是变换可观测序列,来创建一个能够更好的满足我们需求的序列。</p>    <p><strong>map家族</strong></p>    <p>RxJava提供了几个mapping函数: map() , flatMap() , concatMap() , flatMapIterable() 以及 switchMap() .所有这些函数都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们。</p>    <p><strong>Map</strong></p>    <p>Rxjava的map函数接收一个指定的Func1对象,然后将它应用到每一个由Observable发射的值上。Func1是一个接口,两个泛型,内部只有一个 call 方法,一个泛型是参数类型,一个泛型是返回值类型,在这里就可以理解这个 call 方法就是将可观测序列元素转换的转换方法。so,意思就是我们可以自己定制自己需要的转换方法。</p>    <ul>     <li> <p>比如下面的代码,我们让可观测序列元素的值乘以5发射。</p> <pre>  <code class="language-java">public static void main(String... args) {        Observable.create(new Observable.OnSubscribe<Integer>() {            @Override            public void call(Subscriber<? super Integer> subscriber) {                subscriber.onNext(1);                subscriber.onNext(2);                subscriber.onNext(3);            }        }).map(new Func1<Integer, Integer>() {            @Override            public Integer call(Integer integer) {                return integer * 5;            }        }).subscribe(new Observer<Integer>() {            @Override            public void onCompleted() {                System.out.println("completed");            }              @Override            public void onError(Throwable e) {                System.out.println("something is error");            }              @Override            public void onNext(Integer integer) {                System.out.println("i = " + integer);            }        });    }</code></pre> </li>    </ul>    <p>打印结果:</p>    <p>i = 5</p>    <p>i = 10</p>    <p>i = 15</p>    <p><strong>FlatMap</strong></p>    <p>flatMap同样也是转换元素的,不过和 map 不同的是,flatMap转换的返回的是Observable对象。要对其进行理解做一个对比比较容易理解。已知:两个javabean类,School和Student:</p>    <pre>  <code class="language-java">public class School {      private List<Student> mStudents = new ArrayList<>();        public List<Student> getStudents() {          return mStudents;      }        public void setStudents(List<Student> students) {          mStudents = students;      }  }</code></pre>    <p>就School特殊一点,只写了一个Student的集合,当然,当中还可以包含其他数据,这里只是体现其中有一个数据集合,Student类就是一个普通bean类,包含name,age两个成员变量及其get/set方法。然后假设我们获取到一个School变量,需要从中得到所有的Student的具体数据,用 map 我们可以这样干:</p>    <pre>  <code class="language-java">public static void main(String... args) {      School school = new School();      school.getStudents().add(new Student("张三" , 23));      school.getStudents().add(new Student("李四" , 24));      school.getStudents().add(new Student("王五" , 25));        Observable.just(school)              .map(new Func1<School, List<Student>>() {                  @Override                  public List<Student> call(School school) {                      return school.getStudents();                  }              })              .subscribe(new Observer<List<Student>>() {                  @Override                  public void onCompleted() {                    }                    @Override                  public void onError(Throwable e) {                    }                    @Override                  public void onNext(List<Student> students) {                      for (Student student : students) {                          System.out.println("name = " + student.getName());                      }                  }              });  }</code></pre>    <p>接收一个Student数据集合遍历获取当中的数据,但是如果我们不想在代码中使用for循环遍历,而是希望在 Subscriber 中直接传入单个的 Student 对象呢,用 map 显然是行不通的,因为 map 仅仅是一对一的转化,而现在要求的是一对多的转换,那么如何实现呢,我们用 flatMap :</p>    <pre>  <code class="language-java">public static void main(String... args) {      School school = new School();      school.getStudents().add(new Student("张三" , 23));      school.getStudents().add(new Student("李四" , 24));      school.getStudents().add(new Student("王五" , 25));        Observable.just(school)              .flatMap(new Func1<School, Observable<Student>>() {                  @Override                  public Observable<Student> call(School school) {                      return Observable.from(school.getStudents());                  }              })              .subscribe(new Observer<Student>() {                  @Override                  public void onCompleted() {                    }                    @Override                  public void onError(Throwable e) {                    }                    @Override                  public void onNext(Student student) {                      System.out.println("name = " + student.getName());                  }              });  }</code></pre>    <p>打印的结果都是:</p>    <p>name = 张三</p>    <p>name = 李四</p>    <p>name = 王五</p>    <p>我们有一个数据序列,它发射一个数据序列,这些数据本身自己也可以发射Observable,flatMap就可以合并这些Observable发射的数据,然后将合并后的结果作为最终的Observable。但是它的合并允许交叉,也就意味着 flatMap() 不能够保证在最终生成的Observable中源Observables确切的发射顺序。</p>    <p><strong>ConcatMap</strong></p>    <p>RxJava的 concatMap() 函数解决了 flatMap() 的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们。</p>    <pre>  <code class="language-java">public static void main(String... args) {      School school = new School();      school.setAddress("China");      school.getStudents().add(new Student("张三" , 23));      school.getStudents().add(new Student("李四" , 24));      school.getStudents().add(new Student("王五" , 25));        Observable<School> observable = Observable.just(school);      observable.subscribe(new Action1<School>() {          @Override          public void call(School school) {              System.out.println("school address = " + school.getAddress());          }      });      observable.concatMap(new Func1<School, Observable<Student>>() {          @Override          public Observable<Student> call(School school) {              return Observable.from(school.getStudents());          }      }).subscribe(new Observer<Student>() {          @Override          public void onCompleted() {            }            @Override          public void onError(Throwable e) {            }            @Override          public void onNext(Student student) {              System.out.println("student name = " + student.getName());          }      });  }</code></pre>    <p>这里给School类添加一个属性地址,打印结果:</p>    <p>school address = China</p>    <p>student name = 张三</p>    <p>student name = 李四</p>    <p>student name = 王五</p>    <p><strong>FlatMapIterable</strong></p>    <p>flatMapIterable()可以将数据包装成Iterable,在Iterable中我们可以随意对数据进行加工。</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(1,2,3,4)              .flatMapIterable(new Func1<Integer, Iterable<Integer>>() {                  @Override                  public Iterable<Integer> call(Integer integer) {                      List<Integer> list = new ArrayList();                      list.add(integer);                      list.add(5);                      return list;                  }              }).subscribe(new Action1<Integer>() {          @Override          public void call(Integer integer) {              System.out.println("i = " + integer);          }      });  }</code></pre>    <p>这里在每个数据创建的Iterable中都加入了一个元素"5",打印结果如下:</p>    <p>i = 1</p>    <p>i = 5</p>    <p>i = 2</p>    <p>i = 5</p>    <p>i = 3</p>    <p>i = 5</p>    <p>i = 4</p>    <p>i = 5</p>    <p><strong>SwitchMap</strong></p>    <p>每当源Observable发射一个新的数据项( Observable) 时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。</p>    <p><strong>Scan</strong></p>    <p>RxJava的 scan() 函数可以看做是一个累积函数。 scan() 函数对原始Observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(1,2,3,4,5)              .scan(new Func2<Integer, Integer, Integer>() {                  @Override                  public Integer call(Integer integer, Integer integer2) {                      return integer + integer2;                  }              })              .subscribe(new Observer<Integer>() {                  @Override                  public void onCompleted() {                      System.out.println("rxjava onCompleted");                  }                    @Override                  public void onError(Throwable e) {                      System.out.println("rxjava onError");                  }                    @Override                  public void onNext(Integer integer) {                      System.out.println("result = " + integer);                  }              });  }</code></pre>    <p>这里对每一个数据元素都进行了相加的操作,看结果更明了:</p>    <p>result = 1</p>    <p>result = 3</p>    <p>result = 6</p>    <p>result = 10</p>    <p>result = 15</p>    <p>rxjava onCompleted</p>    <p>还有一个 scan() 函数的变体 scan(R initialValue, Func2<R, ? super T, R> accumulator) ,它可以设置一个初始值作为第一个发射的元素值,</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(1,2,3,4,5)              .scan(10, new Func2<Integer, Integer, Integer>() {                  @Override                  public Integer call(Integer integer, Integer integer2) {                      return integer + integer2;                  }              })              .subscribe(new Observer<Integer>() {                  @Override                  public void onCompleted() {                      System.out.println("rxjava onCompleted");                  }                    @Override                  public void onError(Throwable e) {                      System.out.println("rxjava onError");                  }                    @Override                  public void onNext(Integer integer) {                      System.out.println("result = " + integer);                  }              });  }</code></pre>    <p>这里我们为 scan() 设置了一个初始值10,那么发射的第一个元素就是这个10,之后又与其相加,打印结果如下:</p>    <p>result = 10</p>    <p>result = 11</p>    <p>result = 13</p>    <p>result = 16</p>    <p>result = 20</p>    <p>result = 25</p>    <p>rxjava onCompleted</p>    <p><strong>GroupBy</strong></p>    <p>groupBy()将源Observable变换成一个发射Observables的新的Observable。它们中的每一个新的Observable都发射一组指定的数据。实际使用中,我们需要提供一个生成key的规则(也就是Func1中的call方法),所有key相同的数据会包含在同一个小的Observable中。</p>    <pre>  <code class="language-java">public static void main(String... args) {      Student lisi = new Student("李四", 24);      Student wangwu = new Student("王五", 25);      Student zhangsan = new Student("张三", 23);      Student jianjian = new Student("jianjian", 24);      Student wenwen = new Student("wenwen", 25);        Observable<GroupedObservable<Integer, Student>> GroupedObservable = Observable.just(lisi, wangwu, zhangsan, jianjian, wenwen)              .groupBy(new Func1<Student, Integer>() {                  @Override                  public Integer call(Student student) {                      return student.getAge();                  }              });      Observable.concat(GroupedObservable)              .subscribe(new Action1<Student>() {                  @Override                  public void call(Student student) {                      System.out.println("student = " + student.getName() + ", age = " + student.getAge());                  }              });  }</code></pre>    <p>这里我们首先创建了一个新的Observable: GroupedObservable ,它将会发送一个带有GroupedObservable的序列(也就是指发送的数据项的类型为GroupedObservable)。GroupedObservable是一个特殊的Observable,它基于一个分组的key,在这个例子中,key就是 student 的年龄age,代表的意思就是年龄相同的数据会包含在一起。打印结果:</p>    <p>student = 李四, age = 24</p>    <p>student = jianjian, age = 24</p>    <p>student = 王五, age = 25</p>    <p>student = wenwen, age = 25</p>    <p>student = 张三, age = 23</p>    <p><strong>Buffer</strong></p>    <p>RxJava中的 buffer() 函数将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(1,2,3,4,5,6)              .buffer(2)              .subscribe(new Action1<List<Integer>>() {                  @Override                  public void call(List<Integer> integers) {                      for (Integer i : integers) {                          System.out.println("i = " + i);                      }                      System.out.println("-------------------");                  }              });  }</code></pre>    <p>这里的 buffer(2) 指定了缓冲容量的大小为2,打印结果:</p>    <p>i = 1</p>    <p>i = 2</p>    <p>-------------------</p>    <p>i = 3</p>    <p>i = 4</p>    <p>-------------------</p>    <p>i = 5</p>    <p>i = 6</p>    <p>-------------------</p>    <p>实际上, buffer() 函数还有几种重载方法,其中一个允许你指定一个skip值,此后每 skip 项数据,然后又用count项数据填充缓冲区。</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(1,2,3,4,5,6)              .buffer(2, 3)              .subscribe(new Action1<List<Integer>>() {                  @Override                  public void call(List<Integer> integers) {                      for (Integer i : integers) {                          System.out.println("i = " + i);                      }                      System.out.println("-------------------");                  }              });  }</code></pre>    <p>这里buffer(2, 3)设置了每3个元素中用两个元素填充缓冲区,打印结果:</p>    <p>i = 1</p>    <p>i = 2</p>    <p>-------------------</p>    <p>i = 4</p>    <p>i = 5</p>    <p>-------------------</p>    <p>buffer() 还可以带一个 timespan 的参数,会创建一个每隔timespan时间段就会发射一个列表的Observable。</p>    <p><strong>Window</strong></p>    <p>Rxjava的 window() 和 buffer() 很像,但是它发射的是Observable而不是数据列表,发射的这些Observables都是源Observable数据的一个子集,数量由参数count来定,最后发送一个 onCompleted() 结束。</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(1,2,3,4,5)              .window(3)              .subscribe(new Observer<Observable<Integer>>() {                  @Override                  public void onCompleted() {                      System.out.println("window onCompleted");                  }                    @Override                  public void onError(Throwable e) {                    }                    @Override                  public void onNext(Observable<Integer> integerObservable) {                      integerObservable.subscribe(new Observer<Integer>() {                          @Override                          public void onCompleted() {                              System.out.println("onCompleted");                          }                            @Override                          public void onError(Throwable e) {                            }                            @Override                          public void onNext(Integer integer) {                              System.out.println("i = " + integer);                          }                      });                  }              });  }</code></pre>    <p>这里的 window(3) 设置了子集的大小为3,打印结果:</p>    <p>i = 1</p>    <p>i = 2</p>    <p>i = 3</p>    <p>onCompleted</p>    <p>i = 4</p>    <p>i = 5</p>    <p>onCompleted</p>    <p>window onCompleted</p>    <p>同样的, window() 也有带skip参数的重载方法。</p>    <p><strong>Cast</strong></p>    <p>Rxjava的 cast() 可以将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。第一次看到这个,我还以为可以随便转,就用一个Integer转成String试试咯,好吧原谅我的天真,java.lang.ClassCastException立马打我脸,接着我创建了一个父类一个子类,里面很简单。</p>    <p>父类:</p>    <pre>  <code class="language-java">public class Father {      public void eat(){          System.out.println("father was eat");      }  }</code></pre>    <p>子类:</p>    <pre>  <code class="language-java">public class Son extends Father {      public void eat(){          System.out.println("son was eat");      }  }</code></pre>    <p>接着:</p>    <pre>  <code class="language-java">public static void main(String... args) {      Observable.just(new Son())              .cast(Father.class)              .subscribe(new Action1<Father>() {                  @Override                  public void call(Father f) {                      f.eat();                  }              });  }</code></pre>    <p>注意这里只能向上转型,也就是说子类转成父类,构成这样: Father f = new Son() ,要记住一点,"编译看左边,运行看右边",打印结果:</p>    <p> </p>    <p> </p>    <p>来自:http://www.jianshu.com/p/eccc241928cf</p>    <p> </p>