六、stream
1、前言
首先吧,我们的弄清楚为什么要学习 Stream 模块?因为就开发应用来说,不明白 Stream 好像没有什么影响,我只要知道如何监听流状态变化,以及会使用 pipe 处理数据就可以了;但是呢,如果需要你实现一个自定义流,阁下又将如何应对?
stream
模块对于创建新类型的流实例很有用
正如 Node 官网所说,理解 stream 模块对于实现自定义流等非常有用;同时,Node.js 本身很多内置模块就是 Stream,如 http 模块的请求响应、fs 的读写流、zlib 压缩流、crypto 加密流等等,理解流能帮我们更好的理解这些模块。
那么问题来了,什么是 Stream 流?为啥使用 Stream 流操作数据会更快?
2、概念
流是用于在 Node.js 中处理流数据的抽象接口;流可以是可读的、可写的、或两者兼而有之。 所有的流都是
EventEmitter
的实例。
以上为 Node.js 官网给出的定义,实在是有够抽象的~
我们之前 fs.readFile
读取文件,是一次性读取到内存中,对于小型的文本文件,这样是 OK 的;但是,像音视频这样几百 MB 或 1G 的文件,这样就有问题了;node 底层采用的 v8 引擎 ,默认情况下 v8 提供的内存大小 1G 左右(与操作系统有关,可以通过 --max-old-space-size
调整)
随便一个大一点的音视频文件或多几个人访问,就会导致服务器的内存爆掉;流式处理应用而生,把文件资源拆分成小块,分段传输,资源就像水流一样进行传输,减轻服务器压力;以下是一个视频文件的分段流数据
每一段数据只有几百 B
2.1、Node Stream 工作流程
在介绍 Stream 流之前,需理解以下几个概念
- chunk,分段流数据
- 缓冲区,每一个流都会有一个缓冲区,当数据的生产和消费都需要时间时,我们可以在下一次消费前提前生产数据存放到缓冲池。它存储在电脑的 RAM 中;缓冲区会设置一个
highWaterMark
标识,达到标识就不会再从可读流中读取数据 - RAM,是与 CPU 交换数据的随机存储器,常用来暂时存储程序、数据、中间结果;特点是断电即丢失数据
- events,事件触发器,Node 提供的内置模块,实现了事件发布订阅功能,在 Stream 中用来在流状态变化时发射对应的事件,触发流的回调
- pipe,管道,管道的两端分别是一个 Stream,上流是一个可读流,下流是一个可写流;所以多个管道中间的流必须是可读且可写的,如 zlib 压缩流、crypto 加密流
我们可以借助自来水使用来理解流的工作流程
- 自来水源头水库,就相当于可读流的源头 source
- 自来水厂的蓄水池就相当于缓冲区
- 水库的水到蓄水池之间,经过水管连接消毒等处理;就相当于管道中间的可读写处理流
- 当蓄水池达到一定
highWaterMark
标识,就不会再从水库中抽水处理;相当于缓冲区的浮标值 - 当用户需要用水时,打开水龙头,就可以使用自来水;相当于分段的 chunk
2.2、流处理数据的优势
- 更快的处理速度,每拿到一段的 chunk 就可以先处理,而不用等待谁有的数据读取完成再处理,具有更高的效率
- 占用内存更小,不再需要一次性读取整个数据到内存中,而是通过 RAM 缓冲区暂存
- 拓展方便,管道的存在,让流数据的处理及拓展更加方便
2.3、四种流
- Readable:
可读流
,能够实现数据的读取;可读流是对消费数据源的抽象 - Writeeale:
可写流
,能够实现数据的写操作;可写流是数据写入目的地的抽象 - Duplex:
双工流
,既可读又可写;双工流是同时实现 Readable 和 Writable 接口的流 - Tranform:
转换流
,可读可写,还能实现数据修改转换
更多内容参考官方文档
2.4、流的源码分析
我并没有介绍各种流的使用方法和事件,因为从源码角度上来看,四种流都继承了公共的基础类,因此只要大致了解了它们的实现,使用只要大致翻阅以下文档即能迅速掌握,先上代码链接
流的原型链分析,以 fs.createReadStream
为例,他的原型链指向为:
fs 模块自身的属性和方法 --> Readable.prototype --> Stream.prototype --> Stream.prototype --> EventEmitter.prototype --> Object.prototype --> null
Stream 类
我们找到 Stream 的源代码的入口文件 lib\internal\streams\legacy.js
它是一个继承了 events 模块的原型,同时又在原型上实现了 pipe 方法(只有这一个)
Readable 类
我们找到 Readable 的源代码的入口文件 lib\internal\streams\readable.js
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
它继承了 Stream 的原型,同时又在原型上实现了一些方法和添加事件监听函数;如 data
、pause
、resume
等事件和 read
、pause
、resume
等方法
Writable 类
我们找到 Writable 的源代码的入口文件 lib\internal\streams\writable.js
ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Writable, Stream);
它继承了 Stream 的原型,同时又在原型上实现了一些方法和添加事件监听函数;如 drain
、finish
等事件和 write
、close
等方法
Duplex 类
双工流类就非常好理解,它设置原型指向 Readable,然后又遍历,将 Writable 的所有属性和方法添加到自己的原型上
Transform 类
ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);
Transform 则是继承于 Duplex,然后实现了 _transform
、_write
、_read
、_final
方法
至此,四大流的核心实现完成,理解这些能非常好的帮助我们使用 Stream 的 API
3、流式文件读、写、复制
fs 基于 Stream 实现了流式方法用来操作文件;注意,Promise 形式的 API 必须先通过 fs.open 打开一个 filehandle 类,然后调用这个类的流式操作 API
3.1、流式读取文件
(async function() {
const fs = require('fs/promises')
let filehandle = await fs.open('./file/static/authInfo.json')
// 接下来就是 readable stream 的操作 API
let readStream = filehandle.createReadStream({ encoding: 'utf-8' })
readStream.on('data', (chunk) => {
console.log(chunk)
})
// 其他事件或方法调用
readStream.on('error', (err) => {
console.log(err)
})
readStream.on('end', (err) => {
console.log(`读取结束`)
})
})()
- data 事件可能会触发多次,取决于文件大小;
- 默认为 Buffer 形式数据,encoding 则是字符串
3.2、流式写文件
(async function() {
const fs = require('fs/promises')
let 写入时创还能 = await fs.open('./file/static/authInfo2.log', 'w')
// 接下来就是 writeable stream 的操作 API
let writeStream = filehandle.createWriteStream({ encoding: 'utf-8' })
writeStream.write(`11111\r\n`)
writeStream.write(`22222\r\n`)
writeStream.end()
writeStream.on('finish', () => {
console.log(`写入结束`)
})
})()
- 写入流创建时时必须传递 flags 参数
w
,默认是可读r
- 通过监听 finish 来判断是否写入结束
3.3、流式复制文件
(async function() {
const fs = require('fs/promises')
const { pipeline } = require('stream/promises');
let fd1 = await fs.open('./file/static/authInfo.json')
let rs = fd1.createReadStream({ encoding: 'utf-8' })
let fd2 = await fs.open('./file/static/authInfo2.log', 'w')
let ws = fd2.createWriteStream({ encoding: 'utf-8' })
await pipeline(rs, ws);
console.log(`复制完成`)
})()
- 写入流创建时必须传递 flags 参数
w
,默认是可读r
- Promise 形式的管道需要借鉴 Stream 模块的 pipeline
总结:对于文件流式操作,Promise 形式的支持并不算好,推荐采用回调形式来处理,更加简洁
4、Stream 的其他应用场景
Stream 的应用场景非常丰富,除了上述的文件读写流,还有 http 模块的请求响应、zlib 压缩流、crypto 加密流等等;以压缩流为例,以下代码将实现 gzip 压缩
(async function() {
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
})()
5、结束语
周末正在马不停蹄地赶来...
只愿,越努力越幸运
转载自:https://juejin.cn/post/7294150742112387098