likes
comments
collection
share

Node.js 生产实践 如何读大文件和数据?

作者站长头像
站长
· 阅读数 17

使用 Node.js File API 读文件最简单的方式就是 fs.readFile,它确实简单易用,可以一次将文件加载到内存中,对文件进行全面的处理和操作。但是遇到大文件怎么办?如果一次读1个2G的文件用 fs.readFile 显然就不合理了,即使内存不足没崩溃,读取大文件时候也会使整个系统变慢,因为操作系统可能会将一部分内存中的数据交换到磁盘上,磁盘访问速度本身就慢,交换操作本身也是有损耗的。真实情况是我们的系统不止仅有1个用户,用户多并发读取多个大文件时,依然会导致内存溢出的问题。这时候我们在生产中通常会从这么几个方面进行优化:

  • 增加服务器内存 无论是将服务部署在 VM 还是 K8S 上,加内存资源一定是最快和最简单的事情,钱先花到位,钱能解决的优化,不要靠架构,人没有硬件资源可靠是软件开发中的黄金定律。

  • 限制并发读取用户数 排队机制:创建一个请求队列来处理文件读取请求。每当有用户请求读取文件时,将其加入队列。然后,按照预定的顺序,逐个处理队列中的请求。这样可以确保一次只有一个用户读取文件,从而避免内存溢出。但是引入一个队列也是有成本的,比如队列如何应对分布式?(是不是要基于redis?),并发控制处理的逻辑,处理错误和异常情况。还可以的是通过 bull, bee-queue都可以快速完成这个排队的业务需求,甚至可以用一些其他特性比如优先级、超时、和延迟任务等。 应对多线程读取:如果读取文件是多个线程读取的,也就是说通过多进程或者 Worker Threads 创建了多个线程来读取文件,那么为了保护服务不被 OOM 可以使用 mutex 或者 lock 这种并发控制库

const mutex = require('mutex'); 
const lock = mutex.lock('file'); 
lock((release) => { 
// 在这里执行关键代码块
// ... 
// 执行完关键代码块后释放锁 release(); 
});
  • 使用缓存机制 如果文件不经常变化,那可太适合缓存了,一提到缓存,不管懂不懂,一定会有人说用这题我会,用 redis,redis 是个内存数据库存储频繁读取的数据,但是它不适合我们今天的主题:大文件,大文件存储在 Redis 中也会有内存不足的问题和影响 Redis 性能的问题。Node.js 的 node-cache 库也是一个道理,适合存一些中小文件,无论是 Redis 还是 node-cache 对于大文件的话适合存一些大文件的源数据或者索引信息,更合理的方式是大文件缓存使用分布式系统比如 HDFS、Ceph 或者直接使用对象存储比如 OSS,S3。

  • 文件分片 如果用户只需要读取分片中的某个部分,那就可以将大型文件分成多个小块进行处理,每个用户只读取和处理自己负责的文件块。这样可以减少单个用户读取的数据量,降低内存的使用。

认识流

上面的方法都是在使用 流(streaming)的前提下再优化,流是 Node.js 读取大文件的天然方式, 可以逐块读取文件内容,避免一次性加载整个文件到内存中,节省内存资源

以下是用流读取文件的一个 demo

const fs = require('fs');

const filePath = 'path/to/your/file.txt';

const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });

readStream.on('data', (chunk) => {
  // 处理文件数据块
  console.log(chunk);
});

readStream.on('end', () => {
  // 文件读取完成
  console.log('文件读取完成');
  readStream.close() // 关闭流
});

readStream.on('error', (error) => {
  // 处理错误
  console.error('读取文件时发生错误:', error);
});

通过看这个 API 我们就可以猜出 流是通过事件发射器(event emitter)来实现的。在Node.js中,可读流(Readable Stream)会发出以下事件:

  • data:当有数据可用时触发,可以通过监听该事件来读取数据。
  • end:当所有数据都已读取完毕时触发,表示数据流结束。
  • error:当在读取数据过程中发生错误时触发。
  • close:当可读流被关闭时触发。

Line 15 当读取完后可以手动关闭流,释放内存,但大多数情况下读取完会自动关闭流。

Line 7 callback 中的 chunk 就是每段数据,那每个 chunk 有多大呢?可以通过 highWaterMark 属性来设置每次读取的数据量 fs.createReadStream('example.txt', { highWaterMark: 1024 }),读了 1M 数据后,我们可以将流暂停,对 1M 数据慢慢进行处理,处理完再继续恢复流通过 pause()resume() 方法

// 每次收到 1M 数据
stream.on('data', (data) => {
  console.log(data);
  // 暂停读取 
  stream.pause();

  // 模拟处理数据的耗时操作, 比如将1M数据传给别人,或者存到哪里之类的
  setTimeout(() => {
    // 处理完继续读取
    stream.resume();
  }, 1000);
});

此外,通过结合 管道,我们可以简单的实现一个前端下载大文件

const filePath = 'path/to/your/file'; // 替换为实际文件路径
const readStream = fs.createReadStream(filePath);

res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Disposition', 'attachment; filename="filename.ext"'); // 替换为实际文件名及扩展名
res.setHeader('Content-Transfer-Encoding', 'binary');

readStream.pipe(res);

这段代码中,使用了readStream.pipe(res)的方式将数据返回给前端。这种方式会使用流式传输,将数据分批发送给前端,而不是一次性发送整个文件。这样可以有效减少内存消耗,并提高性能。

更进一步认识流

之前在 Dev.to 看到一个不错的流处理大数据量的例子:从一个csv文件中读取 Github 用户名。假设这个文件里面有100 万个用户名,我们必须等读完100万个再去发请求吗?不,大数据的处理核心是批量的,我们可以边读,边发请求,极大的提升效率。这里用到一个新的东西叫做 stream.Transform, 它是 Node.js 中的一个内置模块,用于创建可读写的数据转换流,来简化数据处理过程,提高代码的可读性和维护性,使用起来也很简单,继承 stream.Transform 并实现 _transform_flush 方法。

让我们通过 stream.Transform 流来处理这个需求,每读 20 个用户名就开始发请求

const axios = require('axios')
const stream = require('stream')

module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
  constructor (options = {}) {
    super({ ...options, objectMode: true })
    this.requests = []
  }

  _transform (chunk, encoding, callback) {
    const username = chunk[0]
    const githubRequest = this.getGithubRepositoriesForUser(username)
    this.requests.push(this.prepareGithubRequest(username, githubRequest))
    if (this.requests.length < 20) {
      return callback()
    }

    this.processRequests(callback)
  }

  _flush (callback) {
    this.processRequests(callback)
  }

  getGithubRepositoriesForUser (username) {
    return axios.get(`https://api.github.com/users/${username}/repos`, {
      headers: {
        Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
      }
    })
  }

  prepareGithubRequest (username, githubRequest) {
    return githubRequest
      .then((response) => {
        let repositories = []
        if (response.data) {
          repositories = response.data.map((repository) => repository.name)
        }

        return {
          username,
          repositories
        }
      })
  }

  processRequests (callback) {
    return Promise.all(this.requests)
      .then((responses) => {
        this.requests = []

        this.push(responses.reduce((accumulator, currentValue) => {
          return accumulator + JSON.stringify(currentValue)
        }, ''))
        callback()
      })
      .catch(callback)
  }
}

核心控制逻辑就是实现的 _transform 函数,每读 20 个用户名, 就用 Promise.all 批量发一次请求,当调用 callback 时则会进行下一次读取。读取完所有数据会调用调用 _flush 钩子函数,给开发人员一个收尾的时机。

Line 53 的 this.push 就是将数据传递给下个"消费者"。

定义好这个转换流该怎么用呢?

const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const stream = require('stream')

const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))

stream.pipeline(
  readGithubUsernamesStream,
  csvParser,
  transformUsernameToGithubRepos,
  writeStream,
  (error) => {
    if (error) {
      console.error('error ', error)
      return process.exit(1)
    }

    process.exit()
  }
)

Line 12stream.pipeline 是创建一个流水线,以更有组织和模块化的方式来处理数据,也方便做错误处理,第一个参数是读文件流,第二个参数是解析CVS文件流,第三个参数是将username每20个转成1次,最后一个参数是,写文件,写谁呢?写上面代码中this.push中的数据。

Backpressure 背压

在处理大量数据过程中,存在一种称为背压(backpressure)的常见问题,它描述了在数据传输过程中缓冲区后面的数据积累现象。当数据传输的接收端存在复杂操作或由于某种原因较慢时,来自传入源的数据有积累的趋势。面对这种问题,Unix 管道 和TCP socket 就是一个很好的例子。在 Node.js 中如何解决这个问题呢? 就是通过我们刚才介绍过的 流,以及流的方法,这里我们加深一下背压处理方案:

  • 使用可读流和可写流的缓冲区:通过设置流的缓冲区大小,可以控制数据的流动速度,从而处理背压情况。
  • 使用流的暂停和恢复功能:通过暂停可读流或可写流,可以暂时停止数据的传输,从而处理背压问题。
  • 使用流的流动模式:通过监听流的"drain"事件,可以判断流的缓冲区是否已满,从而控制数据的流动,避免背压情况的发生。

举个简单的例子

const fs = require('fs');

// 创建可读流
const readableStream = fs.createReadStream('data.txt');

// 创建可写流
const writableStream = fs.createWriteStream('output.txt');

writableStream.cork();
writableStream.setDefaultEncoding('utf-8');
writableStream.on('drain', () => {
  console.log('缓冲区已清空,可以继续写入数据');
  readableStream.resume();
});

// 通过管道将可读流中的数据传输给可写流
readableStream.pipe(writableStream);

// 当可读流读取到数据时
readableStream.on('data', (chunk) => {
  // 判断可写流的缓冲区是否已满
  if (!writableStream.write(chunk)) {
    console.log('缓冲区已满,暂停传输数据');
    readableStream.pause();
  }
});

// 当可读流读取完所有数据时
readableStream.on('end', () => {
  console.log('数据传输完成');
  writableStream.end();
});

Line 10 writableStream.cork() 方法的作用是将可写流的缓冲区启用,并暂时阻止数据的传输。当调用该方法后,数据将被缓冲在内存中,直到调用 writableStream.uncork() 方法或当缓冲区已满时自动刷新数据。这个方法通常与 writableStream.on('drain', ...) 一起使用,以控制数据的流动速度,处理背压问题。

Line 22 writableStream.write(chunk) 方法返回一个布尔值,表示写入数据的操作是否成功。如果返回 false,表示可写流的缓冲区已满,需要暂停传输数据。如果返回 true,表示数据写入成功,可继续传输数据。

结尾

流处理在 Node.js 的大数据量处理中是基本操作,尤其是你用 Node.js 开发下面这些场景:

  • 网络通信
  • 数据转换(解压缩,加解密)
  • 日志处理
  • 图像或音频处理

都看到这里了点个赞吧 ❤️