likes
comments
collection
share

浅析flvjs源码

作者站长头像
站长
· 阅读数 37

最近将公司得一个检测系统项目完成后,闲下来就把在项目中用到得flvjs源码看了一遍,梳理出大致得流程,并整理成文档,分享给大家。

当前使用flvjs,必要得步骤一般都是这几步:

1. 获取video标签
2. 初始化flvjs flvjs.createPlayer(mediaDataSource,config)
3. 将flvjs和video标签关联 flvjs.attachMediaElement(videoDom)
4. 开启加载直播流  flvjs.load()

配置

其中mediaDataSource和config如图所示

浅析flvjs源码

完整的导图在这里

流程图

在进行阅读源码之前,我先放一张整理好的调用流程给你们,绿色代表类,红色线代表调用流程,蓝色线表示类的方法,这个流程图在这里有点模糊,需要高清的可以看这个仓库

浅析flvjs源码

那么首先看一下flv得目录组成,他主要分为下面几个包,

浅析flvjs源码

  • core包主要负责初步处理流数据,并且使用javaScript API MediaSource创建缓冲区接纳后续处理得,能够给video标签播放得视频数据,具体实现下面再细讲。

  • demux包主要是负责解析flv格式视频了,解析完后会丢给core包。

  • io包涉及到的是怎么接收流,里面有webSocket loader和普通得fetch loader,根据url分别使用不同得loader。

  • player包是整个flvjs初始化得起点,也负责事件得分发。

  • remux包负责将流解析封装成video标签可播放得mp4 buffer数据。

  • utils包是工具函数。

整个调用流程作者使用得是node得事件监听Api,通过EventEmitter中得emit和on实现事件得发布和监听。通过上面得流程图就知道这个调用过程其实有点绕,我们化繁为简,只看type = flv,url = webSocket这一种情况,从而窥探一下完整得调用流程,在看下面的分析流程的时候,建议配合上面的流程图一起看。

初始化

引入flvjs后首先调用得就是createPlayer,对应源码在这里

// /flv.js
function createPlayer(mediaDataSource, optionalConfig) {
...
  switch (mds.type) {
    case 'flv':
      return new FlvPlayer(mds, optionalConfig)
    default:
      return new NativePlayer(mds, optionalConfig)
  }
}

所以实际上是创建了一个FlvPlayer类,但构造函数只是初始化了一些参数,有用得是里面得两个方法,attachMediaElement和load,没错,就是上面步骤得第三步和第四步,至此,命运的齿轮开始转动0.0。

首先来看attachMediaElement方法

// player/flv-player.js
// 关联video标签
  attachMediaElement(mediaElement) {
    this._mediaElement = mediaElement
    // 给video标签视频节点绑定监听事件
    mediaElement.addEventListener('loadedmetadata', this.e.onvLoadedMetadata)
    mediaElement.addEventListener('seeking', this.e.onvSeeking)
    mediaElement.addEventListener('canplay', this.e.onvCanPlay)
    mediaElement.addEventListener('stalled', this.e.onvStalled)
    mediaElement.addEventListener('progress', this.e.onvProgress)

    this._msectl = new MSEController(this._config)
		...

    // 去看mse-controller.js里面的attachMediaElement方法
    this._msectl.attachMediaElement(mediaElement)
		...
  }

在这里初始化了一个MSEController实例,在这个类中,完成了video标签和视频流之间的关联

// core/mse-controller.js
attachMediaElement(mediaElement) {
    if (this._mediaSource) {
      throw new IllegalStateException('MediaSource has been attached to an HTMLMediaElement!')
    }
    let ms = (this._mediaSource = new window.MediaSource())
    ms.addEventListener('sourceopen', this.e.onSourceOpen)
    ms.addEventListener('sourceended', this.e.onSourceEnded)
    ms.addEventListener('sourceclose', this.e.onSourceClose)

    this._mediaElement = mediaElement
    // 将MediaSource对象转为url喂给video标签
    this._mediaSourceObjectURL = window.URL.createObjectURL(this._mediaSource)
    // mediaElement就是video标签
    mediaElement.src = this._mediaSourceObjectURL
  }

这几行是关键代码,没有这个关联video标签是看不到视频的。重点在于MediaSource的创建,这个API是js的底层API,用于允许 JavaScript 控制媒体流的生成和管理,以便在 <video><audio> 元素中播放。它允许你动态地生成媒体片段并将其添加到媒体流中,从而实现流式传输和动态加载媒体数据,例如在网络上流式传输视频。所以记住_mediaSource 这个变量,后面有大用。

load

接下来看load函数,这个函数特别关键。关键代码如下

// player/flv-player.js
load() {
	...
	// TODO step1: load()方法关联transmuxer.js
    this._transmuxer = new Transmuxer(this._mediaDataSource, this._config)

    this._transmuxer.on(TransmuxingEvents.INIT_SEGMENT, (type, is) => {
      this._msectl.appendInitSegment(is)
    })

    // TODO step2: 监听流片段过来
    this._transmuxer.on(TransmuxingEvents.MEDIA_SEGMENT, (type, ms) => {
      // TODO step3: 交给mse-controller.js处理流,喂给video标签
      this._msectl.appendMediaSegment(ms)

      // lazyLoad check
      if (this._config.lazyLoad && !this._config.isLive) {
        let currentTime = this._mediaElement.currentTime
        if (ms.info.endDts >= (currentTime + this._config.lazyLoadMaxDuration) * 1000) {
          if (this._progressChecker == null) {
            Log.v(this.TAG, 'Maximum buffering duration exceeded, suspend transmuxing task')
            this._suspendTransmuxer()
          }
        }
      }
    })
  this._transmuxer.open()
}

我们先看这两句

this._transmuxer = new Transmuxer(this._mediaDataSource, this._config)
this._transmuxer.open()

// core/transmuxer.js
constructor(mediaDataSource, config) {
    this.TAG = 'Transmuxer'
    this._emitter = new EventEmitter()
    
    // !是否启用分离线程转换
    if (config.enableWorker && typeof Worker !== 'undefined') {
			...
    } else {
      // 没有采用worker处理数据
      this._controller = new TransmuxingController(mediaDataSource, config)
    }
		// 订阅事件
    if (this._controller) {
      let ctl = this._controller
      ctl.on(TransmuxingEvents.IO_ERROR, this._onIOError.bind(this))
      ctl.on(TransmuxingEvents.DEMUX_ERROR, this._onDemuxError.bind(this))
      ctl.on(TransmuxingEvents.INIT_SEGMENT, this._onInitSegment.bind(this))
      ctl.on(TransmuxingEvents.MEDIA_SEGMENT, this._onMediaSegment.bind(this))
      ctl.on(TransmuxingEvents.LOADING_COMPLETE, this._onLoadingComplete.bind(this))
      ctl.on(TransmuxingEvents.RECOVERED_EARLY_EOF, this._onRecoveredEarlyEof.bind(this))
      ctl.on(TransmuxingEvents.MEDIA_INFO, this._onMediaInfo.bind(this))
      ctl.on(TransmuxingEvents.METADATA_ARRIVED, this._onMetaDataArrived.bind(this))
      ctl.on(TransmuxingEvents.SCRIPTDATA_ARRIVED, this._onScriptDataArrived.bind(this))
      ctl.on(TransmuxingEvents.STATISTICS_INFO, this._onStatisticsInfo.bind(this))
      ctl.on(TransmuxingEvents.RECOMMEND_SEEKPOINT, this._onRecommendSeekpoint.bind(this))
    }
  }

很明显是调用了Transmuxer实例的open方法

// core/transmuxer.js
open() {
    if (this._worker) {
      this._worker.postMessage({ cmd: 'start' })
    } else {
      this._controller.start()
    }
  }

我们没有设置config中的使用worker处理设置的选项,所以直接走else,调用的是TransmuxingController中的start方法

// core/transmuxing-controller.js
start() {
    // 开始加载流片段,从0开始
    this._loadSegment(0)
    this._enableStatisticsReporter()
  }

// 重点是_loadSegment函数
_loadSegment(segmentIndex, optionalFrom) {
    this._currentSegmentIndex = segmentIndex
    let dataSource = this._mediaDataSource.segments[segmentIndex]

    let ioctl = (this._ioctl = new IOController(dataSource, this._config, segmentIndex))
    ioctl.onError = this._onIOException.bind(this)
    ioctl.onSeeked = this._onIOSeeked.bind(this)
    ioctl.onComplete = this._onIOComplete.bind(this)
    ioctl.onRedirect = this._onIORedirect.bind(this)
    ioctl.onRecoveredEarlyEof = this._onIORecoveredEarlyEof.bind(this)

    if (optionalFrom) {
      // 把ioController传给数据处理器
      // 在里面绑定了数据处理函数 this._ioctl.onDataArrival
      // 为下面的open做准备
      // 数据流过来会提交给this._ioctl.onDataArrival处理
      this._demuxer.bindDataSource(this._ioctl)
    } else {
      // 初始化数据处理函数
      ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
    }

    // 打开io
    ioctl.open(optionalFrom)
  }

上面有三行重点代码, 我们来一步一步看

let ioctl = (this._ioctl = new IOController(dataSource, this._config, segmentIndex))
ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
ioctl.open(optionalFrom)
  1. 首先是new IOController()

    // io/io-controller.js
    constructor(dataSource, config, extraData) {
        ...
        this._selectSeekHandler()
        this._selectLoader()
        this._createLoader()
      }
    
    // _slectSeekHandler先不管,看_selectLoder和_createLoader方法
    // 这两个就是上面说得根据url来选择创建不同的loader方式
    // 获取loader类型
      _selectLoader() {
        if (this._config.customLoader != null) {
          this._loaderClass = this._config.customLoader
        } else if (this._isWebSocketURL) {
          // websocket链接
          this._loaderClass = WebSocketLoader
        } else if (FetchStreamLoader.isSupported()) {
          // 普通http链接
          this._loaderClass = FetchStreamLoader
        } else if (MozChunkedLoader.isSupported()) {
          this._loaderClass = MozChunkedLoader
        } else if (RangeLoader.isSupported()) {
          this._loaderClass = RangeLoader
        } else {
          throw new RuntimeException("Your browser doesn't support xhr with arraybuffer responseType!")
        }
      }
    
    // 这里我们进入的是WebSocketLoader
    // 根据loader类型创建loader
      _createLoader() {
        this._loader = new this._loaderClass(this._seekHandler, this._config)
        if (this._loader.needStashBuffer === false) {
          this._enableStash = false
        }
        this._loader.onContentLengthKnown = this._onContentLengthKnown.bind(this)
        this._loader.onURLRedirect = this._onURLRedirect.bind(this)
        // 绑定数据到达调用函数,ws对应在_onWebSocketMessage方法中
        this._loader.onDataArrival = this._onLoaderChunkArrival.bind(this)
        this._loader.onComplete = this._onLoaderComplete.bind(this)
        this._loader.onError = this._onLoaderError.bind(this)
      }
    

    这里注意下onDataArrival ,其实在整个flvjs中,onDataArrival 这个方法至关重要,它是整个二进制数据的流向,它采用的是js class语法中的set,get方法绑定上一层clas的回调函数。

    get onDataArrival() {
      return this._onDataArrival
    }
    
    set onDataArrival(callback) {
      this._onDataArrival = callback
    }
    

    所以在websocket-loader.js中,触发_onDataArrival方法其实就是调用io-controller.js中的_onLoaderChunkArrival方法,其他地方也是如此。

回到 ioctl.onDataArrival = this._onInitChunkArrival.bind(this)这一句,原理相同,这里是绑定了数据处理函数,我们下接着看open方法。

open

open方法自然也是调用的是websocket-loader.js中的open方法

// io/websocket-loader.js
open(dataSource) {
    try {
      // 建立ws连接
      let ws = (this._ws = new self.WebSocket(dataSource.url))
      // 定义接收类型
      ws.binaryType = 'arraybuffer'
      // 绑定事件
      ws.onopen = this._onWebSocketOpen.bind(this)
      ws.onclose = this._onWebSocketClose.bind(this)
      // 重点看这里
      ws.onmessage = this._onWebSocketMessage.bind(this)
      ws.onerror = this._onWebSocketError.bind(this)

      this._status = LoaderStatus.kConnecting
    } catch (e) {
			...
    }
  }

_onWebSocketMessage(e) {
    if (e.data instanceof ArrayBuffer) {
      // TODO websocket数据进来这里
      time.curTime = new Date().getTime()
      this._dispatchArrayBuffer(e.data)
    } else if (e.data instanceof Blob) {
      let reader = new FileReader()
      reader.onload = () => {
        this._dispatchArrayBuffer(reader.result)
      }
      reader.readAsArrayBuffer(e.data)
    } else {
    }
  }

//   调度ArrayBuffer
  _dispatchArrayBuffer(arraybuffer) {
    let chunk = arraybuffer
    let byteStart = this._receivedLength
    this._receivedLength += chunk.byteLength

    if (this._onDataArrival) {
      // 数据开始往上传递了
      this._onDataArrival(chunk, byteStart, this._receivedLength)
    }
  }

在这里建立ws后,相当于打通了数据源,flv流开始源源不断的通过ws过来,然后数据就开始往上传递了。

消费数据流

既然有数据进来了,那下一步自然就是消费数据了。数据从ws进来,上一层就是io-controller,里面绑定的回调函数是_onLoaderChunkArrival

// io/io-controller.js
// TODO 数据到达时处理函数
  _onLoaderChunkArrival(chunk, byteStart, receivedLength) {
    if (!this._onDataArrival) {
      throw new IllegalStateException('IOController: No existing consumer (onDataArrival) callback!')
    }
    if (this._paused) {
      return
    }
    if (this._isEarlyEofReconnecting) {
      // Auto-reconnect for EarlyEof succeed, notify to upper-layer by callback
      this._isEarlyEofReconnecting = false
      if (this._onRecoveredEarlyEof) {
        this._onRecoveredEarlyEof()
      }
    }

    this._speedSampler.addBytes(chunk.byteLength)

    // adjust stash buffer size according to network speed dynamically
    // TODO 获取当前的网速
    let KBps = this._speedSampler.lastSecondKBps
    if (KBps !== 0) {
      // TODO 正规化网速
      let normalized = this._normalizeSpeed(KBps)
      if (this._speedNormalized !== normalized) {
        this._speedNormalized = normalized
        this._adjustStashSize(normalized)
      }
    }

    if (!this._enableStash) {
      // disable stash
      // 缓存中没有数据的情况
      if (this._stashUsed === 0) {
        // dispatch chunk directly to consumer;
        // check ret value (consumed bytes) and stash unconsumed to stashBuffer
        // 直接消费
        let consumed = this._dispatchChunks(chunk, byteStart)
        // 如果有剩余
        if (consumed < chunk.byteLength) {
          // unconsumed data remain.
          // 未处理的数据长度
          let remain = chunk.byteLength - consumed
          // 如果数据超过缓存,则扩展缓存
          if (remain > this._bufferSize) {
            this._expandBuffer(remain)
          }
          // 在_stashBuffer上创建 Uint8Array使其可以操作
          let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
          // 从chunk的 consumed开始获取数据 然后从第0位置开始写入stashArray中
          stashArray.set(new Uint8Array(chunk, consumed), 0)
          // 记录stashUsed的大小
          this._stashUsed += remain
          // 记录整个流中的开始位置
          this._stashByteStart = byteStart + consumed
        }
      } else {
        // else: Merge chunk into stashBuffer, and dispatch stashBuffer to consumer.
        // 缓存中有数据的情况
        // 先扩展缓存 能够放下已存在的和当前获取的
        if (this._stashUsed + chunk.byteLength > this._bufferSize) {
          this._expandBuffer(this._stashUsed + chunk.byteLength)
        }
        let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
        // 先把获取到的chunk 放入缓存中 从_stashUsed的offset开始存放
        stashArray.set(new Uint8Array(chunk), this._stashUsed)
        // 重置_stashUsed
        this._stashUsed += chunk.byteLength
        // 把缓存中的数据全部读出进行消费
        let consumed = this._dispatchChunks(this._stashBuffer.slice(0, this._stashUsed), this._stashByteStart)
        // 如果消费了有剩余
        if (consumed < this._stashUsed && consumed > 0) {
          // unconsumed data remain
          // 从consumed开始截取数据
          let remainArray = new Uint8Array(this._stashBuffer, consumed)
          // 从0开始设置 剩下的数据作为缓存 并且改变_stashUsed 记录缓存的位置
          stashArray.set(remainArray, 0)
        }
        // 重新设置_stashUsed
        this._stashUsed -= consumed
        this._stashByteStart += consumed
      }
    } else {
      // enable stash
      if (this._stashUsed === 0 && this._stashByteStart === 0) {
        // seeked? or init chunk?
        // This is the first chunk after seek action
        this._stashByteStart = byteStart
      }
      if (this._stashUsed + chunk.byteLength <= this._stashSize) {
        // just stash
        let stashArray = new Uint8Array(this._stashBuffer, 0, this._stashSize)
        stashArray.set(new Uint8Array(chunk), this._stashUsed)
        this._stashUsed += chunk.byteLength
      } else {
        // stashUsed + chunkSize > stashSize, size limit exceeded
        let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
        if (this._stashUsed > 0) {
          // There're stash datas in buffer
          // dispatch the whole stashBuffer, and stash remain data
          // then append chunk to stashBuffer (stash)
          // 如果有缓存 先消费缓存中的数据
          let buffer = this._stashBuffer.slice(0, this._stashUsed)
          let consumed = this._dispatchChunks(buffer, this._stashByteStart)
          if (consumed < buffer.byteLength) {
            if (consumed > 0) {
              let remainArray = new Uint8Array(buffer, consumed)
              stashArray.set(remainArray, 0)
              this._stashUsed = remainArray.byteLength
              this._stashByteStart += consumed
            }
          } else {
            this._stashUsed = 0
            this._stashByteStart += consumed
          }
          //  消费完缓存中的数据之后,然后再把这次过来的chunk放入缓存中
          if (this._stashUsed + chunk.byteLength > this._bufferSize) {
            this._expandBuffer(this._stashUsed + chunk.byteLength)
            stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
          }
          stashArray.set(new Uint8Array(chunk), this._stashUsed)
          this._stashUsed += chunk.byteLength
        } else {
          // stash buffer empty, but chunkSize > stashSize (oh, holy shit)
          // dispatch chunk directly and stash remain data
          // 如果缓存中没有数据,直接消费本次数据
          let consumed = this._dispatchChunks(chunk, byteStart)
          if (consumed < chunk.byteLength) {
            let remain = chunk.byteLength - consumed
            if (remain > this._bufferSize) {
              this._expandBuffer(remain)
              stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
            }
            stashArray.set(new Uint8Array(chunk, consumed), 0)
            this._stashUsed += remain
            this._stashByteStart = byteStart + consumed
          }
        }
      }
    }
  }

这里的代码虽然有点多,但仔细看会发现,不管咋样都是调用了_dispatchChunks这个方法来消费数据,先不看其他逻辑,先看看数据的走向是咋样的。


// io/io-controller.js
_dispatchChunks(chunks, byteStart) {
  this._currentRange.to = byteStart + chunks.byteLength - 1
  return this._onDataArrival(chunks, byteStart)
}

又是_onDataArrival这个函数,那这次它调用的又是哪个实例的回调函数呢?这时候我觉得最绕的一个点来了,让我们回归到transmuxing-controllelr.js中的start方法,里面是调用了this._loadSegment(0),我们来看看这个方法

// core/transmuxing-controller.js
_loadSegment(segmentIndex, optionalFrom) {
		...
    if (optionalFrom) {
      this._demuxer.bindDataSource(this._ioctl)
    } else {
      // optionalFrom为null,所以走这个分支
      ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
    }

    // 打开io
    ioctl.open(optionalFrom)
  }

重点来了!!!

请记住 ioctl.onDataArrival = this._onInitChunkArrival.bind(this)这句代码,目前ioctl的数据处理回调函数是绑定_onInitChunkArrival的对吧,所以io-controller.js中的_dispatchChunks最后也是调用了这个_onInitChunkArrival,那我们去看看这段代码。

// core/transmuxing-controller.js
_onInitChunkArrival(data, byteStart) {
    let probeData = null
    let consumed = 0

    if (byteStart > 0) {
			...
    } else if ((probeData = FLVDemuxer.probe(data)).match) {
			// Always create new FLVDemuxer
      this._demuxer = new FLVDemuxer(probeData, this._config)

      if (!this._remuxer) {
        this._remuxer = new MP4Remuxer(this._config)
      }

      let mds = this._mediaDataSource
			...
      this._remuxer.bindDataSource(this._demuxer.bindDataSource(this._ioctl))

      this._remuxer.onInitSegment = this._onRemuxerInitSegmentArrival.bind(this)
      this._remuxer.onMediaSegment = this._onRemuxerMediaSegmentArrival.bind(this)

      consumed = this._demuxer.parseChunks(data, byteStart)
    } else {
				...
    }

    return consumed
  }

数据最开始进来的时候byteStart是0,通过debugger得知进入的是else if分支。如果你有去调试过这段代码,你会发现一个很奇怪的点:数据通过ws是源源不断的进来的,按逻辑数据流是通过了websocket-loader.js → io-controller.js → transmuxing-controller.js 。按照正常逻辑这个方法应该会一直被调用才对,通过打印信息你会发现,只有在最开始,第一个流片段过来后才触发,后面就再也没有调用了!

数据流的转向发生在这一句代码:this._remuxer.bindDataSource(this._demuxer.bindDataSource(this._ioctl))

在这一分支,分别创建了FLVDemuxer和MP4Remuxer,也就是解析flv格式的关键类。首先看FLVDemuxer中的bandDataSource做了什么

// demux/flv-demuxer.js
bindDataSource(loader) {
  // 数据到达就调用parseChunks处理
  loader.onDataArrival = this.parseChunks.bind(this)
  return this
}

没错!在这里ioctl的onDataArrival 被修改了!!后续的数据交给了parseChunks方法处理,并且renturn了当前flv-demuxer.js的作用域,接下来看mp4-remuxer.js的bandDataSource方法

// remux/mp4-remuxer.js

bindDataSource(producer) {
  producer.onDataAvailable = this.remux.bind(this)
  producer.onTrackMetadata = this._onTrackMetadataReceived.bind(this)
  return this
}

请注意,这里的producer就是之前的this,也就是flv-demuxer的实例this._demuxer,所以这里就相当于

this._demuxer.onDataAvailable  = this.remux.bind(this)
this._demuxer.onTrackMetadata = this._onTrackMetadataReceived.bind(this)

所以当onDataAvailable 被调用时相当于调用了remux方法。故而现在的数据从io-controller.js过来后被flv-demuxer.js接管了,流程如下

浅析flvjs源码

接着就是mp4-remuxer.js

浅析flvjs源码

到了_onMediaSegment这一步,已经把数据封装好了,可以通过事件监听的方式向上传递。而_onMediaSegment还是一个关联的回调函数

// remux/mp4-remuxer.js
get onMediaSegment() {
  return this._onMediaSegment
}

set onMediaSegment(callback) {
  this._onMediaSegment = callback
}

那它在哪里关联了呢,在上面这里

// core/transmuxing-controller.js
_onInitChunkArrival(data, byteStart) {
    let probeData = null
    let consumed = 0

    if (byteStart > 0) {
			...
    } else if ((probeData = FLVDemuxer.probe(data)).match) {
			// Always create new FLVDemuxer
      this._demuxer = new FLVDemuxer(probeData, this._config)

      if (!this._remuxer) {
        this._remuxer = new MP4Remuxer(this._config)
      }

      let mds = this._mediaDataSource
			...
      this._remuxer.bindDataSource(this._demuxer.bindDataSource(this._ioctl))

      this._remuxer.onInitSegment = this._onRemuxerInitSegmentArrival.bind(this)

			// 在这里关联了
      this._remuxer.onMediaSegment = this._onRemuxerMediaSegmentArrival.bind(this)

      consumed = this._demuxer.parseChunks(data, byteStart)
    } else {
				...
    }

    return consumed
  }

所以_onMediaSegment方法相当于调用了_onRemuxerMediaSegmentArrival

// core/transmuxing-controller.js
_onRemuxerMediaSegmentArrival(type, mediaSegment) {
    ...
    // 触发事件,将数据往上传
    this._emitter.emit(TransmuxingEvents.MEDIA_SEGMENT, type, mediaSegment)
		...
  }

而在transmuxer.js中,有个事件监听

constructor(mediaDataSource, config) {
    this.TAG = 'Transmuxer'
    this._emitter = new EventEmitter()
// !是否启用分离线程转换
    if (config.enableWorker && typeof Worker !== 'undefined') {
				...
    } else {
      // 没有采用worker处理数据
      this._controller = new TransmuxingController(mediaDataSource, config)
    }
    ...

    if (this._controller) {
      let ctl = this._controller
			...
      ctl.on(TransmuxingEvents.MEDIA_SEGMENT, this._onMediaSegment.bind(this))
			...
		}
  }

再看_onMediaSegment

// core/transmuxer.js
_onMediaSegment(type, mediaSegment) {
  Promise.resolve().then(() => {
    // 继续触发事件,数据往上传
    this._emitter.emit(TransmuxingEvents.MEDIA_SEGMENT, type, mediaSegment)
  })
}

再往上一层回到flv-player.js了

// player/flv-player.js
// TODO step2: 监听流片段过来
this._transmuxer.on(TransmuxingEvents.MEDIA_SEGMENT, (type, ms) => {
  // TODO step3: 交给mse-controller.js处理流,喂给video标签
  this._msectl.appendMediaSegment(ms)

  ...
})

emmm…又到了mse-controller.js

// core/mse-controller.js

appendMediaSegment(mediaSegment) {
  let ms = mediaSegment
  this._pendingSegments[ms.type].push(ms)

  if (this._config.autoCleanupSourceBuffer && this._needCleanupSourceBuffer()) {
    this._doCleanupSourceBuffer()
  }

  // type有video和audio两种,分别维护
  let sb = this._sourceBuffers[ms.type]
  if (sb && !sb.updating && !this._hasPendingRemoveRanges()) {
    this._doAppendSegments()
  }
}
// TODO 将处理完的数据
_doAppendSegments() {
  let pendingSegments = this._pendingSegments

  for (let type in pendingSegments) {
	    ...
      try {
        // _sourceBuffers是上面创建出来的缓冲区对象,与_mediaSource对象有关联
        this._sourceBuffers[type].appendBuffer(segment.data)
        this._isBufferFull = false
        if (type === 'video' && segment.hasOwnProperty('info')) {
          this._idrList.appendArray(segment.info.syncPoints)
        }
      } catch (error) {
				...
      } 
    }
  }
}

关联video标签

this._sourceBuffers[type].appendBuffer(segment.data)这一句就是终点了。要理解这一句是什么意思,就要回到最开始的attachMediaElement方法。还记得最开始要你们记住的_mediaSource 变量吗?那它和_sourceBuffers有啥关系呢?

其实在一开始第一段流过来的时候还触发了一个函数appendInitSegment,我们看看这个函数做了啥

appendInitSegment(initSegment, deferred) {
		...

    Log.v(this.TAG, 'Received Initialization Segment, mimeType: ' + mimeType)
    this._lastInitSegments[is.type] = is

    if (mimeType !== this._mimeTypes[is.type]) {
      if (!this._mimeTypes[is.type]) {
        // empty, first chance create sourcebuffer
        firstInitSegment = true
        try {
          // TODO 和this._mediaSource添加得缓冲区关联起来,后续得this._sourceBuffers[is.type].addSourceBuffer操作会同步在this._mediaSource得缓冲区中
          let sb = (this._sourceBuffers[is.type] = this._mediaSource.addSourceBuffer(mimeType))
          sb.addEventListener('error', this.e.onSourceBufferError)
          sb.addEventListener('updateend', this.e.onSourceBufferUpdateEnd)
        } catch (error) {
        }
      } else {
        Log.v(this.TAG, `Notice: ${is.type} mimeType changed, origin: ${this._mimeTypes[is.type]}, target: ${mimeType}`)
      }
      this._mimeTypes[is.type] = mimeType
    }
		...
  }

let sb = (this._sourceBuffers[is.type] = this._mediaSource.addSourceBuffer(mimeType)),从这一句代码可以知道_sourceBuffers是MediaSource创建的一个缓冲区对象,它们之间就存在了引用关系,is.type其实就是两个类型,audio和video。而在attachMediaElement方法中

// 
attachMediaElement(mediaElement) {
	...
	let ms = (this._mediaSource = new window.MediaSource())
	...
	
	this._mediaElement = mediaElement
	// 将MediaSource对象转为url喂给video标签
	this._mediaSourceObjectURL = window.URL.createObjectURL(this._mediaSource)
	// mediaElement就是video标签
	mediaElement.src = this._mediaSourceObjectURL
	}

最后,当this._sourceBuffers[type].appendBuffer(segment.data)添加新的片段过去的时候。video标签就可以持续播放实时画面了。

以上就是webSocket拉流到实时播放的一个完整流程,至于源码是怎么解析flv格式到mp4格式的封装,目前笔者才疏学浅,还要再学习一下音视频方面的知识才能分析后续的源码,到时候再写一篇文章了。

转载自:https://juejin.cn/post/7267418218620239872
评论
请登录