likes
comments
collection
share

详解node.js中的 stream 流操作对象

作者站长头像
站长
· 阅读数 3

node.js中的流就是处理流式数据的抽象接口,是流操作抽象接口的集合,可读,可写,双工,转换是单一抽象具体体现。

流操作的核心功能就是处理数据,node.js中处理数据模块继承了流和EventEmitter

readFile方式读取文件:

详解node.js中的 stream 流操作对象

常见问题:

  • 同步读取资源文件,用户需要等待数据被读取完成
  • 资源文件最终一次性加载自内存,开销较大

流操作文件:

详解node.js中的 stream 流操作对象

流处理数据的优势:

  • 时间效率

流的分段处理可以同时操作多个数据chunk

  • 空间效率

同一时间流无须占据大的内存空间

  • 使用方便

流配合管理,扩展程序变得简单

详解node.js中的 stream 流操作对象

一. 什么是stream

node.js中内置了stream,实现了流操作对象

  1. 流的分类

  • Readable:可读流,能够实现数据的读取
  • Writeable:可写流,能够实现数据的写操作
  • Duplex:双工流,即可读又可写
  • Tranform:转换流,可读可写,还能实现数据转换

二. 可读流

生产供程序消费数据的流

const fs = require('fs')

const rs = fs.createReadStream('test.txt')


rs.pipe(process.stdout)
  1. 自定义可读流?

  • 继承stream里的Readable
  • 重写_read方法调用push产出数据
const { Readable } = require('stream')


// 模拟底层数据
let source = ['lg', 'zce', 'syy']


// 自定义继承 Readable

class MyReadable extends Readable{
    constructor(){
        super(source)
        this.source = source
    }

    // 重写read
    _read(){
        let data = this.source.shift() || null
        this.push(data)
    }
}



// 实例化

let myReadable = new MyReadable(source)

// myReadable.on('readable', ()=>{
//     let data = null
//     while((data = myReadable.read()) !== null){
//         console.log(data.toString()) // lgzce syy
//     }
// })

myReadable.on('data', chunk=>{
    console.log(chunk.toString()) //lg  zce  syy
})

三. 可写流

用于消费数据的流

const { Writable } = require('stream')

class MyWriteable extends Writable{
    constructor(){
        super()
    }

    _write(chunk, en, done){
        process.stdout.write(chunk.toString() + '------>')
        process.nextTick(done)
    }
}

let myWriteable = new MyWriteable()

myWriteable.write('vernon', 'utf-8', ()=>{
    console.log('end')
})

四. 双工流和转换流

Duplex是双工流,既能生产又能消费

Transform也是一个双工流

  1. Duplex自定义转换流

  • 继承Duplex类
  • 重写_read方法,调用push生产数据
  • 重写_write方法,调用write消费数据
let { Duplex } = require('stream')

class MyDuplex extends Duplex{
    constructor(source){
        super()
        this.source = source
    }
    _read(){
        let data = this.source.shift() || null
        this.push(data)
    }

    _write(chunk, en, next){
        process.stdout.write(chunk)
        process.nextTick(next)
    }
}

let source = ['a', 'b', 'c']
let myDuplex = new MyDuplex(source)

// myDuplex.on('data', chunk=>{
//     console.log(chunk.toString())  // a b c
// })

myDuplex.write('vernon', ()=>{
    console.log('2222')  //vernon222
})
  1. Transform自定义转换流

  • 继承Transform类
  • 重写_transform方法,调用push和callback
  • 重写_flush方法,处理剩余数据
const { Transform }  = require('stream')

class MyTransform extends Transform{
    constructor(){
        super()
    }

    _transform(chunk, en, cb){
        this.push(chunk.toString().toUpperCase())
        cb(null)
    }
}

let t = new MyTransform()

t.write('a')

t.on('data', chunk=>{
    console.log(chunk.toString()  //A
})

五. 文件可读流创建和消费

  1. 创建以及消费

readable事件:当流中存在可读取数据时触发

data事件:当流中数据块传给消费者后触发

  • 监听data方式
const fs = require('fs')


let rs = fs.createReadStream('test.txt', {
    flags: 'r', // 以什么方式对文件进行操作,r表示可读模式
    encoding: null, // 编码设置,buffer
    fd: null, // 文件标志符。从3开始
    mode: 438, // 权限位
    autoClose: true, // 是否是自动关闭文件
    start: 0, // 从哪个位置开始读取
    // end: 3, //结束
    highWaterMark: 4 //水位线,表示每次要读多少个字节的数据
})

rs.on('data', chunk=>{
    console.log(chunk.toString()) //01 23 45 67 89
    rs.pause() // 缓存区里的数据读完会暂停
    setTimeout(()=>{
        rs.resume() // 开始读取
    }, 1000)
    // 每一秒钟输出数据
})
  • 监听readable方式
const fs = require('fs')

let rs = fs.createReadStream('test.txt', {
    flags: 'r', // 以什么方式对文件进行操作,r表示可读模式
    encoding: null, // 编码设置,buffer
    fd: null, // 文件标志符。从3开始
    mode: 438, // 权限位
    autoClose: true, // 是否是自动关闭文件
    start: 0, // 从哪个位置开始读取
    // end: 3, //结束
    highWaterMark: 4 //水位线,表示每次要往暂存区放入多少个字节的数据
})

rs.on('readable', ()=>{
    // let data = rs.read()
    let data
    while((data = rs.read(1)) !== null){
        // 表示每次读一个字节
        console.log(data.toString())
        console.log('-------->', rs._readableState.length)
    }
})


0    // 使用read读出的字节,

--------> 3 //使用highWaterMark放入暂存区,被read读出后剩下的

1

--------> 2

2

--------> 1

3

--------> 0

4

--------> 3

5

--------> 2

6

--------> 1

7

--------> 0

8

--------> 1

9

--------> 0
  1. 其他事件

opne

// createReadStream被调用,就会触发

rs.on('open', fd=>{
    console.log(fd, '文件被打开了');
})

data

// 消费数据

rs.on('data',chunk=>{
    console.log(chunk);
})

end

// 当数据被清空时触发

rs.on('end', ()=>{
    console.log('数据被清空');
})

close

// 操作完成之后触发

rs.on('close', ()=>{
    console.log('文件被关闭了');
})

error

// 文件读取出错时触发

rs.on('error', err=>{
    console.log('出错');
})
  1. 案例(处理读取数据)

let bufferAry = []

rs.on('data',chunk=>{
    bufferAry.push(chunk)
})

rs.on('end', ()=>{
    console.log(Buffer.concat(bufferAry).toString()); // 0123456789
})

六. 文件可写流创建与写入

  1. 创建以及写入

const fs = require('fs')

let ws = fs.createWriteStream('test.txt', {
    flags: 'w',
    mode: 438,
    fd: null,
    encoding: "utf-8",
    start: 0,
    highWaterMark: 3
})


//ws.write('vernon', ()=>{
//    console.log('写入成功');
//})

let buf = Buffer.from('abc')
ws.write(buf, ()=>{
    console.log('写入成功');
})

// 只允许输入字符串或buffer
  1. 其他事件

open

ws.on('open', (fd)=>{

    console.log('open', fd);

})

write()

//写入

ws.write('vernon', ()=>{
    console.log('写入成功');
})

end

// 写入操作完成后触发,可以执行最后一次写入
ws.end('hahah')

close

// 全部操作完成时触发

ws.on('close', ()=>{
    console.log('文件被关闭了');
})

error

// 出错时触发

ws.on('error', err=>{
    console.log('出错');
})

七. write执行流程

drain事件

当填入的数据大于等于highWaterMark时触发

模拟文件可读流

const fs = require('fs')
const EventEmitter = require('events')

class MyFileReadStream extends EventEmitter{

  constructor(path, options = {}) {
    super()
    this.path = path
    this.flags = options.flags || "r"
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true 
    this.start = options.start || 0
    this.end = options.end 
    this.highWaterMark = options.highWaterMark || 64 * 1024 
    this.readOffset = 0

    this.open()

    this.on('newListener', (type) => {
      if (type === 'data') {
        this.read()
      }
    })
  }

  open() {

    // 原生 open 方法来打开指定位置上的文件
    fs.open(this.path, this.flags, this.mode, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }

      // fd帮助我们定位目标文件
      this.fd = fd
      this.emit('open', fd)
    })
  }

  read() {
    if (typeof this.fd !== 'number') {
      return this.once('open', this.read)
    }

    let buf = Buffer.alloc(this.highWaterMark)
    let howMuchToRead

    /* if (this.end) {
      howMuchToRead = Math.min(this.end - this.readOffset + 1, this.highWaterMark)
    } else {
      howMuchToRead = this.highWaterMark
    } */

    howMuchToRead = this.end ? Math.min(this.end - this.readOffset + 1, this.highWaterMark) : this.highWaterMark

    fs.read(this.fd, buf, 0, howMuchToRead, this.readOffset, (err, readBytes) => {

      if (readBytes) {
        this.readOffset += readBytes
        this.emit('data', buf.slice(0, readBytes))
        this.read()

      } else {
        this.emit('end')
        this.close()
      }
    })
  }

  close() {
    fs.close(this.fd, () => {
      this.emit('close')
    })
  }
}

let rs = new MyFileReadStream('test.txt', {
  end: 7,
  highWaterMark: 3
})

rs.on('data', (chunk) => {
  console.log(chunk)
})

模拟文件可写流

const fs = require('fs')
const EventsEmitter = require('events')
const Queue = require('./linkedlist')

class MyWriteStream extends EventsEmitter{

  constructor(path, options={}) {
    super()
    this.path = path
    this.flags = options.flags || 'w'
    this.mode = options.mode || 438
    this.autoClose = options.autoClose || true 
    this.start = options.start || 0
    this.encoding = options.encoding || 'utf8'
    this.highWaterMark = options.highWaterMark || 16*1024

    this.open()

    this.writeoffset = this.start 
    this.writing = false 
    this.writeLen = 0
    this.needDrain = false 
    this.cache = new Queue()
  }

  open() {

    // 原生 fs.open 
    fs.open(this.path, this.flags, (err, fd) => {
      if (err) {
        this.emit('error', err)
      }

      // 正常打开文件
      this.fd = fd 
      this.emit('open', fd)
    })
  }

  write(chunk, encoding, cb) {
    chunk = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)
    this.writeLen += chunk.length
    let flag = this.writeLen < this.highWaterMark
    this.needDrain = !flag

    if (this.writing) {
      // 当前正在执行写入,所以内容应该排队
      this.cache.enQueue({chunk, encoding, cb})
    } else {
      this.writing = true

      // 当前不是正在写入那么就执行写入
      this._write(chunk, encoding, () => {
        cb()
        // 清空排队的内容
        this._clearBuffer()
      })
    }

    return flag

  }

  _write(chunk, encoding, cb) {
    if (typeof this.fd !== 'number') {
      return this.once('open', ()=>{return this._write(chunk, encoding, cb)})
    }

    fs.write(this.fd, chunk, this.start, chunk.length, this.writeoffset, (err, written) => {
      this.writeoffset += written

      this.writeLen -= written

      cb && cb()

    })

  }

  _clearBuffer() {
    let data = this.cache.deQueue()

    if (data) {
      this._write(data.element.chunk, data.element.encoding, ()=>{
        data.element.cb && data.element.cb()
        this._clearBuffer()
      })

    } else {
      if (this.needDrain) {
        this.needDrain = false 
        this.emit('drain')

      }

    }

  }

}

const ws = new MyWriteStream('./f9.txt', {})

ws.on('open', (fd) => {
  console.log('open---->', fd)
})


let flag = ws.write('1', 'utf8', () => {
  console.log('ok1')
})


flag = ws.write('10', 'utf8', () => {

  console.log('ok1')

})

flag = ws.write('拉勾教育', 'utf8', () => {
  console.log('ok3')
})

ws.on('drain', () => {
  console.log('drain')
})

pipe方法

const fs = require('fs')

const rs = fs.createReadStream('test.txt', {
    highWaterMark: 4
})

const ws = fs.createWriteStream('test1.txt', {
    highWaterMark: 1
})

rs.pipe(ws)

模拟实现

pipe(ws){
    this.on('data', data=>{
        let flag = ws.write(data)
        if(!flag){
            this.pause()
        }
    })

    ws.on('drain', ()=>{
        this.resume()
    })
}