Node.js Stream - 基础篇

lebaobei 8年前
   <h2>背景</h2>    <p>在构建较复杂的系统时,通常将其拆解为功能独立的若干部分。这些部分的接口遵循一定的规范,通过某种方式相连,以共同完成较复杂的任务。譬如,shell通过管道<code>|</code>连接各部分,其输入输出的规范是文本流。</p>    <p>在<a href="/misc/goto?guid=4958867323246690255">Node.js</a>中,内置的<a href="/misc/goto?guid=4958963900143935496">Stream</a>模块也实现了类似功能,各部分通过<code>.pipe()</code>连接。</p>    <p>鉴于目前国内系统性介绍Stream的文章较少,而越来越多的开源工具都使用了Stream,本系列文章将从以下几方面来介绍相关内容:</p>    <ol>     <li>流的基本类型,以及<a href="/misc/goto?guid=4958963900143935496">Stream</a>模块的基本使用方法</li>     <li>流式处理与<a href="/misc/goto?guid=4959675728905058726">back pressure</a>的工作原理</li>     <li>如何开发流式程序,包括对<a href="/misc/goto?guid=4958828538346190979">Gulp</a>与<a href="/misc/goto?guid=4958836570912107601">Browserify</a>的剖析,以及一个实战示例。</li>    </ol>    <p>本文为系列文章的第一篇。</p>    <h2>流的四种类型</h2>    <p>Stream提供了以下四种类型的流:</p>    <pre>  <code class="language-javascript">var Stream = require('stream')    var Readable = Stream.Readable  var Writable = Stream.Writable  var Duplex = Stream.Duplex  var Transform = Stream.Transform  </code></pre>    <p>使用<code>Stream</code>可实现数据的流式处理,如:</p>    <pre>  <code class="language-javascript">var fs = require('fs')  // `fs.createReadStream`创建一个`Readable`对象以读取`bigFile`的内容,并输出到标准输出  // 如果使用`fs.readFile`则可能由于文件过大而失败  fs.createReadStream(bigFile).pipe(process.stdout)  </code></pre>    <h2>Readable</h2>    <p>创建可读流。</p>    <p>实例:流式消耗<a href="/misc/goto?guid=4959675729048156036">迭代器</a>中的数据。</p>    <pre>  <code class="language-javascript">'use strict'  const Readable = require('stream').Readable    class ToReadable extends Readable {    constructor(iterable) {      super()      this.iterator = new function *() {        yield * iterable      }    }      // 子类需要实现该方法    // 这是生产数据的逻辑    _read() {      const res = this.iterator.next()      if (res.done) {        // 数据源已枯竭,调用`push(null)`通知流        this.push(null)      } else {        // 通过`push`方法将数据添加到流中        this.push(res.value + '\n')      }    }  }    module.exports = ToReadable  </code></pre>    <p>实际使用时,<code>new ToReadable(iterable)</code>会返回一个可读流,下游可以流式的消耗迭代器中的数据。</p>    <pre>  <code class="language-javascript">const iterable = function *(limit) {    while (limit--) {      yield Math.random()    }  }(1e10)    const readable = new ToReadable(iterable)    // 监听`data`事件,一次获取一个数据  readable.on('data', data => process.stdout.write(data))    // 所有数据均已读完  readable.on('end', () => process.stdout.write('DONE'))  </code></pre>    <p>执行上述代码,将会有100亿个随机数源源不断地写进标准输出流。</p>    <p>创建可读流时,需要继承<code>Readable</code>,并实现<code>_read</code>方法。</p>    <ul>     <li><code>_read</code>方法是从底层系统读取具体数据的逻辑,即生产数据的逻辑。</li>     <li>在<code>_read</code>方法中,通过调用<code>push(data)</code>将数据放入可读流中供下游消耗。</li>     <li>在<code>_read</code>方法中,可以同步调用<code>push(data)</code>,也可以异步调用。</li>     <li>当全部数据都生产出来后,<strong>必须</strong>调用<code>push(null)</code>来结束可读流。</li>     <li>流一旦结束,便不能再调用<code>push(data)</code>添加数据。</li>    </ul>    <p>可以通过监听<code>data</code>事件的方式消耗可读流。</p>    <ul>     <li>在首次监听其<code>data</code>事件后,<code>readable</code>便会持续不断地调用<code>_read()</code>,通过触发<code>data</code>事件将数据输出。</li>     <li>第一次<code>data</code>事件会在下一个tick中触发,所以,可以安全地将数据输出前的逻辑放在事件监听后(同一个tick中)。</li>     <li>当数据全部被消耗时,会触发<code>end</code>事件。</li>    </ul>    <p>上面的例子中,<code>process.stdout</code>代表标准输出流,实际是一个可写流。下小节中介绍可写流的用法。</p>    <h2>Writable</h2>    <p>创建可写流。</p>    <p>前面通过继承的方式去创建一类可读流,这种方法也适用于创建一类可写流,只是需要实现的是<code>_write(data, enc, next)</code>方法,而不是<code>_read()</code>方法。</p>    <p>有些简单的情况下不需要创建一类流,而只是一个流对象,可以用如下方式去做:</p>    <pre>  <code class="language-javascript">const Writable = require('stream').Writable    const writable = Writable()  // 实现`_write`方法  // 这是将数据写入底层的逻辑  writable._write = function (data, enc, next) {    // 将流中的数据写入底层    process.stdout.write(data.toString().toUpperCase())    // 写入完成时,调用`next()`方法通知流传入下一个数据    process.nextTick(next)  }    // 所有数据均已写入底层  writable.on('finish', () => process.stdout.write('DONE'))    // 将一个数据写入流中  writable.write('a' + '\n')  writable.write('b' + '\n')  writable.write('c' + '\n')    // 再无数据写入流时,需要调用`end`方法  writable.end()  </code></pre>    <ul>     <li>上游通过调用<code>writable.write(data)</code>将数据写入可写流中。<code>write()</code>方法会调用<code>_write()</code>将<code>data</code>写入底层。</li>     <li>在<code>_write</code>中,当数据成功写入底层后,<strong>必须</strong>调用<code>next(err)</code>告诉流开始处理下一个数据。</li>     <li><code>next</code>的调用既可以是同步的,也可以是异步的。</li>     <li>上游<strong>必须</strong>调用<code>writable.end(data)</code>来结束可写流,<code>data</code>是可选的。此后,不能再调用<code>write</code>新增数据。</li>     <li>在<code>end</code>方法调用后,当所有底层的写操作均完成时,会触发<code>finish</code>事件。</li>    </ul>    <h2>Duplex</h2>    <p>创建可读可写流。</p>    <p><code>Duplex</code>实际上就是继承了<code>Readable</code>和<code>Writable</code>的一类流。<br> 所以,一个<code>Duplex</code>对象既可当成可读流来使用(需要实现<code>_read</code>方法),也可当成可写流来使用(需要实现<code>_write</code>方法)。</p>    <pre>  <code class="language-javascript">var Duplex = require('stream').Duplex    var duplex = Duplex()    // 可读端底层读取逻辑  duplex._read = function () {    this._readNum = this._readNum || 0    if (this._readNum > 1) {      this.push(null)    } else {      this.push('' + (this._readNum++))    }  }    // 可写端底层写逻辑  duplex._write = function (buf, enc, next) {    // a, b    process.stdout.write('_write ' + buf.toString() + '\n')    next()  }    // 0, 1  duplex.on('data', data => console.log('ondata', data.toString()))    duplex.write('a')  duplex.write('b')    duplex.end()  </code></pre>    <p>上面的代码中实现了<code>_read</code>方法,所以可以监听<code>data</code>事件来消耗<code>Duplex</code>产生的数据。<br> 同时,又实现了<code>_write</code>方法,可作为下游去消耗数据。</p>    <p>因为它既可读又可写,所以称它有两端:可写端和可读端。<br> 可写端的接口与<code>Writable</code>一致,作为下游来使用;可读端的接口与<code>Readable</code>一致,作为上游来使用。</p>    <h2>Transform</h2>    <p>在上面的例子中,可读流中的数据(0, 1)与可写流中的数据('a', 'b')是隔离开的,但在<code>Transform</code>中可写端写入的数据经变换后会自动添加到可读端。<br> <code>Tranform</code>继承自<code>Duplex</code>,并已经实现了<code>_read</code>和<code>_write</code>方法,同时要求用户实现一个<code>_transform</code>方法。</p>    <pre>  <code class="language-javascript">'use strict'    const Transform = require('stream').Transform    class Rotate extends Transform {    constructor(n) {      super()      // 将字母旋转`n`个位置      this.offset = (n || 13) % 26    }      // 将可写端写入的数据变换后添加到可读端    _transform(buf, enc, next) {      var res = buf.toString().split('').map(c => {        var code = c.charCodeAt(0)        if (c >= 'a' && c <= 'z') {          code += this.offset          if (code > 'z'.charCodeAt(0)) {            code -= 26          }        } else if (c >= 'A' && c <= 'Z') {          code += this.offset          if (code > 'Z'.charCodeAt(0)) {            code -= 26          }        }        return String.fromCharCode(code)      }).join('')        // 调用push方法将变换后的数据添加到可读端      this.push(res)      // 调用next方法准备处理下一个      next()    }    }    var transform = new Rotate(3)  transform.on('data', data => process.stdout.write(data))  transform.write('hello, ')  transform.write('world!')  transform.end()    // khoor, zruog!  </code></pre>    <p>objectMode</p>    <p>前面几节的例子中,经常看到调用<code>data.toString()</code>。这个<code>toString()</code>的调用是必需的吗?<br> 本节介绍完如何控制流中的数据类型后,自然就有了答案。</p>    <p>在shell中,用管道(<code>|</code>)连接上下游。上游输出的是文本流(标准输出流),下游输入的也是文本流(标准输入流)。在本文介绍的流中,默认也是如此。</p>    <p>对于可读流来说,<code>push(data)</code>时,<code>data</code>只能是<code>String</code>或<code>Buffer</code>类型,而消耗时<code>data</code>事件输出的数据都是<code>Buffer</code>类型。对于可写流来说,<code>write(data)</code>时,<code>data</code>只能是<code>String</code>或<code>Buffer</code>类型,<code>_write(data)</code>调用时传进来的<code>data</code>都是<code>Buffer</code>类型。</p>    <p>也就是说,流中的数据默认情况下都是<code>Buffer</code>类型。产生的数据一放入流中,便转成<code>Buffer</code>被消耗;写入的数据在传给底层写逻辑时,也被转成<code>Buffer</code>类型。</p>    <p>但每个构造函数都接收一个配置对象,有一个<code>objectMode</code>的选项,一旦设置为<code>true</code>,就能出现“种瓜得瓜,种豆得豆”的效果。</p>    <p><code>Readable</code>未设置<code>objectMode</code>时:</p>    <pre>  <code class="language-javascript">const Readable = require('stream').Readable    const readable = Readable()    readable.push('a')  readable.push('b')  readable.push(null)    readable.on('data', data => console.log(data))  </code></pre>    <p>输出:</p>    <pre>  <code class="language-javascript"><Buffer 61>  <Buffer 62>  </code></pre>    <p><code>Readable</code>设置<code>objectMode</code>后:</p>    <pre>  <code class="language-javascript">const Readable = require('stream').Readable    const readable = Readable({ objectMode: true })    readable.push('a')  readable.push('b')  readable.push({})  readable.push(null)    readable.on('data', data => console.log(data))  </code></pre>    <p>输出:</p>    <pre>  <code class="language-javascript">a  b  {}  </code></pre>    <p>可见,设置<code>objectMode</code>后,<code>push(data)</code>的数据被原样地输出了。此时,可以生产任意类型的数据。</p>    <h2>系列文章</h2>    <ul>     <li>第一部分:<a href="http://www.open-open.com/lib/view/open1469618649894.html">《Node.js Stream - 基础篇》</a>,介绍<a href="/misc/goto?guid=4958963900143935496">Stream</a>接口的基本使用。</li>     <li>第二部分:<a href="http://www.open-open.com/lib/view/open1469619171753.html">《Node.js Stream - 进阶篇》</a>,重点剖析Stream底层如何支持流式数据处理,及其<a href="/misc/goto?guid=4959675728905058726">back pressure</a>机制。</li>     <li>第三部分:<a href="http://www.open-open.com/lib/view/open1469619230397.html">《Node.js Stream - 实战篇》</a>,介绍如何使用Stream进行程序设计。从<a href="/misc/goto?guid=4958836570912107601">Browserify</a>和<a href="/misc/goto?guid=4958828538346190979">Gulp</a>总结出两种设计模式,并基于Stream构建一个为<a href="/misc/goto?guid=4958964328408675079">Git</a>仓库自动生成changelog的应用作为示例。</li>    </ul>    <p> </p>    <p>参考文献</p>    <ul>     <li>GitHub,<a href="/misc/goto?guid=4959615073390951462">substack/browserify-handbook</a></li>     <li>GitHub,<a href="/misc/goto?guid=4959675729265092912">zoubin/streamify-your-node-program</a></li>    </ul>    <p>来自:http://tech.meituan.com/stream-basics.html</p>    <p> </p>