通俗的方式理解RxJS

wumingben 3年前
   <h2>通俗的方式理解Rx.js</h2>    <h3>序言</h3>    <p>今早看 民工叔 的文章的时候, 发现对Rxjs所知甚少, 于是去官方看了下教程, 整理出一些东西, 写成此文。</p>    <p>Rxjs据说会在2017年流行起来, 因为其处理异步逻辑,数据流, 事件非常擅长。 但是其学习曲线相比Promise, EventEmitter陡峭了不少。 而且民工叔也说:"由于RxJS的抽象程度很高,所以,可以用很简短代码表达很复杂的含义,这对开发人员的要求也会比较高,需要有比较强的归纳能力。" 本文就Rx.js的几个核心概念做出阐述。 尽可能以通俗易懂的方式解释这些概念。要是本文有误或不完善的地方,欢迎指出。</p>    <h3>Observable到底是什么</h3>    <p>先上代码:</p>    <pre>  <code class="language-javascript">let foo = Rx.Observable.create(observer => {    console.log('Hello');    observer.next(42);  });    foo.subscribe(x => console.log(x));  foo.subscribe(y => console.log(y));</code></pre>    <p>输出</p>    <pre>  <code class="language-javascript">"Hello"  42  "Hello"  42</code></pre>    <p>这里可以把foo想象成一个函数, 这意味着你每次调用foo都会导致传入Rx.Observable.create里的回调函数重新执行一次 , 调用的方式为foo.subscribe(callback), 相当于foo()。 接收函数返回值的方式也从var a = foo()改为通过传入回调函数的方式获取。第三行的observer.next表示返回一个值, 你可以调用多次, 每次调用observer.next后, 会先将next里的值返回给foo.subcribe里的回调函数, 执行完后再返回 。observer.complete, observer.error来控制流程。 具体看代码:</p>    <pre>  <code class="language-javascript">var observable = Rx.Observable.create(observer => {    try {      observer.next(1);      console.log('hello');      observer.next(2);      observer.next(3);      observer.complete();      observer.next(4);    } catch (err) {      observer.error(err);     }  });    let = subcription = observable.subscribe(value => {    console.log(value)  })</code></pre>    <p>运行结果:</p>    <pre>  <code class="language-javascript">1  hello  2  3</code></pre>    <p>如上的第一个回调函数里的结构是推荐的结构。 当observable的执行出现异常的时候,通过observer.error将错误返回, 然而observable.subscribe的回调函数无法接收到.因为observer.complete已经调用, 因此observer.next(4)的返回是无效的. <strong>Observable不是可以返回多个值的Promise</strong> 虽然获得Promise的值的方式也是通过then函数这种类似的方式, 但是new Promise(callback)里的callback回调永远只会执行一次!因为 <strong>Promise的状态是不可逆的</strong> 。</p>    <p>可以使用其他方式创建Observable, 看代码:</p>    <pre>  <code class="language-javascript">var clicks = Rx.Observable.fromEvent(document, 'click');  clicks.subscribe(x => console.log(x));</code></pre>    <p>当用户对document产生一个click行为的时候, 就会打印事件对象到控制台上。</p>    <h3>Observer是什么</h3>    <p>先看代码:</p>    <pre>  <code class="language-javascript">let foo = Rx.Observable.create(observer => {    console.log('Hello');    observer.next(42);  });    let observer = x => console.log(x);  foo.subscribe(observer);</code></pre>    <p>代码中的第二个变量就是observer. 没错, observer就是 <strong>当Observable"返回"值的时候接受那个值的函数!</strong> 第一行中的observer其实就是通过foo.subscribe传入的callback. 只不过稍加封装了。 怎么封装的? 看代码:</p>    <pre>  <code class="language-javascript">let foo = Rx.Observable.create(observer => {    try {      console.log('Hello');      observer.next(42);      observer.complete();      observer.next(10);    } catch(e) { observer.error(e) }      });    let observer = {    next(value) { console.log(value) },    complete() { console.log('completed'),    error(err) { console.error(err) }  }  foo.subscribe(observer);</code></pre>    <p>你看到observer被定义成了一个对象, 其实这才是完整的observer. 传入一个callback到observable.subcribe相当于传入了 { next: callback } 。</p>    <h3>Subcription里的陷阱</h3>    <p>Subscription是什么, 先上代码:</p>    <pre>  <code class="language-javascript">var observable = Rx.Observable.interval(1000);  var subscription = observable.subscribe(x => console.log(x));    setTimeout(() => {    subscription.unsubscribe();  }, 3100)</code></pre>    <p>运行结果:</p>    <p>Rx.Observable.interval可以返回 一个能够发射(返回)0, 1, 2, 3..., n数字的Observable , 返回的时间间隔这里是1000ms。 第二行中的变量就是subscription。 subscription有一个unsubscribe方法, 这个方法可以让 subscription订阅的observable发射的数据被observer忽略掉 .通俗点说就是取消订阅。</p>    <p>unsubscribe存在一个陷阱。 先看代码:</p>    <pre>  <code class="language-javascript">var foo = Rx.Observable.create((observer) => {    var i = 0    setInterval(() => {      observer.next(i++)      console.log('hello')    }, 1000)  })    const subcription = foo.subscribe((i) => console.log(i))  subcription.unsubscribe()</code></pre>    <p>运行结果:</p>    <pre>  <code class="language-javascript">hello  hello  hello  ......  hello</code></pre>    <p>unsubscribe只会让observer忽略掉observable发射的数据,但是setInterval依然会继续执行。 这看起来似乎是一个愚蠢的设计。 所以不建议这样写。</p>    <h3>Subject</h3>    <p>Subject是一种能够发射数据给多个observer的Observable, 这让Subject看起来就好像是EventEmitter。 先上代码:</p>    <pre>  <code class="language-javascript">var subject = new Rx.Subject();    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });  subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(1);  subject.next(2);</code></pre>    <p>运行结果:</p>    <pre>  <code class="language-javascript">observerA: 1  observerB: 1  observerA: 2  observerB: 2</code></pre>    <p>与Observable不同的是, Subject发射数据给多个observer。 其次, 定义subject的时候并没有传入callback, 这是因为subject自带next, complete, error等方法。从而可以发射数据给observer。 这和EventEmitter很类似。observer并不知道他subscribe的是Obervable还是Subject。 对observer来说是透明的。 而且Subject还有各种派生, 比如说:</p>    <p>BehaviorSubject 能够保留最近的数据,使得当有subscribe的时候,立马发射出去。看代码:</p>    <pre>  <code class="language-javascript">var subject = new Rx.BehaviorSubject(0); // 0 is the initial value    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    subject.next(1);  subject.next(2);    subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(3);</code></pre>    <p>运行结果:</p>    <pre>  <code class="language-javascript">observerA: 0  observerA: 1  observerA: 2  observerB: 2  observerA: 3  observerB: 3</code></pre>    <p>ReplaySubject 能够保留最近的一些数据, 使得当有subscribe的时候,将这些数据发射出去。看代码:</p>    <pre>  <code class="language-javascript">var subject = new Rx.ReplaySubject(3);     subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    subject.next(1);  subject.next(2);  subject.next(3);  subject.next(4);    subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(5);</code></pre>    <p>输出结果:</p>    <pre>  <code class="language-javascript">observerA: 1  observerA: 2  observerA: 3  observerA: 4  observerB: 2  observerB: 3  observerB: 4  observerA: 5  observerB: 5</code></pre>    <p>第一行的声明表示ReplaySubject最大能够记录的数据的数量是3。</p>    <p>AsyncSubject 只会发射结束前的一个数据。 看代码:</p>    <pre>  <code class="language-javascript">var subject = new Rx.AsyncSubject();    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });    subject.next(1);  subject.next(2);  subject.next(3);  subject.next(4);    subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    subject.next(5);  subject.complete();</code></pre>    <p>输出结果:</p>    <pre>  <code class="language-javascript">observerA: 5  observerB: 5</code></pre>    <p>既然subject有next, error, complete三种方法, 那subject就可以作为observer! 看代码:</p>    <pre>  <code class="language-javascript">var subject = new Rx.Subject();    subject.subscribe({    next: (v) => console.log('observerA: ' + v)  });  subject.subscribe({    next: (v) => console.log('observerB: ' + v)  });    var observable = Rx.Observable.from([1, 2, 3]);    observable.subscribe(subject);</code></pre>    <p>输出结果:</p>    <pre>  <code class="language-javascript">observerA: 1  observerB: 1  observerA: 2  observerB: 2  observerA: 3  observerB: 3</code></pre>    <p>也就是说, observable.subscribe可以传入一个subject来订阅其消息。 这就好像是Rxjs中的一颗语法糖, Rxjs有专门的实现。</p>    <p>Multicasted Observables 是一种借助Subject来将数据发射给多个observer的Observable。 看代码:</p>    <pre>  <code class="language-javascript">var source = Rx.Observable.from([1, 2, 3]);  var subject = new Rx.Subject();  var multicasted = source.multicast(subject);    multicasted.subscribe({    next: (v) => console.log('observerA: ' + v)  });  multicasted.subscribe({    next: (v) => console.log('observerB: ' + v)  });    multicasted.connect();</code></pre>    <p>Rx.Observable.from能够逐一发射数组中的元素, 在multicasted.connect()调用之前的任何subscribe都不会导致source发射数据。multicasted.connect()相当于之前的observable.subscribe(subject)。因此不能将multicasted.connect()写在subscribe的前面。因为这会导致在执行multicasted.connect()的时候source发射数据, 但是subject又没保存数据, 导致两个subscribe无法接收到任何数据。</p>    <p>最好是第一个subscribe的时候能够得到当前已有的数据, 最后一个unsubscribe的时候就 <strong>停止Observable的执行</strong> , 相当于Observable发射的数据都被忽略。</p>    <p>refCount就是能够返回这样的Observable的方法</p>    <pre>  <code class="language-javascript">var source = Rx.Observable.interval(500);  var subject = new Rx.Subject();  var refCounted = source.multicast(subject).refCount();  var subscription1, subscription2, subscriptionConnect;    console.log('observerA subscribed');  subscription1 = refCounted.subscribe({    next: (v) => console.log('observerA: ' + v)  });    setTimeout(() => {    console.log('observerB subscribed');    subscription2 = refCounted.subscribe({      next: (v) => console.log('observerB: ' + v)    });  }, 600);    setTimeout(() => {    console.log('observerA unsubscribed');    subscription1.unsubscribe();  }, 1200);    setTimeout(() => {    console.log('observerB unsubscribed');    subscription2.unsubscribe();  }, 2000);</code></pre>    <p>输出结果:</p>    <pre>  <code class="language-javascript">observerA subscribed  observerA: 0  observerB subscribed  observerA: 1  observerB: 1  observerA unsubscribed  observerB: 2  observerB unsubscribed</code></pre>    <h3>What's Operators?</h3>    <p>Observable上有很多方法, 比如说map, filter, merge等等。 他们基于调用它们的observable,返回一个 <strong>全新的observable</strong> 。 而且他们都是纯方法。 operators分为两种, instance operators 和 static operators。 instance operators是存在于observable实例上的方法, 也就是实例方法; static operators是存在于Observable这个类型上的方法, 也就是静态方法。Rxjs拥有很多强大的 <a href="/misc/goto?guid=4959738923324909720" rel="nofollow,noindex">operators</a> 。</p>    <p>自己实现一个operators:</p>    <pre>  <code class="language-javascript">function multiplyByTen(input) {    var output = Rx.Observable.create(function subscribe(observer) {      input.subscribe({        next: (v) => observer.next(10 * v),        error: (err) => observer.error(err),        complete: () => observer.complete()      });    });    return output;  }    var input = Rx.Observable.from([1, 2, 3, 4]);  var output = multiplyByTen(input);  output.subscribe(x => console.log(x));</code></pre>    <p>输出结果:</p>    <h3>Rx.js实践import React from 'react';</h3>    <pre>  <code class="language-javascript">import ReactDOM from 'react-dom';  import Rx from 'rx';    class Main extends React.Component {    constructor (props) {      super(props);      this.state = {count: 0};    }      // Click events are now observables! No more proactive approach.    componentDidMount () {      const plusBtn = document.getElementById('plus');      const minusBtn = document.getElementById('minus');        const plus$ = Rx.Observable.fromEvent(plusBtn, 'click').map(e => 1);      const minus$ = Rx.Observable.fromEvent(minusBtn, 'click').map(e => -1);        Rx.Observable.merge(plus$, minus$).scan((acc, n) => acc + n)        .subscribe(value => this.setState({count: value}));    }      render () {      return (          <div>            <button id="plus">+</button>            <button id="minus">-</button>            <div>count: {this.state.count}</div>          </div>      );    }  }    ReactDOM.render(<Main/>, document.getElementById('app'));</code></pre>    <p>merge用于合并两个observable产生一个新的observable。 scan类似于Array中的reduce。 <a href="/misc/goto?guid=4959717420613590994" rel="nofollow,noindex">这个例子</a> 实现了点击plus的时候+1, 点击minus的时候-1。</p>    <h3>Rx.js适用的场景</h3>    <ul>     <li> <p>多个复杂的异步或事件组合在一起。</p> </li>     <li> <p>处理多个数据序列</p> </li>    </ul>    <p>假如没有被复杂的异步,事件, 数据序列困扰, 如果promise已经足够的话, 就没必要适用Rx.js。</p>    <h3>Summary</h3>    <ul>     <li> <p>Observable, Observer, Subscription, Subscrib, Subject概念。</p> </li>     <li> <p>RxJS适用于解决复杂的异步,事件问题。</p> </li>    </ul>    <h3>文章参考</h3>    <ul>     <li> <p><a href="/misc/goto?guid=4959738923449698379" rel="nofollow,noindex">让我们一起来学习 RxJS ---by 饿了么前端</a></p> </li>     <li> <p><a href="/misc/goto?guid=4959717421017000839" rel="nofollow,noindex">21-use-rxjs-for-orchestrating-asynchronous-and-event-based-computations</a></p> </li>     <li> <p><a href="/misc/goto?guid=4959717416649186477" rel="nofollow,noindex">RxJS文档</a></p> </li>     <li> <p><a href="/misc/goto?guid=4959738923611055589" rel="nofollow,noindex">RxJS 入门指引和初步应用 ---by 民工叔</a></p> </li>    </ul>    <p> </p>    <p>来自:https://segmentfault.com/a/1190000008464065</p>    <p> </p>