⚡今日算法 -- LOL全球总决赛直播中如何在数据乱序到达时保证按序输出?
场景描述
相信大家都有用过各种流媒体平台观看直播的经历,那么在观看直播时,数据通过网络传输时,一般会基于udp
协议传输,由于udp
不像tcp
那样能够保证数据包的有序传输,所以现在我们就需要设计一个数据结构能够保证得到的数据是按序输出的,尽管它们传输过程中是乱序到达
举个具体的例子,比如在一场LOL全球总决赛
的直播中,有一位选手依次完成了五杀,那么正常来说用户看到的直播数据应当是先击杀第一位敌人,然后第二位,第三位,第四位,第五位
但是由于网络传输的缘故,击杀第五位敌人的画面数据先到达客户端,然后前面几个击杀镜头的数据才到达
如果没有一个按序输出的机制控制的话,那用户看到的就是先五杀然后再击杀前面的几个敌人,很明显不合理
接下来再编写一个单元测试描述一下这个场景有助于我们理解以及后续验证代码具体实现
单元测试
这里我们就暂且将我们的实现作为一个类,名字就叫做MessageBox
吧,这个类提供两个api
:
receive
,接收到消息的时候调用它,第一个参数是消息的序号,第二个则是消息的内容flush
,用于刷新缓冲区,获取到此时可以得到的按序数据
describe('03-数据乱序到达时按序输出', () => {
let box: MessageBox<string>
beforeEach(() => {
box = new MessageBox()
})
test('数据全部按序到达', () => {
box.receive(1, '首次击杀敌人')
box.receive(2, '双杀')
box.receive(3, '三杀')
box.receive(4, '四杀')
box.receive(5, '五杀')
const data = box.flush()
expect(data).toEqual(['首次击杀敌人', '双杀', '三杀', '四杀', '五杀'])
})
test('数据乱序到达', () => {
box.receive(5, '五杀')
box.receive(2, '双杀')
box.receive(1, '首次击杀敌人')
box.receive(4, '四杀')
box.receive(3, '三杀')
const data = box.flush()
expect(data).toEqual(['首次击杀敌人', '双杀', '三杀', '四杀', '五杀'])
})
})
为了让单元测试的覆盖更加完善,我们还要考虑一下如果这里flush
调用的时机是在每次receive
接收到消息之后就立刻调用时会是怎样的
以第二个单测为例,时序1的时候接收到的是五杀的数据,它的消息序列号为5,前面的消息序列号都还没接收到,因此即便调用flush
也不会将其输出
以此类推,该场景下的预期结果如下:
test('每次击杀后调用flush', () => {
box.receive(5, '五杀')
expect(box.flush()).toEqual([])
box.receive(2, '双杀')
expect(box.flush()).toEqual([])
box.receive(1, '首次击杀敌人')
expect(box.flush()).toEqual(['首次击杀敌人', '双杀'])
box.receive(4, '四杀')
expect(box.flush()).toEqual([])
box.receive(3, '三杀')
expect(box.flush()).toEqual(['三杀', '四杀', '五杀'])
})
明白了这个算法的使用场景后,接下来我们就要思考如何实现了
思考实现
在我们的MessageBox
类内部可以维护一个nextMessageIdx
,表示接下来希望接收到的数据包的序列号
再维护两个Map
,分别是头表和尾表,用来记录目前接收到的messageIdx
的连续区间,什么意思呢?
比如目前我接收到的数据包的序列号有2, 3, 4, 7, 8
,那么就存在着两个连续区间 -- [2, 3, 4]
和[7, 8]
假设目前的nextMessageIdx
是1,也就是说只要序列号为1的数据包还没到达,此时无论调用多少次flush
得到的都是空数组,而一旦序列号为1的数据包到达了,调用flush
时就会得到1, 2, 3, 4
序列号对应的数据组成的数组
相当于说我们要有一个缓冲区的机制去维护乱序到达的数据包,并且在flush
后刷新缓冲区,将缓冲区数据输出并将其从缓冲区中销毁,由于输出的时候是按照顺序输出的,因此使用链表的方式是最合适的,甚至可以做成一个迭代器传出去而不是数组或许会更加合适
在输出了数据之后,将已输出的连续数据从缓冲区中销毁也十分简单,由于头尾表始终都只记录连续区间的头部和尾部,我们只需要删除头尾表Map
中对应的key
之后,这个对链表结点的强引用就没了,然后交给垃圾回收去处理即可
上述过程中我们的MessageBox
中数据的状态如下图所示:
缺少序列号为1的数据包时
序列号为1的数据包到达后
可以看到,我们只需要在数据到达的时候不断更新头表尾表即可,而flush
时则根据nextMessageIdx
到头表中找到对应的链表节点然后顺着其next指针往下不断输出即可,输出完后将头表中对应的key
移除,之后的内存销毁工作就交给垃圾回收去完成了
明白了思路后就开始动手干吧!
代码实现
这里我就直接贴出代码实现了,具体逻辑已经在注释中写清楚了,并不复杂,就不过多解释了
/**
* @description 消息链表结点
*/
class MessageNode<T> {
constructor(public data: T, public next: MessageNode<T> | null) {}
}
export class MessageBox<T> {
private nextMessageIdx: number
private headMap: Map<number, MessageNode<T>>
private tailMap: Map<number, MessageNode<T>>
constructor() {
// 初始数据包序列号从 1 开始
this.nextMessageIdx = 1
this.headMap = new Map()
this.tailMap = new Map()
}
/**
* @description 接收数据
*/
receive(messageIdx: number, messageContent: T) {
// 创建结点
const messageNode = new MessageNode<T>(messageContent, null)
if (this.tailMap.has(messageIdx - 1)) {
// 到尾表中查看是否有以 messageIdx - 1 结尾的连续区间
// 有的话则和它合并 将自己作为该连续区间的尾结点
const tailNode = this.tailMap.get(messageIdx - 1)!
tailNode.next = messageNode
this.tailMap.delete(messageIdx - 1)
this.tailMap.set(messageIdx, messageNode)
} else {
// 尾表中没有前一个序列号的结点 -- 那么自己作为头表的结点
this.headMap.set(messageIdx, messageNode)
}
if (this.headMap.has(messageIdx + 1)) {
// 到头表中查看是否有以 messageIdx + 1 开头的连续区间
// 有的话则和它合并 将自己作为该连续区间的头结点
const headNode = this.headMap.get(messageIdx + 1)!
messageNode.next = headNode
this.headMap.delete(messageIdx + 1)
this.headMap.set(messageIdx, messageNode)
} else {
// 头表中没有后一个序列号的结点 -- 那么自己作为尾表的结点
this.tailMap.set(messageIdx, messageNode)
}
}
/**
* @description 刷新已经接收到的按序数据
*/
flush(): T[] {
if (this.headMap.has(this.nextMessageIdx)) {
// 有 nextMessageIdx 为头结点的连续区间时,将这个区间输出
let pHeadNode: MessageNode<T> | null = this.headMap.get(
this.nextMessageIdx,
)!
// 统计连续区间中有几个数据 方便更新 nextMessageIdx
let dataCount = 0
const data: T[] = []
while (pHeadNode !== null) {
data.push(pHeadNode.data)
dataCount++
pHeadNode = pHeadNode.next
}
// 移除头尾表中对应结点
const tailMessageIdx = this.nextMessageIdx + dataCount - 1
this.headMap.delete(this.nextMessageIdx)
this.tailMap.delete(tailMessageIdx)
// 更新 nextMessageIdx
this.nextMessageIdx += dataCount
return data
}
return []
}
}
可以看到顺利通过了单元测试,说明我们的实现没问题,可以愉快地去看LOL全球总决赛
了~~~
转载自:https://juejin.cn/post/7150311952256860190