RxJava学习记录

qiyue 7年前
   <p>RxJava是基于JVM实现的Reactive扩展. 它是一种使用Reactor模式实现的, 异步的, 事件驱动的编程方式. 它支持数据/事件序列, 可以使用操作符将序列结合在一起, 而不用关心底层的细节, 比如线程安全, 同步等问题.</p>    <p>Reactor模式</p>    <p>Reactor模式是事件驱动的, 有一个或多个并发的事件输入源, 有一个Service Handler集中收集输入事件, 并分发到相应的Request Handler中进行处理.</p>    <p> </p>    <pre>  <code class="language-java">+-------+ +-----------------+  |Input|-----+ +---->|Request Handler|  +-------+ | |+-----------------+   | |  +-------+ |+-----------------+|+-----------------+  |Input|-----+-->|Service Handler|---+---->|Request Handler|  +-------+ |+-----------------+|+-----------------+   | |  +-------+ | |+-----------------+  |Input|-----+ +---->|Request Handler|  +-------+ +-----------------+  </code></pre>    <p>角色</p>    <p>RxJava主要有2个角色: Observable和Observer, 他们之间通过订阅建立联系, Observable发射出数据, 期间通过 操作符 进行处理, 最后被 Observer 得到</p>    <ul>     <li>Observable: 产生数据</li>     <li>Observer: 消费数据</li>     <li>操作符: 转换数据</li>    </ul>    <pre>  <code class="language-java">+------------+  | Observable |  +------------+  |  subscribe  |  +------------+  | Observer |  +------------+  </code></pre>    <p>RxJava的执行过程</p>    <ul>     <li>创建一个Observable</li>     <li>使用各种操作符进行变换</li>     <li>创建一个Observer订阅Observable</li>    </ul>    <p>当Observable发射数据时, Observer进行相应的动作</p>    <p>伪代码如下</p>    <pre>  <code class="language-java">Observable   .operator()   .operator()   .operator()   .operator()   .subscribe()  </code></pre>    <h2>Observable和Observer</h2>    <p>Observable</p>    <p>Observable是发射数据的来源, 并实现了以下3种方法:</p>    <ul>     <li>onNext()<br> Observable调用该方法来发射一条数据</li>     <li>onError()<br> 当遇到错误时, Observable会调用该方法. 调用该方法后便不会再调用onNext和onComplete</li>     <li>onComplete()<br> 当Observable最后一次调用onNext完毕后, 会调用该方法.</li>    </ul>    <p>Observable的 onNext 可以被调用0次或多次</p>    <p>最后, 会调用 onComplete 或者 onError , 这两者的调用标志着序列结束.</p>    <p>onNext 通常被叫做 发射 , onComplete 和 onError 通常被叫做 通知</p>    <p>冷Observable, 热Observable, 可连接的Observable</p>    <ul>     <li>热Observable<br> 在创建时就开始发射数据, 如果某个observer在发射中途订阅了这个Observable, 那么他接收到的数据也从中途开始</li>     <li>冷Observable<br> 会等待Observer来订阅它, 然后才开始发射数据</li>     <li>可连接的Observable<br> 当调用 connect 方法后, 才开始发射数据, 无论是否有Observer对他进行订阅</li>    </ul>    <p>Observer</p>    <p>Subscriber是Observer的实现. Subscriber会订阅Observable发出的事件, 它有两种基本操作:</p>    <ul>     <li>subscribe</li>     <li>unsubscribe</li>    </ul>    <h2>Observable约定</h2>    <p>通知</p>    <p>Observable通过以下 <em>通知</em> 与订阅它的Observer进行通信:</p>    <ul>     <li>onNext<br> 将Observable发射的数据传递给Observer</li>     <li>onComplete<br> 表示Observable成功完成了所有数据的发射, 并将不再发射任何数据</li>     <li>onError<br> 表示Observable由于某种错误被终止, 并将不在发射任何数据</li>     <li>onSubscribe (可选)<br> 表示Observable已经准备好接收Observer的请求通知了</li>    </ul>    <p>Observer通过以下通知来与Observable进行通信:</p>    <ul>     <li>subscribe<br> 表示Observer已经准备好从Observable接收通知了</li>     <li>unsubscribe<br> 表示Observer不再希望接收Observable的通知</li>     <li>request (可选)<br> 表示Observer希望不再从Observable接收多余某种特定数量的onNext通知</li>    </ul>    <p>通知的约束</p>    <p>一个Observable可以创建0个或多个onNext通知, 每个通知代表一个单独发射的数据, 最后跟随一个onCompleted或onError通知, 两者之一. 当发出onCompleted或onError通知后, 它将不再发射任何其他通知.</p>    <p>一个Observable可以不发射任何数据. 一个Observable也可以永远不通过onCompleted和onError来终止. 也就是说, Observable可以不发出任何通知, 或只发出onCompleted或onError通知, 或执法处onNext通知.</p>    <p>Observable必须按顺序向Observer发出通知, 而不能并行发出通知. Observable可以在不同的线程中发出通知, 但通知之间必须存在 <em>happens-before</em> 的关系.</p>    <p>Observable的终止</p>    <p>如果一个Observable没有发出onCompleted或onError通知, 那么Observer会认为该Observable仍然是活动的(即使已经不再发射任何数据), 并且该Observer可能会向其发出通知(比如unsubscribe或是request通知). 当一个Observable发出onCompleted或onError通知时, 该Observable可能会释放他的资源, 并终止, 他的Observer则不应该再与其进行通信.</p>    <p>onError通知必须包含错误的原因 (也就是说, 调用带有null值的onError是无效的)</p>    <p>在Observable终止之前, 它必须向订阅它的Observer发出onCompleted或onError之一</p>    <p>subscribe和unsubscribe</p>    <p>Observable在接收到Observer发出的subscribe通知后, 会开始发出自己的通知</p>    <p>当一个Observer向Observable发出unsubscribe通知时, Observable会尝试停止向该Observer发出通知. 但并不保证这种情况.</p>    <p>当Observable向Observer发出onError或onCompleted通知时, 会终止订阅关系. Observer不再需要想Observable发出unsubscribe通知</p>    <p>多个Observer</p>    <p>如果有第二个Observer订阅了Observable, 而这个Observable此时已经向第一个Observer发射了一些数据, 那么该Observable是否会向第二个Observer继续发射数据, 或者是否将完整的数据序列重新发射给第二个Observer, 或者是否会向第二个Observer发射完全不同的数据, 以上这些都取决于该Observable的设置. 并不会保证订阅同一个Observable的两个Observer会接收到相同的数据序列.</p>    <p>背压(backpressure)</p>    <p>背压是可选的; 并不是所有的RX语言都实现了背压, 并且在实现了背压的RX语言中, 也并不是所有的Observable或操作符会推荐背压. (@todo 不再翻译了)</p>    <h2>操作符</h2>    <p>大多数的操作符都会返回Observable对象, 所以可以利用这一点进行操作符的链式调用, 完成一系列的操作</p>    <p>创建操作符</p>    <p>用于创建Observable</p>    <ul>     <li>create<br> 手动创建一个Observable, 手动调用observer的方法</li>     <li>defer<br> 当Observer进行subscribe时, 才创建Observable, 并且是为每个observer都创建一个新的Observable</li>     <li>empty/never/throw<br> 创建有限行为的Observable</li>     <li>from<br> 将其他对象或数据结构转换为Observable</li>     <li>interval<br> 创建一个根据指定时间间隔, 发射整形数字队列的Observable</li>     <li>just<br> 将一个对象或一个对象集合转换为一个Observable, 并将它们发射</li>     <li>range<br> 创建一个发射某个范围内整形数字的Observable</li>     <li>repeat<br> 创建一个重复发射某一个或某个序列数据的Observable</li>     <li>start<br> 创建一个发射某个方法返回值的Observable</li>     <li>timer<br> 创建一个每隔一定时间发射一个数据的Observable</li>    </ul>    <p>变换操作符</p>    <p>用于将Observable发射的数据进行变换</p>    <ul>     <li>buffer<br> 间接性收集Observable发射的数据, 将这些数据放入bundle中, 并发射这个bundle</li>     <li>flatMap<br> 将Observable发射的多个数据变换为多个Observable, 然后将他们扁平化, 并放入一个Observable中</li>     <li>groupBy<br> 将一个Observable拆分为多个Observable的集合, 每次发射其中一组Observable, 通过key来结组</li>     <li>map<br> 通过某个函数将一个Observable发射的数据进行变换</li>     <li>scan<br> 对一个Observable发射的每个数据都按顺序应用某个方法, 并将返回值发射</li>     <li>window<br> 间接地从一个Observable中拆分数据, 放入window中, 并从window中一次发射一个数据</li>    </ul>    <p>过滤操作符</p>    <p>用于从Observable中有选择地发射数据</p>    <ul>     <li>debounce<br> 只发射Observable中指定间隔之后的数据</li>     <li>distinct<br> 忽略Observable中重复的数据</li>     <li>elementAt<br> 只发射Observable中指定位置的数据</li>     <li>filter<br> 只发射Observable中满足条件的数据</li>     <li>first<br> 只发射Observable中第一个数据, 或第一个满足条件的数据</li>     <li>ignoreElements<br> 不发射数据, 而只通知结束(onError或onComplete)</li>     <li>last<br> 只发射Observable中最后一个数据</li>     <li>sample<br> 定期采样Observable的数据, 并发送距离上次采样时间最近发射的那个数据</li>     <li>skip<br> 忽略Observable的前n个数据</li>     <li>skipLast<br> 忽略Observable的后n个数据</li>     <li>take<br> 发射Observable的前n个数据</li>     <li>takeLast<br> 发射Observable的后n个数据</li>    </ul>    <p>组合操作符</p>    <p>用于将多个Observable组合为一个单独的Observable</p>    <ul>     <li>and/then/when<br> 利用Pattern和Plan作为中介, 将多个Observable发射的数据合并到一个Observable</li>     <li>combineLatest<br> 将两个Observable最新发射的数据结合, 通过某个方法进行运算, 并将该方法的结果发射出去</li>     <li>join<br> 将两个Observable发射的, 在同一时间窗口内的数据结合起来</li>     <li>merge<br> 将多个Observable的数据合并为一个Observable</li>     <li>startWith<br> 在发射Observable的数据之前, 先发射指定序列的数据</li>     <li>switch<br> 将发射Observable的多个Observable转换为一个单独的Observable</li>     <li>zip<br> 将多个Observable的发射数据通过一个指定的方法结合在一起, 并将通过该方法结合后的数据发射出去</li>    </ul>    <p>错误处理操作符</p>    <p>从错误通知中恢复</p>    <ul>     <li>catch<br> 从onError中恢复, 并继续执行序列</li>     <li>retry<br> 如果Observable发送了一个onError通知, 则重新subscribe这个Observable</li>    </ul>    <p>Observable工具操作符</p>    <p>工具</p>    <ul>     <li>delay<br> 延时后发射</li>     <li>do<br> 注册一个action来处理Observable的生命周期事件</li>     <li>meterialize/dematerialize<br> 将onNext, onError, onComplete转换为数据序列, 由Observable发出; 或者反之</li>     <li>observeOn<br> 指定observer观察Observable所在的线程</li>     <li>Serialize<br> 强制序列同步执行</li>     <li>subscribe<br> 对Observable进行发射和通知操作</li>     <li>subscribeOn<br> 指定当Observable被订阅时, 所应使用的线程</li>     <li>timeInterval<br> 将打算发射的数据转换为发射数据的时间</li>     <li>timeout<br> 如果某段时间内没有发射任何数据, 则发出onError通知</li>     <li>timestamp<br> 为Observable发射的每个数据都附加一个时间戳</li>     <li>using<br> 创建一个一次性的资源, 该资源和Observable具有相同的生命周期</li>    </ul>    <p>条件和布尔操作符</p>    <p>对Observable和数据进行判断</p>    <ul>     <li>all<br> 判断是否所有发射的数据都符合某个约束</li>     <li>amb<br> 将多个Observable的数据由第一个Observable来发射</li>     <li>contains<br> 判断某个Observable是否发射了指定的数据</li>     <li>defaultIfEmpty<br> 发射Observable的数据, 如果Observable没有数据可以发射时, 发射一个默认数据</li>     <li>sequenceEqual<br> 判断两个Observable发射的数据序列是否相同</li>     <li>skipUntil<br> 放弃一个Observable所发射的数据, 直到另一个Observable开始发射数据</li>     <li>skipWhile<br> 放弃一个Observable所发射的数据, 直到某个指定条件变为false</li>     <li>takeUntil<br> 在一个Observable开始发射或结束发射数据后, 放弃另一个Observable所发射的数据</li>     <li>takeWhile<br> 当某个指定条件变为false时, 放弃一个Observable所发射的数据</li>    </ul>    <p>数学和汇总操作符</p>    <p>操作数据序列</p>    <ul>     <li>average<br> 计算一个Observable发射数据的数量的平均值, 并将该平均值发射</li>     <li>concat<br> 拼接多个Observable所发射的数据, 并将所有数据发射</li>     <li>count<br> 计算Observable所发射数据的数量, 并将该数量发射</li>     <li>max<br> 计算并发射Observable所发射的最大的数据</li>     <li>min<br> 计算并发射Observable所发射的最小的数据</li>     <li>Reduce<br> 对Observable所发射的数据按顺序应用一个方法, 并将方法值发射</li>     <li>sum<br> 计算Observable所发射的数据数量总和, 并将该总和值发送</li>    </ul>    <p>背压操作符</p>    <p>@todo</p>    <p>连接操作符</p>    <p>特殊的Observable, 拥有更多特性</p>    <ul>     <li>connect<br> 指示一个可连接的Observable开始发射数据到它的subscriber</li>     <li>publish<br> 将原始Observable转换为一个可连接的Observable</li>     <li>refCount<br> 使一个可连接的Observable表现的和原始Observable一样</li>     <li>replay<br> 确保所有Observable发射数据序列的顺序是相同的, 即使当subscribe时Observable已经开始发射数据</li>    </ul>    <p>转换操作符</p>    <ul>     <li>to<br> 将一个Observable转换为另一个对象或数据结构</li>    </ul>    <p>异步操作符</p>    <p>用于将同步方法转换为Observable</p>    <ul>     <li>start()</li>     <li>toAsync()<br> 将一个 方法 转换为 Observable 来执行方法并发射返回值</li>     <li>asyncAction()</li>     <li>asyncFunc()</li>     <li>startFuture()<br> 将一个 返回值为Future的方法 转换为一个 Observable , 并发射Future的返回值</li>     <li>deferFuture()<br> 将一个 返回值为Observable的Future 转换为一个 Observable , 当有Subscriber订阅时, 才返回Future的Observable返回值(有点绕)</li>     <li>forEachFuture()<br> 将Subscriber方法转入一个Observable中, 直到complete时才执行</li>     <li>fromAction()<br> 将一个Action转换为一个Observable, 当Subscriber订阅时, 执行该动作并发射结果</li>     <li>fromCallable()<br> 将一个Callable转换为一个 Observable , 当Subscriber订阅时, 执行callable并发射其结果或异常</li>     <li>fromRunnable()<br> 将一个 Runnable 转换为一个 Observable , 当Subscriber订阅时, 执行runnable并发射其结果</li>     <li>runAsync()<br> 返回一个 StoppableObservable , 它可以发射某个Action或Scheduler指定的多个action</li>    </ul>    <p>操作符的选择树</p>    <p>以下可以帮助你选择合适的操作符:</p>    <pre>  <code class="language-java">我想创建一个新的Observable  |_ 它只发射一个数据: `just`  | |_ 该数据是在subscribe时, 由一个方法返回的: `start`  | |_ 该数据是在subscribe时, 由Action, Callable, Runnable或类似的返回的: `from`  | |_ 该数据在某段时间后发射: `timer`  |_ 它是从某个Array, Iterrable或类似的发射数据的: `from`  |_ 它是从一个Future获取到的: `start`  |_ 它是从一个Future获取的数据序列: `from`  |_ 它会重复发射数据序列: `repeat`  |_ 它是从某个自定义逻辑创建的: `create`  |_ 它为每个subscribe的Observer都创建一个新的Observable: `defer`  |_ 它发射一个整形数字序列: `range`  | |_ 该序列会根据某种时间间隔发射: `interval`  | |_ 并且会在某段延时后才开始发射: `timer`  |_ 它不发射任何数据就会结束: `empty`  |_ 它什么都不做: `never`  </code></pre>    <pre>  <code class="language-java">我想通过组合多个Observable来创建一个新的Observable   |_ 它会发射所有的Observable中的数据, 顺序按照数据默认的顺序: `merge`   |_ 它会发射所有的Observable中的数据, 一次只发射一个Observable的数据: `concat`   |_ 它会将多个Observable的数据按顺序组合, 生成一个新的数据来发射   | |_ 并且将每个Observable发射的数据通过某个方法结合, 发射该方法处理后的数据: `zip`   | |_ 并且将每个Observable最新发射的数据结合为一个数据进行发射: `combinLatest`   | |_ 并且将同一window内的数据结合为一个数据进行发射   | |_ 通过Pattern和Plan中介来发射: `and/then/when`   |_ 它是从这些Observable最近发射的数据中发射数据: `switch`  </code></pre>    <pre>  <code class="language-java">我想将Observable的数据进行变换后再发射  |_ 变换的方式是通过某个方法一次发射一个数据: `map`  |_ 变换的方式是发射多个Observable中的所有数据: `flatMap`  | |_ 按照发射时间顺序, 一次发射一个Observable: `concatMap`  |_ based on all of the items that preceded them: `scan`  |_ 变换的方式是为每个数据附加一个时间戳: `timestamp`  |_ 变换的方式是将数据转变为距离上次发射的时间间隔: `timeInterval`  </code></pre>    <pre>  <code class="language-java">我想延长数据发射时间: `delay`  </code></pre>    <pre>  <code class="language-java">我想将数据和通知都转换为该Observable的数据, 并重新进行发射  |_ 通过将他们封装在Notification对象中: `materialize`  |_ 并且我还可以再次解除封装: `dematerialize`  </code></pre>    <pre>  <code class="language-java">我想忽略Observable的所有数据, 只发射complete和error通知: `ignoreElements`  </code></pre>    <pre>  <code class="language-java">我想复制一个Observable, 并在它的数据序列前添加其他的数据序列: `startWith`  |_ 并且仅在该Observable数据序列为空的情况下才添加其他数据序列: `defaultEmpty`  </code></pre>    <pre>  <code class="language-java">我想从一个Observable中收集数据, 并通过一个数据的缓冲重新发射: `buffer`  |_ 该缓冲只包含最后一个数据: `takeLastBuffer`  </code></pre>    <pre>  <code class="language-java">我想将一个Observable拆分为多个Observable: `window`  |_ 并且相似的数据可以在同一个Observable中: `groupBy`  </code></pre>    <pre>  <code class="language-java">我想从一个Observable发射的数据中获取某个特定的数据  |_ 要获取的是在complete前发射的最后一个数据: `last`  |_ 要获取的是一个单独的数据: `single`  |_ 要获取的是第一个发射的数据: `first`  </code></pre>    <pre>  <code class="language-java">我想重新发射一个Observable中的某些数据  |_ 只要满足过滤条件的数据: `filter`  |_ 只要第一个数据: `first`  |_ 只要前n个数据: `take`  |_ 只要最后一个数据: `last`  |_ 只要第n个数据: `elementAt`   |_ 只要前几个数据之后的数据  | |_ 即, 跳过前n个数据: `skip`  | |_ 即, 直到某个数据满足某种特定条件之后所发射的数据: `skipWhile`  | |_ 即, 在开始发射某段时间后的数据: `skip`  | |_ 即, 在另一个Observable开始发射数据之后, 原Observable所发射的数据: `skipUtil`   |_ 只要除最后几个数据之外的数据  | |_ 即, 除最后n个数据之外的数据: `skipLast`  | |_ 即, 在某个数据满足某种特定条件之前所发射的数据: `takeWhile`  | |_ 即, 在complete某段时间之前所发射的数据: `skipLast`  | |_ 即, 在另一个Observable开始发射数据之前, 原Observable所发射的数据: `takeUtil`  |_ 只要间歇采样的数据: `sample`  |_ 只要某段时间内不会再有数据发射的数据: `debounce`  |_ 只要与已发射数据不重复的数据: `distinct`  | |_ if they immediately follow the item they are duplicates of: `distinctUntilChanged`  |_ 只要在Observable开始发射数据后, 我的subscriber进行subscribe以后的数据: `delaySubscription`  </code></pre>    <pre>  <code class="language-java">如果某个Observable是一个Observable集合的第一个, 则重新发射他的数据: `amb`  </code></pre>    <pre>  <code class="language-java">我想对Observable发射的数据序列进行判断  |_ 判断是否所有数据都满足条件, 然后发射一个单独的boolean值: `all`  |_ 判断是否其中某个数据满足条件, 然后发射一个单独的boolena值: `contains`  |_ 判断是否Observable没有发射任何数据, 然后发射一个单独的boolean值: `isEmpty`  |_ 判断是否该Observable的数据序列和另一个Observable的数据序列一样, 然后发射一个单独的boolean值: `sequenceEqual`  |_ 发射所有数据的平均值: `average`  |_ 发射所有数据的总和: `sum`  |_ 发射数据的个数: `count`  |_ 发射数据序列中的最大值: `max`  |_ 发射数据序列中的最小值: `min`  |_ 通过对每个数据应用一个方法, 并发射该方法的结果: `scan`  </code></pre>    <pre>  <code class="language-java">我想将Observable发射的整个数据序列转换为另一种数据结构: `to`  </code></pre>    <pre>  <code class="language-java">我想控制操作符进行操作所在的线程: `subscribeOn`  |_ 想控制通知Observer的线程: `observeOn`  </code></pre>    <pre>  <code class="language-java">我想创建一个在某种事件发生后, 可以激活某个特定的action的Observable: `do`  </code></pre>    <pre>  <code class="language-java">我想创建一个可以通知Observer发生错误的Observable: `throw`  |_ 如果在某段时间内没有发射任何数据, 则通知错误: `timeout`  </code></pre>    <pre>  <code class="language-java">我想创建一个可以从错误中恢复的Observable  |_ 它可以通过转换到一个备份Observable来从超时中恢复: `timeout`  |_ 它可以从上游错误通知中恢复: `catch`  |_ 通过尝试重新subscribe上游的Observable: `retry`  </code></pre>    <pre>  <code class="language-java">我想创建一个与Observable有相同生命周期的对象: `using`  </code></pre>    <pre>  <code class="language-java">我想subscribe一个Observable, 并一直阻塞, 直到该Observable完成时, 接收一个Future: `start`  </code></pre>    <pre>  <code class="language-java">我想创建一个Observable, 在subscribe时并不发射数据, 直到我要求它才发射: `publish`  |_ 它只发射最后一个数据: `publishLast`  |_ 它发射全部数据序列, 无论在序列发射后是否有其他进行subscribe: `replay`  |_ 当所有subscriber都取消subscribe时, 我要放弃发射: `refCount`  |_ 我要要求它开始发射: `connect`  </code></pre>    <h2>Scheduler线程切换</h2>    <p>如果你想在操作符中引入多线程, 你可以使用 <em>Schedulers</em></p>    <p>默认情况下, Observable和操作链会在调用 subscribe 方法的线程中进行操作和发出通知. subscribeOn 操作符可以指定操作Observable的具体线程. observeOn 操作符可以指定Observable向Observer发出通知的线程</p>    <ul>     <li>observeOn      <ul>       <li>指定Observable向Observer发出通知的线程</li>       <li>可以在操作链多次调用</li>       <li>作用范围: 从本次observeOn调用开始, 到下次observeOn操作符结束.</li>      </ul> </li>     <li>subscribeOn      <ul>       <li>指定操作Observable的具体线程</li>       <li>可以在操作链中多次调用</li>       <li>作用范围: 从 创建操作符 或 doOnSubscribe操作符 开始, 到下次subscribeOn操作符结束</li>      </ul> </li>    </ul>    <p>注意: subscribe中的通知回调方法是 observeOn 指定的线程, 而不是subscribeOn指定的线程</p>    <p> </p>    <p>来自:http://blog.lixplor.com/2016/10/16/rxjava/</p>    <p> </p>