likes
comments
collection
share

六、stream

作者站长头像
站长
· 阅读数 42

1、前言

首先吧,我们的弄清楚为什么要学习 Stream 模块?因为就开发应用来说,不明白 Stream 好像没有什么影响,我只要知道如何监听流状态变化,以及会使用 pipe 处理数据就可以了;但是呢,如果需要你实现一个自定义流,阁下又将如何应对?

stream 模块对于创建新类型的流实例很有用

正如 Node 官网所说,理解 stream 模块对于实现自定义流等非常有用;同时,Node.js 本身很多内置模块就是 Stream,如 http 模块的请求响应、fs 的读写流、zlib 压缩流、crypto 加密流等等,理解流能帮我们更好的理解这些模块。

那么问题来了,什么是 Stream 流?为啥使用 Stream 流操作数据会更快?

2、概念

流是用于在 Node.js 中处理流数据的抽象接口;流可以是可读的、可写的、或两者兼而有之。 所有的流都是 EventEmitter 的实例。

以上为 Node.js 官网给出的定义,实在是有够抽象的~

我们之前 fs.readFile 读取文件,是一次性读取到内存中,对于小型的文本文件,这样是 OK 的;但是,像音视频这样几百 MB 或 1G 的文件,这样就有问题了;node 底层采用的 v8 引擎 ,默认情况下 v8 提供的内存大小 1G 左右(与操作系统有关,可以通过 --max-old-space-size 调整)

随便一个大一点的音视频文件或多几个人访问,就会导致服务器的内存爆掉;流式处理应用而生,把文件资源拆分成小块,分段传输,资源就像水流一样进行传输,减轻服务器压力;以下是一个视频文件的分段流数据

六、stream

每一段数据只有几百 B

2.1、Node Stream 工作流程

在介绍 Stream 流之前,需理解以下几个概念

  • chunk,分段流数据
  • 缓冲区,每一个流都会有一个缓冲区,当数据的生产和消费都需要时间时,我们可以在下一次消费前提前生产数据存放到缓冲池。它存储在电脑的 RAM 中;缓冲区会设置一个 highWaterMark 标识,达到标识就不会再从可读流中读取数据
  • RAM,是与 CPU 交换数据的随机存储器,常用来暂时存储程序、数据、中间结果;特点是断电即丢失数据
  • events,事件触发器,Node 提供的内置模块,实现了事件发布订阅功能,在 Stream 中用来在流状态变化时发射对应的事件,触发流的回调
  • pipe,管道,管道的两端分别是一个 Stream,上流是一个可读流,下流是一个可写流;所以多个管道中间的流必须是可读且可写的,如 zlib 压缩流、crypto 加密流

我们可以借助自来水使用来理解流的工作流程

  • 自来水源头水库,就相当于可读流的源头 source
  • 自来水厂的蓄水池就相当于缓冲区
  • 水库的水到蓄水池之间,经过水管连接消毒等处理;就相当于管道中间的可读写处理流
  • 当蓄水池达到一定 highWaterMark 标识,就不会再从水库中抽水处理;相当于缓冲区的浮标值
  • 当用户需要用水时,打开水龙头,就可以使用自来水;相当于分段的 chunk

2.2、流处理数据的优势

  • 更快的处理速度,每拿到一段的 chunk 就可以先处理,而不用等待谁有的数据读取完成再处理,具有更高的效率
  • 占用内存更小,不再需要一次性读取整个数据到内存中,而是通过 RAM 缓冲区暂存
  • 拓展方便,管道的存在,让流数据的处理及拓展更加方便

2.3、四种流

  • Readable: 可读流,能够实现数据的读取;可读流是对消费数据源的抽象
  • Writeeale: 可写流,能够实现数据的写操作;可写流是数据写入目的地的抽象
  • Duplex双工流,既可读又可写;双工流是同时实现 Readable 和 Writable 接口的流
  • Tranform转换流,可读可写,还能实现数据修改转换

更多内容参考官方文档

2.4、流的源码分析

我并没有介绍各种流的使用方法和事件,因为从源码角度上来看,四种流都继承了公共的基础类,因此只要大致了解了它们的实现,使用只要大致翻阅以下文档即能迅速掌握,先上代码链接

流的原型链分析,以 fs.createReadStream 为例,他的原型链指向为: fs 模块自身的属性和方法 --> Readable.prototype --> Stream.prototype --> Stream.prototype --> EventEmitter.prototype --> Object.prototype --> null

Stream 类

我们找到 Stream 的源代码的入口文件 lib\internal\streams\legacy.js

六、stream

它是一个继承了 events 模块的原型,同时又在原型上实现了 pipe 方法(只有这一个)

Readable 类

我们找到 Readable 的源代码的入口文件 lib\internal\streams\readable.js

ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);

它继承了 Stream 的原型,同时又在原型上实现了一些方法和添加事件监听函数;如 datapauseresume 等事件和 readpauseresume 等方法

Writable 类

我们找到 Writable 的源代码的入口文件 lib\internal\streams\writable.js

ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Writable, Stream);

它继承了 Stream 的原型,同时又在原型上实现了一些方法和添加事件监听函数;如 drainfinish等事件和 writeclose 等方法

Duplex 类

六、stream

双工流类就非常好理解,它设置原型指向 Readable,然后又遍历,将 Writable 的所有属性和方法添加到自己的原型上

Transform 类

ObjectSetPrototypeOf(Transform.prototype, Duplex.prototype);
ObjectSetPrototypeOf(Transform, Duplex);

Transform 则是继承于 Duplex,然后实现了 _transform_write_read_final 方法

六、stream

至此,四大流的核心实现完成,理解这些能非常好的帮助我们使用 Stream 的 API

3、流式文件读、写、复制

fs 基于 Stream 实现了流式方法用来操作文件;注意,Promise 形式的 API 必须先通过 fs.open 打开一个 filehandle 类,然后调用这个类的流式操作 API

3.1、流式读取文件

(async function() {
  const fs = require('fs/promises')

  let filehandle = await fs.open('./file/static/authInfo.json')
  // 接下来就是 readable stream 的操作 API
  let readStream = filehandle.createReadStream({ encoding: 'utf-8' })
  readStream.on('data', (chunk) => {
      console.log(chunk)
  })

  // 其他事件或方法调用
  readStream.on('error', (err) => {
    console.log(err)
  })
  readStream.on('end', (err) => {
    console.log(`读取结束`)
  })
})()
  • data 事件可能会触发多次,取决于文件大小;
  • 默认为 Buffer 形式数据,encoding 则是字符串

3.2、流式写文件

(async function() {
  const fs = require('fs/promises')

  let 写入时创还能 = await fs.open('./file/static/authInfo2.log', 'w')
  // 接下来就是 writeable stream 的操作 API
  let writeStream = filehandle.createWriteStream({ encoding: 'utf-8' })
  writeStream.write(`11111\r\n`)
  writeStream.write(`22222\r\n`)
  writeStream.end()
    writeStream.on('finish', () => {
    console.log(`写入结束`)
  })
})()
  • 写入流创建时时必须传递 flags 参数 w,默认是可读 r
  • 通过监听 finish 来判断是否写入结束

3.3、流式复制文件

(async function() {
  const fs = require('fs/promises')
  const { pipeline } = require('stream/promises');

  let fd1 = await fs.open('./file/static/authInfo.json')
  let rs = fd1.createReadStream({ encoding: 'utf-8' })
  let fd2 = await fs.open('./file/static/authInfo2.log', 'w')
  let ws = fd2.createWriteStream({ encoding: 'utf-8' })
  await pipeline(rs, ws);
  console.log(`复制完成`)
})()
  • 写入流创建时必须传递 flags 参数 w,默认是可读 r
  • Promise 形式的管道需要借鉴 Stream 模块的 pipeline

总结:对于文件流式操作,Promise 形式的支持并不算好,推荐采用回调形式来处理,更加简洁

4、Stream 的其他应用场景

Stream 的应用场景非常丰富,除了上述的文件读写流,还有 http 模块的请求响应、zlib 压缩流、crypto 加密流等等;以压缩流为例,以下代码将实现 gzip 压缩

(async function() {
    const { pipeline } = require('node:stream/promises');
    const fs = require('node:fs');
    const zlib = require('node:zlib');

    async function run() {
      await pipeline(
        fs.createReadStream('archive.tar'),
        zlib.createGzip(),
        fs.createWriteStream('archive.tar.gz'),
      );
      console.log('Pipeline succeeded.');
    }

    run().catch(console.error);
})()

5、结束语

周末正在马不停蹄地赶来...

只愿,越努力越幸运

转载自:https://juejin.cn/post/7294150742112387098
评论
请登录