Async 模块实现入门浅析

JavaScript   2017-06-13 09:20:25 发布
您的评价:
     
0.0
收藏     0收藏
文件夹
标签
(多个标签用逗号分隔)

在早期的异步开发中, Async 模块是比较有名的异步解决方案。本文会带大家简单看一下 async 模块的几个方法实现思路,具体分别是:

  • async.waterfall
  • async.each
  • async.eachLimit
  • async.whilst

PS:本文有相应视频—— 优酷地址 (声音据说有点小)。

 

waterfall

我们先来看下一个 async.waterfall 的简单使用场景登录:

async.waterfall([
  function (next) {
    user.get(name, next);
  },
  function (user, next) {
    if (!user) {
      return next(new Error('user not found'));
    }
    if (passwd != user.passwd) {
      return next(new Error('wrong password'));
    }
    sign.up(name, next);
  },
  function (reward, next) {
    resource.add(name, reward, next);
  },
], function (err, ...res) {
  if (err) {
    console.error(err.stack);
  }
  console.log(res);
});

async 的思路是将原本容易出现 callback hell 的嵌套,通过数组并列的方式抹平,并且节省每次判断 error 的代码,按照 error first 的约定在内部每次都帮助用户检查异步是否出错。了解了这种想法之后我们可以写个很简单的 waterfall 出来。

// 确认整体结构
exports.waterfall = function (task = [], callback = noop) { // 默认值
  // 拿到 callback 数组
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  // TODO
};

function noop() {}

拿到了 callback 数组之后,我们需要想办法,让这个数组串联的执行起来,即从数组的第一个 callback 开始,一个执行完就自动调用下一个 callback:

exports.waterfall = function (task = [], callback = noop) {
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  (function next() {
    // 取出数组中的第一个 callback 执行
    let fn = task.shift();
    fn.apply(null, [next]); // ①用户自行调用这个 next
  })();
};

关于 ① 处流程自行走下去结合这里看看:

async.waterfall([
    function(callback) {
        callback(null, 'one', 'two'); // ①这里 callback 就调用了 next
    },
// ...

理解了这个剩下的就比较好办了:

exports.waterfall = function (task = [], callback = noop) {
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  (function next(...args) { // args 获取上一个 callback 传的结果
    if (args[0]) { // error first 约定
      // 发现第一个参数存在 error 直接返回结束整个流程
      return callback(args[0]);
    }

    if (task.length) { // 判断 callback 是不是执行完了
      let fn = task.shift();
      // ② 将 args 平摊到下一个 cb 的开头,next 位于最后
      fn.apply(null, [...args.slice(1), next]);
    } else {
      // 如果执行完了就结束流程
      callback.apply(null, args);
    }
  })();
};

关于 ② 可以结合例子来看:

async.waterfall([
    function(callback) {
        callback(null, 'one', 'two');
    },
    function(arg1, arg2, callback) { // ②
        // arg1 now equals 'one' and arg2 now equals 'two'
        callback(null, 'three');
    },
    function(arg1, callback) {
        // arg1 now equals 'three'
        callback(null, 'done');
    }
], function (err, result) {
    // result now equals 'done'
});

那么到这里一个简单的 waterfall 的实现思路已经完全展现出来了。最后说一下可能出现的问题,比如用户多调了一次 cb (这种情况确实可能出现)所以我们需要做一些简单的预防:

exports.waterfall = function (task = [], callback = noop) {
  if (!Array.isArray(task)) {
    return callback(new Error('task should be an array!'));
  }

  (function next(...args) {
    if (args[0]) {
      return callback(args[0]);
    }

    if (task.length) {
      let fn = task.shift();
      fn.apply(null, [...args.slice(1), onlyOnce(next)]); // 保证只被调用一次
    } else {
      callback.apply(null, args);
    }
  })();
};

function onlyOnce(cb) {
  let flag = false;
  return (...args) => {
    if (flag) {
      return cb(new Error('cb already called'));
    }
    cb.apply(null, args);
    flag = true;
  };
}

 

each

async.each 有点像是异步的 arr.map 操作。我们可以来看一个使用的例子:

'use strict';

const fs = require('fs');
const async = require('async');
const request = require('request');

const sites = ['www.baidu.com','github.com','www.npmjs.com', 'www.zhihu.com'];

// 下站站点图标
function downloadFavicon(site, next) {
  let addr = `https://${site}/favicon.ico`;
  let file = `./${site}.ico`;
  request.get(addr)
    .pipe(fs.createWriteStream(file))
    .on('error', (err) => {
      console.error(`${url} Download failed: ${err.message}`);
      next();
    })
    .on('finish', next);
}

// 下载每一个站点的图标
async.each(sites, downloadFavicon, function (err) {
  if (err) {
    console.log('err', err);
  }
  console.log('over');
});

那么按照例子,我们可以先来搭一个 async.each 的架子:

exports.each = function (items = [], iterator, callback = noop) {
  // 判断数组类型
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  // 判断迭代器
  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  // TODO
};

然后我们需要做的事情很简单,只需要将数组的每个一个元素作为参数拿来调用 iterator 函数即可:

exports.each = function (items = [], iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  function next(err) {
    // TODO
  }

  items.map((item) => iterator(item, next));
};

然后我们要想办法在所有的异步操作都执行完之后调用 callback 出去

exports.each = function (items = [], iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  let completed = 0; // 计数

  function next(err) {
    if (err) { // error first
      return callback(err); // 结束流程
    }

    if (++completed >= items.length) { // 计数判断
      callback(); // 流程结束
    }
  }

  items.map((item) => iterator(item, next));
};

async.each 的实现思路确实如上述例子一样简单,当然还可能会有一些复杂的情况需要判断,更深入的内容各位可以移步 Async 官方的 each 实现 中查看更多。

eachLimit

使用 each 执行操作的时候,在量小的情况下是没有问题的,但是当异步操作的量特别大的时候,就需要对其进行一定的控制。比如写一个爬虫去某种网站上爬图片,那么将图片下载到本地的过程中存在一个文件描述符的限制,即同时打开的文件(保存图片时需要openFile)数目超过一定程度就会收到操作系统的报错。

以 each 中出现过的例子来说 eachLimit 的功能:

const sites = [ ... ]; // 可能非常多站点

// 对 each 操作做 limit,同时最多下载 100 个站点图标
async.eachLimit(sites, 100, downloadFavicon, function (err) {
  if (err) {
    console.log('err', err);
  }
  console.log('over');
});

了解了上述需求之后,我们来搭一个 eachLimit 的架子:

exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  // 同时执行的异步操作数目 (不能超过 limit)
  let running = 0;

  // TODO
};

我首先需要一个循环来将异步操作加入到执行队列,但是只能加到 limit 的数目为止:

let running = 0;

(function next() {
  while (running < limit) { // 一口气加到队列满为止
    let item = items.shift();
    running++;

    iterator(item, (err) => {
      running--;
      next(); // 每执行完一个异步操作就触发一下加入队列的行为
    });
  }
})();

然后加上结束的操作:

let done = false;
let running = 0;

(function next() {
  if (done && running <= 0) {
    return callback();
  }

  while (running < limit) {
    let item = items.shift();
    running++;
    if (item === undefined) {
      done = true;
      if (running <= 0) {
        callback();
      }
      return;
    }

    iterator(item, (err) => {
      running--;
      next();
    });
  }
})();

最后补上错误处理的完整版:

exports.eachLimit = function (items = [], limit = 1, iterator, callback = noop) {
  if (!Array.isArray(items)) {
    return callback(new Error('items should be an array!'));
  }

  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  let done = false;
  let running = 0;
  let errored = false;

  (function next() {
    if (done && running <= 0) {
      return callback();
    }

    while (running < limit && !errored) {
      let item = items.shift();
      running++;
      if (item === undefined) {
        done = true;
        if (running <= 0) {
          callback();
        }
        return;
      }

      iterator(item, (err) => {
        running--;
        if (err) {
          errored = true;
          return callback(err);
        }
        next();
      });
    }
  })();
};

 

whilst

最后我们来看一个循环异步 whilst 的实现,也是非常的简单。我们先看看使用例子:

'use strict';

const async = require('async');

let count = 0;
async.whilst(
  function () { return count < 5; },
  function (callback) {
    console.log('count', count++);
    setTimeout(callback, 1000);
  },
  function (err) {
    console.log('over');
  }
);

然后因为比较简单,直接来看代码吧:

exports.whilst = function (test, iterator, callback = noop) {
  if (typeof test != 'function') {
    return callback(new Error('iterator should be a function!'));
  }
  if (typeof iterator != 'function') {
    return callback(new Error('iterator should be a function!'));
  }

  (function next() {
    if (test()) {
      iterator((err) => {
        if (err) {
          return callback(err);
        }
        next();
      });
    }
  })();
};

 

小结

综上,本文为 callback 的异步流程封装控制的思路做了一点微小的整理工作。实现上并没有完全遵循原版,而是选择使用 es6 的新特性劲量让代码看起来简(zhuang)洁(bi),整体上是为了展现一个思路可能有不少细节没有处理,完整的部分参见 async 官方文档

async 的优点可以简单的说,由于 async 基于原生的 callback 所以相比 promise/co 等方式性较好(目前最快的方式是专门优化了速度的 neo-async )。并且 async 提供了非常多、非常全面的 60+ 种异步操作方式,功能可谓十分强大。

最后简单提一下 async 的一些缺点:

  1. 基于 error first 的 约定 。约定的意思就是不是强制的,也就存在不了解这个约定或者使用错误方面的问题。
  2. 流程没有状态。
  3. 由于功能太过强大(如 async.auto)存在可能滥用的问题。
  4. 错误栈曲折排查困难。

 

来自:https://zhuanlan.zhihu.com/p/27303127

 

扩展阅读

node.js中文资料导航
以开发者的视角整理编排的前端开发所使用语言的主流学习资源
目前最好的 JavaScript 异步方案 async/await
目前最好的 JavaScript 异步方案 async/await
异步 JS 工具 Async.js

为您推荐

.net , java webSocket 连接 Socket.io (1.4.4版本) 问题
Express入门教程:一个简单的博客
ECMAScript 6新特性介绍
一个基于 Koa2 构建的类似于 Rails 的 nodejs 开源项目
ECMAScript 2015 简易教程

更多

JavaScript
JavaScript开发
相关文档  — 更多
相关经验  — 更多
相关讨论  — 更多