Node.js Stream 如何工作的
Node.js Stream 如何工作的
Node.js 流是一个强大的功能,可以让你以非阻塞的方式高效地处理数据流。 Node.js 中的流是内置对象,能够以小块的形式增量地读取或写入源,而不是一次性将整个数据加载到内存中。
Node.js 中有四种基本类型的流:
可读流:这些流代表可以使用数据的源。例如,从文件读取数据、从 HTTP 请求接收数据或从数据库查询生成数据。每当有新数据可用时,可读流就会发出“data”事件,并且消费者可以使用不同的方法读取这些数据。
可写流:这些流代表可以写入数据的目的地。例如,将数据写入文件、通过 HTTP 响应发送数据或将数据插入数据库。可写流提供诸如 write() 和 end() 之类的方法来将数据发送到流。
双工流:这些流代表可读流和可写流。这意味着可以同时读取和写入这些流。双工流是双向的,通常用于网络通信等任务。
转换流:这些流是一种特殊类型的双工流,当数据从可读端流向可写端时,它们对数据进行操作或转换。转换流对于压缩、加密或数据格式化等任务很有用。它们可用于在数据传输时动态修改数据。
Node.js 中的流根据事件和背压原理工作。数据以块的形式消耗或生成,并发出诸如data、end和error之类的事件来通知应用程序有关流的状态。
背压是一种机制,用于在可写流无法像生成数据一样快地处理传入数据时控制数据流。它允许可读流暂停或减慢数据生成,直到可写流准备好消耗更多数据。
要在 Node.js 中使用流,通常需要创建适当流类型的实例、设置事件处理程序,并使用流 API 提供的各种方法与数据进行交互。还可以通过管道将流连接在一起,这意味着将一个流的输出连接到另一个流的输入,以创建一个自动处理多个流之间的数据流的管道。
下面的这些示例展示了可以在 Node.js 中使用流的不同场景,包括复制文件、压缩数据和转换数据。
const fs = require('fs');
const zlib = require('zlib');
const { Duplex } = require('stream');
const { Transform } = require('stream');
// 将数据文件读取和写入另一个文件:
export const readWriteStream = () => {
const readableStream = fs.createReadStream('input.txt');
const writableStream = fs.createWriteStream('output.txt');
readableStream.pipe(writableStream);
console.log('File is being copied...');
}
// 读取数据文件,生成zip并写入另一个文件:
export const zipGenerate = () => {
const readableStream = fs.createReadStream('input.txt');
const gzipStream = zlib.createGzip();
const writableStream = fs.createWriteStream('output.txt.gz');
readableStream.pipe(gzipStream).pipe(writableStream);
console.log('File is being compressed...');
}
// 读取输入数据,转换为大写并写入输出:
export const convertInputToUpperCase = () => {
const uppercaseTransform = new Transform({
transform(chunk, encoding, callback) {
const uppercasedChunk = chunk.toString().toUpperCase();
this.push(uppercasedChunk);
callback();
}
});
process.stdin.pipe(uppercaseTransform).pipe(process.stdout);
}
// Handle with Duplex streams:
/*
When you run this code, it will write "Hello" to the duplex stream, which will
be logged as "Writing: Hello". Then, the stream will generate and push characters
starting from 'A' to the readable side. Each received chunk will be logged as
"Received: <character>". The stream will continue pushing characters until it
reaches 'Z', at which point it will end the stream by pushing null.
*/
export const logChars = () => {
const myDuplexStream = new Duplex({
write(chunk, encoding, callback) {
console.log(`Writing: ${chunk}`);
callback();
},
read(size) {
if (this.currentCharCode > 90) {
this.push(null);
return;
}
this.push(String.fromCharCode(this.currentCharCode++));
}
});
myDuplexStream.currentCharCode = 65; // ASCII code for 'A'
myDuplexStream.on('data', (chunk) => {
console.log(`Received: ${chunk}`);
});
myDuplexStream.write('Hello');
myDuplexStream.end();
}
实现可读流
- 继承 Readable
- 接受迭代器作为参数
- 实现_read方法
import { Readable } from 'stream'
class IeteratorReadableStream extends Readable {
constructor(iterator) {
super()
this.iterator = iterator
}
_read () {
let data = this.iterator.next()
// console.log(data);
if (data.done) {
this.push(null)
} else {
// 必须 push 字符串或者 Buffer
this.push(data.value + '')
}
}
}
function* getData () {
for (let i = 0; i < 5; i++) {
yield i
}
}
let stream = new IeteratorReadableStream(getData())
stream.on('data', data => {
console.log(data.toString())
})
stream.on('end', () => {
console.log("迭代结束")
})
同理实现可写流
import { Writable } from 'stream';
class ConsoleWritableStream extends Writable {
constructor(options) {
super(options);
}
_write(chunk, encoding, callback) {
console.log(chunk.toString());
callback();
}
}
const writableStream = new ConsoleWritableStream();
writableStream.write('Hello, ');
writableStream.write('world!\n');
writableStream.end('结束流');
Web Stream 和 Node Stream 区别
特性 | Web Streams | Node.js Streams |
---|---|---|
运行环境 | 浏览器 | Node.js (服务器端或命令行应用程序) |
API 设计 | 基于 Promise 和异步迭代器 | 基于事件和回调函数 |
主要类型 | ReadableStream, WritableStream, TransformStream | stream.Readable, stream.Writable, stream.Duplex, stream.Transform |
错误处理 | 通过 Promise 拒绝状态处理 | 通过监听 error 事件处理 |
应用场景 | 文件读写、视频流处理、响应式应用 | 文件处理、网络通信、日志处理、实时数据处理 |
兼容性 | 取决于浏览器支持 | 取决于 Node.js 版本 |
总结
这篇文章主要介绍了 Node.js 中的几种数据流类型及其使用场景,并对比了 Web Stream 与 Node.js Stream 的区别
转载自:https://juejin.cn/post/7367997561510772772