likes
comments
collection
share

node常用内置模块(Stream)

作者站长头像
站长
· 阅读数 25

node常用内置模块(Stream)

一、Stream了解

node.js中的流就是处理流式数据的抽象接口,文件操作系统和网络模块实现了流接口

不使用流的常见问题:

同步读取资源文件,用户与要等待数据读取完成
资源文件最终一次性加载至内存,开销较大

使用流的图解(数据的分段传输):node常用内置模块(Stream)配合管道对需求的加工:node常用内置模块(Stream)流处理数据的优势:

时间效率:流的分段处理可以同时操作多个数据chunk
空间效率:同一时间流无须占据大内存空间
使用方便:流配合管理,扩展程序变得简单

node.js中流的分类:

Readalbe:可读流,能够是实现数据的读取
Writealbe:可写流,能够实现数据的写操作
Duplex:双工流,既可读又可写
Transform:转换流,可读可写,还是实现数据转换

nodejs流特点:

Stream模块实现了四个具体的抽象
所有流都继承自EventEmitter

二、基本API

1.可读流

专门生产供程序消费数据的流

自定义可读流:

继承stream里的Readable
重写_read方法调用push产出数据

可读流基本原理:node常用内置模块(Stream)消费数据:

readable事件:当流中存在可读取的数据是触发
data事件:当流中数据块传给消费者后触发

自定义可读流代码实现:

const { Readable } = require('stream');

// 模拟底层数据
let source = ['lg', 'zce', 'syy'];

class MyReadable extends Readable {
    constructor(source) {
        super();
        this.source = source;
    }
    _read() {
        let data = this.source.shift() || null;     // 如果没有数据,则返回 null
        this.push(data);                            // 将数据推入到流中
    }
}

let mr = new MyReadable(source);

// reaadable 默认是暂停模式 
// mr.on('readable', () => {
//     let data = null;
//     while(data = mr.read(2)){
//         console.log(data.toString());
//     }
// })

// 有可能都不放入缓存中,直接输出
mr.on('data', (chunk) => {
    console.log(chunk.toString());
})

2.可写流

用于消费数据的流

自定义可写流:

继承stream模块的Writeable
重写_write方法,调用write执行写入

可写流事件

pipe事件:可读流调用pipe()方法时触发
unpipe事件:可读流调用unpipe()方法时触发

自定义可写流代码实现:

const { Writable } = require('stream');

class MyWriteable extends Writable {
    constructor() {
        super();
    }
    _write(chunk, encoding, done) {
        process.stdout.write(chunk.toString() + '<-----\n');
        process.nextTick(done);
    }
}

let mw = new MyWriteable();

mw.write('江江学习', 'utf-8', () => {
    console.log('write success');
})

3.双工流

Duplex是双工流,既能生产又能消费,读写相互独立,读操作创建的数据不能当作写操作的数据源去使用

自定义双工流

继承Duplex类
重写_read方法,调用push生产数据
重写_write方法,调用write消费数据

代码实现:

let { Duplex } = require('stream');

class MyDuplex extends Duplex {
    constructor(source) {
        super();
        this.source = source;
    }
    _read() {
        let data = this.source.shift() || null;
        this.push(data);
    }
    _write(chunk, en, next) {
        process.stdout.write(chunk.toString() + '<-----\n');
        process.nextTick(next);
    }
}

let source = ['hello', 'world', '!'];
let md = new MyDuplex(source);

md.write('江江',()=>{
    console.log('write success');
})

md.on('data', (chunk) => {
    console.log(chunk.toString());

})

Transform

Transform也是一个双工流,读写操作进行了联通

Transform自定义实现:

继承Transform类
重写_transform方法,调用push和callback
重写_flush方法,处理剩余数据

transform自定义代码实现:

let { Transform } = require('stream');

class MyTransform extends Transform{
    constructor(){
        super();
    }

    _transform(chunk,en,cb){
        this.push(chunk.toString().toUpperCase());
        cb(null);
    }
}

let t = new MyTransform();
t.write('hello');

t.on('data',(chunk)=>{
    console.log(chunk.toString());

})

三、文件读写流

1.文件可读流

文件可读流代码中使用:

const fs = require('fs');
const path = require('path');


let rs = fs.createReadStream('test.txt', {
    flags: 'r',
    encoding: null,         // 返回buffer
    fd: null,               // 默认值从3开始的, 0、1、2被输入、输出、错误占用了
    mode: 438,              // 权限控制
    autoClose: false,       // 是否自动关闭文件
    start: 0,               // 从文件的某个位置开始读取
    // end: 10,             // 在文件的某个位置结束读取
    highWaterMark: 16        // 每次准备多少个字节的数据让读取(调用push放入缓存区里面),Readable中默认16个,文件可读流中(此处)默认64个
})

// data 事件
// rs.on('data',(chunk)=>{
//     console.log(chunk.toString());
//     rs.pause();         // 流动模式切换到暂停模式
//     setTimeout(()=>{
//         rs.resume();    // 恢复到流动模式
//     },1000)
// })


// readable 事件
rs.on('readable', () => {
    // let data = rs.read();
    // console.log(data)
    let data = null;
    while(data = rs.read(3)){            // 每次从缓存中读取多少个字节
        console.log(data.toString());
        console.log('------',rs._readableState.length);     // 剩余多少个字节
    }
})

其它事件:

const fs = require('fs');
const path = require('path');


let rs = fs.createReadStream('test.txt', {
    flags: 'r',
    encoding: null,         // 返回buffer
    fd: null,               // 默认值从3开始的, 0、1、2被输入、输出、错误占用了
    mode: 438,              // 权限控制
    autoClose: false,       // 是否自动关闭文件
    start: 0,               // 从文件的某个位置开始读取
    // end: 10,             // 在文件的某个位置结束读取
    highWaterMark: 16        // 每次准备多少个字节的数据让读取(调用push放入缓存区里面),Readable中默认16个,文件可读流中(此处)默认64个
})


rs.on('open', (fd) => {
    console.log('fd', fd,'文件打开了');
})


rs.on('close',()=>{
    console.log('文件关闭了')
})
let bufferArr = [];
rs.on('data',(chunk)=>{
    bufferArr.push(chunk)
})

rs.on('end',()=>{
    console.log(Buffer.concat(bufferArr).toString())
    console.log('数据被清空之后')
})

rs.on('error',()=>{
    console.log('出错了')
})

2.文件可写流

可写流常用事件:

const fs = require('fs');
const path = require('path');


const ws = fs.createWriteStream('test.txt', {
    flags: 'w',
    mode: 438,
    fd: null,
    encoding: 'utf-8',
    start: 0,
    highWaterMark: 16    // 默认16kb
})

ws.write('拉钩教育', () => {
    console.log('拉钩教育-数据写完了')
})

// 字符串 或者 buffer ===> fs rs
// ws.write(123456,()=>{
//     console.log('123456-数据写完了')
// })


ws.on('open', (fd) => {
    console.log('open', fd)
})


// colose 是在数据写入操作全部完成之后再执行
ws.on('close',()=>{
    console.log('文件关闭了');
})

ws.write('0');

// end 执行之后就意味着数据写入操作完成
ws.end('jiang');  // 可最后写入一次

// ws.write('2');

ws.on('error',(err)=>{
    console.log('出错了');
})

write执行流程:node常用内置模块(Stream)

drain事件与读写速度:

/**
 * 需求:"江江学习" 写入指定的文件
 * 01 一次性写入
 * 02 分批写入
 * 对比:对内存的压力不同
 */

const fs = require('fs');

let ws = fs.createWriteStream('test.txt', {
    highWaterMark: 3
});

// ws.write('江江学习');


let source = '江江学习'.split('');
let num = 0;
let flag = true;

function executeWrite() {
    
    while (num != source.length && flag) {
        flag = ws.write(source[num++]);  // 当写入的数据大于等于hightWaterMark时,会返回false
    }
}

executeWrite();

ws.on('drain',()=>{     // 缓存中的数据已经被消费完了,才触发
    console.log('drain 执行了');
    flag = true;
    executeWrite();
})

四、背压机制

让数据在的生产者与消费者平滑流动的机制

1.问题发现

看一段代码发现问题:node常用内置模块(Stream)数据从磁盘读取出来的速度是远远大于写入磁盘的速度的(消费者的速度跟不到生产者的速度的),WriteAble内部维护了一个队列,不能即使的消费数据导致的产能过剩,就会放入该队列中,但队列长度是有上限的,所以在当读写的过程中,如果没有实现被压机制的化,就可能会导致

内存溢出
其它进程运行变慢
GC频繁调用

了解读写机制:

Readable运行机制:node常用内置模块(Stream)

Writeable运行机制:node常用内置模块(Stream)

背压机制基本原理代码:

let fs = require('fs');

let rs = fs.createReadStream('test.txt', {
    highWaterMark: 4       // Readable默认是16,fs中createReadStream默认为64
})

let ws = fs.createWriteStream('test1.txt', {
    highWaterMark: 1
})


let flag = true;
rs.on('data',(chunk)=>{
    flag = ws.write(chunk,()=>{
        console.log('写完了');
    })
    if(!flag){
        rs.pause();
    }
})

ws.on('drain',()=>{
    rs.resume();
})

// 可以直接使用pipe
// rs.pipe(ws);

2.模拟可读流

代码实现:

const fs = require('fs');

const EventEmitter = require('events');

class MyFileReadStream extends EventEmitter {
    constructor(path, options = {}) {
        super();
        this.path = path;
        this.flags = options.flags || 'r';
        this.mode = options.mode || 438;
        this.autoClose = options.autoClose || true;
        this.start = options.start || 0;
        this.end = options.end;
        this.highWaterMark = options.highWaterMark || 64 * 1024;
        this.readOffset = 0;
        this.open();

        // 当监听新的事件时,会被触发
        this.on('newListener', (type) => {
            if (type == 'data') {
                this.read();
            }
        })
    }
    open() {
        fs.open(this.path, this.flags, this.mode, (err, fd) => {
            if (err) {
                // 触发自生error事件,这里是回调函数,执行在同步代之后
                this.emit('error', err);
            }
            this.fd = fd;
            this.emit('open', this.fd);
        });
    }

    read() {
        if (typeof this.fd != 'number') {
            return this.once('open', this.read);
        }
        let buf = Buffer.alloc(this.highWaterMark);

        // let howMuchToRead;      // 每次读多少
        // if (this.end) {
        //     // 判断end是否有存在
        //     howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark);       // 使用剩余未读的字节数与highWaterMark中较小的一个
        // } else {
        //     howMuchToRead = this.highWaterMark;       // 使用剩余未读的字节数与highWaterMark中较小的一个
        // }
                                                            // 可以取到末尾end下标的值,所以这里要加一
        let howMuchToRead = this.end?Math.min(this.end - this.readOffset + 1, this.highWaterMark):this.highWaterMark

        fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {
            if (readBytes) {
                this.readOffset += readBytes;
                this.emit('data', buf.slice(0, readBytes))
                this.read()
            } else {
                this.emit('end')
                this.close()
            }
        })
    }

    close() {
        fs.close(this.fd, () => {
            this.emit('close')
        });
    }
}

let rs = new MyFileReadStream('test.txt', {
    end: 7, // 结束位置的下标,可以取到
    highWaterMark: 3
});


rs.on('open', (fd) => { // 这里是同步代码,监听该事件在触发事件之前
    console.log('open', fd);
})

rs.on('error', (err) => {
    console.log(err);
})

rs.on('data', (chunk) => {
    console.log(chunk)
})

rs.on('end', () => {
    console.log('end')
})

rs.on('close', () => {
    console.log('close')
})

五、链表

使用wirte时,有些被写入的内容需要放入缓存中被排队等待,而且要遵循先进先出的规则,这里使用链表的数据结构来保存这些数据

为什么不使用数组:

数组存储数据的长度具有上限
数组存在塌陷问题

模拟链表实现队列:

class Node {
    constructor(element, next = null) {
        this.element = element;
        this.next = next;
    }
}

class LinkedList {
    constructor() {
        this.head = null;
        this.size = 0
    }
    // 获取指定位置节点
    _getNode(index) {
        if (index < 0 || index >= this.size) {
            throw new Error('getNode --> index error')
        }

        let currentNode = this.head;
        while (index--) {
            currentNode = currentNode.next;
        }
        return currentNode;
    }

    // 确保该下标的位置合法
    _checkIndex(index) {
        if (index < 0 || index >= this.size) {
            throw new Error('index 参数错误')
        }
    }

    add(index, element) {
        if (arguments.length == 1) {
            element = index;
            index = this.size;
        }
        if (index < 0 || index > this.size) {
            throw new Error('index 参数错误')
        }
        let newNode = new Node(element);


        // index == 1 与 index != 1处理方式不同
        if (index == 0) {
            newNode.next = this.head;
            this.head = newNode;
        } else {
            // 获取指定位置的前一个节点
            let prevNode = this._getNode(--index);
            newNode.next = prevNode.next;
            prevNode.next = newNode;
        }
        this.size++;
    }
    remove(index) {
        if (this.size == 0) return undefined;
        this._checkIndex(index);
        let currentNode = this._getNode(index);

        if (index == 0) {
            this.head = currentNode.next;
        } else {
            let prevNode = this._getNode(index - 1);
            prevNode.next = currentNode.next;

        }
        this.size--;
        currentNode.next = null;
        return currentNode;
    }


    set(index, element) {
        this._checkIndex(index);
        this._getNode(index).element = element;
    }


    get(index) {
        this._checkIndex(index);
        let currentNode = this._getNode(index);
        currentNode.next = null;
        return currentNode;
    }

    clear() {
        this.head = null;
        this.size = 0;
    }
}
class Queue {
    constructor() {
        this.linkedList = new LinkedList();
    }
    enQueue(data) {
        this.linkedList.add(data);
    }
    deQueue() {
        return this.linkedList.remove(0);
    }
}

const q = new Queue();
q.enQueue('node1');
q.enQueue('node2');

console.log(q.deQueue());
console.log(q.deQueue());
console.log(q.deQueue());
console.log(q)

模拟可写流:

const fs = require('fs');
const EventsEmitter = require('events');
const Queue = require('./linkedlist');

class MyWriteStream extends EventsEmitter {
    constructor(path, options = {}) {
        super();
        this.path = path;
        this.flags = options.flags || 'w';
        this.mode = options.mode || 438;
        this.autoClose = options.autoClose || true;
        this.start = options.start || 0;
        this.end = options.end
        this.encoding = options.encoding || 'utf8';
        this.highWaterMark = options.highWaterMark || 16 * 1024;
        this.writeOffset = this.start;
        this.writing = false;
        this.writeLen = 0;
        this.needDrain = false;
        this.cache = new Queue();

        this.open();
    }

    open() {
        // 原生 fs.open
        fs.open(this.path, this.flags, (err, fd) => {
            if (err) {
                this.emit('error', err);
                return;
            }
            this.fd = fd;
            this.emit('open', fd);
        })
    }

    write(chunk, encoding, cb) {
        // 统一成buffer
        chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk);
        this.writeLen += chunk.length;
        let flag = this.writeLen < this.highWaterMark;
        this.needDrain = !flag;

        if (this.writing) {
            // 当前 是 正在写入状态,所以在这里将数据存入队列
            this.cache.enQueue({
                chunk,
                encoding,
                cb
            })
        } else {
            // 当前 不是 正在写入状态,所以在这里执行写入
            this.writing = true;
            this._write(chunk, encoding, cb);
            // this.writing = false;

        }

        return flag;
    }

    _write(chunk, encoding, cb) {
        if (typeof this.fd != 'number') {
            return this.once('open', () => {
                return this._write(chunk, encoding, () => {
                    cb()
                    // 清空排队的内容
                    this._clearBuffer();
                });
            })
        }
        fs.write(this.fd, chunk, this.start, chunk.length, this.writeOffset, (err, written) => {
            this.writeOffset += written;
            this.writeLen -= written;
            cb && cb();
        })
    }

    _clearBuffer() {
        let data = this.cache.deQueue();
        if(data){
            this._write(data.element.chunk,data.element.encoding,()=>{
                data.element.cb();
                this._clearBuffer();
            })
        }else{
            if(this.needDrain){
                this.needDrain = false;
                this.emit('drain')
            }
        }
    }
}

let mws = new MyWriteStream('f04.txt', {
    highWaterMark: 4
});

mws.on('open', (fd) => {
    console.log('open--->', fd)
})

pipe方法的使用:

const fs = require('fs');

const rs = fs.createReadStream('./f04.txt', {
    highWaterMark: 4     // 默认64kb
});

const ws = fs.createWriteStream('./f04_copy.txt', {
    highWaterMark: 1     // 默认16kb
})
rs.pipe(ws);

// data 需要查看数据,可监听rs data事件

自定义的pipe方法(有问题,没找出来):

const fs = require('fs');

const EventEmitter = require('events');

class MyFileReadStream extends EventEmitter {
    constructor(path, options = {}) {...}
    open() {...}
    read() {...}
    close() {...}

    pipe(ws){
        this.on('data',(data)=>{
            let flag = ws.write(data);
            if(!flag){
                // 读数据的缓存满了。开启暂停
                this.pause();
                // 找不到该方法
            }
        });
        this.on('drain',()=>{
            // 缓存中的数据被消费完了,继续开启数据读入缓存
            this.resume();
        })
    }
}