likes
comments
collection
share

Nodejs的Stream流的高级操作-Readable(可读流篇)

作者站长头像
站长
· 阅读数 13

一、什么是Stream流

Stream流是一种处理大数据的方式,在上传文件的场景会经常使用到。如图片、视频、文本、文档上传的场景。Stream在处理数据时,把数据都分成一小块(chunk)一小块的传输。而不是直接处理整个数据。每处理好一小块(chunk)就会释放掉当前使用的内存,从而提高了效率和性能。

二、Stream流的类型

Nodejs中的有四种基本的Stream流类型:

  1. Readable: 是一个读取数据的流,example:fs.createReadStream.
  2. Writable: 是一个可写数据的流,example:fs.createWriteStream.
  3. Duplex: 是一个双工流,即可读和可写的流,example: net.socket.
  4. Transform: 是一个转换流,即可在读与写数据时修改和转换数据的双工流,example:zlib.createDeflate.

三、Readable可读流的使用

可读流的可以通过Stream.Readablefs.createReadStream的方式创建。

可读流它拥有两种读取数据的模式: flowingpaused

两种读取模式的区别:

  • flowing: 在flowing流动模式下,数据会自动从底层系统读取,并通过EventEmitter接口使用事件尽可能快地提供给应用程序。
  • paused: 在paused暂停模式下,必须显式调用 stream.read() 方法来从流中读取数据块(chunk)。

所有可读流创建之后,它的数据读取模式默认是以paused开始。平时开发时,常用的是flowing流动模式,如果你想要从paused暂停模式切换为flowing流动模式的话,可以通过下面几种方式:

  1. 添加data事件
  2. 显示调用stream.resume()方法
  3. 显示调用stream.pipe()方法

反之,想切换回paused模式的话,可以调用stream.pause()stream.unpipe()方法。

(1) 添加data事件: data事件: on(event: 'data', listener: (chunk: Buffer | string) => void): this;。输出的chunk的数据类型默认是Buffer,如果想要为string的数据类型可以通过readable.setEncoding('utf-8')方法设置。

const fs = require('fs');

const createReadable = fs.createReadStream('./test.txt'); 

createReadable.on('data', (chunk) => {
	console.log('chunk: ', chunk.toString());
});

(2)添加readable事件: 注意: 如果添加了readable事件时,可读流将自动的停止流动,并且必须通过调用readable.read()方法来消费数据

const fs = require('fs');
const createReadable = fs.createReadStream('./test.txt'); 

createReadable.on('readable', function() {
	let chunk;
	// this.read() 可用this直接调用,如果是匿名函数记得用createReadable.read()调用
	if ((chunk = this.read()) !== null) {
		console.log('chunk: ', chunk.toString());
	}
});

调用底层的readable.read()方法可以做更多的事情,比如控制每次流动的数据块(chunk)的大小。readable.read(size: number)方法接受一个参数size, 单位byte。size可以控制消费的chunk的大小, 但是size必须小于等于1GB。

const fs = require('fs');
const createReadable = fs.createReadStream('./test.txt'); 

createReadable.on('readable', function() {
	let chunk;
	// 每次读取10个字节
	if ((chunk = this.read(10)) !== null) {
		console.log('chunk: ', chunk.byteLength);
	}
});

当然也可以通过设置HighWaterMark的大小来决定chunk的大小。但是设置HighWaterMark时,需要注意背压问题。这里简单讲述一下,背压问题就是读取速度高于写入速度,写入速度慢了,就把读取的数据堆积到了数据在缓冲区内,最后导致了Buffer溢出等问题HighWaterMark在可读流中的默认值情况,stream.Readable的创建的可读流的HighWaterMark默认是16kb,而通过fs.createReadStream创建的可读流的HighWaterMark默认是64kb。它们决定了每块chunk的数据大小是多少

const fs = require('fs');
const {Readable} = require('node:stream');

const streamReadable = new Readable();
const createReadable = fs.createReadStream('./test.txt'); 

console.log('streamReadable HighWaterMark: ', streamReadable.readableHighWaterMark);
// streamReadable HighWaterMark:  16384

console.log('createReadable HighWaterMark: ', createReadable.readableHighWaterMark);
// createReadable HighWaterMark:  65536

我们可以自定义设置HighWaterMark的大小。当然,也可以在可读流的模式是paused暂停模式时,显示调用readable.read()方法也可以控制chunk的数据大小。

const fs = require('fs');

const createReadable = fs.createReadStream('./test.txt', {highWaterMark: 20});  // 指定为20字节

console.log('createReadable HighWaterMark: ', createReadable.readableHighWaterMark);
// createReadable HighWaterMark:  20

createReadable.on('readable', function() {
	let chunk;
	// 每次读取10个字节
	if ((chunk = this.read(10)) !== null) {
		console.log('chunk: ', chunk.byteLength); // 10
	}
});

四、Readable可读流的应用场景

  1. 想要判断文件是否是同一个的情况下,可以通过可读流进行文件的加密处理后,生成唯一的摘要。

如生成一个图片的md5值:

const fs = require('fs');
const Crypto = require('crypto');

const createReadable = fs.createReadStream('./img.png');

const Sign = Crypto.createHash('md5'); // 创建一个md5的哈希

createReadable.on('data', (chunk) => {
	Sign.update(chunk);
});

createReadable.on('end', () => {
	const md5 = Sign.digest('hex');
	console.log('md5: ', md5); // 图片最后的 md5: 4b10f9f6cbbe42a90247b1012d55bf40
});

拿到这个md5就可以去判断图片的唯一性。