异步编程的主要解决方案有如下三种:
Node 自身提供的 events
模块是发布/订阅模式的简单实现,Node 中部分模块都继承于它,这个模块比前端浏览器大量 DOM 事件绑定的机制简单,不存在事件冒泡等概念。该模块具有 addListener/on()
、once()
、removeListener()
、removeAllListeners()
和 emit()
等基本的事件监听模式的方法实现。
🌰 基本示例:
// 订阅emitter.on('event', function(message) {console.log(message);});// 发布emitter.emit('event', 'I am message!');
订阅事件就是一个高阶函数的应用。事件发布/订阅模式可以实现一个事件与多个回调函数的关联,这些回调函数又称为事件侦听器。通过 emit()
发布事件后,消息会立即传递给当前事件的所有侦听器执行。侦听器可以很灵活地添加和删除,使得事件和具体处理逻辑之间可以很轻松地关联和解耦。
事件发布/订阅模式常常用于解耦业务逻辑,事件发布者无须关注订阅的侦听器如何实现业务逻辑,甚至不用关注有多少个侦听器存在,数据通过消息的方式可以很灵活地传递。有一些典型场景中,可以通过事件发布/订阅模式进行组件封装,将不变的部分封装在组件内部,将容易变化、需自定义的部分通过事件暴露给外部处理,这是一种典型的逻辑分离方式。在这种事件发布/订阅组件中,事件的设计非常重要,因为它关乎外部调用组件时是否优雅,从某种角度来说事件的设计就是组件的接口设计。
从另一个角度来看,事件侦听器模式也是一种钩子(hook)机制,利用钩子导出内部数据或状态给外部的调用者。Node 中的很多对象大多具有黑盒的特点,功能点较少,如果不通过事件钩子的形式,我们就无法获取对象在运行期间的中间值或内部状态。这种通过事件钩子的方式,可以使编程者不用关注组件是如何启动和执行的,只需关注在需要的事件点上即可。
Node 对事件发布/订阅机制做了额外的处理:
emitter.setMaxListeners(0)
可将限制去除。error
事件,EventEmitter 会检查是否有对 error
事件添加过侦听器。如果添加了,这个错误将会交由该侦听器处理,否则这个错误将会作为异常抛出。如果外部没有捕获这个异常,将会引起线程推出。🌰 基本示例:Stream 对象继承 EventEmitter
const events = require('events');function Stream() {events.EventEmitter.call(this);}util.inherits(Stream, events.EventEmitter);
在事件订阅/发布模式中,通常也有一个 once()
方法,通过它添加的侦听器只能执行一次,在执行之后就会将它与事件的关联移除。这个特性常常可以帮助我们过滤一些重复性的事件响应。
在计算机中,缓存由于存放在内存中,访问速度十分快,常常用于加速数据访问,让绝大多数的请求不必重复去做一些低效的数据读取。所谓雪崩问题,就是在高访问量、大并发量的情况下缓存失效的情景,此时大量的请求同时涌入数据库中,数据无法同时承受如此大的查询请求,进而往前影响到网站整体的响应速度。
🌰 基本示例:数据库查询语句
var select = function(callback) {db.select('SQL', function(results) {callback(results);});};
如果站点刚好启动,这时缓存中是不存在数据的,而如果访问量巨大,同一句 SQL 会被发送到数据库中反复查询,会影响服务的整体性能。
改进方案可以通过添加一个状态锁解决:
var status = 'ready';var select = function(callback) {if (status === 'ready') {status = 'pending';db.select('SQL', function(result) {status = 'ready';callback(results);});}};
这种情景下,连续多次调用 select()
时,只有首次调用是生效的,后续的 select()
是没有数据服务的,这个时候可以引入事件队列。
var proxy = new events.EventEmitter();var status = 'ready';var select = function(callback) {proxy.once('selected', callback);if (status === 'ready') {status = 'pending';db.select('SQL', function(results) {proxy.emit('selected', results);status = 'ready';});}};
这里我们利用了 once()
方法,将所有请求的回调都压入事件队列中,利用其执行一次就会将监视器移除的特点,保证每个回调都只会被执行一次。对于相同的 SQL 语句,保证在同一个查询开始到结束的过程中永远只有一次。SQL 在进行查询时,新到来的相同调用只需在队列中等待数据就绪即可,一旦查询结束,得到的结果可以被这些调用共同使用。这种方式能节省重复的数据库调用产生的开销。由于 Node 单线程执行的原因,此处无须担心状态同步问题。这种方式其实也可以应用到其它远程调用的场景中,即使外部没有缓存策略,也能有效节省重复开销。
利用高阶函数的优势,侦听器作为回调函数可以随意添加和删除,它帮助开发者轻松处理随时可能添加的业务逻辑。也可以隔离业务逻辑,保持业务逻辑单元的职责单一。一般而言,事件与侦听器的关系是一对多,但在异步编程中,也会出现事件与侦听器的关系是多对一的情况,也就是说一个业务逻辑可能依赖两个通过回调或事件传递的结果。
每个中间件传递请求对象、响应对象和尾触发函数,通过队列形成一个处理流。
中间件机制使得在处理网络请求时,可以像面向切面编程一样进行过滤、验证、日志等功能,而不与具体业务逻辑产生关联,以致产生耦合。
同步 I/O 因为每个 I/O 都是彼此阻塞的,在循环体中,总是一个接着一个调用,不会出现耗用文件描述符太多的情况,同时性能也是地下的。
对于异步 I/O,虽然并发容易实现,但是由于太容易实现,依然需要控制。换言之,尽管是要压榨底层系统的性能,但还是需要给予一定的过载保护,以防止过犹不及。
bagpipe 的 API 主要暴露了一个 push()
方法和 full
事件。
🌰 标准示例:
var Bagpipe = require('bagpipe');// 设定最大并发数为10var bagpipe = new Bagpipe(10);for (var i = 0; i < 100; i++) {bagpipe.push(async, function() {// 异步回调执行});}bagpipe.on('full', function(length) {console.warn('底层系统处理不能及时完成,队列拥堵,目前队列长度为:' + length);});
实现细节类似于前文的 smooth()
。push()
方法依然是通过函数变换的方式实现,假设第一个参数是方法,最后一个参数是回调函数,其余为其它参数,其核心实现如下:
/*** 推入方法,参数。最后一个参数为回调函数* @param {Function} method 异步方法* @param {Mix} args 参数列表,最后一个参数为回调函数*/Bagpine.prototype.push = function(method) {var args = [].slice.call(arguments, 1);var callback = args[args.length - 1];if (typeof callback !== 'function') {args.push(function() {});}if (this.options.disabled || this.limit < 1) {method.apply(null, args);return this;}// 队列长度不超过限制值时if (this.queue.length < this.queueLength || !this.options.refuse) {this.queue.push({method: method,args: args,});} else {var err = new Error('Too much async call in queue');err.name = 'TooMuchAstncCallError';callback(err);}if (this.queue.length > 1) {this.emit('full', this.queue.length);}this.next();return this;};
将调用推入队列后,调用一次 next()
方法尝试触发。next()
方法的定义如下:
/*** 继续执行队列中的后续动作*/Bagpepe.prototype.next = function () {var that = this;if (that.active < that limit && that.queue.length) {var req = that.queue.shift();that.run(req.method, req.args);}};
next()
方法主要判断活跃调用的数量,如果正常,将调用内部方法 run()
来执行真正的调用。这里为了判断回调函数是否执行,采用了一个注入代码的技巧。
/*!* 执行队列中的方法*/Bagpipe.prototype.run = function(method, args) {var that = this;that.active++;var callback = args[args.length - 1];var timer = null;var called = false;// inject logicargs[args.length - 1] = function(err) {// anyway, clear the timerif (timer) {clearTimeout(timer);timer = null;}// if timeout, don't executeif (!called) {that._next();callback.apply(null, arguments);} else {// pass the outdated errorif (err) {that.emit('outdated', err);}}};var timeout = that.options.timeout;if (timeout) {timer = setTimer(function() {// set called as truecalled = true;that._next();// pass the exceptionvar err = new Error(timeout + 'ms timeout');err.name = 'BgapipeTimeoutError';err.data = {name: method.name,method: method.toString(),args: args.slice(0, -1),};callback(err);}, timeout);}method.apply(null, args);};
用户传入的回调函数被真正执行前,被封装替换过。这个封装的回调函数内部的逻辑将活跃值的计数器减 1 后,主动调用 next()
执行后续等待的异步调用。
bagpipe 类似于打开了一道窗口,允许异步调用并行进行,但是严格限定上线。仅仅在调用 push()
时分开传递,并不对原有 API 有任何侵入。
事实上,bagpipe 还有一些深度的使用方式。对于大量的异步调用,也需要分场景进行区分,因为设计并发控制,必然会造成部分调用需要进行等待。如果调用有实时方面的需求,那么需要快速返回,因为等到方法被真正执行时,可能已经超过了等待时间,即使返回了数据,也没有意义了。这种场景下需要快速失败,让调用方尽早返回,而不用浪费不必要的等待时间。bagpipe 为此支持了拒绝模式。
拒绝模式的使用只要设置下参数即可,相关代码如下:
// 设定最大并发数为 10var bagpipe = new Bagpipe(10, {refuse: true,});
在拒绝模式下,如果等待的调用队列也满了之后,心来的调用就直接返回它一个队列太忙的拒绝异常。
造成队列用塞的主要原因是异步调用耗时太久,调用产生的速度远远高于执行的速度。为了防止某些异步调用使用了太多的时间,我们需要设置一个时间基线,将那些执行时间太久的异步调用清理出活跃队列,让排队中的异步调用尽快执行。否则在拒绝模式下,会有太多的调用因为某个执行得慢,导致得到拒绝异常。相对而言,这种场景下得到拒绝异常显得比较无辜。为了公平地对待在实时需求场景下的每个调用,必须要控制每个调用的执行时间,将那些害群之马提出队伍。
为此,bagpipe 也提供了超时控制。超时控制是为了异步调用设置一个时间阀值,如果异步调用没有在规定时间内完成,我们先执行用户传入的回调函数,让用户得到一个超时异常,以尽早返回。然后让下一个等待队列中的调用孩子醒。
超时的设置如下:
// 设置最大并发数 10var bagpipe = new Bagpiep(10, {timeout: 3000,});
异步调用的并发限制在不同场景下的需求不同:非实时场景下,让超出限制的并发暂时等待执行已经可以满足需求;但在实时场景下,需要更细粒度、更合理的控制。
async
也提供了一个方法用于处理异步调用的限制:parallelLimit()
。如下是 async
的示例代码:
async.parallelLimt([function(callback) {fs.readFile('file1.txt', 'utf-8', callback);},function(callback) {fs.readFile('file2.txt', 'utf-8', callback);},],1,function(err, results) {// TODO});
parallelLimit()
与 paralletl()
类似,但多了一个用于限制并发数量的参数,使得任务只能同时并发一定数量,而不是无限制并发。
parallelLimt()
方法的缺陷在于无法动态地增加并行任务。为此,async
提供了 queue()
方法来满足该需求,这对于遍历文件目录等操作十分有效。
var q = async.queue(function(file, callback) {fs.readFile(file, 'utf-8', callback);}, 2);q.drain = function() {// 完成了队列中的所有任务};fs.readdirSync('.').forEach(function(file) {q.push(file, function(err, data) {// TODO});});
尽管 queue()
实现了动态添加并行任务,但是相比 parallelLimit()
,由于 queue()
接收的参数是固定的,它丢失了 parallelLimit()
的多样性,我私心地认为 bagpipe 更灵活,可以添加任意类型的异步任务,也可以动态添加异步任务,同时还能够在实时处理场景中加入拒绝模式的超时控制。