RxJava 教程第三部分:驯服数据流之 高级错误处理

tgow0735 8年前
   <p>前面已经知道如何使用 Observer 来处理错误情况。在前面一节中我们通过避免 Monad 使用传统的 Java 方式来处理异常。代码中可以出现各种各样的异常情况,并不是每一个异常都需要告诉上层代码的。在传统的 Java 中,你可以捕获一个异常,然后决定是自己处理该异常还是再次抛出去。同样,在 RxJava 中,你也可以根据异常来执行不同的逻辑而无需结束 Observable,也不再强迫 Observer 处理所有情况。</p>    <p>Resume</p>    <p>onErrorReturn</p>    <p>onErrorReturn 操作函数的功能是:当发生错误的时候,发射一个默认值然后结束数据流。所以 Subscriber 看不到异常信息,看到的是正常的数据流结束状态。</p>    <p><img src="https://simg.open-open.com/show/1d93460ec338238133efe3b6b70f1808.png"></p>    <pre>  <code class="language-java">Observable<String> values = Observable.create(o -> {      o.onNext("Rx");      o.onNext("is");      o.onError(new Exception("adjective unknown"));  });     values      .onErrorReturn(e -> "Error: " + e.getMessage())      .subscribe(v -> System.out.println(v));     </code></pre>    <p>结果:</p>    <pre>  <code class="language-java">Rx  is  Error: adjectiveunknown     </code></pre>    <p>onErrorResumeNext</p>    <p>onErrorResumeNext 的功能是:当错误发生的时候,使用另外一个数据流继续发射数据。在返回的 Observable 中是看不到错误信息的。</p>    <pre>  <code class="language-java">public final Observable<T> onErrorResumeNext(      Observable<? extends T> resumeSequence)  public final Observable<T> onErrorResumeNext(      Func1<java.lang.Throwable,? extends Observable<? extends T>> resumeFunction)     </code></pre>    <p><img src="https://simg.open-open.com/show/ff8e6cf80d6260d4d3294ee051d03ed4.png"></p>    <p>第二个重载的函数可以根据错误的信息来返回不同的 Observable。</p>    <pre>  <code class="language-java">Observable<Integer> values = Observable.create(o -> {      o.onNext(1);      o.onNext(2);      o.onError(new Exception("Oops"));  });     values      .onErrorResumeNext(Observable.just(Integer.MAX_VALUE))      .subscribe(new PrintSubscriber("with onError: "));     </code></pre>    <p>结果:</p>    <pre>  <code class="language-java">Observable<Integer> values = Observable.create(o -> {      o.onNext(1);      o.onNext(2);      o.onError(new Exception("Oops"));  });     values      .onErrorResumeNext(Observable.just(Integer.MAX_VALUE))      .subscribe(new PrintSubscriber("with onError: "));     </code></pre>    <p>利用这个操作函数可以实现把一个异常信息包装起来再次抛出。在传统的 Java 中,如果异常发生的时候发现当前无法处理该异常,则会再次抛出该异常。通常情况下都会包装(Wrap)一下异常信息再抛出。在 Rx 中也可以这样用:</p>    <pre>  <code class="language-java">.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))     </code></pre>    <p>onExceptionResumeNext</p>    <p>onExceptionResumeNext 和 onErrorResumeNext 的区别是只捕获 Exception;</p>    <pre>  <code class="language-java">Observable<String> values = Observable.create(o -> {      o.onNext("Rx");      o.onNext("is");      //o.onError(new Throwable() {}); // 这个为 error 不会捕获      o.onError(new Exception()); // 这个为 Exception 会被捕获  });     values      .onExceptionResumeNext(Observable.just("hard"))      .subscribe(v -> System.out.println(v));     </code></pre>    <p>Retry</p>    <p>如果发生了不定性的异常,则通常会重试一下看看是否正常了。 retry 的功能就算重新订阅到事件流,并重头重新开始发射数据。</p>    <pre>  <code class="language-java">public final Observable<T> retry()  public final Observable<T> retry(long count)     </code></pre>    <p><img src="https://simg.open-open.com/show/312bb7633ec42858306e7a0848fb3e8c.png"></p>    <p>没有参数的 retry() 函数会一直重试,直到没有异常发生为止。而带有参数的 retry(n) 函数会重试 N 次, 如果 N 次后还是失败,则不再重试了,数据流发射一个异常信息并结束。</p>    <pre>  <code class="language-java">Randomrandom = new Random();  Observable<Integer> values = Observable.create(o -> {      o.onNext(random.nextInt() % 20);      o.onNext(random.nextInt() % 20);      o.onError(new Exception());  });     values      .retry(1)      .subscribe(v -> System.out.println(v));     </code></pre>    <p>结果:</p>    <pre>  <code class="language-java">0  13  9  15  java.lang.Exception     </code></pre>    <p>上面的示例,发射了两个数字遇到异常信息,然后重试一次,又发射 两个数据遇到异常信息,然后抛出该异常并结束。</p>    <p>请注意:上面的示例中两次发射的数字不一样。说明 retry 并不像 replay 一样会缓存之前的数据。一般情况下,这样的情况都是不合理的。所以一般情况下,只有具有副作用的时候或者 Observable 是 hot 的时候 才应该使用 retry。</p>    <p>retryWhen</p>    <p>retryWhen 更具有控制力。</p>    <pre>  <code class="language-java">public final Observable<T> retryWhen(      Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)     </code></pre>    <p>retryWhen 的参数是一个函数, 该函数的输入参数为一个异常 Observable,返回值为另外一个 Observable。 输入参数中包含了 retryWhen 发生时候遇到的异常信息;返回的 Observable 为一个信号,用来判别何时需要重试的:</p>    <p>– 如果返回的 Observable 发射了一个数据,retryWhen 将会执行重试操作</p>    <p>– 如果返回的 Observable 发射了一个错误信息,retryWhen 将会发射一个错误并不会重试</p>    <p>– 如果返回的 Observable 正常结束了,retryWhen 也正常结束。</p>    <p>参数返回的 Observable 发射的数据类型是无关紧要的。该 Observable 的数据只是用来当做是否重试的信号。数据本身是无用的。</p>    <p>下面一个示例,构造一个等待 100 毫秒再重试的机制:</p>    <pre>  <code class="language-java">Observable<Integer> source = Observable.create(o -> {      o.onNext(1);      o.onNext(2);      o.onError(new Exception("Failed"));  });     source.retryWhen((o) -> o          .take(2)          .delay(100, TimeUnit.MILLISECONDS))      .timeInterval()      .subscribe(          System.out::println,          System.out::println);     </code></pre>    <p>结果:</p>    <pre>  <code class="language-java">TimeInterval [intervalInMilliseconds=21, value=1]  TimeInterval [intervalInMilliseconds=0, value=2]  TimeInterval [intervalInMilliseconds=104, value=1]  TimeInterval [intervalInMilliseconds=0, value=2]  TimeInterval [intervalInMilliseconds=103, value=1]  TimeInterval [intervalInMilliseconds=0, value=2]     </code></pre>    <p>源 Observable 发射两个数字 然后遇到异常;当异常发生的时候,retryWhen 返回的 判断条件 Observable 会获取到这个异常,这里等待 100毫秒然后把这个异常当做数据发射出去告诉 retryWhen 开始重试。take(2) 参数确保判断条件 Observable 只发射两个数据(源 Observable 出错两次)然后结束。所以当源 Observable 出现两次错误以后就不再重试了。</p>    <p>using</p>    <p>using 操作函数是用来管理资源的,如果一个 Observable 需要使用一个资源来发射数据(比如 需要使用一个文件资源,从文件中读取内容),当该 Observable 结束的时候(不管是正常结束还是异常结束)就释放该资源。这样你就不用自己管理资源了, 用 Rx 的方式来管理资源。</p>    <pre>  <code class="language-java">public static final <T,Resource> Observable<T> using(      Func0<Resource> resourceFactory,      Func1<? super Resource,? extends Observable<? extends T>> observableFactory,      Action1<? super Resource> disposeAction)     </code></pre>    <p>using 有三个参数。当 Observable 被订阅的时候,resourceFactory 用来获取到需要的资源;observableFactory 用这个资源来发射数据;当 Observable 完成的时候,disposeAction 来释放资源。</p>    <p>下面的示例中,假设 String 是一个需要管理的资源。</p>    <pre>  <code class="language-java">Observable<Character> values = Observable.using(      () -> {          String resource = "MyResource";          System.out.println("Leased: " + resource);          return resource;      },      (resource) -> {          return Observable.create(o -> {              for (Character c : resource.toCharArray())                  o.onNext(c);              o.onCompleted();          });      },      (resource) -> System.out.println("Disposed: " + resource));     values      .subscribe(          v -> System.out.println(v),          e -> System.out.println(e));     </code></pre>    <p>结果:</p>    <pre>  <code class="language-java">Leased: MyResource  M  y  R  e  s  o  u  r  c  e  Disposed: MyResource     </code></pre>    <p>当订阅到 values 的时候, 调用 resourceFactory 函数返回一个字符串 “MyResource”;observableFactory 使用返回的 “MyResource” 字符串来生成一个 Observable, 该 Observable 发射”MyResource” 字符串中的每个字符;当发生完成的时候, disposeAction 来释放这个字符串资源。</p>    <p>有一点需要注意: 和使用 create 创建 Observable 一样,我们需要自己来结束 Observable 的发射(onCompleted 的调用)。如果你没有结束 Observable,则资源是永远不会释放的。</p>    <p>来自: <a href="/misc/goto?guid=4959671554848267191" rel="nofollow">http://blog.chengyunfeng.com/?p=970</a></p>