浅析flvjs源码
最近将公司得一个检测系统项目完成后,闲下来就把在项目中用到得flvjs源码看了一遍,梳理出大致得流程,并整理成文档,分享给大家。
当前使用flvjs,必要得步骤一般都是这几步:
1. 获取video标签
2. 初始化flvjs flvjs.createPlayer(mediaDataSource,config)
3. 将flvjs和video标签关联 flvjs.attachMediaElement(videoDom)
4. 开启加载直播流 flvjs.load()
配置
其中mediaDataSource和config如图所示
完整的导图在这里
流程图
在进行阅读源码之前,我先放一张整理好的调用流程给你们,绿色代表类,红色线代表调用流程,蓝色线表示类的方法,这个流程图在这里有点模糊,需要高清的可以看这个仓库
那么首先看一下flv得目录组成,他主要分为下面几个包,
-
core包主要负责初步处理流数据,并且使用javaScript API MediaSource创建缓冲区接纳后续处理得,能够给video标签播放得视频数据,具体实现下面再细讲。
-
demux包主要是负责解析flv格式视频了,解析完后会丢给core包。
-
io包涉及到的是怎么接收流,里面有
webSocket loader
和普通得fetch loader
,根据url分别使用不同得loader。 -
player包是整个flvjs初始化得起点,也负责事件得分发。
-
remux包负责将流解析封装成video标签可播放得mp4 buffer数据。
-
utils包是工具函数。
整个调用流程作者使用得是node得事件监听Api,通过EventEmitter中得emit和on实现事件得发布和监听。通过上面得流程图就知道这个调用过程其实有点绕,我们化繁为简,只看type = flv,url = webSocket这一种情况,从而窥探一下完整得调用流程,在看下面的分析流程的时候,建议配合上面的流程图一起看。
初始化
引入flvjs后首先调用得就是createPlayer,对应源码在这里
// /flv.js
function createPlayer(mediaDataSource, optionalConfig) {
...
switch (mds.type) {
case 'flv':
return new FlvPlayer(mds, optionalConfig)
default:
return new NativePlayer(mds, optionalConfig)
}
}
所以实际上是创建了一个FlvPlayer类,但构造函数只是初始化了一些参数,有用得是里面得两个方法,attachMediaElement和load,没错,就是上面步骤得第三步和第四步,至此,命运的齿轮开始转动0.0。
首先来看attachMediaElement方法
// player/flv-player.js
// 关联video标签
attachMediaElement(mediaElement) {
this._mediaElement = mediaElement
// 给video标签视频节点绑定监听事件
mediaElement.addEventListener('loadedmetadata', this.e.onvLoadedMetadata)
mediaElement.addEventListener('seeking', this.e.onvSeeking)
mediaElement.addEventListener('canplay', this.e.onvCanPlay)
mediaElement.addEventListener('stalled', this.e.onvStalled)
mediaElement.addEventListener('progress', this.e.onvProgress)
this._msectl = new MSEController(this._config)
...
// 去看mse-controller.js里面的attachMediaElement方法
this._msectl.attachMediaElement(mediaElement)
...
}
在这里初始化了一个MSEController实例,在这个类中,完成了video标签和视频流之间的关联
// core/mse-controller.js
attachMediaElement(mediaElement) {
if (this._mediaSource) {
throw new IllegalStateException('MediaSource has been attached to an HTMLMediaElement!')
}
let ms = (this._mediaSource = new window.MediaSource())
ms.addEventListener('sourceopen', this.e.onSourceOpen)
ms.addEventListener('sourceended', this.e.onSourceEnded)
ms.addEventListener('sourceclose', this.e.onSourceClose)
this._mediaElement = mediaElement
// 将MediaSource对象转为url喂给video标签
this._mediaSourceObjectURL = window.URL.createObjectURL(this._mediaSource)
// mediaElement就是video标签
mediaElement.src = this._mediaSourceObjectURL
}
这几行是关键代码,没有这个关联video标签是看不到视频的。重点在于MediaSource的创建,这个API是js的底层API,用于允许 JavaScript 控制媒体流的生成和管理,以便在 <video>
和 <audio>
元素中播放。它允许你动态地生成媒体片段并将其添加到媒体流中,从而实现流式传输和动态加载媒体数据,例如在网络上流式传输视频。所以记住_mediaSource
这个变量,后面有大用。
load
接下来看load函数,这个函数特别关键。关键代码如下
// player/flv-player.js
load() {
...
// TODO step1: load()方法关联transmuxer.js
this._transmuxer = new Transmuxer(this._mediaDataSource, this._config)
this._transmuxer.on(TransmuxingEvents.INIT_SEGMENT, (type, is) => {
this._msectl.appendInitSegment(is)
})
// TODO step2: 监听流片段过来
this._transmuxer.on(TransmuxingEvents.MEDIA_SEGMENT, (type, ms) => {
// TODO step3: 交给mse-controller.js处理流,喂给video标签
this._msectl.appendMediaSegment(ms)
// lazyLoad check
if (this._config.lazyLoad && !this._config.isLive) {
let currentTime = this._mediaElement.currentTime
if (ms.info.endDts >= (currentTime + this._config.lazyLoadMaxDuration) * 1000) {
if (this._progressChecker == null) {
Log.v(this.TAG, 'Maximum buffering duration exceeded, suspend transmuxing task')
this._suspendTransmuxer()
}
}
}
})
this._transmuxer.open()
}
我们先看这两句
this._transmuxer = new Transmuxer(this._mediaDataSource, this._config)
this._transmuxer.open()
// core/transmuxer.js
constructor(mediaDataSource, config) {
this.TAG = 'Transmuxer'
this._emitter = new EventEmitter()
// !是否启用分离线程转换
if (config.enableWorker && typeof Worker !== 'undefined') {
...
} else {
// 没有采用worker处理数据
this._controller = new TransmuxingController(mediaDataSource, config)
}
// 订阅事件
if (this._controller) {
let ctl = this._controller
ctl.on(TransmuxingEvents.IO_ERROR, this._onIOError.bind(this))
ctl.on(TransmuxingEvents.DEMUX_ERROR, this._onDemuxError.bind(this))
ctl.on(TransmuxingEvents.INIT_SEGMENT, this._onInitSegment.bind(this))
ctl.on(TransmuxingEvents.MEDIA_SEGMENT, this._onMediaSegment.bind(this))
ctl.on(TransmuxingEvents.LOADING_COMPLETE, this._onLoadingComplete.bind(this))
ctl.on(TransmuxingEvents.RECOVERED_EARLY_EOF, this._onRecoveredEarlyEof.bind(this))
ctl.on(TransmuxingEvents.MEDIA_INFO, this._onMediaInfo.bind(this))
ctl.on(TransmuxingEvents.METADATA_ARRIVED, this._onMetaDataArrived.bind(this))
ctl.on(TransmuxingEvents.SCRIPTDATA_ARRIVED, this._onScriptDataArrived.bind(this))
ctl.on(TransmuxingEvents.STATISTICS_INFO, this._onStatisticsInfo.bind(this))
ctl.on(TransmuxingEvents.RECOMMEND_SEEKPOINT, this._onRecommendSeekpoint.bind(this))
}
}
很明显是调用了Transmuxer实例的open方法
// core/transmuxer.js
open() {
if (this._worker) {
this._worker.postMessage({ cmd: 'start' })
} else {
this._controller.start()
}
}
我们没有设置config中的使用worker处理设置的选项,所以直接走else,调用的是TransmuxingController中的start方法
// core/transmuxing-controller.js
start() {
// 开始加载流片段,从0开始
this._loadSegment(0)
this._enableStatisticsReporter()
}
// 重点是_loadSegment函数
_loadSegment(segmentIndex, optionalFrom) {
this._currentSegmentIndex = segmentIndex
let dataSource = this._mediaDataSource.segments[segmentIndex]
let ioctl = (this._ioctl = new IOController(dataSource, this._config, segmentIndex))
ioctl.onError = this._onIOException.bind(this)
ioctl.onSeeked = this._onIOSeeked.bind(this)
ioctl.onComplete = this._onIOComplete.bind(this)
ioctl.onRedirect = this._onIORedirect.bind(this)
ioctl.onRecoveredEarlyEof = this._onIORecoveredEarlyEof.bind(this)
if (optionalFrom) {
// 把ioController传给数据处理器
// 在里面绑定了数据处理函数 this._ioctl.onDataArrival
// 为下面的open做准备
// 数据流过来会提交给this._ioctl.onDataArrival处理
this._demuxer.bindDataSource(this._ioctl)
} else {
// 初始化数据处理函数
ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
}
// 打开io
ioctl.open(optionalFrom)
}
上面有三行重点代码, 我们来一步一步看
let ioctl = (this._ioctl = new IOController(dataSource, this._config, segmentIndex))
ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
ioctl.open(optionalFrom)
-
首先是new IOController()
// io/io-controller.js constructor(dataSource, config, extraData) { ... this._selectSeekHandler() this._selectLoader() this._createLoader() } // _slectSeekHandler先不管,看_selectLoder和_createLoader方法 // 这两个就是上面说得根据url来选择创建不同的loader方式 // 获取loader类型 _selectLoader() { if (this._config.customLoader != null) { this._loaderClass = this._config.customLoader } else if (this._isWebSocketURL) { // websocket链接 this._loaderClass = WebSocketLoader } else if (FetchStreamLoader.isSupported()) { // 普通http链接 this._loaderClass = FetchStreamLoader } else if (MozChunkedLoader.isSupported()) { this._loaderClass = MozChunkedLoader } else if (RangeLoader.isSupported()) { this._loaderClass = RangeLoader } else { throw new RuntimeException("Your browser doesn't support xhr with arraybuffer responseType!") } } // 这里我们进入的是WebSocketLoader // 根据loader类型创建loader _createLoader() { this._loader = new this._loaderClass(this._seekHandler, this._config) if (this._loader.needStashBuffer === false) { this._enableStash = false } this._loader.onContentLengthKnown = this._onContentLengthKnown.bind(this) this._loader.onURLRedirect = this._onURLRedirect.bind(this) // 绑定数据到达调用函数,ws对应在_onWebSocketMessage方法中 this._loader.onDataArrival = this._onLoaderChunkArrival.bind(this) this._loader.onComplete = this._onLoaderComplete.bind(this) this._loader.onError = this._onLoaderError.bind(this) }
这里注意下onDataArrival ,其实在整个flvjs中,onDataArrival 这个方法至关重要,它是整个二进制数据的流向,它采用的是js class语法中的set,get方法绑定上一层clas的回调函数。
get onDataArrival() { return this._onDataArrival } set onDataArrival(callback) { this._onDataArrival = callback }
所以在websocket-loader.js中,触发_onDataArrival方法其实就是调用io-controller.js中的_onLoaderChunkArrival方法,其他地方也是如此。
回到 ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
这一句,原理相同,这里是绑定了数据处理函数,我们下接着看open方法。
open
open方法自然也是调用的是websocket-loader.js中的open方法
// io/websocket-loader.js
open(dataSource) {
try {
// 建立ws连接
let ws = (this._ws = new self.WebSocket(dataSource.url))
// 定义接收类型
ws.binaryType = 'arraybuffer'
// 绑定事件
ws.onopen = this._onWebSocketOpen.bind(this)
ws.onclose = this._onWebSocketClose.bind(this)
// 重点看这里
ws.onmessage = this._onWebSocketMessage.bind(this)
ws.onerror = this._onWebSocketError.bind(this)
this._status = LoaderStatus.kConnecting
} catch (e) {
...
}
}
_onWebSocketMessage(e) {
if (e.data instanceof ArrayBuffer) {
// TODO websocket数据进来这里
time.curTime = new Date().getTime()
this._dispatchArrayBuffer(e.data)
} else if (e.data instanceof Blob) {
let reader = new FileReader()
reader.onload = () => {
this._dispatchArrayBuffer(reader.result)
}
reader.readAsArrayBuffer(e.data)
} else {
}
}
// 调度ArrayBuffer
_dispatchArrayBuffer(arraybuffer) {
let chunk = arraybuffer
let byteStart = this._receivedLength
this._receivedLength += chunk.byteLength
if (this._onDataArrival) {
// 数据开始往上传递了
this._onDataArrival(chunk, byteStart, this._receivedLength)
}
}
在这里建立ws后,相当于打通了数据源,flv流开始源源不断的通过ws过来,然后数据就开始往上传递了。
消费数据流
既然有数据进来了,那下一步自然就是消费数据了。数据从ws进来,上一层就是io-controller,里面绑定的回调函数是_onLoaderChunkArrival
// io/io-controller.js
// TODO 数据到达时处理函数
_onLoaderChunkArrival(chunk, byteStart, receivedLength) {
if (!this._onDataArrival) {
throw new IllegalStateException('IOController: No existing consumer (onDataArrival) callback!')
}
if (this._paused) {
return
}
if (this._isEarlyEofReconnecting) {
// Auto-reconnect for EarlyEof succeed, notify to upper-layer by callback
this._isEarlyEofReconnecting = false
if (this._onRecoveredEarlyEof) {
this._onRecoveredEarlyEof()
}
}
this._speedSampler.addBytes(chunk.byteLength)
// adjust stash buffer size according to network speed dynamically
// TODO 获取当前的网速
let KBps = this._speedSampler.lastSecondKBps
if (KBps !== 0) {
// TODO 正规化网速
let normalized = this._normalizeSpeed(KBps)
if (this._speedNormalized !== normalized) {
this._speedNormalized = normalized
this._adjustStashSize(normalized)
}
}
if (!this._enableStash) {
// disable stash
// 缓存中没有数据的情况
if (this._stashUsed === 0) {
// dispatch chunk directly to consumer;
// check ret value (consumed bytes) and stash unconsumed to stashBuffer
// 直接消费
let consumed = this._dispatchChunks(chunk, byteStart)
// 如果有剩余
if (consumed < chunk.byteLength) {
// unconsumed data remain.
// 未处理的数据长度
let remain = chunk.byteLength - consumed
// 如果数据超过缓存,则扩展缓存
if (remain > this._bufferSize) {
this._expandBuffer(remain)
}
// 在_stashBuffer上创建 Uint8Array使其可以操作
let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
// 从chunk的 consumed开始获取数据 然后从第0位置开始写入stashArray中
stashArray.set(new Uint8Array(chunk, consumed), 0)
// 记录stashUsed的大小
this._stashUsed += remain
// 记录整个流中的开始位置
this._stashByteStart = byteStart + consumed
}
} else {
// else: Merge chunk into stashBuffer, and dispatch stashBuffer to consumer.
// 缓存中有数据的情况
// 先扩展缓存 能够放下已存在的和当前获取的
if (this._stashUsed + chunk.byteLength > this._bufferSize) {
this._expandBuffer(this._stashUsed + chunk.byteLength)
}
let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
// 先把获取到的chunk 放入缓存中 从_stashUsed的offset开始存放
stashArray.set(new Uint8Array(chunk), this._stashUsed)
// 重置_stashUsed
this._stashUsed += chunk.byteLength
// 把缓存中的数据全部读出进行消费
let consumed = this._dispatchChunks(this._stashBuffer.slice(0, this._stashUsed), this._stashByteStart)
// 如果消费了有剩余
if (consumed < this._stashUsed && consumed > 0) {
// unconsumed data remain
// 从consumed开始截取数据
let remainArray = new Uint8Array(this._stashBuffer, consumed)
// 从0开始设置 剩下的数据作为缓存 并且改变_stashUsed 记录缓存的位置
stashArray.set(remainArray, 0)
}
// 重新设置_stashUsed
this._stashUsed -= consumed
this._stashByteStart += consumed
}
} else {
// enable stash
if (this._stashUsed === 0 && this._stashByteStart === 0) {
// seeked? or init chunk?
// This is the first chunk after seek action
this._stashByteStart = byteStart
}
if (this._stashUsed + chunk.byteLength <= this._stashSize) {
// just stash
let stashArray = new Uint8Array(this._stashBuffer, 0, this._stashSize)
stashArray.set(new Uint8Array(chunk), this._stashUsed)
this._stashUsed += chunk.byteLength
} else {
// stashUsed + chunkSize > stashSize, size limit exceeded
let stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
if (this._stashUsed > 0) {
// There're stash datas in buffer
// dispatch the whole stashBuffer, and stash remain data
// then append chunk to stashBuffer (stash)
// 如果有缓存 先消费缓存中的数据
let buffer = this._stashBuffer.slice(0, this._stashUsed)
let consumed = this._dispatchChunks(buffer, this._stashByteStart)
if (consumed < buffer.byteLength) {
if (consumed > 0) {
let remainArray = new Uint8Array(buffer, consumed)
stashArray.set(remainArray, 0)
this._stashUsed = remainArray.byteLength
this._stashByteStart += consumed
}
} else {
this._stashUsed = 0
this._stashByteStart += consumed
}
// 消费完缓存中的数据之后,然后再把这次过来的chunk放入缓存中
if (this._stashUsed + chunk.byteLength > this._bufferSize) {
this._expandBuffer(this._stashUsed + chunk.byteLength)
stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
}
stashArray.set(new Uint8Array(chunk), this._stashUsed)
this._stashUsed += chunk.byteLength
} else {
// stash buffer empty, but chunkSize > stashSize (oh, holy shit)
// dispatch chunk directly and stash remain data
// 如果缓存中没有数据,直接消费本次数据
let consumed = this._dispatchChunks(chunk, byteStart)
if (consumed < chunk.byteLength) {
let remain = chunk.byteLength - consumed
if (remain > this._bufferSize) {
this._expandBuffer(remain)
stashArray = new Uint8Array(this._stashBuffer, 0, this._bufferSize)
}
stashArray.set(new Uint8Array(chunk, consumed), 0)
this._stashUsed += remain
this._stashByteStart = byteStart + consumed
}
}
}
}
}
这里的代码虽然有点多,但仔细看会发现,不管咋样都是调用了_dispatchChunks这个方法来消费数据,先不看其他逻辑,先看看数据的走向是咋样的。
// io/io-controller.js
_dispatchChunks(chunks, byteStart) {
this._currentRange.to = byteStart + chunks.byteLength - 1
return this._onDataArrival(chunks, byteStart)
}
又是_onDataArrival
这个函数,那这次它调用的又是哪个实例的回调函数呢?这时候我觉得最绕的一个点来了,让我们回归到transmuxing-controllelr.js
中的start
方法,里面是调用了this._loadSegment(0)
,我们来看看这个方法
// core/transmuxing-controller.js
_loadSegment(segmentIndex, optionalFrom) {
...
if (optionalFrom) {
this._demuxer.bindDataSource(this._ioctl)
} else {
// optionalFrom为null,所以走这个分支
ioctl.onDataArrival = this._onInitChunkArrival.bind(this)
}
// 打开io
ioctl.open(optionalFrom)
}
重点来了!!!
请记住 ioctl.onDataArrival = this._onInitChunkArrival.bind(this)这句代码,目前ioctl的数据处理回调函数是绑定_onInitChunkArrival的对吧,所以io-controller.js中的_dispatchChunks最后也是调用了这个_onInitChunkArrival,那我们去看看这段代码。
// core/transmuxing-controller.js
_onInitChunkArrival(data, byteStart) {
let probeData = null
let consumed = 0
if (byteStart > 0) {
...
} else if ((probeData = FLVDemuxer.probe(data)).match) {
// Always create new FLVDemuxer
this._demuxer = new FLVDemuxer(probeData, this._config)
if (!this._remuxer) {
this._remuxer = new MP4Remuxer(this._config)
}
let mds = this._mediaDataSource
...
this._remuxer.bindDataSource(this._demuxer.bindDataSource(this._ioctl))
this._remuxer.onInitSegment = this._onRemuxerInitSegmentArrival.bind(this)
this._remuxer.onMediaSegment = this._onRemuxerMediaSegmentArrival.bind(this)
consumed = this._demuxer.parseChunks(data, byteStart)
} else {
...
}
return consumed
}
数据最开始进来的时候byteStart是0,通过debugger得知进入的是else if分支。如果你有去调试过这段代码,你会发现一个很奇怪的点:数据通过ws是源源不断的进来的,按逻辑数据流是通过了websocket-loader.js → io-controller.js → transmuxing-controller.js 。按照正常逻辑这个方法应该会一直被调用才对,通过打印信息你会发现,只有在最开始,第一个流片段过来后才触发,后面就再也没有调用了!
数据流的转向发生在这一句代码:this._remuxer.bindDataSource(this._demuxer.bindDataSource(this._ioctl))
在这一分支,分别创建了FLVDemuxer和MP4Remuxer,也就是解析flv格式的关键类。首先看FLVDemuxer中的bandDataSource做了什么
// demux/flv-demuxer.js
bindDataSource(loader) {
// 数据到达就调用parseChunks处理
loader.onDataArrival = this.parseChunks.bind(this)
return this
}
没错!在这里ioctl的onDataArrival 被修改了!!后续的数据交给了parseChunks方法处理,并且renturn了当前flv-demuxer.js的作用域,接下来看mp4-remuxer.js的bandDataSource方法
// remux/mp4-remuxer.js
bindDataSource(producer) {
producer.onDataAvailable = this.remux.bind(this)
producer.onTrackMetadata = this._onTrackMetadataReceived.bind(this)
return this
}
请注意,这里的producer就是之前的this,也就是flv-demuxer的实例this._demuxer,所以这里就相当于
this._demuxer.onDataAvailable = this.remux.bind(this)
this._demuxer.onTrackMetadata = this._onTrackMetadataReceived.bind(this)
所以当onDataAvailable 被调用时相当于调用了remux方法。故而现在的数据从io-controller.js过来后被flv-demuxer.js接管了,流程如下
接着就是mp4-remuxer.js
到了_onMediaSegment这一步,已经把数据封装好了,可以通过事件监听的方式向上传递。而_onMediaSegment还是一个关联的回调函数
// remux/mp4-remuxer.js
get onMediaSegment() {
return this._onMediaSegment
}
set onMediaSegment(callback) {
this._onMediaSegment = callback
}
那它在哪里关联了呢,在上面这里
// core/transmuxing-controller.js
_onInitChunkArrival(data, byteStart) {
let probeData = null
let consumed = 0
if (byteStart > 0) {
...
} else if ((probeData = FLVDemuxer.probe(data)).match) {
// Always create new FLVDemuxer
this._demuxer = new FLVDemuxer(probeData, this._config)
if (!this._remuxer) {
this._remuxer = new MP4Remuxer(this._config)
}
let mds = this._mediaDataSource
...
this._remuxer.bindDataSource(this._demuxer.bindDataSource(this._ioctl))
this._remuxer.onInitSegment = this._onRemuxerInitSegmentArrival.bind(this)
// 在这里关联了
this._remuxer.onMediaSegment = this._onRemuxerMediaSegmentArrival.bind(this)
consumed = this._demuxer.parseChunks(data, byteStart)
} else {
...
}
return consumed
}
所以_onMediaSegment方法相当于调用了_onRemuxerMediaSegmentArrival
// core/transmuxing-controller.js
_onRemuxerMediaSegmentArrival(type, mediaSegment) {
...
// 触发事件,将数据往上传
this._emitter.emit(TransmuxingEvents.MEDIA_SEGMENT, type, mediaSegment)
...
}
而在transmuxer.js中,有个事件监听
constructor(mediaDataSource, config) {
this.TAG = 'Transmuxer'
this._emitter = new EventEmitter()
// !是否启用分离线程转换
if (config.enableWorker && typeof Worker !== 'undefined') {
...
} else {
// 没有采用worker处理数据
this._controller = new TransmuxingController(mediaDataSource, config)
}
...
if (this._controller) {
let ctl = this._controller
...
ctl.on(TransmuxingEvents.MEDIA_SEGMENT, this._onMediaSegment.bind(this))
...
}
}
再看_onMediaSegment
// core/transmuxer.js
_onMediaSegment(type, mediaSegment) {
Promise.resolve().then(() => {
// 继续触发事件,数据往上传
this._emitter.emit(TransmuxingEvents.MEDIA_SEGMENT, type, mediaSegment)
})
}
再往上一层回到flv-player.js了
// player/flv-player.js
// TODO step2: 监听流片段过来
this._transmuxer.on(TransmuxingEvents.MEDIA_SEGMENT, (type, ms) => {
// TODO step3: 交给mse-controller.js处理流,喂给video标签
this._msectl.appendMediaSegment(ms)
...
})
emmm…又到了mse-controller.js
// core/mse-controller.js
appendMediaSegment(mediaSegment) {
let ms = mediaSegment
this._pendingSegments[ms.type].push(ms)
if (this._config.autoCleanupSourceBuffer && this._needCleanupSourceBuffer()) {
this._doCleanupSourceBuffer()
}
// type有video和audio两种,分别维护
let sb = this._sourceBuffers[ms.type]
if (sb && !sb.updating && !this._hasPendingRemoveRanges()) {
this._doAppendSegments()
}
}
// TODO 将处理完的数据
_doAppendSegments() {
let pendingSegments = this._pendingSegments
for (let type in pendingSegments) {
...
try {
// _sourceBuffers是上面创建出来的缓冲区对象,与_mediaSource对象有关联
this._sourceBuffers[type].appendBuffer(segment.data)
this._isBufferFull = false
if (type === 'video' && segment.hasOwnProperty('info')) {
this._idrList.appendArray(segment.info.syncPoints)
}
} catch (error) {
...
}
}
}
}
关联video标签
this._sourceBuffers[type].appendBuffer(segment.data)
这一句就是终点了。要理解这一句是什么意思,就要回到最开始的attachMediaElement方法。还记得最开始要你们记住的_mediaSource
变量吗?那它和_sourceBuffers有啥关系呢?
其实在一开始第一段流过来的时候还触发了一个函数appendInitSegment,我们看看这个函数做了啥
appendInitSegment(initSegment, deferred) {
...
Log.v(this.TAG, 'Received Initialization Segment, mimeType: ' + mimeType)
this._lastInitSegments[is.type] = is
if (mimeType !== this._mimeTypes[is.type]) {
if (!this._mimeTypes[is.type]) {
// empty, first chance create sourcebuffer
firstInitSegment = true
try {
// TODO 和this._mediaSource添加得缓冲区关联起来,后续得this._sourceBuffers[is.type].addSourceBuffer操作会同步在this._mediaSource得缓冲区中
let sb = (this._sourceBuffers[is.type] = this._mediaSource.addSourceBuffer(mimeType))
sb.addEventListener('error', this.e.onSourceBufferError)
sb.addEventListener('updateend', this.e.onSourceBufferUpdateEnd)
} catch (error) {
}
} else {
Log.v(this.TAG, `Notice: ${is.type} mimeType changed, origin: ${this._mimeTypes[is.type]}, target: ${mimeType}`)
}
this._mimeTypes[is.type] = mimeType
}
...
}
let sb = (this._sourceBuffers[is.type] = this._mediaSource.addSourceBuffer(mimeType)),从这一句代码可以知道_sourceBuffers是MediaSource创建的一个缓冲区对象,它们之间就存在了引用关系,is.type其实就是两个类型,audio和video。而在attachMediaElement方法中
//
attachMediaElement(mediaElement) {
...
let ms = (this._mediaSource = new window.MediaSource())
...
this._mediaElement = mediaElement
// 将MediaSource对象转为url喂给video标签
this._mediaSourceObjectURL = window.URL.createObjectURL(this._mediaSource)
// mediaElement就是video标签
mediaElement.src = this._mediaSourceObjectURL
}
最后,当this._sourceBuffers[type].appendBuffer(segment.data)
添加新的片段过去的时候。video标签就可以持续播放实时画面了。
以上就是webSocket拉流到实时播放的一个完整流程,至于源码是怎么解析flv格式到mp4格式的封装,目前笔者才疏学浅,还要再学习一下音视频方面的知识才能分析后续的源码,到时候再写一篇文章了。
转载自:https://juejin.cn/post/7267418218620239872