Node.js 生产实践 如何读大文件和数据?
使用 Node.js File API 读文件最简单的方式就是 fs.readFile
,它确实简单易用,可以一次将文件加载到内存中,对文件进行全面的处理和操作。但是遇到大文件怎么办?如果一次读1个2G的文件用 fs.readFile
显然就不合理了,即使内存不足没崩溃,读取大文件时候也会使整个系统变慢,因为操作系统可能会将一部分内存中的数据交换到磁盘上,磁盘访问速度本身就慢,交换操作本身也是有损耗的。真实情况是我们的系统不止仅有1个用户,用户多并发读取多个大文件时,依然会导致内存溢出的问题。这时候我们在生产中通常会从这么几个方面进行优化:
-
增加服务器内存 无论是将服务部署在 VM 还是 K8S 上,加内存资源一定是最快和最简单的事情,钱先花到位,钱能解决的优化,不要靠架构,人没有硬件资源可靠是软件开发中的黄金定律。
-
限制并发读取用户数 排队机制:创建一个请求队列来处理文件读取请求。每当有用户请求读取文件时,将其加入队列。然后,按照预定的顺序,逐个处理队列中的请求。这样可以确保一次只有一个用户读取文件,从而避免内存溢出。但是引入一个队列也是有成本的,比如队列如何应对分布式?(是不是要基于redis?),并发控制处理的逻辑,处理错误和异常情况。还可以的是通过
bull
,bee-queue
都可以快速完成这个排队的业务需求,甚至可以用一些其他特性比如优先级、超时、和延迟任务等。 应对多线程读取:如果读取文件是多个线程读取的,也就是说通过多进程或者 Worker Threads 创建了多个线程来读取文件,那么为了保护服务不被 OOM 可以使用mutex
或者lock
这种并发控制库
const mutex = require('mutex');
const lock = mutex.lock('file');
lock((release) => {
// 在这里执行关键代码块
// ...
// 执行完关键代码块后释放锁 release();
});
-
使用缓存机制 如果文件不经常变化,那可太适合缓存了,一提到缓存,不管懂不懂,一定会有人说用这题我会,用 redis,redis 是个内存数据库存储频繁读取的数据,但是它不适合我们今天的主题:大文件,大文件存储在 Redis 中也会有内存不足的问题和影响 Redis 性能的问题。Node.js 的
node-cache
库也是一个道理,适合存一些中小文件,无论是 Redis 还是 node-cache 对于大文件的话适合存一些大文件的源数据或者索引信息,更合理的方式是大文件缓存使用分布式系统比如 HDFS、Ceph 或者直接使用对象存储比如 OSS,S3。 -
文件分片 如果用户只需要读取分片中的某个部分,那就可以将大型文件分成多个小块进行处理,每个用户只读取和处理自己负责的文件块。这样可以减少单个用户读取的数据量,降低内存的使用。
认识流
上面的方法都是在使用 流(streaming)的前提下再优化,流是 Node.js 读取大文件的天然方式, 可以逐块读取文件内容,避免一次性加载整个文件到内存中,节省内存资源
以下是用流读取文件的一个 demo
const fs = require('fs');
const filePath = 'path/to/your/file.txt';
const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });
readStream.on('data', (chunk) => {
// 处理文件数据块
console.log(chunk);
});
readStream.on('end', () => {
// 文件读取完成
console.log('文件读取完成');
readStream.close() // 关闭流
});
readStream.on('error', (error) => {
// 处理错误
console.error('读取文件时发生错误:', error);
});
通过看这个 API 我们就可以猜出 流是通过事件发射器(event emitter)来实现的。在Node.js中,可读流(Readable Stream)会发出以下事件:
data
:当有数据可用时触发,可以通过监听该事件来读取数据。end
:当所有数据都已读取完毕时触发,表示数据流结束。error
:当在读取数据过程中发生错误时触发。close
:当可读流被关闭时触发。
Line 15 当读取完后可以手动关闭流,释放内存,但大多数情况下读取完会自动关闭流。
Line 7 callback 中的 chunk 就是每段数据,那每个 chunk 有多大呢?可以通过 highWaterMark
属性来设置每次读取的数据量 fs.createReadStream('example.txt', { highWaterMark: 1024 })
,读了 1M 数据后,我们可以将流暂停,对 1M 数据慢慢进行处理,处理完再继续恢复流通过 pause()
和 resume()
方法
// 每次收到 1M 数据
stream.on('data', (data) => {
console.log(data);
// 暂停读取
stream.pause();
// 模拟处理数据的耗时操作, 比如将1M数据传给别人,或者存到哪里之类的
setTimeout(() => {
// 处理完继续读取
stream.resume();
}, 1000);
});
此外,通过结合 管道,我们可以简单的实现一个前端下载大文件
const filePath = 'path/to/your/file'; // 替换为实际文件路径
const readStream = fs.createReadStream(filePath);
res.setHeader('Content-Type', 'application/octet-stream');
res.setHeader('Content-Disposition', 'attachment; filename="filename.ext"'); // 替换为实际文件名及扩展名
res.setHeader('Content-Transfer-Encoding', 'binary');
readStream.pipe(res);
这段代码中,使用了readStream.pipe(res)的方式将数据返回给前端。这种方式会使用流式传输,将数据分批发送给前端,而不是一次性发送整个文件。这样可以有效减少内存消耗,并提高性能。
更进一步认识流
之前在 Dev.to 看到一个不错的流处理大数据量的例子:从一个csv文件中读取 Github 用户名。假设这个文件里面有100 万个用户名,我们必须等读完100万个再去发请求吗?不,大数据的处理核心是批量的,我们可以边读,边发请求,极大的提升效率。这里用到一个新的东西叫做 stream.Transform
, 它是 Node.js 中的一个内置模块,用于创建可读写的数据转换流,来简化数据处理过程,提高代码的可读性和维护性,使用起来也很简单,继承 stream.Transform
并实现 _transform
和 _flush
方法。
让我们通过 stream.Transform 流来处理这个需求,每读 20 个用户名就开始发请求
const axios = require('axios')
const stream = require('stream')
module.exports = class TransformUsernameToGithubRepos extends stream.Transform {
constructor (options = {}) {
super({ ...options, objectMode: true })
this.requests = []
}
_transform (chunk, encoding, callback) {
const username = chunk[0]
const githubRequest = this.getGithubRepositoriesForUser(username)
this.requests.push(this.prepareGithubRequest(username, githubRequest))
if (this.requests.length < 20) {
return callback()
}
this.processRequests(callback)
}
_flush (callback) {
this.processRequests(callback)
}
getGithubRepositoriesForUser (username) {
return axios.get(`https://api.github.com/users/${username}/repos`, {
headers: {
Authorization: `Token ${process.env.GITHUB_ACCESS_TOKEN}`
}
})
}
prepareGithubRequest (username, githubRequest) {
return githubRequest
.then((response) => {
let repositories = []
if (response.data) {
repositories = response.data.map((repository) => repository.name)
}
return {
username,
repositories
}
})
}
processRequests (callback) {
return Promise.all(this.requests)
.then((responses) => {
this.requests = []
this.push(responses.reduce((accumulator, currentValue) => {
return accumulator + JSON.stringify(currentValue)
}, ''))
callback()
})
.catch(callback)
}
}
核心控制逻辑就是实现的 _transform 函数,每读 20 个用户名, 就用 Promise.all 批量发一次请求,当调用 callback 时则会进行下一次读取。读取完所有数据会调用调用 _flush 钩子函数,给开发人员一个收尾的时机。
Line 53 的 this.push 就是将数据传递给下个"消费者"。
定义好这个转换流该怎么用呢?
const fs = require('fs')
const path = require('path')
const CsvParser = require('csv-parse')
const TransformUsernameToGithubRepos = require('./transform-username-to-github-repos')
const stream = require('stream')
const readGithubUsernamesStream = fs.createReadStream(path.resolve(__dirname, '../github-usernames.csv'))
const csvParser = new CsvParser({ columns: false })
const transformUsernameToGithubRepos = new TransformUsernameToGithubRepos()
const writeStream = fs.createWriteStream(path.resolve(__dirname, '../github-user-repositories.txt'))
stream.pipeline(
readGithubUsernamesStream,
csvParser,
transformUsernameToGithubRepos,
writeStream,
(error) => {
if (error) {
console.error('error ', error)
return process.exit(1)
}
process.exit()
}
)
Line 12 的 stream.pipeline
是创建一个流水线,以更有组织和模块化的方式来处理数据,也方便做错误处理,第一个参数是读文件流,第二个参数是解析CVS文件流,第三个参数是将username每20个转成1次,最后一个参数是,写文件,写谁呢?写上面代码中this.push
中的数据。
Backpressure 背压
在处理大量数据过程中,存在一种称为背压(backpressure)的常见问题,它描述了在数据传输过程中缓冲区后面的数据积累现象。当数据传输的接收端存在复杂操作或由于某种原因较慢时,来自传入源的数据有积累的趋势。面对这种问题,Unix 管道 和TCP socket 就是一个很好的例子。在 Node.js 中如何解决这个问题呢? 就是通过我们刚才介绍过的 流,以及流的方法,这里我们加深一下背压处理方案:
- 使用可读流和可写流的缓冲区:通过设置流的缓冲区大小,可以控制数据的流动速度,从而处理背压情况。
- 使用流的暂停和恢复功能:通过暂停可读流或可写流,可以暂时停止数据的传输,从而处理背压问题。
- 使用流的流动模式:通过监听流的"drain"事件,可以判断流的缓冲区是否已满,从而控制数据的流动,避免背压情况的发生。
举个简单的例子
const fs = require('fs');
// 创建可读流
const readableStream = fs.createReadStream('data.txt');
// 创建可写流
const writableStream = fs.createWriteStream('output.txt');
writableStream.cork();
writableStream.setDefaultEncoding('utf-8');
writableStream.on('drain', () => {
console.log('缓冲区已清空,可以继续写入数据');
readableStream.resume();
});
// 通过管道将可读流中的数据传输给可写流
readableStream.pipe(writableStream);
// 当可读流读取到数据时
readableStream.on('data', (chunk) => {
// 判断可写流的缓冲区是否已满
if (!writableStream.write(chunk)) {
console.log('缓冲区已满,暂停传输数据');
readableStream.pause();
}
});
// 当可读流读取完所有数据时
readableStream.on('end', () => {
console.log('数据传输完成');
writableStream.end();
});
Line 10 writableStream.cork()
方法的作用是将可写流的缓冲区启用,并暂时阻止数据的传输。当调用该方法后,数据将被缓冲在内存中,直到调用 writableStream.uncork()
方法或当缓冲区已满时自动刷新数据。这个方法通常与 writableStream.on('drain', ...)
一起使用,以控制数据的流动速度,处理背压问题。
Line 22 writableStream.write(chunk) 方法返回一个布尔值,表示写入数据的操作是否成功。如果返回 false,表示可写流的缓冲区已满,需要暂停传输数据。如果返回 true,表示数据写入成功,可继续传输数据。
结尾
流处理在 Node.js 的大数据量处理中是基本操作,尤其是你用 Node.js 开发下面这些场景:
- 网络通信
- 数据转换(解压缩,加解密)
- 日志处理
- 图像或音频处理
都看到这里了点个赞吧 ❤️
转载自:https://juejin.cn/post/7281088405190393871