likes
comments
collection
share

Node.js Stream 如何工作的

作者站长头像
站长
· 阅读数 31

Node.js Stream 如何工作的

Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 Node.js 中的流是内置对象,能够以小块的形式增量地读取或写入源,而不是一次性将整个数据加载到内存中。

Node.js 中有四种基本类型的流:

可读流:这些流代表可以使用数据的源。例如,从文件读取数据、从 HTTP 请求接收数据或从数据库查询生成数据。每当有新数据可用时,可读流就会发出“data”事件,并且消费者可以使用不同的方法读取这些数据。

可写流:这些流代表可以写入数据的目的地。例如,将数据写入文件、通过 HTTP 响应发送数据或将数据插入数据库。可写流提供诸如 write() 和 end() 之类的方法来将数据发送到流。

双工流:这些流代表可读流和可写流。这意味着可以同时读取和写入这些流。双工流是双向的,通常用于网络通信等任务。

转换流:这些流是一种特殊类型的双工流,当数据从可读端流向可写端时,它们对数据进行操作或转换。转换流对于压缩、加密或数据格式化等任务很有用。它们可用于在数据传输时动态修改数据。

Node.js 中的流根据事件和背压原理工作。数据以块的形式消耗或生成,并发出诸如dataenderror之类的事件来通知应用程序有关流的状态。

背压是一种机制,用于在可写流无法像生成数据一样快地处理传入数据时控制数据流。它允许可读流暂停或减慢数据生成,直到可写流准备好消耗更多数据。

要在 Node.js 中使用流,通常需要创建适当流类型的实例、设置事件处理程序,并使用流 API 提供的各种方法与数据进行交互。还可以通过管道将流连接在一起,这意味着将一个流的输出连接到另一个流的输入,以创建一个自动处理多个流之间的数据流的管道。

下面的这些示例展示了可以在 Node.js 中使用流的不同场景,包括复制文件、压缩数据和转换数据。

const fs = require('fs');
const zlib = require('zlib');
const { Duplex } = require('stream');
const { Transform } = require('stream');

// 将数据文件读取和写入另一个文件:
export const readWriteStream = () => {
    const readableStream = fs.createReadStream('input.txt');
    const writableStream = fs.createWriteStream('output.txt');

    readableStream.pipe(writableStream);

    console.log('File is being copied...');
}

// 读取数据文件,生成zip并写入另一个文件:
export const zipGenerate = () => {
    const readableStream = fs.createReadStream('input.txt');
    const gzipStream = zlib.createGzip();
    const writableStream = fs.createWriteStream('output.txt.gz');

    readableStream.pipe(gzipStream).pipe(writableStream);

    console.log('File is being compressed...');
}

// 读取输入数据,转换为大写并写入输出:
export const convertInputToUpperCase = () => {
    const uppercaseTransform = new Transform({
        transform(chunk, encoding, callback) {
            const uppercasedChunk = chunk.toString().toUpperCase();
            this.push(uppercasedChunk);
            callback();
        }
    });

    process.stdin.pipe(uppercaseTransform).pipe(process.stdout);
}

// Handle with Duplex streams:
/* 
When you run this code, it will write "Hello" to the duplex stream, which will 
be logged as "Writing: Hello". Then, the stream will generate and push characters 
starting from 'A' to the readable side. Each received chunk will be logged as 
"Received: <character>". The stream will continue pushing characters until it 
reaches 'Z', at which point it will end the stream by pushing null. 
*/
export const logChars = () => {
    const myDuplexStream = new Duplex({
        write(chunk, encoding, callback) {
            console.log(`Writing: ${chunk}`);
            callback();
        },
        read(size) {
            if (this.currentCharCode > 90) {
                this.push(null);
                return;
            }
            this.push(String.fromCharCode(this.currentCharCode++));
        }
    });

    myDuplexStream.currentCharCode = 65; // ASCII code for 'A'

    myDuplexStream.on('data', (chunk) => {
        console.log(`Received: ${chunk}`);
    });

    myDuplexStream.write('Hello');

    myDuplexStream.end();
}

实现可读流

  • 继承 Readable
  • 接受迭代器作为参数
  • 实现_read方法
import { Readable } from 'stream'

class IeteratorReadableStream extends Readable {
  constructor(iterator) {
    super()
    this.iterator = iterator
  }

  _read () {
    let data = this.iterator.next()
    // console.log(data);
    if (data.done) {
      this.push(null)
    } else {
      // 必须 push 字符串或者 Buffer
      this.push(data.value + '')
    }
  }
}

function* getData () {
  for (let i = 0; i < 5; i++) {
    yield i
  }
}

let stream = new IeteratorReadableStream(getData())

stream.on('data', data => {
  console.log(data.toString())
})

stream.on('end', () => {
  console.log("迭代结束")
})

同理实现可写流

import { Writable } from 'stream';

class ConsoleWritableStream extends Writable {
  constructor(options) {
    super(options);
  }

  _write(chunk, encoding, callback) {
    console.log(chunk.toString());
    callback();
  }
}

const writableStream = new ConsoleWritableStream();

writableStream.write('Hello, ');
writableStream.write('world!\n');
writableStream.end('结束流');

Web Stream 和 Node Stream 区别

特性Web StreamsNode.js Streams
运行环境浏览器Node.js (服务器端或命令行应用程序)
API 设计基于 Promise 和异步迭代器基于事件和回调函数
主要类型ReadableStream, WritableStream, TransformStreamstream.Readable, stream.Writable, stream.Duplex, stream.Transform
错误处理通过 Promise 拒绝状态处理通过监听 error 事件处理
应用场景文件读写、视频流处理、响应式应用文件处理、网络通信、日志处理、实时数据处理
兼容性取决于浏览器支持取决于 Node.js 版本

总结

这篇文章主要介绍了 Node.js 中的几种数据流类型及其使用场景,并对比了 Web Stream 与 Node.js Stream 的区别

转载自:https://juejin.cn/post/7367997561510772772
评论
请登录