Node.js 流编程

JavaScript/前端
245
0
0
2024-04-23
标签   NodeJs

缓冲模式和流模式

  • 缓冲模式(buffer mode),在这种模式下系统会把某份资源传来的所有数据,都先收集到一个缓冲区里,直到操作完成为止。然后,系统把这些数据当成一个模块回传给调用方。比如 fs.writeFilefs.readFile 等;
  • 流模式(stream mode),在流模式下,系统会把自己从资源端收到的每一块新数据都立刻传给消费方,让后者有机会立刻处理该数据。

假如我们要读取一份特别庞大的文件,这份文件有好几个 GB 大小,这种情况下如果使用缓冲模式是相当糟糕的,而且 V8 引擎对缓冲区的尺寸是有限制的,你可能根本没办法分配一个高达好几 GB 的缓冲区,因此有可能还谈不到物理内存耗尽的问题,你在分配缓冲区的这个环节就已经被卡住了。

在 Node.js 中可以通过 buffer.constants.MAX\_LENGTH 查看某套开发环境最多可支持多少字节的缓冲区。

缓冲模式压缩文件

// main.js

import { gzip } from 'node:zlib';

import { promisify } from 'node:util';

import { promises as fs } from 'node:fs';



const gzipPromise = promisify(gzip);



const filename = process.argv[2];



async function main() {

    const data = await fs.readFile(filename);

    const gzippedData = await gzipPromise(data);

    await fs.writeFile(`${filename}.gz`, gzippedData);

    console.log('File successfully compressed');

}



main();

运行 node ./main.js ./test.txt 就可以把 test.txt 文件压缩成 .gz 格式的压缩包了。

流模式压缩文件

// main.js

import { createGzip } from 'node:zlib';

import { createReadStream, createWriteStream } from 'node:fs';



const filename = process.argv[2];



createReadStream(filename)

    .pipe(createGzip())

    .pipe(createWriteStream(`${filename}.gz`))

    .on('finish', () => console.log('File successfully compressed'));

流对象结构

Node.js 平台里面每一种流对象,在类型上都属于下面这四个基本抽象类中的一个,这些类是由 stream 核心模块提供的:

  • Readable
  • Writable
  • Duplex
  • Transform

每个 stream 类的对象,本身也都是一个 EventEmmiter 实例,所有流对象实际上可以触发许多事件,比如:

  • Readable 流在读取完毕时会触发 end 事件;
  • Writable 流在写入完毕后会触发 finish 事件;
  • 如果操作过程中发生错误,则会触发 error 事件;

流不仅可以处理二进制数据,而且几乎能处理任何一种 JavaScript 值。流对象的操作模式可以分成两种:

  • 二进制模式(Binary mode):以 chunk 的形式串流数据,这种模式可以用来处理缓冲或者字符串;
  • 对象模式(Object mode):以对象序列的形式串流数据(这意味着我们几乎能处理任何一种 JavaScript 值),因此可以像函数式编程那样,把各种处理环节分别表示成相应的流对象,并把这些对象组合起来(比如 Rxjs 这个库);

Readable 流(可读流)

要通过 Readable 流来读取数据,有两种办法可以考虑:非流动模式(non-flowing),也叫暂停模式,另一种是流动模式(flowing)。

非流动模式

下面代码实现了一款简单的程序,把标准输入端(这也是一种 Readable 流)的内容读取进来,并将读到的东西回显到标准输出端。

process.stdin.on('readable', () => {

    let chunk: Buffer | null;

    console.log('New data available');

    while((chunk = process.stdin.read()) !== null) {

        // 回显

        console.log(

            `Chunk read(${chunk.length} bytes: "${chunk.toString()}")`

        );

    }

}).on('end', () => {

    // Windows 上是 Ctrl+Z,Linux和Mac上是 Ctrl+D

    console.log('End of stream');

});

readable 一旦发生(按下 Enter 键),就说明有新的数据可以读取了。

process.stdin.read() 方法是一项同步操作,会从 Readable 流内部缓冲区里面提取一块数据,这种模式下让我们可以根据需要,从流对象里面提取数据。

流动模式

流动模式下,我们不通过 read() 方法提取数据,而是等着流对象把数据推送到 data 监听器里面,只要流对象拿到数据,它就会推过来。上面的代码改为流动模式,就应该这么写:

process.stdin.on('data', (chunk) => {

    console.log('New data available');

    console.log(

        `Chunk read(${chunk.length} bytes: "${chunk.toString()}")`

    );

})

.on('end', () => {

    console.log('End of stream');

});

实现自己的 Readable 流

自己定制新的 Readable 流,首先必须从 stream 模块里面继承 Readable 原型,然后还必须在自己的这个具体类之中,给 \_read([size]) 方法提供实现代码,而这个方法内部又必须 readable.push(chunk) 这种操作向缓冲区里面填入数据。

\_read() 方法和 read() 方法不通,后者是给流对象的消费方使用的,而 \_read() 方法是我们在定制 stream 子类时必须自己实现的一个方法。一旦流准备好接受更多数据,则 \_read() 将在每次调用 this.push(dataChunk) 后再次调用。 \_read() 可能会继续从资源中读取并推送数据,直到 readable.push() 返回 false。

比如下面代码,可以生成随机字符串流对象:

import { Readable, ReadableOptions } from 'node:stream';

import Chance from 'chance';



const chance = new Chance();



export class RandomStream extends Readable {

    emittedBytes = 0;

    constructor(options?: ReadableOptions) {

        super(options); // 继承

    }



    \_read(size: number): void {

        // 生成长度为 size 的随机字符串

        const chunk = chance.string({ length: size });

        // 推入内部缓冲区

        this.push(chunk, 'utf8');

        this.emittedBytes += chunk.length;

        // 百分之 5 的概率返回 true,并推入 null

        // 这样会给内部缓冲区推入 `EOF`(文件结束),表示这条数据就此结束

        if (chance.bool({ likelihood: 5 })) {

            this.push(null);

        }

    }

}
使用 RandomStream
const randomStream = new RandomStream({

    highWaterMark: 10

});

randomStream.on('data', (chunk) => {

    console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);

}).on('end', () => {

    console.log(`Produced (${randomStream.emittedBytes}) bytes of radom data`);

});
\_read() 函数中接收一个 size 数字类型的参数,它是一个建议参数,意思是说,你最好尊重这个参数,只推入调用方所请求的这么多字节(即 highWaterMark 配置项),当然这只是一个建议,不是强迫你必须这么做。

ReadableOptions 接收的 options 参数可能会有这样一些属性:

  • encoding: 表示流对象按照什么样的编码标准,把缓冲区的数据转化成字符串,它的默认值是 null
  • objectMode: 这个属性是个标志,用来表示对象模式是否启用,它的默认值是 false
  • highWaterMark: 这个属性表示内部缓冲区的数据上限,如果数据所占的字节数已经达到该上限,那么这个流对象就不应该再从数据源之中读取数据了,它的默认值是 16KB

简化版定制方案

如果定制的流对象比较简单,可以不用专门编写一个类,而是采用简化版的写法来制作 Readable 流。这种写法只需要调用 new Readable(options),并把一个包含 read() 方法的对象传给 options 参数即可。

let emittedBytes = 0;

const randomStream = new Readable({

    highWaterMark: 10,

    read(size: number): void {

        const chunk = chance.string({ length: size });

        this.push(chunk, 'utf8');

        emittedBytes += chunk.length;

        if (chance.bool({ likelihood: 5 })) {

            this.push(null);

        }

    }

});



randomStream.on('data', (chunk) => {

    console.log(`Chunk received (${chunk.length}) bytes: ${chunk.toString()}`);

}).on('end', () => {

    console.log(`Produced (${emittedBytes}) bytes of radom data`);

});

Readable.from

// main.js

有个叫做 Readable.from() 的辅助函数,让你能够把数组或者生成器、迭代器以及异步迭代器这样的 iterable 对象当做数据源,轻松构建 Readable 流。

import { Readable } from 'node:stream';



const arrStream = Readable.from(['a', 'b', 'c', 'd', 'e', 'f', 'g']);



arrStream.on('data', (char: string) => {

    console.log("🚀 ~ file: exercise.ts:6 ~ arrStream.on ~ char:", char);

}).on('end', () => {

    console.log("end");

});

Writable 流(可写流)

向 Writable 流推送数据,是相当容易的,我们只需要使用 write 方法就行了,方法前面是:

writable.write(chunk, [encoding], [callback])

其中 encoding 参数和 callback 参数是可选的。如果 chunk 是字符串,那么 encoding 参数默认是 utf8,如果 chunk 是 Buffer,那么该参数的值会为系统所忽略。callback 表示这个函数会在系统把数据块写入底层资源的时候,得到调用。

如果想告诉 Writable 流,已经没有数据需要写入了,那么应该调用 end() 方法:

writable.end([chunk], [encoding], [callback]);

下面代码我们创建了一个小的 HTTP 服务器程序,让它输出一些随机字符串:

const chance = new Chance();

const server = createServer((\_, res) => {

    res.writeHead(200, { 'Content-Type': 'text/plain' });

    while(chance.bool({ likelihood: 95 })) {

        res.write(`${chance.string()}\n`);

    }

    res.end('\n\n');

    res.on('finish', () => console.log('All data send.'));

});

server.listen(8082, () => {

    console.log('listening on http://localhost:8082');

});

res 对象不仅是 http.ServerResponse 实例,同时也是一个 Writable 流。上面代码我们使用 curl localhost:8082 命令就可以看到服务器发来的随机字符串了。

实现 Writable 流

要实现一种新的 Writable 流,我们可以继承 Writable 类,并实现 \_write() 方法。

假如我们要实现这样一种 Writable 流,接收下面这种格式的对象:

{

    path: <文件路径>

    content: <字符串或 buffer>

}

每收到这样一个对象,我们就会把 path 所指的路径下创建一份文件,并把 content 属性的内容存入该文件。

大家应该意识到,输入给我们这种 Writable 流的数据,并不是字符串或 Buffer,而应该是对象,因此这种流必须在对象模式下运作。代码如下:

import { dirname } from 'node:path';

import { promises as fs } from 'node:fs';

import { Writable, WritableOptions } from 'node:stream';



interface ChunkType {

    path: string;

    content: string | Buffer;

}

export class ToFileStream extends Writable {

    constructor(options: WritableOptions) {

        super({ ...options, objectMode: true });

    }



    \_write(chunk: ChunkType, \_encoding: BufferEncoding, callback: (error?: Error) => void) {

        // 递归创建多级文件夹,然后写入文件

        fs.mkdir(dirname(chunk.path), { recursive: true })

            .then(() => fs.writeFile(chunk.path, chunk.content))

            .then(() => callback())

            .catch(callback);

    }

}

使用:

tfs.write({

    path: join('files', 'file1.txt'),

    content: 'Hello',

});

tfs.write({

    path: join('files', 'file2.txt'),

    content: 'Node.js',

});

tfs.write({

    path: join('files', 'file3.txt'),

    content: 'streams',

});



tfs.end(() => console.log('All files created.'));

另外,通 Readable 流一样,Writable 流也支持简化版的定制方案:

const tfs = new Writable({

    objectMode: true,

    write(chunk: ChunkType, \_encoding: BufferEncoding, callback) {

        fs.mkdir(dirname(chunk.path), { recursive: true })

            .then(() => fs.writeFile(chunk.path, chunk.content))

            .then(() => callback())

            .catch(callback);

    }

});

backpressure(防拥堵机制)

写入数据的速度可能要比消耗数据的速度要快,为了应对这种情况,流对象会把写进来的数据先放入缓冲区,但如果给该对象写入数据的那个人不知道已经出现这种情况,那么还是会不断地写入,导致内部缓冲区里面的数据越积越多,让内存使用量变得比较高。

为了提醒写入方注意这种问题,writable.write() 方法会在内部缓冲区触碰 highWaterMark(内部缓冲区的数据上限) 上限的时候,返回 false,以表明此时不应该再向其中写入内容。当缓冲区清空时,流对象会触发 drain 事件,以提示现在又可以向里面写入数据了。这套机制就叫做 backpressure(防拥堵机制)。

backpressure 只是一套建议机制,而不是强制实施的。即便 write() 返回 false,我们也还是可以忽略这个信号,继续往里面写入,让缓冲区越变越大。

这套机制其实在 Readable 流中也有类似的体现,在实现 \_read() 方法时,如果发现自己调用 push() 方法得到的结果是 false,那就不应该再向其中推送新数据了。这个问题仅仅需要由实现 Readable 流的人来担心,而不太需要由使用这种流的人负责处理。

下面代码实现的是防拥堵机制的输出随机字符 HTTP 服务器:

const chance = new Chance();

const server = createServer((\_, res) => {

    res.writeHead(200, { 'Content-Type': 'text/plain' });

    function generateMore() {

        while(chance.bool({ likelihood: 95 })) {

            const randomChunk = chance.string({

                length: (16 \* 1024) - 1

            });

            const shouldContinue = res.write(`${randomChunk}\n`);

            if (!shouldContinue) {  // 是否已拥堵

                console.log('back-pressure');

                // 监听 drain 事件,表面现在又可以向里面写入数据了

                return res.on('drain', generateMore);

            }

        }

        res.end('\n\n');

    }

    generateMore();

    res.on('finish', () => console.log('All data send.'));

});

server.listen(8082, () => {

    console.log('listening on http://localhost:8082');

});

以上就是可读流和可写流的全部内容了,双工流和转换流下期分享!