RxJava 教程第三部分:驯服数据流之副作用

T-top 8年前
   <p>前面两部分,我们学习到了如何创建 Observable以及如何从 Observable 中获取数据。本部分将介绍一些更高级的用法,以及一些在大型项目中的最佳实践。</p>    <p>Side effects(副作用)</p>    <p>没有副作用的函数通过参数和返回值来程序中的其他函数相互调用。当一个函数中的操作会影响其他函数中的返回结果时,我们称该函数有副作用。写数据到磁盘、记录日志、打印调试信息都是常见的副作用表现。Java 中在一个函数里面修改另外一个函数使用的对象的值是合法的。</p>    <p>副作用有时候很有用也有必要使用。但是有时候也有很多坑。 Rx 开发者应避免非必要的副作用,如果必须使用副作用的时候,则应该写下详细的说明文档。</p>    <p>副作用的问题</p>    <blockquote>     <p>Functional programming 通常避免副作用。带有副作用的函数(尤其是可以修改参数状态的)要求开发者了解跟多实现的细节。增加了函数的复杂度并且导致函数被错误理解和使用,并且难以维护。副作用有故意的和无意的。可以通过封装或者使用不可变对象来避免副作用。有一些明智的封装规则可以显著的提高你 Rx 代码的可维护性。</p>    </blockquote>    <p>我们使用一个带有副作用的示例来演示。 Java 中不可以在 Lambda 或者 匿名函数中引用外层的非 final 变量。 但是 Java 中的 final 变量只是保证了该编译引用的对象地址不变,但是对象本身的状态还是可以改变的。例如,下面是一个用来计数的一个类:</p>    <pre>  <code class="language-java">class Inc {        private int count = 0;        public void inc() {            count++;        }        public int getCount() {            return count;        }    }</code></pre>    <p><br>  </p>    <p>即使是一个 final 的 Inc 变量,还是可以通过调用其函数来修改他的状态。 注意 Java 并没有强制显式使用 final ,如果在你 Lambda 表达式中修改外层变量的引用对象地址(把外层变量重新复制为其他对象),则会出错。</p>    <pre>  <code class="language-java">Observable<String> values = Observable.just("请", "不要", "有", "副作用");        Inc index = new Inc();    Observable<String> indexed =            values.map(w -> {                index.inc();                return w;            });    indexed.subscribe(w -> System.out.println(index.getCount() + ": " + w));</code></pre>    <p>结果:</p>    <pre>  <code>1: 请    2: 不要    3: 有    4: 副作用  </code></pre>    <p>目前还来看不出来问题。但是如果我们在该 Observable 上再次订阅一个 subscriber,则问题就出来了。</p>    <pre>  <code class="language-java">Observable<String> values = Observable.just("请", "不要", "有", "副作用");        Inc index = new Inc();    Observable<String> indexed =            values.map(w -> {                index.inc();                return w;            });    indexed.subscribe(w -> System.out.println("1st observer: " + index.getCount() + ": " + w));    indexed.subscribe(w -> System.out.println("2nd observer: " + index.getCount() + ": " + w));  </code></pre>    <p>结果:</p>    <pre>  <code>1st observer: 1: 请    1st observer: 2: 不要    1st observer: 3: 有    1st observer: 4: 副作用    2nd observer: 5: 请    2nd observer: 6: 不要    2nd observer: 7: 有    2nd observer: 8: 副作用      </code></pre>    <p>第二个 Subscriber 的索引是从 5 开始的。这明显不是我们想要的结果。这里的副作用很容易发现,但是真实应用中的副作用有些很难发现。</p>    <p>在数据流中组织数据</p>    <p>可以通过 scan 函数来计算每个数据的发射顺序:</p>    <pre>  <code class="language-java">class Indexed <T> {        public final int index;        public final T item;        public Indexed(int index, T item) {            this.index = index;            this.item = item;        }    }</code></pre>    <pre>  <code class="language-java">Observable<String> values = Observable.just("No", "side", "effects", "please");        Observable<Indexed<String>> indexed =        values.scan(                new Indexed<String>(0, null),                (prev,v) -> new Indexed<String>(prev.index+1, v))            .skip(1);    indexed.subscribe(w -> System.out.println("1st observer: " + w.index + ": " + w.item));    indexed.subscribe(w -> System.out.println("2nd observer: " + w.index + ": " + w.item));  </code></pre>    <p>结果:</p>    <pre>  <code>1st observer: 1: No    1st observer: 2: side    1st observer: 3: effects    1st observer: 4: please    2nd observer: 1: No    2nd observer: 2: side    2nd observer: 3: effects    2nd observer: 4: please</code></pre>    <p>上面的结果为正确的。 我们把两个 Subscriber 共享的属性给删除了,这样他们就没法相互影响了。</p>    <p>do</p>    <p>像记录日志这样的情况是需要副作用的。subscribe 总是有副作用,否则的话这个函数就没啥用了。虽然可以在 subscriber 中记录日志信息,但是这样做有缺点:<br> 1. 在核心业务代码中混合了不太重要的日志代码<br> 2. 如果想记录数据流中数据的中间状态,比如 执行某个操作之前和之后,则需要一个额外的 Subscriber 来实现。这样可能会导致最终 Subscriber 和 日志 Subscriber 看到的状态是不一样的。</p>    <p>下面的这些函数让我们可以更加简洁的实现需要的功能:</p>    <pre>  <code class="language-java">public final Observable<T> doOnCompleted(Action0 onCompleted)    public final Observable<T> doOnEach(Action1<Notification<? super T>> onNotification)    public final Observable<T> doOnEach(Observer<? super T> observer)    public final Observable<T> doOnError(Action1<java.lang.Throwable> onError)    public final Observable<T> doOnNext(Action1<? super T> onNext)    public final Observable<T> doOnTerminate(Action0 onTerminate)</code></pre>    <p>这些函数在 Observable 每次事件发生的时候执行,并且返回 Observable。 这些函数明确的表明了他们有副作用,使用起来更加不易混淆:</p>    <pre>  <code class="language-java">Observable<String> values = Observable.just("side", "effects");        values        .doOnEach(new PrintSubscriber("Log"))        .map(s -> s.toUpperCase())        .subscribe(new PrintSubscriber("Process"));</code></pre>    <p>结果:</p>    <pre>  <code>Log: side    Process: SIDE    Log: effects    Process: EFFECTS    Log: Completed    Process: Completed      </code></pre>    <p> </p>    <p>这里使用了上一章使用的帮助类 PrintSubscriber 。这些 do 开头的函数并不影响最终的 Subscriber。 例如:</p>    <pre>  <code class="language-java">static Observable<String> service() {        return  Observable.just("First", "Second", "Third")                .doOnEach(new PrintSubscriber("Log"));    }</code></pre>    <p>可以这样使用该函数:</p>    <pre>  <code class="language-java">service()        .map(s -> s.toUpperCase())        .filter(s -> s.length() > 5)        .subscribe(new PrintSubscriber("Process"));      </code></pre>    <p>结果:</p>    <pre>  <code>Log: First    Log: Second    Process: SECOND    Log: Third    Log: Completed    Process: Completed  </code></pre>    <p><br> 即便最终使用的时候过滤了一些数据,但是我们记录了服务器返回的所有结果。</p>    <p>这些函数中 doOnTerminate 在 Observable 结束发射数据之前发生。不管是因为 onCompleted 还是 onError 导致数据流结束。 另外还有一个 finallyDo 函数在 Observable 结束发射之后发生。</p>    <p>doOnSubscribe, doOnUnsubscribe</p>    <pre>  <code class="language-java">public final Observable<T> doOnSubscribe(Action0 subscribe)    public final Observable<T> doOnUnsubscribe(Action0 unsubscribe)      </code></pre>    <p>Subscription 和 unsubscription 并不是 Observable 发射的事件。而是 该 Observable 被 Observer 订阅和取消订阅的事件。</p>    <pre>  <code class="language-java">ReplaySubject<Integer> subject = ReplaySubject.create();    Observable<Integer> values = subject        .doOnSubscribe(() -> System.out.println("New subscription"))        .doOnUnsubscribe(() -> System.out.println("Subscription over"));        Subscription s1 = values.subscribe(new PrintSubscriber("1st"));    subject.onNext(0);    Subscription s2 = values.subscribe(new PrintSubscriber("2st"));    subject.onNext(1);    s1.unsubscribe();    subject.onNext(2);    subject.onNext(3);    subject.onCompleted();</code></pre>    <p>结果:</p>    <pre>  <code class="language-java">New subscription    1st: 0    New subscription    2st: 0    1st: 1    2st: 1    Subscription over    2st: 2    2st: 3    2st: Completed    Subscription over  </code></pre>    <p>使用 AsObservable 函数来封装</p>    <p>Rx 使用面向对象的 Java 语言来实现 functional programming 风格编码。 需要注意 面向对象中的问题。 例如下面一个天真版的返回 observable 的服务:</p>    <pre>  <code class="language-java">public class BrakeableService {        public BehaviorSubject<String> items = BehaviorSubject.create("Greet");        public void play() {            items.onNext("Hello");            items.onNext("and");            items.onNext("goodbye");        }    }  </code></pre>    <p><span style="line-height:1.6">上面的实现中, 调用者可以自己修改 items 引用的对象,也可以修改 Observable 发射的数据。所以需要对调用者隐藏 Subject 接口,只暴露 Observable 接口:</span></p>    <pre>  <code class="language-java">public class BrakeableService {        private final BehaviorSubject<String> items = BehaviorSubject.create("Greet");            public Observable<String> getValues() {            return items;        }            public void play() {            items.onNext("Hello");            items.onNext("and");            items.onNext("goodbye");        }    }</code></pre>    <p>上面的改进版本,看起来我们返回的是一个 Observable,但该返回的对象是不安全的,返回的其实是一个 Subject。</p>    <p>asObservable</p>    <p>由于 Observable 是不可变的,所以 asObservable 函数是为了把一个 Observable 对象包装起来并安全的分享给其他人使用。</p>    <pre>  <code class="language-java">public Observable<String> getValues() {        return items.asObservable();    }</code></pre>    <p>这样的话,我们的 Subject 对象就被合理的保护起来了。这样其他恶意人员也无法修改你的 Observable 返回的数据了,在使用的过程中也可以避免出现错误了。</p>    <p>无法保护可变对象</p>    <p>在 RxJava 中, Rx 传递的是对象引用 而不是 对象的副本。在一个 地方修改了对象,在传递路径的其他地方上也是可见的。例如下面一个可变的对象:</p>    <pre>  <code class="language-java">class Data {        public int id;        public String name;        public Data(int id, String name) {            this.id = id;            this.name = name;        }    }  </code></pre>    <p>使用该对象的一个 Observable 和两个 Subscriber:</p>    <pre>  <code class="language-java">Observable<Data> data = Observable.just(        new Data(1, "Microsoft"),        new Data(2, "Netflix")    );        data.subscribe(d -> d.name = "Garbage");    data.subscribe(d -> System.out.println(d.id + ": " + d.name));      </code></pre>    <p>结果:</p>    <pre>  <code>1: Garbage    2: Garbage  </code></pre>    <p>第一个 Subscriber 先处理每个数据,在第一个 Subscriber 完成后第二个 Subscriber 开始处理数据,由于 Observable 在传递路径中使用的是对象引用,所以 第一个 Subscriber 中对对象做的修改,第二个 Subscriber 也会看到。</p>    <p><br> <br> 来自:http://blog.chengyunfeng.com/?p=968</p>