缓冲模式和流模式
- 缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如
fs.writeFile
、fs.readFile
等; - 流模式(stream mode),在流模式下,系统会把自己从资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。
假如我们要读取一份特别庞大的文件,这份文件有好几个 GB 大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几 GB 的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。
在 Node.js 中可以通过 buffer.constants.MAX\_LENGTH
查看某套开发环境最多可支持多少字节的缓冲区。
缓冲模式压缩文件
// main.js
import { gzip } from 'node:zlib';
import { promisify } from 'node:util';
import { promises as fs } from 'node:fs';
const gzipPromise = promisify(gzip);
const filename = process.argv[2];
async function main() {
const data = await fs.readFile(filename);
const gzippedData = await gzipPromise(data);
await fs.writeFile(`${filename}.gz`, gzippedData);
console.log('File successfully compressed');
}
main();
运行 node ./main.js ./test.txt 就可以把 test.txt 文件压缩成 .gz 格式的压缩包了。
流模式压缩文件
// main.js
import { createGzip } from 'node:zlib';
import { createReadStream, createWriteStream } from 'node:fs';
const filename = process.argv[2];
createReadStream(filename)
.pipe(createGzip())
.pipe(createWriteStream(`${filename}.gz`))
.on('finish', () => console.log('File successfully compressed'));
流对象结构
在 Node.js
平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream
核心模块提供的:
Readable
Writable
Duplex
Transform
每个 stream
类的对象,本身也都是一个 EventEmmiter
实例,所有流对象实际上可以触发许多事件,比如:
Readable
流在读取完毕时会触发end
事件;Writable
流在写入完毕后会触发finish
事件;- 如果操作过程中发生错误,则会触发
error
事件;
流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:
- 二进制模式(Binary mode):以 chunk 的形式串流数据,这种模式可以用来处理缓冲或者字符串;
- 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如
Rxjs
这个库);
Readable 流(可读流)
要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式
(non-flowing),也叫暂停模式,另一种是流动模式
(flowing)。
非流动模式
下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。
process.stdin.on('readable', () => {
let chunk: Buffer | null;
console.log('New data available');
while((chunk = process.stdin.read()) !== null) {
// 回显
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
}
}).on('end', () => {
// Windows 上是 Ctrl+Z,Linux和Mac上是 Ctrl+D
console.log('End of stream');
});
readable
一旦发生(按下 Enter 键),就说明有新的数据可以读取了。
process.stdin.read()
方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。
流动模式
流动模式下,我们不通过 read()
方法提取数据,而是等着流对象把数据推送到 data
监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:
process.stdin.on('data', (chunk) => {
console.log('New data available');
console.log(
`Chunk read(${chunk.length} bytes: "${chunk.toString()}")`
);
})
.on('end', () => {
console.log('End of stream');
});
实现自己的 Readable 流
自己定制新的 Readable 流,首先必须从 stream 模块里面继承 Readable 原型,然后还必须在自己的这个具体类之中,给 \_read([size])
方法提供实现代码,而这个方法内部又必须 readable.push(chunk)
这种操作向缓冲区里面填入数据。
\_read()
方法和read()
方法不通,后者是给流对象的消费方使用的,而\_read()
方法是我们在定制 stream 子类时必须自己实现的一个方法。一旦流准备好接受更多数据,则 \_read() 将在每次调用 this.push(dataChunk) 后再次调用。 \_read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。
比如下面代码,可以生成随机字符串流对象:
import { Readable, ReadableOptions } from 'node:stream';
import Chance from 'chance';
const chance = new Chance();
export class RandomStream extends Readable {
emittedBytes = 0;
constructor(options?: ReadableOptions) {
super(options); // 继承
}
\_read(size: number): void {
// 生成长度为 size 的随机字符串
const chunk = chance.string({ length: size });
// 推入内部缓冲区
this.push(chunk, 'utf8');
this.emittedBytes += chunk.length;
// 百分之 5 的概率返回 true,并推入 null
// 这样会给内部缓冲区推入 `EOF`(文件结束),表示这条数据就此结束
if (chance.bool({ likelihood: 5 })) {
this.push(null);
}
}
}
使用 RandomStream
const randomStream = new RandomStream({
highWaterMark: 10
});
randomStream.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);
}).on('end', () => {
console.log(`Produced (${randomStream.emittedBytes}) bytes of radom data`);
});
\_read()
函数中接收一个size
数字类型的参数,它是一个建议参数,意思是说,你最好尊重这个参数,只推入调用方所请求的这么多字节(即highWaterMark
配置项),当然这只是一个建议,不是强迫你必须这么做。
ReadableOptions
接收的 options 参数可能会有这样一些属性:
encoding
: 表示流对象按照什么样的编码标准,把缓冲区的数据转化成字符串,它的默认值是null
;objectMode
: 这个属性是个标志,用来表示对象模式是否启用,它的默认值是false
;highWaterMark
: 这个属性表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源之中读取数据了,它的默认值是16KB
;
简化版定制方案
如果定制的流对象比较简单,可以不用专门编写一个类,而是采用简化版的写法来制作 Readable 流。这种写法只需要调用 new Readable(options)
,并把一个包含 read()
方法的对象传给 options 参数即可。
let emittedBytes = 0;
const randomStream = new Readable({
highWaterMark: 10,
read(size: number): void {
const chunk = chance.string({ length: size });
this.push(chunk, 'utf8');
emittedBytes += chunk.length;
if (chance.bool({ likelihood: 5 })) {
this.push(null);
}
}
});
randomStream.on('data', (chunk) => {
console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);
}).on('end', () => {
console.log(`Produced (${emittedBytes}) bytes of radom data`);
});
Readable.from
// main.js
有个叫做 Readable.from()
的辅助函数,让你能够把数组或者生成器、迭代器以及异步迭代器这样的 iterable
对象当做数据源,轻松构建 Readable 流。
import { Readable } from 'node:stream';
const arrStream = Readable.from(['a', 'b', 'c', 'd', 'e', 'f', 'g']);
arrStream.on('data', (char: string) => {
console.log("🚀 ~ file: exercise.ts:6 ~ arrStream.on ~ char:", char);
}).on('end', () => {
console.log("end");
});
Writable 流(可写流)
向 Writable 流推送数据,是相当容易的,我们只需要使用 write
方法就行了,方法前面是:
writable.write(chunk, [encoding], [callback])
其中 encoding 参数和 callback 参数是可选的。如果 chunk 是字符串,那么 encoding 参数默认是 utf8,如果 chunk 是 Buffer,那么该参数的值会为系统所忽略。callback 表示这个函数会在系统把数据块写入底层资源的时候,得到调用。
如果想告诉 Writable 流,已经没有数据需要写入了,那么应该调用 end()
方法:
writable.end([chunk], [encoding], [callback]);
下面代码我们创建了一个小的 HTTP 服务器程序,让它输出一些随机字符串:
const chance = new Chance();
const server = createServer((\_, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
while(chance.bool({ likelihood: 95 })) {
res.write(`${chance.string()}\n`);
}
res.end('\n\n');
res.on('finish', () => console.log('All data send.'));
});
server.listen(8082, () => {
console.log('listening on http://localhost:8082');
});
res
对象不仅是 http.ServerResponse
实例,同时也是一个 Writable
流。上面代码我们使用 curl localhost:8082
命令就可以看到服务器发来的随机字符串了。
实现 Writable 流
要实现一种新的 Writable 流,我们可以继承 Writable 类,并实现 \_write()
方法。
假如我们要实现这样一种 Writable 流,接收下面这种格式的对象:
{
path: <文件路径>
content: <字符串或 buffer>
}
每收到这样一个对象,我们就会把 path 所指的路径下创建一份文件,并把 content 属性的内容存入该文件。
大家应该意识到,输入给我们这种 Writable 流的数据,并不是字符串或 Buffer,而应该是对象,因此这种流必须在对象模式下运作。代码如下:
import { dirname } from 'node:path';
import { promises as fs } from 'node:fs';
import { Writable, WritableOptions } from 'node:stream';
interface ChunkType {
path: string;
content: string | Buffer;
}
export class ToFileStream extends Writable {
constructor(options: WritableOptions) {
super({ ...options, objectMode: true });
}
\_write(chunk: ChunkType, \_encoding: BufferEncoding, callback: (error?: Error) => void) {
// 递归创建多级文件夹,然后写入文件
fs.mkdir(dirname(chunk.path), { recursive: true })
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => callback())
.catch(callback);
}
}
使用:
tfs.write({
path: join('files', 'file1.txt'),
content: 'Hello',
});
tfs.write({
path: join('files', 'file2.txt'),
content: 'Node.js',
});
tfs.write({
path: join('files', 'file3.txt'),
content: 'streams',
});
tfs.end(() => console.log('All files created.'));
另外,通 Readable 流一样,Writable 流也支持简化版的定制方案:
const tfs = new Writable({
objectMode: true,
write(chunk: ChunkType, \_encoding: BufferEncoding, callback) {
fs.mkdir(dirname(chunk.path), { recursive: true })
.then(() => fs.writeFile(chunk.path, chunk.content))
.then(() => callback())
.catch(callback);
}
});
backpressure(防拥堵机制)
写入数据的速度可能要比消耗数据的速度要快,为了应对这种情况,流对象会把写进来的数据先放入缓冲区,但如果给该对象写入数据的那个人不知道已经出现这种情况,那么还是会不断地写入,导致内部缓冲区里面的数据越积越多,让内存使用量变得比较高。
为了提醒写入方注意这种问题,writable.write() 方法会在内部缓冲区触碰 highWaterMark
(内部缓冲区的数据上限) 上限的时候,返回 false,以表明此时不应该再向其中写入内容。当缓冲区清空时,流对象会触发 drain
事件,以提示现在又可以向里面写入数据了。这套机制就叫做 backpressure
(防拥堵机制)。
backpressure 只是一套建议机制,而不是强制实施的。即便 write() 返回 false,我们也还是可以忽略这个信号,继续往里面写入,让缓冲区越变越大。
这套机制其实在 Readable 流中也有类似的体现,在实现 \_read()
方法时,如果发现自己调用 push()
方法得到的结果是 false,那就不应该再向其中推送新数据了。这个问题仅仅需要由实现 Readable 流的人来担心,而不太需要由使用这种流的人负责处理。
下面代码实现的是防拥堵机制的输出随机字符 HTTP 服务器:
const chance = new Chance();
const server = createServer((\_, res) => {
res.writeHead(200, { 'Content-Type': 'text/plain' });
function generateMore() {
while(chance.bool({ likelihood: 95 })) {
const randomChunk = chance.string({
length: (16 \* 1024) - 1
});
const shouldContinue = res.write(`${randomChunk}\n`);
if (!shouldContinue) { // 是否已拥堵
console.log('back-pressure');
// 监听 drain 事件,表面现在又可以向里面写入数据了
return res.on('drain', generateMore);
}
}
res.end('\n\n');
}
generateMore();
res.on('finish', () => console.log('All data send.'));
});
server.listen(8082, () => {
console.log('listening on http://localhost:8082');
});
以上就是可读流和可写流的全部内容了,双工流和转换流下期分享!