likes
comments
collection
share

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

作者站长头像
站长
· 阅读数 58

Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 Node.js 中的流是内置对象,能够以小块的形式增量地读取或写入源,而不是一次性将整个数据加载到内存中。

主要特点

Node.js流的一些主要特点:

  1. 数据流动: 数据像水流一样从一个地方"流"到另一个地方。

  2. 类型: 主要有四种类型的流:

    • 可读流(Readable)
    • 可写流(Writable)
    • 双工流(Duplex):既可读又可写
    • 转换流(Transform):可以在写入和读取数据时修改或转换数据
  3. 事件机制: 流基于Node.js的EventEmitter,可以发出事件。

  4. 管道: 可以使用pipe()方法将流连接起来。

  5. 背压(Backpressure)处理: 自动处理读写速度不匹配的情况。

Readable

实现一个可读流的核心是继承 Readable, 并至少实现一个 _read 方法。

const { Readable } = require('stream');

const myReadable = new Readable({
  read(size) {
    // ...
  }
});

可读流,有两种模式,Flowing Mode,Non-Flowing Mode。

所有可读流都开始于暂停模式,可以通过以下方式切换到流动模式, 流动模式后就会不断调用 _read 方法。

  • 添加 'data' 事件句柄。
  • 调用 stream.resume() 方法。
  • 调用 stream.pipe() 方法将数据发送到可写流。
import { Readable } from 'stream'
import fs from 'fs'

function main () {
  const readable = fs.createReadStream('./input.txt')
  readable.on('data', (c) => {
    console.log(c.toString())
  })
}

main()

流程图如下:

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

资源并不是直接流向消费者的,而是先到内部的缓冲区,如果数据流入 Stream 的速度比您处理它的速度快,会发生什么?内部缓冲区的数量变大,在最坏的情况下,进程可能会耗尽内存并终止。

为了避免这种情况,Stream 允许您设置内部缓冲区阈值。当数据量超过此阈值时,它会暂时停止从资源中读取数据,以防止内部缓冲区溢出。这种流量控制机制称为背压,阈值称为highWaterMark。

可读流背压处理和恢复机制,如下图

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

Writable

可写流原理和可读流比较相似,数据源也就是生产者就写入数据到缓冲区,然后再写入目标。

当生产者写入速度过快,把缓冲区装满了之后,就会出现「背压」,这个时候是需要告诉生产者暂停生产的,当队列释放之后,Writable Stream 会给生产者发送一个 drain 消息,让它恢复生产。

  1. 生产者通过 write() 方法向流中写入数据。

  2. writeOrBuffer() 方法决定是直接写入资源还是先放入缓冲区:

  3. 写入操作返回 true 或 false:

    • true 表示可以继续写入。
    • false 表示内部缓冲已满,应该停止写入直到"drain"事件发生。
  4. 当所有数据都写入完毕时,调用 end() 方法来结束写入过程。

  5. 当写入操作全部完成时,流会触发 "finish" 事件,通知生产者写入已经结束。

流程图如下

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

可写流背压处理和恢复机制,如下图

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

pipe

pipe 方法主要是处理好可读流与可写流的联动问题, 如一方停止了或者结束了, 会在回调中触发另一方对应的处理函数。

让流处于流动模式的核心是 pipe 方法中 src.on('data', ondata) 让可读流监听了 onData 事件, 当监听该事件时会调用流的 resume 方法, 接着调用 _read 方法开始产生数据, 产生的数据通过 src 可读流的回调函数 ondata 继续调用 dest.write(chunk) 可写流去消费数据。

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

如下例子,可用来查看背压的实际效果

import { Readable, Writable } from 'stream'

// 创建一个可读流,模拟数据源
class SlowReadable extends Readable {
  constructor(options) {
    super(options)
    this.index = 0
  }

  _read (size) {
    const i = this.index++
    if (i > 10) {
      this.push(null)
    } else {
      const str = String(i)
      console.log(`Producing: ${str}`)

      // 模拟异步生产数据
      setTimeout(() => {
        if (!this.push(str)) {
          console.log('Backpressure applied, pausing production...')
        }
      }, 10)
    }
  }
}

// 创建一个可写流,模拟慢速消费者
const slowWriter = new Writable({
  write (chunk, encoding, callback) {
    console.log(`Consuming: ${chunk.toString()}`)

    // 模拟慢速处理
    setTimeout(callback, 1000)
  },
  highWaterMark: 2 // 设置较低的高水位线来更容易观察到背压
})

const readable = new SlowReadable({ highWaterMark: 2 })
readable.pipe(slowWriter)

slowWriter.on('drain', () => {
  console.log('Drain event: backpressure relieved, resuming production...')
})

Duplex

双工流结合了可读流和可写流的特性,允许数据在两个方向上流动。它的输入和输出可以没有任何关系。

这种双工流的设计使得它能够高效地处理需要双向数据传输的场景,如网络通信、文件读写等。它提供了灵活的控制机制,可以独立管理数据的读取和写入过程,同时通过内部缓冲机制来平衡数据产生和消费的速度差异。

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

Transform

转换流是一种特殊的双工流,它可以在数据写入和读取之间进行数据转换。

工作流程:

  1. 生产者通过 write() 方法将数据写入转换流。

  2. 数据进入 Transformer,在这里进行处理和转换。

  3. 转换后的数据经过 highWaterMark 检查:

    • 如果缓存未满,数据进入缓存池,并返回 true。
    • 如果缓存已满,返回 false,表示需要暂停写入。
  4. 缓存池中的数据通过 "data" 事件发送给消费者。

  5. 消费者可以使用 resume() 和 pause() 方法来控制数据的接收速度。

  6. 当所有数据处理完毕时,生产者调用 end() 方法结束写入。

图解Node.js流:原理、类型与实践Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 No

总结

主要介绍了Node.js中的流(Streams)概念,通过图解方式详细阐述了其工作原理和应用。

  1. 流的基本概念和特点,包括非阻塞处理、事件机制和背压控制
  2. 四种主要类型的流:可读流、可写流、双工流和转换流,及其各自的实现和使用方法
  3. 流的内部工作机制,如数据缓冲、流动模式切换和管道操作
  4. 实际应用示例,展示了如何在文件处理、数据转换等场景中高效使用流
转载自:https://juejin.cn/post/7415914057389129764
评论
请登录