图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No
Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 Node.js 中的流是内置对象,能够以小块的形式增量地读取或写入源,而不是一次性将整个数据加载到内存中。
主要特点
Node.js流的一些主要特点:
-
数据流动: 数据像水流一样从一个地方"流"到另一个地方。
-
类型: 主要有四种类型的流:
- 可读流(Readable)
- 可写流(Writable)
- 双工流(Duplex):既可读又可写
- 转换流(Transform):可以在写入和读取数据时修改或转换数据
-
事件机制: 流基于Node.js的EventEmitter,可以发出事件。
-
管道: 可以使用pipe()方法将流连接起来。
-
背压(Backpressure)处理: 自动处理读写速度不匹配的情况。
Readable
实现一个可读流的核心是继承 Readable, 并至少实现一个 _read 方法。
const { Readable } = require('stream');
const myReadable = new Readable({
read(size) {
// ...
}
});
可读流,有两种模式,Flowing Mode,Non-Flowing Mode。
所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式, 流动模式后就会不断调用 _read 方法。
- 添加 'data' 事件句柄。
- 调用 stream.resume() 方法。
- 调用 stream.pipe() 方法将数据发送到可写流。
import { Readable } from 'stream'
import fs from 'fs'
function main () {
const readable = fs.createReadStream('./input.txt')
readable.on('data', (c) => {
console.log(c.toString())
})
}
main()
流程图如下:
资源并不是直接流向消费者的,而是先到内部的缓冲区,如果数据流入 Stream 的速度比您处理它的速度快,会发生什么?内部缓冲区的数量变大,在最坏的情况下,进程可能会耗尽内存并终止。
为了避免这种情况,Stream 允许您设置内部缓冲区阈值。当数据量超过此阈值时,它会暂时停止从资源中读取数据,以防止内部缓冲区溢出。这种流量控制机制称为背压,阈值称为highWaterMark。
可读流背压处理和恢复机制,如下图
Writable
可写流原理和可读流比较相似,数据源也就是生产者就写入数据到缓冲区,然后再写入目标。
当生产者写入速度过快,把缓冲区装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,Writable Stream 会给生产者发送一个 drain
消息,让它恢复生产。
-
生产者通过 write() 方法向流中写入数据。
-
writeOrBuffer() 方法决定是直接写入资源还是先放入缓冲区:
-
写入操作返回 true 或 false:
- true 表示可以继续写入。
- false 表示内部缓冲已满,应该停止写入直到"drain"事件发生。
-
当所有数据都写入完毕时,调用 end() 方法来结束写入过程。
-
当写入操作全部完成时,流会触发 "finish" 事件,通知生产者写入已经结束。
流程图如下
可写流背压处理和恢复机制,如下图
pipe
pipe 方法主要是处理好可读流与可写流的联动问题, 如一方停止了或者结束了, 会在回调中触发另一方对应的处理函数。
让流处于流动模式的核心是 pipe 方法中 src.on('data', ondata) 让可读流监听了 onData 事件, 当监听该事件时会调用流的 resume 方法, 接着调用 _read 方法开始产生数据, 产生的数据通过 src 可读流的回调函数 ondata 继续调用 dest.write(chunk) 可写流去消费数据。
如下例子,可用来查看背压的实际效果
import { Readable, Writable } from 'stream'
// 创建一个可读流,模拟数据源
class SlowReadable extends Readable {
constructor(options) {
super(options)
this.index = 0
}
_read (size) {
const i = this.index++
if (i > 10) {
this.push(null)
} else {
const str = String(i)
console.log(`Producing: ${str}`)
// 模拟异步生产数据
setTimeout(() => {
if (!this.push(str)) {
console.log('Backpressure applied, pausing production...')
}
}, 10)
}
}
}
// 创建一个可写流,模拟慢速消费者
const slowWriter = new Writable({
write (chunk, encoding, callback) {
console.log(`Consuming: ${chunk.toString()}`)
// 模拟慢速处理
setTimeout(callback, 1000)
},
highWaterMark: 2 // 设置较低的高水位线来更容易观察到背压
})
const readable = new SlowReadable({ highWaterMark: 2 })
readable.pipe(slowWriter)
slowWriter.on('drain', () => {
console.log('Drain event: backpressure relieved, resuming production...')
})
Duplex
双工流结合了可读流和可写流的特性,允许数据在两个方向上流动。它的输入和输出可以没有任何关系。
这种双工流的设计使得它能够高效地处理需要双向数据传输的场景,如网络通信、文件读写等。它提供了灵活的控制机制,可以独立管理数据的读取和写入过程,同时通过内部缓冲机制来平衡数据产生和消费的速度差异。
Transform
转换流是一种特殊的双工流,它可以在数据写入和读取之间进行数据转换。
工作流程:
-
生产者通过 write() 方法将数据写入转换流。
-
数据进入 Transformer,在这里进行处理和转换。
-
转换后的数据经过 highWaterMark 检查:
- 如果缓存未满,数据进入缓存池,并返回 true。
- 如果缓存已满,返回 false,表示需要暂停写入。
-
缓存池中的数据通过 "data" 事件发送给消费者。
-
消费者可以使用 resume() 和 pause() 方法来控制数据的接收速度。
-
当所有数据处理完毕时,生产者调用 end() 方法结束写入。
总结
主要介绍了Node.js中的流(Streams)概念,通过图解方式详细阐述了其工作原理和应用。
- 流的基本概念和特点,包括非阻塞处理、事件机制和背压控制
- 四种主要类型的流:可读流、可写流、双工流和转换流,及其各自的实现和使用方法
- 流的内部工作机制,如数据缓冲、流动模式切换和管道操作
- 实际应用示例,展示了如何在文件处理、数据转换等场景中高效使用流
转载自:https://juejin.cn/post/7415914057389129764