likes
comments
collection
share

从Readable steam(可读流)中读取数据的方式

作者站长头像
站长
· 阅读数 17

可读流是对数据源的抽象,从Readable中读取数据的模式有两种,non-flowing(非流动模式也称为paused)模式和flowing(流动)模式。

非流动模式

  • 在非流动模式下,当流中有可读取的数据时,会触发readable事件,所以该事件被触发时需要反复调用流的read方法从缓存中获取最新的数据直到read方法返回null,表示没有数据可读,新建的可读流默认处于该模式下。常用方式如下:
const stream = fs.createReadStream(path.resolve(__dirname, './xxx.js'));
stream.on('readable', () => {
  let chunk;
  while (null !== (chunk = stream.read())) {
    console.log('chunk: ' + chunk.toString());
  }
});
  • 在没有指定编码方式的前提下,read方法返回的是Buffer,可以调用setEncoding方法指定数据的编码,这样调用read方法时就可以直接获取到string。提前指定encoding的好处是对于多字节字符流会确保多个字节不会被分割到不同的chunk中:
const stream = fs.createReadStream(path.resolve(__dirname, './print.js'), { encoding: 'utf8' });
stream.on('readable', () => {
  let chunk;
  while (null !== (chunk = stream.read())) {
    console.log('chunk: ' + chunk);
  }
});
  • 在非流动模式下调用Readable.resume()或者通过pipe连接到一个Writable,可切换到流动模式,如果此时没有注册data事件处理函数,那么产生的新数据回丢失。

流动模式

  • 可以通过注册data事件处理函数让可读流切换到流动模式,在该模式下当数据源中有新的数据准备就绪时,新的数据会被传递给data事件的处理函数,可以在函数中处理新的数据。如通过以下方式读取文件的内容:
const stream = fs.createReadStream(path.resolve(__dirname, './print.js'), { encoding: 'utf8' });
let chunks = '';
stream.on('data', (chunk) => {
  chunks += chunk;
});

stream.on('end', () => {
  console.log(chunks);
});
  • 在流动模式下,可以通过两种方式切换回pause模式:

    1. 调用pause方法;
    2. 当可读流通过pipe管道连接到其他流是,调用unpipe也会使流切换到pause模式;
  • 在流动模式下注册readable事件处理函数会让流停止流动,需要通过read读取数据;如果readable处理函数被移除,如果存在data处理函数,流会再次进入flowing模式;

异步迭代器

新版本的node<v11.14.0>中的Readable实现了异步迭代器,可以更加方便的读取数据:

const stream = fs.createReadStream(path.resolve(__dirname, './print.js'), { encoding: 'utf8' });

async function getData(stream) {
  let chunks = '';
  for await (let chunk of stream) {
    chunks += chunk;
  }
  return chunks;
}

getData(stream).then(data => console.log(data));

如果循环通过break, return, throw 终止,流会被销毁。每次读取的chunk的大小等于流的highWaterMark选项,没有指点该选项的情况下,默认值为64kb, 因此如果文件的大小小于该值,文件的内容会在单个chunk中被返回。

补充

流中的数据也有两种模式,二进制模式和对象模式。二进制模式下读取的数据类型为Buffer或者String, 前面的例子都是在二进制模式下读取数据。而对象模式下可以返回任意的javascript的对象,下面举一个简单的例子, Readable.from()创建的流默认是objectMode:

const stream = Readable.from([{
  name: 'Bob',
}, {
  name: 'Jenny'
}, {
  name: 'Lili'
}]);

async function getData(stream) {
  for await (let chunk of stream) {
    console.log(chunk);
  }
}

getData(stream);

输出如下:

{ name: 'Bob' }
{ name: 'Jenny' }
{ name: 'Lili' }