Node.js + Redis Sorted Set 任务队列

li8548668 8年前
   <p>需求:功能 A 需要调用第三方 API 获取数据,而第三方 API 自身是异步处理方式,在调用后会返回数据与状态 { data: "查询结果", "status": "正在异步处理中" } ,这样就需要间隔一段时间后再去调用第三方 API 获取数据。为了用户在使用功能 A 时不会因为第三方 API 正在异步处理中而必须等待,将用户请求加入任务队列中,返回部分数据并关闭请求。然后定时从任务队列里中取出任务调用第三方 API,若返回状态为”异步处理中“,将该任务再次加入任务队列,若返回状态为”已处理完毕“,将返回数据入库。</p>    <p>根据以上问题,想到使用 Node.js + Redis sorted set 来实现任务队列。Node.js 实现自身应用 API 用来接受用户请求,合并数据库已存数据与 API 返回的部分数据返回给用户,并将任务加入到任务队列中。利用 Node.js child process 与 cron 定时从任务队列中取出任务执行。</p>    <p>在设计任务队列的过程中需要考虑到的几个问题</p>    <ol>     <li> <p>并行执行多个任务</p> </li>     <li> <p>任务唯一性</p> </li>     <li> <p>任务成功或失败后的处理</p> </li>    </ol>    <p>针对以上问题的解决方案</p>    <ol>     <li> <p>并行执行多个任务利用 Promise.all 来实现</p> </li>     <li> <p>任务唯一性利用 Redis sorted set 来实现。使用时间戳作为分值可以实现将 sorted set 作为 list 来使用,在加入任务时判断任务是否已经存在,在取出任务执行时将该任务分值设置为 0,每次取出分值大于 0 的任务来执行,可以避免重复执行任务。</p> </li>     <li> <p>执行任务成功后删除任务,执行任务失败后将任务分值更新为当前时间时间戳,这样就可以将失败的任务重新加入任务队列尾部</p> </li>    </ol>    <p>示例代码</p>    <pre>  <code class="language-javascript">// remote_api.js 模拟第三方 API  'use strict';    const app = require('express')();    app.get('/', (req, res) => {      setTimeout(() => {          let arr = [200, 300];  // 200 代表成功,300 代表失败需要重新请求          res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });      }, 3000);  });    app.listen('9001', () => {      console.log('API 服务监听端口:9001');  });</code></pre>    <pre>  <code class="language-javascript">// producer.js 自身应用 API,用来接受用户请求并将任务加入任务队列  'use strict';    const app = require('express')();  const redisClient = require('redis').createClient();    const QUEUE_NAME = 'queue:example';    function addTaskToQueue(taskName, callback) {      // 先判断任务是否已经存在,存在:跳过,不存在:加入任务队列      redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {          if (error) {              console.log(error);          } else {              if (task) {                  console.log('任务已存在,不新增相同任务');                  callback(null, task);              } else {                  redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {                      if (error) {                          callback(error);                      } else {                          callback(null, result);                      }                  });              }          }      });  }    app.get('/', (req, res) => {      let taskName = req.query['task-name'];      addTaskToQueue(taskName, (error, result) => {          if (error) {              console.log(error);          } else {              res.status(200).send('正在查询中......');          }      });  });    app.listen(9002, () => {      console.log('生产者服务监听端口:9002');  });</code></pre>    <pre>  <code class="language-javascript">// consumer.js 定时获取任务并执行  'use strict';    const redisClient = require('redis').createClient();  const request = require('request');  const schedule = require('node-schedule');    const QUEUE_NAME = 'queue:expmple';  const PARALLEL_TASK_NUMBER = 2;  // 并行执行任务数量    function getTasksFromQueue(callback) {      // 获取多个任务      redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {          if (error) {              callback(error);          } else {              // 将任务分值设置为 0,表示正在处理              if (tasks.length > 0) {                  let tmp = [];                  tasks.forEach((task) => {                      tmp.push(0);                      tmp.push(task);                  });                  redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {                      if (error) {                          callback(error);                      } else {                          callback(null, tasks)                      }                  });              }          }      });  }    function addFailedTaskToQueue(taskName, callback) {      redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {          if (error) {              callback(error);          } else {              callback(null, result);          }      });  }    function removeSucceedTaskFromQueue(taskName, callback) {      redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {          if (error) {              callback(error);          } else {              callback(null, result);          }      })  }    function execTask(taskName) {      return new Promise((resolve, reject) => {          let requestOptions = {              'url': 'http://127.0.0.1:9001',              'method': 'GET',              'timeout': 5000          };          request(requestOptions, (error, response, body) => {              if (error) {                  resolve('failed');                  console.log(error);                  addFailedTaskToQueue(taskName, (error) => {                      if (error) {                          console.log(error);                      } else {                        }                  });              } else {                  try {                      body = typeof body !== 'object' ? JSON.parse(body) : body;                  } catch (error) {                      resolve('failed');                      console.log(error);                      addFailedTaskToQueue(taskName, (error, result) => {                          if (error) {                              console.log(error);                          } else {                            }                      });                      return;                  }                  if (body.status !== 200) {                      resolve('failed');                      addFailedTaskToQueue(taskName, (error, result) => {                          if (error) {                              console.log(error);                          } else {                            }                      });                  } else {                      resolve('succeed');                      removeSucceedTaskFromQueue(taskName, (error, result) => {                          if (error) {                              console.log(error);                          } else {                            }                      });                  }              }          });      });  }    // 定时,每隔 5 秒获取新的任务来执行  let job = schedule.scheduleJob('*/5 * * * * *', () => {      console.log('获取新任务');      getTasksFromQueue((error, tasks) => {          if (error) {              console.log(error);          } else {              if (tasks.length > 0) {                  console.log(tasks);                    Promise.all(tasks.map(execTask))                  .then((results) => {                      console.log(results);                  })                  .catch((error) => {                      console.log(error);                  });                                }          }      });  });</code></pre>    <p> </p>    <p>来自:https://segmentfault.com/a/1190000006933269</p>    <p> </p>