likes
comments
collection
share

NodeJS文件可读流, 从理解到手写实现,做全面的前端能手

作者站长头像
站长
· 阅读数 3

手写实现文件可读流

api: createReadStream 文件可读流

简介:Node.js 中的文件可读流是基于 Stream API 实现的数据流读取机制,它允许以分块的方式从文件系统中读取文件内容,而非一次性加载到内存中。这对于处理大文件或者需要高效利用内存的场景尤为有用。

常见使用场景

  1. 大文件读取:当文件过大无法一次性加载到内存时,可以使用可读流逐块读取并进行处理,例如日志分析、数据迁移等任务。

  2. 实时流处理:在处理网络传输或持续更新的文件时(如日志文件),可以通过可读流实时监听和消费新生成的内容。

  3. 管道操作:将一个文件可读流与其它可写流(如HTTP响应流)连接起来,实现数据的无缝传输,如服务器端发送大文件给客户端。

  4. 数据转换:在读取文件的同时对数据进行压缩、加密或格式转换等操作,通过中间件流连接多个处理过程。

实例:

const fs = require('fs');
const path = require('path');

// 创建一个指向特定文件的可读流
const readStream = fs.createReadStream(path.join(__dirname, 'largeFile.txt'), { encoding: 'utf8' });

// 监听data事件,每当有新的数据块可读时触发
readStream.on('data', (chunk) => {
  console.log(chunk); // 输出每次读取的数据块
});

// 当所有数据都已读取完毕时触发end事件
readStream.on('end', () => {
  console.log('文件读取完成');
});

// 错误处理
readStream.on('error', (err) => {
  console.error('读取文件出错:', err);
});

实现自定义基础文件可读流

导入基础模块

由于所有流事件都是基于 events 实现的,我们也可以直接导入

const fs = require("fs");

const EventEmitter = require("events");

定义基础参数

class MyReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.mode = options.mode || 438;
    this.start = options.start || 0; 
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true;
    this.open();
  }
  open () {}
 }

open 方法实现

 

...
	// 原生 fs open 方法打开指定位置文件
        open() {
            fs.open(this.path, this.flags, this.mode, (err, fd) => {
              if (err) {
                return this.emit("error", err);
              }
              this.fd = fd;
              this.emit("open", fd);
            });
          }

read 方法实现

如果这时我们直接拿 文件 fd 是拿不到的,当 read() 方法被首次调用(如在添加 "data" 事件监听器时),可能会在文件尚未打开的情况下执行。这时,由于没有有效的文件描述符,无法进行读取操作。因此,我们需要等待 "open" 事件触发后再执行读取操作。

如果当前类实例的 this.fd 还不是一个数字(即文件还没有打开),则注册一个一次性监听器来监听 "open" 事件,当文件打开后立即调用 this.read() 方法。

  read () {
    if (typeof this.id !== "number") {
      return this.once("open", this.read);
    }

    const buf = Buffer.alloc(this.highWaterMark);
    let howMuchToRead = this.end
      ? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
      : this.highWaterMark;
  
    fs.read(
      this.fd,
      buf,
      0,
      howMuchToRead,
      this.readOffset,
      (err, readBytes) => {
        if (readBytes) {
          this.readOffset += readBytes;
          this.emit("data", buf.slice(0, this.readBytes));
          this.read();
        } else {
          this.emit("end");
          this.close();
        }
      }
    );
  }

PIPE 方法

另外,在 Node.js 中,ReadableStream.pipe(destination[, options]) 方法是流(Stream)模块的一个核心功能,它提供了一种便捷的方式来连接多个数据流,实现数据从一个源头自动流动到另一个目标。举个栗子🌰

const fs = require("fs");

const rs = fs.createReadStream("./test.txt", {
   highWaterMark: 4,
});

const ws = fs.createWriteStream("./file1.txt", {
   highWaterMark: 1,
});

rs.pipe(ws);

我们再使用 drain 方法实现一下 pipe 方法:

pipe(ws) {
    this.on("data", (data) => {
      let flag = ws.write(data);
      if (!flag) {
        this.pause();
      }
      ws.on("drain", () => {
        this.resume();
      });
    });
  }

总结

完整代码如下:

const fs = require("fs");
const EventEmitter = require("events");

class MyReadStream extends EventEmitter {
  constructor(path, options = {}) {
    super();
    this.path = path;
    this.flags = options.flags || "r";
    this.mode = options.mode || 438;
    this.start = options.start || 0;
    this.end = options.end;
    this.highWaterMark = options.highWaterMark || 64 * 1024;
    this.autoClose = options.autoClose || true;



    this.readOffset = 0;

    this.open();

    this.on("newListener", (type) => {
      switch (type) {
        case "data": {
          this.read();
          break;
        }
      }
    });
  }

  

  open() {
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        return this.emit("error", err);
      }
      this.fd = fd;
      this.emit("open", fd);
    });
  }

  

  read() {
    if (typeof this.id !== "number") {
      return this.once("open", this.read);
    }

    const buf = Buffer.alloc(this.highWaterMark);
    let howMuchToRead = this.end
      ? Math.min(this.end - this.readOffset + 1, this.highWaterMark)
      : this.highWaterMark;

    fs.read(
      this.fd,
      buf,
      0,
      howMuchToRead,
      this.readOffset,
      (err, readBytes) => {
        if (readBytes) {
          this.readOffset += readBytes;
          this.emit("data", buf.slice(0, this.readBytes));
          this.read();
        } else {
          this.emit("end");
          this.close();
        }
      }
    )
  }

  close() {
    fs.close(this.fd, (...args) => {
      this.emit("close", ...args);
    });
  }
  
  pipe(ws) {
    this.on("data", (data) => {
      let flag = ws.write(data);
      if (!flag) {
        this.pause();
      }

      ws.on("drain", () => {
        this.resume();
      });
    });
  }
}

注意事项(PLUS):

  • 错误处理:确保在创建可读流后设置适当的错误监听器,以便在出现任何问题(如文件不存在、权限不足等)时能够捕获错误。

  • 缓冲区大小:可通过 highWaterMark 参数控制读取缓冲区的大小,这会影响每次 _read 调用时读取多少数据。根据应用需求选择合适的值,太大可能导致内存占用过多,太小可能影响性能。

  • 资源管理:关闭流是非常重要的,尤其是在文件流的情况下,确保在不再需要时调用 stream.close() 或者在流结束时会自动关闭。如果忘记关闭流,可能会导致文件描述符泄漏。

  • 暂停和恢复:可以根据实际处理能力适时调用 stream.pause()stream.resume() 方法来控制数据读取速度。

  • 背压:在下游消费者来不及处理数据时,应考虑实施背压策略,避免内存溢出。对于实现了 Backpressure 的 Node.js 流,这是自动处理的。

  • 异步边界:尽管 fs.createReadStream() 是异步的,但其内部读取逻辑仍然是基于回调的,因此,在 _read 方法中通常会看到异步IO操作。


好了,今天的内容就到分享这里啦,很享受与大家一起学习,沟通交流问题,如果喜欢的话,请为我来个3连吧 !👍

作者:chenuvi

转载自:https://juejin.cn/post/7330807271008960564
评论
请登录