NodeJS文件可读流, 从理解到手写实现,做全面的前端能手
手写实现文件可读流
api: createReadStream 文件可读流
简介:Node.js 中的文件可读流是基于 Stream API 实现的数据流读取机制,它允许以分块的方式从文件系统中读取文件内容,而非一次性加载到内存中。这对于处理大文件或者需要高效利用内存的场景尤为有用。
常见使用场景
-
大文件读取:当文件过大无法一次性加载到内存时,可以使用可读流逐块读取并进行处理,例如日志分析、数据迁移等任务。
-
实时流处理:在处理网络传输或持续更新的文件时(如日志文件),可以通过可读流实时监听和消费新生成的内容。
-
管道操作:将一个文件可读流与其它可写流(如HTTP响应流)连接起来,实现数据的无缝传输,如服务器端发送大文件给客户端。
-
数据转换:在读取文件的同时对数据进行压缩、加密或格式转换等操作,通过中间件流连接多个处理过程。
实例:
const fs = require('fs');
const path = require('path');
// 创建一个指向特定文件的可读流
const readStream = fs.createReadStream(path.join(__dirname, 'largeFile.txt'), { encoding: 'utf8' });
// 监听data事件,每当有新的数据块可读时触发
readStream.on('data', (chunk) => {
console.log(chunk); // 输出每次读取的数据块
});
// 当所有数据都已读取完毕时触发end事件
readStream.on('end', () => {
console.log('文件读取完成');
});
// 错误处理
readStream.on('error', (err) => {
console.error('读取文件出错:', err);
});
实现自定义基础文件可读流
导入基础模块
由于所有流事件都是基于 events
实现的,我们也可以直接导入
const fs = require("fs");
const EventEmitter = require("events");
定义基础参数
class MyReadStream extends EventEmitter {
constructor(path, options = {}) {
super();
this.path = path;
this.flags = options.flags || "r";
this.mode = options.mode || 438;
this.start = options.start || 0;
this.end = options.end;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true;
this.open();
}
open () {}
}
open 方法实现
...
// 原生 fs open 方法打开指定位置文件
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
return this.emit("error", err);
}
this.fd = fd;
this.emit("open", fd);
});
}
read 方法实现
如果这时我们直接拿 文件 fd 是拿不到的,当 read()
方法被首次调用(如在添加 "data" 事件监听器时),可能会在文件尚未打开的情况下执行。这时,由于没有有效的文件描述符,无法进行读取操作。因此,我们需要等待 "open" 事件触发后再执行读取操作。
如果当前类实例的 this.fd
还不是一个数字(即文件还没有打开),则注册一个一次性监听器来监听 "open" 事件,当文件打开后立即调用 this.read()
方法。
read () {
if (typeof this.id !== "number") {
return this.once("open", this.read);
}
const buf = Buffer.alloc(this.highWaterMark);
let howMuchToRead = this.end
? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
: this.highWaterMark;
fs.read(
this.fd,
buf,
0,
howMuchToRead,
this.readOffset,
(err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes;
this.emit("data", buf.slice(0, this.readBytes));
this.read();
} else {
this.emit("end");
this.close();
}
}
);
}
PIPE 方法
另外,在 Node.js 中,ReadableStream.pipe(destination[, options])
方法是流(Stream)模块的一个核心功能,它提供了一种便捷的方式来连接多个数据流,实现数据从一个源头自动流动到另一个目标。举个栗子🌰
const fs = require("fs");
const rs = fs.createReadStream("./test.txt", {
highWaterMark: 4,
});
const ws = fs.createWriteStream("./file1.txt", {
highWaterMark: 1,
});
rs.pipe(ws);
我们再使用 drain
方法实现一下 pipe
方法:
pipe(ws) {
this.on("data", (data) => {
let flag = ws.write(data);
if (!flag) {
this.pause();
}
ws.on("drain", () => {
this.resume();
});
});
}
总结
完整代码如下:
const fs = require("fs");
const EventEmitter = require("events");
class MyReadStream extends EventEmitter {
constructor(path, options = {}) {
super();
this.path = path;
this.flags = options.flags || "r";
this.mode = options.mode || 438;
this.start = options.start || 0;
this.end = options.end;
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true;
this.readOffset = 0;
this.open();
this.on("newListener", (type) => {
switch (type) {
case "data": {
this.read();
break;
}
}
});
}
open() {
fs.open(this.path, this.flags, this.mode, (err, fd) => {
if (err) {
return this.emit("error", err);
}
this.fd = fd;
this.emit("open", fd);
});
}
read() {
if (typeof this.id !== "number") {
return this.once("open", this.read);
}
const buf = Buffer.alloc(this.highWaterMark);
let howMuchToRead = this.end
? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
: this.highWaterMark;
fs.read(
this.fd,
buf,
0,
howMuchToRead,
this.readOffset,
(err, readBytes) => {
if (readBytes) {
this.readOffset += readBytes;
this.emit("data", buf.slice(0, this.readBytes));
this.read();
} else {
this.emit("end");
this.close();
}
}
)
}
close() {
fs.close(this.fd, (...args) => {
this.emit("close", ...args);
});
}
pipe(ws) {
this.on("data", (data) => {
let flag = ws.write(data);
if (!flag) {
this.pause();
}
ws.on("drain", () => {
this.resume();
});
});
}
}
注意事项(PLUS):
-
错误处理:确保在创建可读流后设置适当的错误监听器,以便在出现任何问题(如文件不存在、权限不足等)时能够捕获错误。
-
缓冲区大小:可通过
highWaterMark
参数控制读取缓冲区的大小,这会影响每次_read
调用时读取多少数据。根据应用需求选择合适的值,太大可能导致内存占用过多,太小可能影响性能。 -
资源管理:关闭流是非常重要的,尤其是在文件流的情况下,确保在不再需要时调用
stream.close()
或者在流结束时会自动关闭。如果忘记关闭流,可能会导致文件描述符泄漏。 -
暂停和恢复:可以根据实际处理能力适时调用
stream.pause()
和stream.resume()
方法来控制数据读取速度。 -
背压:在下游消费者来不及处理数据时,应考虑实施背压策略,避免内存溢出。对于实现了 Backpressure 的 Node.js 流,这是自动处理的。
-
异步边界:尽管
fs.createReadStream()
是异步的,但其内部读取逻辑仍然是基于回调的,因此,在_read
方法中通常会看到异步IO操作。
好了,今天的内容就到分享这里啦,很享受与大家一起学习,沟通交流问题,如果喜欢的话,请为我来个3连吧 !👍
作者:chenuvi
转载自:https://juejin.cn/post/7330807271008960564