學(xué)習(xí) Node.js 一定要理解的內(nèi)容之一,文中主要涉及到了 EventEmitter 的使用和一些異步情況的處理,比較偏基礎(chǔ),值得一讀。
大多數(shù) Node.js 對象都依賴了 EventEmitter 模塊來監(jiān)聽和響應(yīng)事件,比如我們常用的 HTTP requests, responses, 以及 streams。
const EventEmitter = require('events');
事件驅(qū)動機(jī)制的最簡單形式,是在 Node.js 中十分流行的回調(diào)函數(shù),例如 fs.readFile。 在回調(diào)函數(shù)這種形式中,事件每被觸發(fā)一次,回調(diào)就會被觸發(fā)一次。
我們先來探索下這個最基本的方式。
你準(zhǔn)備好了就叫我哈,Node!
很久很久以前,在 js 里還沒有原生支持 Promise,async/await 還只是一個遙遠(yuǎn)的夢想,回調(diào)函數(shù)是處理異步問題的最原始的方式。
回調(diào)從本質(zhì)上講是傳遞給其他函數(shù)的函數(shù),在 JavaScript 中函數(shù)是第一類對象,這也讓回調(diào)的存在成為可能。
一定要搞清楚的是,回調(diào)在代碼中的并不表示異步調(diào)用。 回調(diào)既可以是同步調(diào)用的,也可以是異步調(diào)用的。
舉個例子,這里有一個宿主函數(shù) fileSize,它接受一個回調(diào)函數(shù) cb,并且可以通過條件判斷來同步或者異步地調(diào)用該回調(diào)函數(shù):
function fileSize (fileName, cb) { if (typeof fileName !== 'string') { // Sync return cb(new TypeError('argument should be string')); } fs.stat(fileName, (err, stats) => { if (err) { // Async return cb(err); } // Async cb(null, stats.size); }); }
這其實也是個反例,這樣寫經(jīng)常會引起一些意外的錯誤,在設(shè)計宿主函數(shù)的時候,應(yīng)當(dāng)盡可能的使用同一種風(fēng)格,要么始終都是同步的使用回調(diào),要么始終都是異步的。
我們來研究下一個典型的異步 Node 函數(shù)的簡單示例,它用回調(diào)樣式編寫:
const readFileAsArray = function(file, cb) { fs.readFile(file, function(err, data) { if (err) { return cb(err); } const lines = data.toString().trim().split('\n'); cb(null, lines); }); };
readFileAsArray 函數(shù)接受兩個參數(shù):一個文件路徑和一個回調(diào)函數(shù)。它讀取文件內(nèi)容,將其拆分成行數(shù)組,并將該數(shù)組作為回調(diào)函數(shù)的參數(shù)傳入,調(diào)用回調(diào)函數(shù)。
現(xiàn)在設(shè)計一個用例,假設(shè)我們在同一目錄中的文件 numbers.txt 包含如下內(nèi)容:
10 11 12 13 14 15
如果我們有一個需求,要求統(tǒng)計該文件中的奇數(shù)數(shù)量,我們可以使用 readFileAsArray 來簡化代碼:
readFileAsArray('./numbers.txt', (err, lines) => { if (err) throw err; const numbers = lines.map(Number); const oddNumbers = numbers.filter(n => n%2 === 1); console.log('Odd numbers count:', oddNumbers.length); });
這段代碼將文件內(nèi)容讀入字符串?dāng)?shù)組中,回調(diào)函數(shù)將其解析為數(shù)字,并計算奇數(shù)的個數(shù)。
這才是最純粹的 Node 回調(diào)風(fēng)格。回調(diào)的第一個參數(shù)要遵循錯誤優(yōu)先的原則,err 可以為空,我們要將回調(diào)作為宿主函數(shù)的最后一個參數(shù)傳遞。你應(yīng)該一直用這種方式這樣設(shè)計你的函數(shù),因為用戶可能會假設(shè)。讓宿主函數(shù)把回調(diào)當(dāng)做其最后一個參數(shù),并讓回調(diào)函數(shù)以一個可能為空的錯誤對象作為其第一個參數(shù)。
回調(diào)在現(xiàn)代 JavaScript 中的替代品
在現(xiàn)代 JavaScript 中,我們有 Promise,Promise 可以用來替代異步 API 的回調(diào)?;卣{(diào)函數(shù)需要作為宿主函數(shù)的一個參數(shù)進(jìn)行傳遞(多個宿主回調(diào)進(jìn)行嵌套就形成了回調(diào)地獄),而且錯誤和成功都只能在其中進(jìn)行處理。而 Promise 對象可以讓我們分開處理成功和錯誤,還允許我們鏈?zhǔn)秸{(diào)用多個異步事件。
如果 readFileAsArray 函數(shù)支持 Promise,我們可以這樣使用它,如下所示:
readFileAsArray('./numbers.txt') .then(lines => { const numbers = lines.map(Number); const oddNumbers = numbers.filter(n => n%2 === 1); console.log('Odd numbers count:', oddNumbers.length); }) .catch(console.error);
我們在宿主函數(shù)的返回值上調(diào)用了一個函數(shù)來處理我們的需求,這個 .then 函數(shù)會把剛剛在回調(diào)版本中的那個行數(shù)組傳遞給這里的匿名函數(shù)。為了處理錯誤,我們在結(jié)果上添加一個 .catch 調(diào)用,當(dāng)發(fā)生錯誤時,它會捕捉到錯誤并讓我們訪問到這個錯誤。
在現(xiàn)代 JavaScript 中已經(jīng)支持了 Promise 對象,因此我們可以很容易的將其使用在宿主函數(shù)之中。下面是支持 Promise 版本的 readFileAsArray 函數(shù)(同時支持舊有的回調(diào)函數(shù)方式):
const readFileAsArray = function(file, cb = () => {}) { return new Promise((resolve, reject) => { fs.readFile(file, function(err, data) { if (err) { reject(err); return cb(err); } const lines = data.toString().trim().split('\n'); resolve(lines); cb(null, lines); }); }); };
我們使該函數(shù)返回一個 Promise 對象,該對象包裹了 fs.readFile 的異步調(diào)用。Promise 對象暴露了兩個參數(shù),一個 resolve 函數(shù)和一個 reject 函數(shù)。
當(dāng)有異常拋出時,我們可以通過向回調(diào)函數(shù)傳遞 error 來處理錯誤,也同樣可以使用 Promise 的 reject 函數(shù)。每當(dāng)我們將數(shù)據(jù)交給回調(diào)函數(shù)處理時,我們同樣也可以用 Promise 的 resolve 函數(shù)。
在這種同時可以使用回調(diào)和 Promise 的情況下,我們需要做的唯一一件事情就是為這個回調(diào)參數(shù)設(shè)置默認(rèn)值,防止在沒有傳遞回調(diào)函數(shù)參數(shù)時,其被執(zhí)行然后報錯的情況。 在這個例子中使用了一個簡單的默認(rèn)空函數(shù):()=> {}。
通過 async/await 使用 Promise
當(dāng)需要連續(xù)調(diào)用異步函數(shù)時,使用 Promise 會讓你的代碼更容易編寫。不斷的使用回調(diào)會讓事情變得越來越復(fù)雜,最終陷入回調(diào)地獄。
Promise 的出現(xiàn)改善了一點,Generator 的出現(xiàn)又改善了一點。 處理異步問題的最新解決方式是使用 async 函數(shù),它允許我們將異步代碼視為同步代碼,使其整體上更加可讀。
以下是使用 async/await 版本的調(diào)用 readFileAsArray 的例子:
async function countOdd () { try { const lines = await readFileAsArray('./numbers'); const numbers = lines.map(Number); const oddCount = numbers.filter(n => n%2 === 1).length; console.log('Odd numbers count:', oddCount); } catch(err) { console.error(err); } } countOdd();
首先,我們創(chuàng)建了一個 async 函數(shù) —— 就是一個普通的函數(shù)聲明之前,加了個 async 關(guān)鍵字。在 async 函數(shù)內(nèi)部,我們調(diào)用了 readFileAsArray 函數(shù),就像把它的返回值賦值給變量 lines 一樣,為了真的拿到 readFileAsArray 處理生成的行數(shù)組,我們使用關(guān)鍵字 await。之后,我們繼續(xù)執(zhí)行代碼,就好像 readFileAsArray 的調(diào)用是同步的一樣。
要讓代碼運(yùn)行,我們可以直接調(diào)用 async 函數(shù)。這讓我們的代碼變得更加簡單和易讀。為了處理異常,我們需要將異步調(diào)用包裝在一個 try/catch 語句中。
有了 async/await 這個特性,我們不必使用任何特殊的API(如 .then 和 .catch )。我們只是把這種函數(shù)標(biāo)記出來,然后使用純粹的 JavaScript 寫代碼。
我們可以把 async/await 這個特性用在支持使用 Promise 處理后續(xù)邏輯的函數(shù)上。但是,它無法用在只支持回調(diào)的異步函數(shù)上(例如setTimeout)。
EventEmitter 模塊
EventEmitter 是一個處理 Node 中各個對象之間通信的模塊。 EventEmitter 是 Node 異步事件驅(qū)動架構(gòu)的核心。 Node 的許多內(nèi)置模塊都繼承自 EventEmitter。
它的概念其實很簡單:emitter 對象會發(fā)出被定義過的事件,導(dǎo)致之前注冊的所有監(jiān)聽該事件的函數(shù)被調(diào)用。所以,emitter 對象基本上有兩個主要特征:
為了使用 EventEmitter,我們需要創(chuàng)建一個繼承自 EventEmitter 的類。
class MyEmitter extends EventEmitter { }
我們從 EventEmitter 的子類實例化的對象,就是 emitter 對象:
const myEmitter = new MyEmitter();
在這些 emitter 對象的生命周期里,我們可以調(diào)用 emit 函數(shù)來觸發(fā)我們想要的觸發(fā)的任何被命名過的事件。
myEmitter.emit('something-happened');
emit 函數(shù)的使用表示發(fā)生某種情況發(fā)生了,讓大家去做該做的事情。 這種情況通常是某些狀態(tài)變化引起的。
我們可以使用 on 方法添加監(jiān)聽器函數(shù),并且每次 emitter 對象觸發(fā)其關(guān)聯(lián)的事件時,將執(zhí)行這些監(jiān)聽器函數(shù)。
事件 !== 異步
先看看這個例子:
const EventEmitter = require('events'); class WithLog extends EventEmitter { execute(taskFunc) { console.log('Before executing'); this.emit('begin'); taskFunc(); this.emit('end'); console.log('After executing'); } } const withLog = new WithLog(); withLog.on('begin', () => console.log('About to execute')); withLog.on('end', () => console.log('Done with execute')); withLog.execute(() => console.log('*** Executing task ***'));
WithLog 是一個事件觸發(fā)器,它有一個方法 —— execute,該方法接受一個參數(shù),即具體要處理的任務(wù)函數(shù),并在其前后包裹 log 以輸出其執(zhí)行日志。
為了看到這里會以什么順序執(zhí)行,我們在兩個命名的事件上都注冊了監(jiān)聽器,最后執(zhí)行一個簡單的任務(wù)來觸發(fā)事件。
下面是上面程序的輸出結(jié)果:
Before executing About to execute *** Executing task *** Done with execute After executing
這里我想證實的是以上的輸出都是同步發(fā)生的,這段代碼里沒有什么異步的成分。
就像普通的回調(diào)一樣,不要以為事件意味著同步或異步代碼。
跟之前的回調(diào)一樣,不要一提到事件就認(rèn)為它是異步的或者同步的,還要具體分析。
如果我們傳遞 taskFunc 是一個異步函數(shù),會發(fā)生什么呢?
// ... withLog.execute(() => { setImmediate(() => { console.log('*** Executing task ***') }); });
輸出結(jié)果變成了這樣:
Before executing About to execute Done with execute After executing *** Executing task ***
這樣就有問題了,異步函數(shù)的調(diào)用導(dǎo)致 "Done with execute" 和 "After executing" 的輸出并不準(zhǔn)確。
要在異步函數(shù)完成后發(fā)出事件,我們需要將回調(diào)(或 Promise)與基于事件的通信相結(jié)合。 下面的例子說明了這一點。
使用事件而不是常規(guī)回調(diào)的一個好處是,我們可以通過定義多個監(jiān)聽器對相同的信號做出多個不同的反應(yīng)。如果使用回調(diào)來完成這件事,我們要在單個回調(diào)中寫更多的處理邏輯。事件是應(yīng)用程序允許多個外部插件在應(yīng)用程序核心之上構(gòu)建功能的好辦法。你可以把它們當(dāng)成鉤子來掛一些由于狀態(tài)變化而引發(fā)執(zhí)行的程序。
異步事件
我們把剛剛那些同步代碼的示例改成異步的:
const fs = require('fs'); const EventEmitter = require('events'); class WithTime extends EventEmitter { execute(asyncFunc, ...args) { this.emit('begin'); console.time('execute'); asyncFunc(...args, (err, data) => { if (err) { return this.emit('error', err); } this.emit('data', data); console.timeEnd('execute'); this.emit('end'); }); } } const withTime = new WithTime(); withTime.on('begin', () => console.log('About to execute')); withTime.on('end', () => console.log('Done with execute')); withTime.execute(fs.readFile, __filename);
用 WithTime 類執(zhí)行 asyncFunc 函數(shù),并通過調(diào)用 console.time 和 console.timeEnd 報告該asyncFunc 所花費的時間。它在執(zhí)行之前和之后都將以正確的順序觸發(fā)相應(yīng)的事件,并且還會發(fā)出 error/data 事件作為處理異步調(diào)用的信號。
我們傳遞一個異步的 fs.readFile 函數(shù)來測試一下 withTime emitter。 我們現(xiàn)在可以直接通過監(jiān)聽 data 事件來處理讀取到的文件數(shù)據(jù),而不用把這套處理邏輯寫到 fs.readFile 的回調(diào)函數(shù)中。
執(zhí)行這段代碼,我們以預(yù)期的順序執(zhí)行了一系列事件,并且得到異步函數(shù)的執(zhí)行時間,這些是十分重要的。
About to execute execute: 4.507ms Done with execute
請注意,我們是將回調(diào)與事件觸發(fā)器 emitter 相結(jié)合實現(xiàn)的這部分功能。 如果 asynFunc 支持Promise,我們可以使用 async/await 函數(shù)來做同樣的事情:
class WithTime extends EventEmitter { async execute(asyncFunc, ...args) { this.emit('begin'); try { console.time('execute'); const data = await asyncFunc(...args); this.emit('data', data); console.timeEnd('execute'); this.emit('end'); } catch(err) { this.emit('error', err); } } }
我認(rèn)為這段代碼比之前的回調(diào)風(fēng)格的代碼以及使用 .then/.catch 風(fēng)格的代碼更具可讀性。async/await 讓我們更加接近 JavaScript 語言本身(不必再使用 .then/.catch 這些 api)。
事件參數(shù)和錯誤
在之前的例子中,有兩個事件被發(fā)出時還攜帶了別的參數(shù)。
error 事件被觸發(fā)時會攜帶一個 error 對象。
this.emit('error', err);
data 事件被觸發(fā)時會攜帶一個 data 對象。
this.emit('data', data);
我們可以在 emit 函數(shù)中不斷的添加參數(shù),當(dāng)然第一個參數(shù)一定是事件的名稱,除去第一個參數(shù)之外的所有參數(shù)都可以在該事件注冊的監(jiān)聽器中使用。
例如,要處理 data 事件,我們注冊的監(jiān)聽器函數(shù)將訪問傳遞給 emit 函數(shù)的 data 參數(shù),而這個 data 也正是由 asyncFunc 返回的數(shù)據(jù)。
withTime.on('data', (data) => { // do something with data });
error 事件比較特殊。在我們基于回調(diào)的那個示例中,如果不使用監(jiān)聽器處理 error 事件,node 進(jìn)程將會退出。
舉個由于錯誤使用參數(shù)而造成程序崩潰的例子:
class WithTime extends EventEmitter { execute(asyncFunc, ...args) { console.time('execute'); asyncFunc(...args, (err, data) => { if (err) { return this.emit('error', err); // Not Handled } console.timeEnd('execute'); }); } } const withTime = new WithTime(); withTime.execute(fs.readFile, ''); // BAD CALL withTime.execute(fs.readFile, __filename);
第一次調(diào)用 execute 將會觸發(fā) error 事件,由于沒有處理 error ,Node 程序隨之崩潰:
events.js:163 throw er; // Unhandled 'error' event ^ Error: ENOENT: no such file or directory, open ''
第二次執(zhí)行調(diào)用將受到此崩潰的影響,并且可能根本不會被執(zhí)行。
如果我們?yōu)檫@個 error 事件注冊一個監(jiān)聽器函數(shù)來處理 error,結(jié)果將大不相同:
withTime.on('error', (err) => { // do something with err, for example log it somewhere console.log(err) });
如果我們執(zhí)行上述操作,將會報告第一次執(zhí)行 execute 時發(fā)送的錯誤,但是這次 node 進(jìn)程不會崩潰退出,其他程序的調(diào)用也都能正常完成:
{ Error: ENOENT: no such file or directory, open '' errno: -2, code: 'ENOENT', syscall: 'open', path: '' }
execute: 4.276ms
需要注意的是,基于 Promise 的函數(shù)有些不同,它們暫時只是輸出一個警告:
UnhandledPromiseRejectionWarning: Unhandled promise rejection (rejection id: 1): Error: ENOENT: no such file or directory, open ''
DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
另一種處理異常的方式是在監(jiān)聽全局的 uncaughtException 進(jìn)程事件。 然而,使用該事件全局捕獲錯誤并不是一個好辦法。
關(guān)于 uncaughtException,一般都會建議你避免使用它,但是如果必須用它,你應(yīng)該讓進(jìn)程退出:
process.on('uncaughtException', (err) => { // something went unhandled. // Do any cleanup and exit anyway! console.error(err); // don't do just that. // FORCE exit the process too. process.exit(1); });
但是,假設(shè)在同一時間發(fā)生多個錯誤事件,這意味著上面的 uncaughtException 監(jiān)聽器將被多次觸發(fā),這可能會引起一些問題。
EventEmitter 模塊暴露了 once 方法,這個方法發(fā)出的信號只會調(diào)用一次監(jiān)聽器。所以,這個方法常與 uncaughtException 一起使用。
監(jiān)聽器的順序
如果針對一個事件注冊多個監(jiān)聽器函數(shù),當(dāng)事件被觸發(fā)時,這些監(jiān)聽器函數(shù)將按其注冊的順序被觸發(fā)。
// first withTime.on('data', (data) => { console.log(`Length: ${data.length}`); }); // second withTime.on('data', (data) => { console.log(`Characters: ${data.toString().length}`); }); withTime.execute(fs.readFile, __filename);
上述代碼會先輸出 Length 信息,再輸出 Characters 信息,執(zhí)行的順序與注冊的順序保持一致。
如果你想定義一個新的監(jiān)聽函數(shù),但是希望它能夠第一個被執(zhí)行,你還可以使用 prependListener 方法:
withTime.on('data', (data) => { console.log(`Length: ${data.length}`); }); withTime.prependListener('data', (data) => { console.log(`Characters: ${data.toString().length}`); }); withTime.execute(fs.readFile, __filename);
上述代碼中,Charaters 信息將首先被輸出。
最后,你可以用 removeListener 函數(shù)來刪除某個監(jiān)聽器函數(shù)。
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。TEL:177 7030 7066 E-MAIL:11247931@qq.com