likes
comments
collection
share

Node.js&TypeScript#4 Stream-可读流

作者站长头像
站长
· 阅读数 35

在Nodejs中,Stream是非常重要的,我们可以通过Stream来高效的读写数据。比如在处理文件或者处理http请求时。在这篇文章中,我们主要讲的是readable stream(可读流)

Readable Streams

Stream是一种用于处理无法被一次性完全获取的数据集合。因为有了这种方式,数据不需要一次性全部存在内存中,这使得处理大量数据的时候变得高效。除此之外,你可以在只有部分数据可用时开始处理数据,而不是等待整个数据可用。

在我们之前的例子中,我们通过下面的方式读取文件:

import * as fs from 'fs';

async function readFile() {
  try {
    const content = await fs.promises.readFile('./file.txt')
    console.log(content instanceof Buffer) // true
    console.log(content.toString())
  } catch(err) {
    console.log(err)
  }
}
readFile()

上面的代码并不高效,因为它会等待整个文件加载到内存中然后再执行操作。而Nodejs为我们提供了fs.createReadableStreamAPI来编写可读流的操作。

每一个stream都是EventEmitter的一个实例。通过EventEmitter我们可以监听到数据是否读取到。

import * as fs from 'fs';

const stream = fs.createReadStream('./file.txt');
stream.on('data', (chunk) => {
  console.log('New chunk of data:', chunk);
})

在上面的方法中,文件被存储在了内部的buffer中。根据createReadStream的第二个参数有一个highWaterMark属性(这里默认是64kib)。当触发data事件后,会根据它的限制来读取文件,如果文件大于它的阈值,那么就会被分割为多个chunk

New chunk of data: <Buffer 68 61 61 61 61 61>

如上,每个chunk就是一个buffer实例。你的文件越大,接收到的chunk就越多。

如果想要将buffer转为字符串有以下几种方法:

  1. buffer.toString()
const stream = fs.createReadStream('./file.txt');
stream.on('data', (chunk) => {
  console.log('New chunk of data:', chunk.toString());
})
  1. 使用StringDecoder
import { StringDecoder } from 'string_decoder';

const decoder = new StringDecoder('utf-8')
const stream = fs.createReadStream('./file.txt');

stream.on('data', (chunk) => {
  return console.log('New chunk of data:', decoder.write(chunk as Buffer));
})
  1. 还有一种就是通过在createReadStream中明确定义字符编码
const stream = fs.createReadStream('./file.txt', {encoding: 'utf-8'});

关于流的两个模式

在上面的示例中,我们通过在data事件上添加监听器,使stream开始发出chunk

那么如果我们在创建stream后一段时间再添加回调函数,结果是什么呢?

我们仍然可以监听到数据。

想要更好的理解它,我们需要去看一下可读流的模式。readable stream有两个模式:

  • paused
  • flowing

我们可以看下面的例子:

const stream = fs.createReadStream('./file.txt');
setTimeout(() => {
  stream.on('data', (chunk) => {
    console.log(chunk);
  })
}, 2000);

我们首先创建了一个可读流,然后在setTimeout中延迟读取流。但是最终还是可以正常的读取数据,这是因为可读流可写流都会将文件存储在了内部的buffer中。且所有stream默认都是使用paused模式。我们需要通过添加一个data事件的监听器来自动切换流的模式到flowing。当切换为flowing模式时,才开始读取数据,所以数据并不会因为延迟调用而丢失。

还有一种手动将readable stream切换到flowing模式的方法是调用 stream.resume 方法。

stream.resume();

setTimeout(() => {
  stream.on('data', (data) => {
      console.log(data)
  })
}, 2000);

上面这个例子中,我们是先手动修改模式,不过我们并没有及时的处理。所以当延迟执行后,stream已经丢失了。最终导致上面这个例子什么都不会输出

Readable stream的原理是什么

在使用 fs.createReadableStream 熟悉可读流之后,让我们创建自己的可读流以更好地说明其工作原理。

import { Readable } from 'stream';

const stream = new Readable();

stream.push('Hello');
stream.push('World!');
stream.push(null);

stream.on('data', (chunk) => {
  console.log(chunk.toString());
});

push方法会将数据添加到内部buffer中,以供用户使用。最后push(null)表示流已经完成数据输入。

上面的例子会输出

Hello
World!

Readable实例的read方法和readable事件

readable.read() 是可读流中用于手动触发读取数据的方法。当可读流处于flowing模式时,数据会自动从底层系统读取到内存中,并被放入buffer中,供用户使用。但是,在某些情况下,你可能需要手动控制数据的读取速度,这时候就可以使用 readable.read() 方法手动触发读取。

import { Readable } from 'stream';

const stream = new Readable();

const read = stream.read.bind(stream);
stream.read = function() {
  console.log('read() called');
  return read(2);
}

stream.push('Hello');
stream.push('World!');
stream.push(null);

stream.on('data', (chunk) => {
  console.log(chunk.toString());
});

当我们运行上面代码后,我们可以看到当我们开始读取流时,read函数被多次调用。并且,每次只输出两个字节。

read() called
He
read() called
ll
read() called
oW
read() called
or
read() called
ld
read() called
!
read() called
read() called
read() called
read() called

我们也可以使用readable.on('readable') 来读流是否有数据可供读取

import { Readable } from 'stream';

const stream = new Readable();

stream.push('Hello');
stream.push('World!');
stream.push(null);

stream.on('readable', () => {
  let data;
  // 使用一个循环,以确保我们读取所有当前可用的数据
  while (null !== (data = stream.read())) {
    console.log('Received:', data.toString());
  }
});
stream.on('end', () => {
  console.log('Reached end of stream.');
});

在上面的例子中,当可读流被读取时,会先在 read 方法中推送数据到队列中,然后当调用 stream.read() 时,会从buffer中读取数据。readable.on('readable') 用于监听可读流是否有新数据可供读取,当可读流中有新数据时,会触发回调函数,循环读取buffer中的数据,直到数据为空为止。

总结

在本文中,我们介绍了流是什么以及如何使用它们。虽然在本系列文章的这一部分中,我们着重讨论了可读流,但在接下来的部分中,我们将涵盖可写流、管道等更多内容。

转载自:https://juejin.cn/post/7218228146104320057
评论
请登录