Nodejs的Stream流的高级操作-Readable(可读流篇)
一、什么是Stream流
Stream
流是一种处理大数据的方式,在上传文件的场景会经常使用到。如图片、视频、文本、文档上传的场景。Stream
在处理数据时,把数据都分成一小块(chunk
)一小块的传输。而不是直接处理整个数据。每处理好一小块(chunk
)就会释放掉当前使用的内存,从而提高了效率和性能。
二、Stream流的类型
在Nodejs
中的有四种基本的Stream
流类型:
Readable
: 是一个读取数据的流,example:fs.createReadStream.Writable
: 是一个可写数据的流,example:fs.createWriteStream.Duplex
: 是一个双工流,即可读和可写的流,example: net.socket.Transform
: 是一个转换流,即可在读与写数据时修改和转换数据的双工流,example:zlib.createDeflate.
三、Readable
可读流的使用
可读流的可以通过Stream.Readable
或fs.createReadStream
的方式创建。
可读流它拥有两种读取数据的模式: flowing
和 paused
。
两种读取模式的区别:
flowing
: 在flowing
流动模式下,数据会自动从底层系统读取,并通过EventEmitter接口使用事件尽可能快地提供给应用程序。paused
: 在paused
暂停模式下,必须显式调用 stream.read() 方法来从流中读取数据块(chunk
)。
所有可读流创建之后,它的数据读取模式默认是以paused
开始。平时开发时,常用的是flowing
流动模式,如果你想要从paused
暂停模式切换为flowing
流动模式的话,可以通过下面几种方式:
- 添加
data
事件 - 显示调用
stream.resume()
方法 - 显示调用
stream.pipe()
方法
反之,想切换回paused
模式的话,可以调用stream.pause()
或 stream.unpipe()
方法。
(1) 添加data
事件:
data
事件: on(event: 'data', listener: (chunk: Buffer | string) => void): this;
。输出的chunk的数据类型默认是Buffer
,如果想要为string
的数据类型可以通过readable.setEncoding('utf-8')
方法设置。
const fs = require('fs');
const createReadable = fs.createReadStream('./test.txt');
createReadable.on('data', (chunk) => {
console.log('chunk: ', chunk.toString());
});
(2)添加readable
事件:
注意: 如果添加了readable
事件时,可读流将自动的停止流动,并且必须通过调用readable.read()
方法来消费数据。
const fs = require('fs');
const createReadable = fs.createReadStream('./test.txt');
createReadable.on('readable', function() {
let chunk;
// this.read() 可用this直接调用,如果是匿名函数记得用createReadable.read()调用
if ((chunk = this.read()) !== null) {
console.log('chunk: ', chunk.toString());
}
});
调用底层的readable.read()
方法可以做更多的事情,比如控制每次流动的数据块(chunk
)的大小。readable.read(size: number)方法接受一个参数size, 单位byte。size可以控制消费的chunk的大小, 但是size必须小于等于1GB。
const fs = require('fs');
const createReadable = fs.createReadStream('./test.txt');
createReadable.on('readable', function() {
let chunk;
// 每次读取10个字节
if ((chunk = this.read(10)) !== null) {
console.log('chunk: ', chunk.byteLength);
}
});
当然也可以通过设置HighWaterMark
的大小来决定chunk的大小。但是设置HighWaterMark
时,需要注意背压问题
。这里简单讲述一下,背压问题就是读取速度高于写入速度,写入速度慢了,就把读取的数据堆积到了数据在缓冲区内,最后导致了Buffer溢出等问题。
HighWaterMark
在可读流中的默认值情况,stream.Readable
的创建的可读流的HighWaterMark默认是16kb,而通过fs.createReadStream
创建的可读流的HighWaterMark默认是64kb。它们决定了每块chunk的数据大小是多少。
const fs = require('fs');
const {Readable} = require('node:stream');
const streamReadable = new Readable();
const createReadable = fs.createReadStream('./test.txt');
console.log('streamReadable HighWaterMark: ', streamReadable.readableHighWaterMark);
// streamReadable HighWaterMark: 16384
console.log('createReadable HighWaterMark: ', createReadable.readableHighWaterMark);
// createReadable HighWaterMark: 65536
我们可以自定义设置HighWaterMark
的大小。当然,也可以在可读流的模式是paused
暂停模式时,显示调用readable.read()
方法也可以控制chunk
的数据大小。
const fs = require('fs');
const createReadable = fs.createReadStream('./test.txt', {highWaterMark: 20}); // 指定为20字节
console.log('createReadable HighWaterMark: ', createReadable.readableHighWaterMark);
// createReadable HighWaterMark: 20
createReadable.on('readable', function() {
let chunk;
// 每次读取10个字节
if ((chunk = this.read(10)) !== null) {
console.log('chunk: ', chunk.byteLength); // 10
}
});
四、Readable
可读流的应用场景
- 想要判断文件是否是同一个的情况下,可以通过可读流进行文件的加密处理后,生成唯一的摘要。
如生成一个图片的md5值:
const fs = require('fs');
const Crypto = require('crypto');
const createReadable = fs.createReadStream('./img.png');
const Sign = Crypto.createHash('md5'); // 创建一个md5的哈希
createReadable.on('data', (chunk) => {
Sign.update(chunk);
});
createReadable.on('end', () => {
const md5 = Sign.digest('hex');
console.log('md5: ', md5); // 图片最后的 md5: 4b10f9f6cbbe42a90247b1012d55bf40
});
拿到这个md5就可以去判断图片的唯一性。
转载自:https://juejin.cn/post/7244819689942990907