Node.js stream接口

2021-09-15 16:32 更新

Stream是什么?

Unix操作系統(tǒng)從很早以前,就有Stream(流)這個(gè)概念,它是不同進(jìn)程之間傳遞數(shù)據(jù)的一種方式。管道命令Pipe就起到在不同命令之間,連接Stream的作用。

Stream把較大的數(shù)據(jù),拆成很小的部分。只要命令部署了Stream接口,就可以把一個(gè)流的輸出接到另一個(gè)流的輸入。Node引入了這個(gè)概念,通過Stream為異步讀寫數(shù)據(jù)提供的統(tǒng)一接口。無論是硬盤數(shù)據(jù)、網(wǎng)絡(luò)數(shù)據(jù),還是內(nèi)存數(shù)據(jù),都可以采用這個(gè)接口讀寫。

讀寫數(shù)據(jù)有兩種方式。一種方式是同步處理,即先將數(shù)據(jù)全部讀入內(nèi)存,然后處理。它的優(yōu)點(diǎn)是符合直覺,流程非常自然,缺點(diǎn)是如果遇到大文件,要花很長時(shí)間,可能要過很久才能進(jìn)入數(shù)據(jù)處理的步驟。另一種方式就是Stream方式,它是系統(tǒng)讀取外部數(shù)據(jù)實(shí)際上的方式,即每次只讀入數(shù)據(jù)的一小塊,像“流水”一樣。所以,Stream方式就是每當(dāng)系統(tǒng)讀入了一小塊數(shù)據(jù),就會(huì)觸發(fā)一個(gè)事件,發(fā)出“新數(shù)據(jù)塊”的信號,只要監(jiān)聽這個(gè)事件,就能掌握進(jìn)展,做出相應(yīng)處理,這樣就提高了程序的性能。

Stream接口最大特點(diǎn)就是通過事件通信,具有readable、writable、drain、data、end、close等事件,既可以讀取數(shù)據(jù),也可以寫入數(shù)據(jù)。讀寫數(shù)據(jù)時(shí),每讀入(或?qū)懭耄┮欢螖?shù)據(jù),就會(huì)觸發(fā)一次data事件,全部讀?。ɑ?qū)懭耄┩戤?,觸發(fā)end事件。如果發(fā)生錯(cuò)誤,則觸發(fā)error事件。

一個(gè)對象只要部署了Stream接口,就可以從讀取數(shù)據(jù),或者寫入數(shù)據(jù)。Node內(nèi)部很多涉及IO處理的對象,都部署了Stream接口,比如HTTP連接、文件讀寫、標(biāo)準(zhǔn)輸入輸出等。

基本用法

Node的I/O操作都是異步的,所以與磁盤和網(wǎng)絡(luò)的交互,都要通過回調(diào)函數(shù)。一個(gè)典型的寫文件操作,可能像下面這樣。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
  fs.readFile(__dirname + '/data.txt', function (err, data) {
    res.end(data);
  });
});
server.listen(8000);

上面的代碼有一個(gè)問題,那就是它必須將整個(gè)data.txt文件讀入內(nèi)存,然后再輸入。如果data.txt非常大,就會(huì)占用大量的內(nèi)容。一旦有多個(gè)并發(fā)請求,操作就會(huì)變得非常緩慢,用戶不得不等很久,才能得到結(jié)果。

由于參數(shù)req和res都部署了Stream接口,可以使用fs.createReadStream()替代fs.readFile(),就能解決這個(gè)問題。

var http = require('http');
var fs = require('fs');

var server = http.createServer(function (req, res) {
  var stream = fs.createReadStream(__dirname + '/data.txt');
  stream.pipe(res);
});
server.listen(8000);

Stream接口的最大特點(diǎn),就是數(shù)據(jù)會(huì)發(fā)出node和data事件,內(nèi)置的pipe方法會(huì)處理這兩個(gè)事件。

數(shù)據(jù)流通過pipe方法,可以方便地導(dǎo)向其他具有Stream接口的對象。

var fs = require('fs');
var zlib = require('zlib');

fs.createReadStream('wow.txt')
  .pipe(zlib.createGzip())
  .pipe(process.stdout);

上面代碼先打開文本文件wow.txt,然后壓縮,再導(dǎo)向標(biāo)準(zhǔn)輸出。

fs.createReadStream('wow.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('wow.gz'));

上面代碼壓縮文件wow.txt以后,又將其寫回壓縮文件。

下面代碼新建一個(gè)Stream實(shí)例,然后指定寫入事件和終止事件的回調(diào)函數(shù),再將其接到標(biāo)準(zhǔn)輸入之上。

var stream = require('stream');
var Stream = stream.Stream;

var ws = new Stream;
ws.writable = true;

ws.write = function(data) {
  console.log("input=" + data);
}

ws.end = function(data) {
  console.log("bye");
}

process.stdin.pipe(ws);

調(diào)用上面的腳本,會(huì)產(chǎn)生以下結(jié)果。

$ node pipe_out.js
hello
input=hello
^d
bye

上面代碼調(diào)用腳本下,鍵入hello,會(huì)輸出input=hello。然后按下ctrl-d,會(huì)輸出bye。使用管道命令,可以看得更清楚。

$ echo hello | node pipe_out.js
input=hello

bye

Stream接口分成三類。

  • 可讀數(shù)據(jù)流接口,用于讀取數(shù)據(jù)。
  • 可寫數(shù)據(jù)流接口,用于寫入數(shù)據(jù)。
  • 雙向數(shù)據(jù)流接口,用于讀取和寫入數(shù)據(jù),比如Node的tcp sockets、zlib、crypto都部署了這個(gè)接口。

可讀數(shù)據(jù)流

“可讀數(shù)據(jù)流”表示數(shù)據(jù)的來源,只要一個(gè)對象提供“可讀數(shù)據(jù)流”,就表示你可以從其中讀取數(shù)據(jù)。

“可讀數(shù)據(jù)流”有兩種狀態(tài):流動(dòng)態(tài)和暫停態(tài)。處于流動(dòng)態(tài)時(shí),數(shù)據(jù)會(huì)盡快地從數(shù)據(jù)源導(dǎo)向用戶的程序;處于暫停態(tài)時(shí),必須顯式調(diào)用stream.read()等指令,“可讀數(shù)據(jù)流”才會(huì)釋放數(shù)據(jù)。剛剛新建的時(shí)候,“可讀數(shù)據(jù)流”處于暫停態(tài)。

三種方法可以讓暫停態(tài)轉(zhuǎn)為流動(dòng)態(tài)。

  • 添加data事件的監(jiān)聽函數(shù)
  • 調(diào)用resume方法
  • 調(diào)用pipe方法將數(shù)據(jù)送往一個(gè)可寫數(shù)據(jù)流

如果轉(zhuǎn)為流動(dòng)態(tài)時(shí),沒有data事件的監(jiān)聽函數(shù),也沒有pipe方法的目的地,那么數(shù)據(jù)將遺失。

以下兩種方法可以讓流動(dòng)態(tài)轉(zhuǎn)為暫停態(tài)。

  • 不存在pipe方法的目的地時(shí),調(diào)用pause方法
  • 存在pipe方法的目的地時(shí),移除所有data事件的監(jiān)聽函數(shù),并且調(diào)用unpipe方法,移除所有pipe方法的目的地

注意,只移除data事件的監(jiān)聽函數(shù),并不會(huì)自動(dòng)引發(fā)數(shù)據(jù)流進(jìn)入“暫停態(tài)”。另外,存在pipe方法的目的地時(shí),調(diào)用pause方法,并不能保證數(shù)據(jù)流總是處于暫停態(tài),一旦那些目的地發(fā)出數(shù)據(jù)請求,數(shù)據(jù)流有可能會(huì)繼續(xù)提供數(shù)據(jù)。

每當(dāng)系統(tǒng)有新的數(shù)據(jù),該接口可以監(jiān)聽到data事件,從而回調(diào)函數(shù)。

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  data+=chunk;
});

readableStream.on('end', function() {
  console.log(data);
});

上面代碼中,fs模塊的createReadStream方法,是部署了Stream接口的文件讀取方法。該方法對指定的文件,返回一個(gè)對象。該對象只要監(jiān)聽data事件,回調(diào)函數(shù)就能讀到數(shù)據(jù)。

除了data事件,監(jiān)聽readable事件,也可以讀到數(shù)據(jù)。

var fs = require('fs');
var readableStream = fs.createReadStream('file.txt');
var data = '';
var chunk;

readableStream.setEncoding('utf8');

readableStream.on('readable', function() {
  while ((chunk=readableStream.read()) !== null) {
    data += chunk;
  }
});

readableStream.on('end', function() {
  console.log(data)
});

readable事件表示系統(tǒng)緩沖之中有可讀的數(shù)據(jù),使用read方法去讀出數(shù)據(jù)。如果沒有數(shù)據(jù)可讀,read方法會(huì)返回null。

“可讀數(shù)據(jù)流”除了read方法,還有以下方法。

  • Readable.pause() :暫停數(shù)據(jù)流。已經(jīng)存在的數(shù)據(jù),也不再觸發(fā)data事件,數(shù)據(jù)將保留在緩存之中,此時(shí)的數(shù)據(jù)流稱為靜態(tài)數(shù)據(jù)流。如果對靜態(tài)數(shù)據(jù)流再次調(diào)用pause方法,數(shù)據(jù)流將重新開始流動(dòng),但是緩存中現(xiàn)有的數(shù)據(jù),不會(huì)再觸發(fā)data事件。
  • Readable.resume():恢復(fù)暫停的數(shù)據(jù)流。
  • readable.unpipe():從管道中移除目的地?cái)?shù)據(jù)流。如果該方法使用時(shí)帶有參數(shù),會(huì)阻止“可讀數(shù)據(jù)流”進(jìn)入某個(gè)特定的目的地?cái)?shù)據(jù)流。如果使用時(shí)不帶有參數(shù),則會(huì)移除所有的目的地?cái)?shù)據(jù)流。

read()

read方法從系統(tǒng)緩存讀取并返回?cái)?shù)據(jù)。如果讀不到數(shù)據(jù),則返回null。

該方法可以接受一個(gè)整數(shù)作為參數(shù),表示所要讀取數(shù)據(jù)的數(shù)量,然后會(huì)返回該數(shù)量的數(shù)據(jù)。如果讀不到足夠數(shù)量的數(shù)據(jù),返回null。如果不提供這個(gè)參數(shù),默認(rèn)返回系統(tǒng)緩存之中的所有數(shù)據(jù)。

只在“暫停態(tài)”時(shí),該方法才有必要手動(dòng)調(diào)用。“流動(dòng)態(tài)”時(shí),該方法是自動(dòng)調(diào)用的,直到系統(tǒng)緩存之中的數(shù)據(jù)被讀光。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  var chunk;
  while (null !== (chunk = readable.read())) {
    console.log('got %d bytes of data', chunk.length);
  }
});

如果該方法返回一個(gè)數(shù)據(jù)塊,那么它就觸發(fā)了data事件。

setEncoding()

調(diào)用該方法,會(huì)使得數(shù)據(jù)流返回指定編碼的字符串,而不是緩存之中的二進(jìn)制對象。比如,調(diào)用setEncoding('utf8'),數(shù)據(jù)流會(huì)返回UTF-8字符串,調(diào)用setEncoding('hex'),數(shù)據(jù)流會(huì)返回16進(jìn)制的字符串。

該方法會(huì)正確處理多字節(jié)的字符,而緩存的方法buf.toString(encoding)不會(huì)。所以如果想要從數(shù)據(jù)流讀取字符串,應(yīng)該總是使用該方法。

var readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', function(chunk) {
  assert.equal(typeof chunk, 'string');
  console.log('got %d characters of string data', chunk.length);
});

resume()

resume方法會(huì)使得“可讀數(shù)據(jù)流”繼續(xù)釋放data事件,即轉(zhuǎn)為流動(dòng)態(tài)。

var readable = getReadableStreamSomehow();
readable.resume();
readable.on('end', function(chunk) {
  console.log('數(shù)據(jù)流到達(dá)尾部,未讀取任務(wù)數(shù)據(jù)');
});

上面代碼中,調(diào)用resume方法使得數(shù)據(jù)流進(jìn)入流動(dòng)態(tài),只定義end事件的監(jiān)聽函數(shù),不定義data事件的監(jiān)聽函數(shù),表示不從數(shù)據(jù)流讀取任何數(shù)據(jù),只監(jiān)聽數(shù)據(jù)流到達(dá)尾部。

pause()

pause方法使得流動(dòng)態(tài)的數(shù)據(jù)流,停止釋放data事件,轉(zhuǎn)而進(jìn)入暫停態(tài)。任何此時(shí)已經(jīng)可以讀到的數(shù)據(jù),都將停留在系統(tǒng)緩存。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('讀取%d字節(jié)的數(shù)據(jù)', chunk.length);
  readable.pause();
  console.log('接下來的1秒內(nèi)不讀取數(shù)據(jù)');
  setTimeout(function() {
    console.log('數(shù)據(jù)恢復(fù)讀取');
    readable.resume();
  }, 1000);
});

isPaused()

該方法返回一個(gè)布爾值,表示“可讀數(shù)據(jù)流”被客戶端手動(dòng)暫停(即調(diào)用了pause方法),目前還沒有調(diào)用resume方法。

var readable = new stream.Readable

readable.isPaused() // === false
readable.pause()
readable.isPaused() // === true
readable.resume()
readable.isPaused() // === false

pipe()

pipe方法是自動(dòng)傳送數(shù)據(jù)的機(jī)制,就像管道一樣。它從“可讀數(shù)據(jù)流”讀出所有數(shù)據(jù),將其寫出指定的目的地。整個(gè)過程是自動(dòng)的。

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.pipe(writableStream);

上面代碼使用pipe方法,將file1的內(nèi)容寫入file2。整個(gè)過程由pipe方法管理,不用手動(dòng)干預(yù),所以可以將傳送數(shù)據(jù)寫得很簡潔。

pipe方法返回目的地的數(shù)據(jù)流,因此可以使用鏈?zhǔn)綄懛?,將多個(gè)數(shù)據(jù)流操作連在一起。

var fs = require('fs');
var zlib = require('zlib');

fs.createReadStream('input.txt.gz')
  .pipe(zlib.createGunzip())
  .pipe(fs.createWriteStream('output.txt'));

上面代碼采用鏈?zhǔn)綄懛ǎ茸x取文件,然后進(jìn)行壓縮,最后輸出。

下面的寫法模擬了Unix系統(tǒng)的cat命令,將標(biāo)準(zhǔn)輸出寫入標(biāo)準(zhǔn)輸入。

process.stdin.pipe(process.stdout);

當(dāng)來源地的數(shù)據(jù)流讀取完成,默認(rèn)會(huì)調(diào)用目的地的end方法,就不再能夠?qū)懭?。對pipe方法傳入第二個(gè)參數(shù){ end: false },可以讓目的地的數(shù)據(jù)流保持打開。

reader.pipe(writer, { end: false });
reader.on('end', function() {
  writer.end('Goodbye\n');
});

上面代碼中,目的地?cái)?shù)據(jù)流默認(rèn)不會(huì)調(diào)用end方法,只能手動(dòng)調(diào)用,因此“Goodbye”會(huì)被寫入。

unpipe()

該方法移除pipe方法指定的數(shù)據(jù)流目的地。如果沒有參數(shù),則移除所有的pipe方法目的地。如果有參數(shù),則移除該參數(shù)指定的目的地。如果沒有匹配參數(shù)的目的地,則不會(huì)產(chǎn)生任何效果。

var readable = getReadableStreamSomehow();
var writable = fs.createWriteStream('file.txt');
readable.pipe(writable);
setTimeout(function() {
  console.log('停止寫入file.txt');
  readable.unpipe(writable);
  console.log('手動(dòng)關(guān)閉file.txt的寫入數(shù)據(jù)流');
  writable.end();
}, 1000);

上面代碼寫入file.txt的時(shí)間,只有1秒鐘,然后就停止寫入。

事件

(1)readable

readable事件在數(shù)據(jù)流能夠向外提供數(shù)據(jù)時(shí)觸發(fā)。

var readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // there is some data to read now
});

(2)data

對于那些沒有顯式暫停的數(shù)據(jù)流,添加data事件監(jiān)聽函數(shù),會(huì)將數(shù)據(jù)流切換到流動(dòng)態(tài),盡快向外提供數(shù)據(jù)。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});

(3)end

無法再讀取到數(shù)據(jù)時(shí),會(huì)觸發(fā)end事件。也就是說,只有當(dāng)前數(shù)據(jù)被完全讀取完,才會(huì)觸發(fā)end事件,比如不停地調(diào)用read方法。

var readable = getReadableStreamSomehow();
readable.on('data', function(chunk) {
  console.log('got %d bytes of data', chunk.length);
});
readable.on('end', function() {
  console.log('there will be no more data.');
});

(4)close

數(shù)據(jù)源關(guān)閉時(shí),close事件被觸發(fā)。并不是所有的數(shù)據(jù)流都支持這個(gè)事件。

(5)error

當(dāng)讀取數(shù)據(jù)發(fā)生錯(cuò)誤時(shí),error事件被觸發(fā)。

可寫數(shù)據(jù)流

“可寫數(shù)據(jù)流”允許你將數(shù)據(jù)寫入某個(gè)目的地。它是數(shù)據(jù)寫入的一種抽象,不同的數(shù)據(jù)目的地部署了這個(gè)接口以后,就可以用統(tǒng)一的方法寫入。

以下是部署了可寫數(shù)據(jù)流的一些場合。

  • 客戶端的http requests
  • 服務(wù)器的http responses
  • fs write streams
  • zlib streams
  • crypto streams
  • tcp sockets
  • child process stdin
  • process.stdout, process.stderr

下面是fs模塊的可寫數(shù)據(jù)流的例子。

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) {
  writableStream.write(chunk);
});

上面代碼中,fs模塊的createWriteStream方法針對特定文件,創(chuàng)建了一個(gè)“可寫數(shù)據(jù)流”,本質(zhì)上就是對寫入操作部署了Stream接口。然后,“可寫數(shù)據(jù)流”的write方法,可以將數(shù)據(jù)寫入文件。

write()

write方法用于向“可寫數(shù)據(jù)流”寫入數(shù)據(jù)。它接受兩個(gè)參數(shù),一個(gè)是寫入的內(nèi)容,可以是字符串,也可以是一個(gè)stream對象(比如可讀數(shù)據(jù)流),另一個(gè)是寫入完成后的回調(diào)函數(shù)。

它返回一個(gè)布爾值,表示本次數(shù)據(jù)是否處理完成。如果返回true,就表示可以寫入新的數(shù)據(jù)了。如果等待寫入的數(shù)據(jù)被緩存了,就返回false。不過,在返回false的情況下,也可以繼續(xù)傳入新的數(shù)據(jù)等待寫入。只是這時(shí),新的數(shù)據(jù)不會(huì)真的寫入,只會(huì)緩存在內(nèi)存中。為了避免內(nèi)存消耗,比較好的做法還是等待該方法返回true,然后再寫入。

cork(),uncork()

cork方法可以強(qiáng)制等待寫入的數(shù)據(jù)進(jìn)入緩存。當(dāng)調(diào)用uncork方法或end方法時(shí),緩存的數(shù)據(jù)就會(huì)吐出。

setDefaultEncoding()

setDefaultEncoding方法用于將寫入的數(shù)據(jù)編碼成新的格式。它返回一個(gè)布爾值,表示編碼是否成功,如果返回false就表示編碼失敗。

end()

end方法用于終止“可寫數(shù)據(jù)流”。該方法可以接受三個(gè)參數(shù),全部都是可選參數(shù)。第一個(gè)參數(shù)是最后所要寫入的數(shù)據(jù),可以是字符串,也可以是stream對象;第二個(gè)參數(shù)是寫入編碼;第三個(gè)參數(shù)是一個(gè)回調(diào)函數(shù),finish事件觸發(fā)時(shí),會(huì)調(diào)用這個(gè)回調(diào)函數(shù)。

var file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');

上面代碼會(huì)在數(shù)據(jù)寫入結(jié)束時(shí),在尾部寫入“world!”。

調(diào)用end方法之后,再寫入數(shù)據(jù)會(huì)報(bào)錯(cuò)。

var file = fs.createWriteStream('example.txt');
file.end('world!');
file.write('hello, '); // 報(bào)錯(cuò)

事件

(1)drain事件

writable.write(chunk)返回false以后,當(dāng)緩存數(shù)據(jù)全部寫入完成,可以繼續(xù)寫入時(shí),會(huì)觸發(fā)drain事件。

function writeOneMillionTimes(writer, data, encoding, callback) {
  var i = 1000000;
  write();
  function write() {
    var ok = true;
    do {
      i -= 1;
      if (i === 0) {
        writer.write(data, encoding, callback);
      } else {
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      writer.once('drain', write);
    }
  }
}

上面代碼是一個(gè)寫入100萬次的例子,通過drain事件得到可以繼續(xù)寫入的通知。

(2)finish事件

調(diào)用end方法時(shí),所有緩存的數(shù)據(jù)釋放,觸發(fā)finish事件。該事件的回調(diào)函數(shù)沒有參數(shù)。

var writer = getWritableStreamSomehow();
for (var i = 0; i < 100; i ++) {
  writer.write('hello, #' + i + '!\n');
}
writer.end('this is the end\n');
writer.on('finish', function() {
  console.error('all writes are now complete.');
});

(3)pipe事件

“可寫數(shù)據(jù)流”調(diào)用pipe方法,將數(shù)據(jù)流導(dǎo)向?qū)懭肽康牡貢r(shí),觸發(fā)該事件。

該事件的回調(diào)函數(shù),接受發(fā)出該事件的“可讀數(shù)據(jù)流”對象作為參數(shù)。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('pipe', function(src) {
  console.error('something is piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);

(4)unpipe事件

“可讀數(shù)據(jù)流”調(diào)用unpipe方法,將可寫數(shù)據(jù)流移出寫入目的地時(shí),觸發(fā)該事件。

該事件的回調(diào)函數(shù),接受發(fā)出該事件的“可讀數(shù)據(jù)流”對象作為參數(shù)。

var writer = getWritableStreamSomehow();
var reader = getReadableStreamSomehow();
writer.on('unpipe', function(src) {
  console.error('something has stopped piping into the writer');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer);

(5)error事件

如果寫入數(shù)據(jù)或pipe數(shù)據(jù)時(shí)發(fā)生錯(cuò)誤,就會(huì)觸發(fā)該事件。

該事件的回調(diào)函數(shù),接受一個(gè)Error對象作為參數(shù)。

HTTP請求

HTTP對象使用Stream接口,實(shí)現(xiàn)網(wǎng)絡(luò)數(shù)據(jù)的讀寫。

var http = require('http');

var server = http.createServer(function (req, res) {
  // req is an http.IncomingMessage, which is a Readable Stream
  // res is an http.ServerResponse, which is a Writable Stream

  var body = '';
  // we want to get the data as utf8 strings
  // If you don't set an encoding, then you'll get Buffer objects
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added
  req.on('data', function (chunk) {
    body += chunk;
  });

  // the end event tells you that you have entire body
  req.on('end', function () {
    try {
      var data = JSON.parse(body);
    } catch (er) {
      // uh oh!  bad json!
      res.statusCode = 400;
      return res.end('error: ' + er.message);
    }

    // write back something interesting to the user:
    res.write(typeof data);
    res.end();
  });
});

server.listen(1337);

// $ curl localhost:1337 -d '{}'
// object
// $ curl localhost:1337 -d '"foo"'
// string
// $ curl localhost:1337 -d 'not json'
// error: Unexpected token o

data事件表示讀取或?qū)懭肓艘粔K數(shù)據(jù)。

req.on('data', function(buf){
  // Do something with the Buffer
});

使用req.setEncoding方法,可以設(shè)定字符串編碼。

req.setEncoding('utf8');
req.on('data', function(str){
    // Do something with the String
});

end事件,表示讀取或?qū)懭霐?shù)據(jù)完畢。

var http = require('http');

http.createServer(function(req, res){
    res.writeHead(200);
    req.on('data', function(data){
        res.write(data);
    });
    req.on('end', function(){
        res.end();
    });
}).listen(3000);

上面代碼相當(dāng)于建立了“回聲”服務(wù),將HTTP請求的數(shù)據(jù)體,用HTTP回應(yīng)原樣發(fā)送回去。

system模塊提供了pump方法,有點(diǎn)像Linux系統(tǒng)的管道功能,可以將一個(gè)數(shù)據(jù)流,原封不動(dòng)得轉(zhuǎn)給另一個(gè)數(shù)據(jù)流。所以,上面的例子也可以用pump方法實(shí)現(xiàn)。

var http = require('http'),
    sys = require('sys');

http.createServer(function(req, res){
    res.writeHead(200);
    sys.pump(req, res);
}).listen(3000);

fs模塊

fs模塊的createReadStream方法用于新建讀取數(shù)據(jù)流,createWriteStream方法用于新建寫入數(shù)據(jù)流。使用這兩個(gè)方法,可以做出一個(gè)用于文件復(fù)制的腳本copy.js。

// copy.js

var fs = require('fs');
console.log(process.argv[2], '->', process.argv[3]);

var readStream = fs.createReadStream(process.argv[2]);
var writeStream = fs.createWriteStream(process.argv[3]);

readStream.on('data', function (chunk) {
  writeStream.write(chunk);
});

readStream.on('end', function () {
  writeStream.end();
});

readStream.on('error', function (err) {
  console.log("ERROR", err);
});

writeStream.on('error', function (err) {
  console.log("ERROR", err);
});d all your errors, you wouldn't need to use domains.

上面代碼非常容易理解,使用的時(shí)候直接提供源文件路徑和目標(biāo)文件路徑,就可以了。

node cp.js src.txt dest.txt

Streams對象都具有pipe方法,起到管道作用,將一個(gè)數(shù)據(jù)流輸入另一個(gè)數(shù)據(jù)流。所以,上面代碼可以重寫成下面這樣:

var fs = require('fs');
console.log(process.argv[2], '->', process.argv[3]);

var readStream = fs.createReadStream(process.argv[2]);
var writeStream = fs.createWriteStream(process.argv[3]);

readStream.on('open', function () {
  readStream.pipe(writeStream);
});

readStream.on('end', function () {
  writeStream.end();
});

錯(cuò)誤處理

下面是壓縮后發(fā)送文件的代碼。

http.createServer(function (req, res) {
  // set the content headers
  fs.createReadStream('filename.txt')
  .pipe(zlib.createGzip())
  .pipe(res)
})

上面的代碼沒有部署錯(cuò)誤處理機(jī)制,一旦發(fā)生錯(cuò)誤,就無法處理。所以,需要加上error事件的監(jiān)聽函數(shù)。

http.createServer(function (req, res) {
  // set the content headers
  fs.createReadStream('filename.txt')
  .on('error', onerror)
  .pipe(zlib.createGzip())
  .on('error', onerror)
  .pipe(res)

  function onerror(err) {
    console.error(err.stack)
  }
})

上面的代碼還是存在問題,如果客戶端中斷下載,寫入的數(shù)據(jù)流就會(huì)收不到close事件,一直處于等待狀態(tài),從而造成內(nèi)存泄漏。因此,需要使用on-finished模塊用來處理這種情況。

http.createServer(function (req, res) {
  var stream = fs.createReadStream('filename.txt')

  // set the content headers
  stream
  .on('error', onerror)
  .pipe(zlib.createGzip())
  .on('error', onerror)
  .pipe(res)

  onFinished(res, function () {
    // make sure the stream is always destroyed
    stream.destroy()
  })
})

參考鏈接

以上內(nèi)容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號