从二叉堆到时间轮,学习如何更好的处理延迟任务
二叉堆
二叉堆是一种特殊的堆,二叉堆是完全二叉树或者是近似完全二叉树。
二叉堆满足堆特性:
- 父节点的键值总是保持固定的序关系于任何一个子节点的键值
- 每个节点的左子树和右子树都是一个二叉堆。
当父节点的键值总是大于或等于任何一个子节点的键值时为“最大堆”。 当父节点的键值总是小于或等于任何一个子节点的键值时为“最小堆”。
二叉堆一般用数组来表示。如果根节点在数组中的位置是1,第n个位置的子节点分别在2n和 2n+1。因此,第1个位置的子节点在2和3,第2个位置的子节点在4和5。以此类推。这种基于1的数组存储方式便于寻找父节点和子节点。
如果存储数组的下标基于0,那么下标为i的节点的子节点是2i + 1与2i + 2;其父节点的下标是⌊floor((i − 1) ∕ 2)⌋。函数floor(x)的功能是“向下取整”,或者说“向下舍入”,即取不大于x的最大整数(与“四舍五入”不同,向下取整是直接取按照数轴上最接近要求值的左边值,即不大于要求值的最大的那个值)。比如floor(1.1)、floor(1.9)都返回1。
如下图的两个堆:
将这两个堆保存在以1开始的数组中:
位置: 1 2 3 4 5 6 7 8 9 10 11
左图: 1 2 3 4 5 6 7 8 9 10 11
右图: 11 9 10 5 6 7 8 1 2 3 4
对于一个很大的堆,这种存储是低效的。因为节点的子节点很可能在另外一个内存页中。B-heap是一种效率更高的存储方式,把每个子树放到同一内存页。
如果用指针链表存储堆,那么需要能访问叶节点的方法。可以对二叉树“穿线”(threading)方式,来依序遍历这些节点。
在二叉堆上可以进行插入节点、删除节点、取出值最小的节点、减小节点的值等基本操作。下面从Golang代码的角度实现一下二叉堆
import (
"fmt"
)
type Item struct {
key int
value interface{}
}
type BinaryHeap struct {
items []*Item
}
func NewBinaryHeap() *BinaryHeap {
return &BinaryHeap{
items: []*Item{},
}
}
func (h *BinaryHeap) Insert(key int, value interface{}) {
item := &Item{
key: key,
value: value,
}
h.items = append(h.items, item)
h.bubbleUp(len(h.items) - 1)
}
func (h *BinaryHeap) bubbleUp(index int) {
parentIndex := (index - 1) / 2
if parentIndex < 0 {
return
}
if h.items[parentIndex].key > h.items[index].key {
h.swap(parentIndex, index)
h.bubbleUp(parentIndex)
}
}
func (h *BinaryHeap) ExtractMin() (*Item, error) {
if len(h.items) == 0 {
return nil, fmt.Errorf("heap is empty")
}
min := h.items[0]
lastIndex := len(h.items) - 1
h.swap(0, lastIndex)
h.items = h.items[:lastIndex]
h.bubbleDown(0)
return min, nil
}
func (h *BinaryHeap) bubbleDown(index int) {
minIndex := index
leftChildIndex := 2*index + 1
rightChildIndex := 2*index + 2
if leftChildIndex < len(h.items) && h.items[leftChildIndex].key < h.items[minIndex].key {
minIndex = leftChildIndex
}
if rightChildIndex < len(h.items) && h.items[rightChildIndex].key < h.items[minIndex].key {
minIndex = rightChildIndex
}
if minIndex != index {
h.swap(minIndex, index)
h.bubbleDown(minIndex)
}
}
func (h *BinaryHeap) swap(i, j int) {
h.items[i], h.items[j] = h.items[j], h.items[i]
}
下一个基本的测试用例
func TestMain(t *testing.T) {
heap := NewBinaryHeap()
heap.Insert(10, "A")
heap.Insert(4, "B")
heap.Insert(6, "C")
heap.Insert(5, "D")
heap.Insert(1, "E")
heap.Insert(12, "F")
heap.Insert(31, "G")
heap.Insert(193, "H")
heap.Insert(29, "I")
heap.Insert(2, "J")
for len(heap.items) > 0 {
item, err := heap.ExtractMin()
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Key: %d, Value: %s\n", item.key, item.value)
}
}
输出结果如下(注意此结果为本地mac本测试结果,仅供参考):
Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestMain$ workspace/gozerotimewhell
=== RUN TestMain
Key: 1, Value: E
Key: 2, Value: J
Key: 4, Value: B
Key: 5, Value: D
Key: 6, Value: C
Key: 10, Value: A
Key: 12, Value: F
Key: 29, Value: I
Key: 31, Value: G
Key: 193, Value: H
--- PASS: TestMain2 (0.00s)
PASS
ok workspace/gozerotimewhell 0.538s
四叉堆
从二叉堆的基本特性和问题,我们再来看一下四叉堆,直接上代码:
type QuadHeap struct {
items []*Item
}
func NewQuadHeap() *QuadHeap {
return &QuadHeap{
items: []*Item{},
}
}
func (h *QuadHeap) Insert(key int, value interface{}) {
item := &Item{
key: key,
value: value,
}
h.items = append(h.items, item)
h.bubbleUp(len(h.items) - 1)
}
func (h *QuadHeap) bubbleUp(index int) {
parentIndex := (index - 1) / 4
if parentIndex < 0 {
return
}
if h.items[parentIndex].key > h.items[index].key {
h.swap(parentIndex, index)
h.bubbleUp(parentIndex)
}
}
func (h *QuadHeap) ExtractMin() (*Item, error) {
if len(h.items) == 0 {
return nil, fmt.Errorf("heap is empty")
}
min := h.items[0]
lastIndex := len(h.items) - 1
h.swap(0, lastIndex)
h.items = h.items[:lastIndex]
h.bubbleDown(0)
return min, nil
}
func (h *QuadHeap) bubbleDown(index int) {
minIndex := index
childStartIndex := 4*index + 1
childEndIndex := int(math.Min(float64(childStartIndex+4), float64(len(h.items))))
for i := childStartIndex; i < childEndIndex; i++ {
if h.items[i].key < h.items[minIndex].key {
minIndex = i
}
}
if minIndex != index {
h.swap(minIndex, index)
h.bubbleDown(minIndex)
}
}
func (h *QuadHeap) swap(i, j int) {
h.items[i], h.items[j] = h.items[j], h.items[i]
}
单元测试看一下结果:
func TestMain2(t *testing.T) {
heap := NewQuadHeap()
heap.Insert(10, "A")
heap.Insert(4, "B")
heap.Insert(6, "C")
heap.Insert(5, "D")
heap.Insert(1, "E")
heap.Insert(12, "F")
heap.Insert(31, "G")
heap.Insert(193, "H")
heap.Insert(29, "I")
heap.Insert(2, "J")
for len(heap.items) > 0 {
item, err := heap.ExtractMin()
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("Key: %d, Value: %s\n", item.key, item.value)
}
}
测试结果:
Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestMain2$ workspace/gozerotimewhell
=== RUN TestMain2
Key: 1, Value: E
Key: 2, Value: J
Key: 4, Value: B
Key: 5, Value: D
Key: 6, Value: C
Key: 10, Value: A
Key: 12, Value: F
Key: 29, Value: I
Key: 31, Value: G
Key: 193, Value: H
--- PASS: TestMain2 (0.00s)
PASS
ok workspace/gozerotimewhell 0.105s
单从两次测试结果来看,也能发现四叉堆的性能要略高,当然了,这种测试并不是严谨的测试结果,仅供参考。
在golang内置的timer中,就有使用到四叉堆,
// siftupTimer puts the timer at position i in the right place
// in the heap by moving it up toward the top of the heap.
// It returns the smallest changed index.
func siftupTimer(t []*timer, i int) int {
if i >= len(t) {
badTimer()
}
when := t[i].when
if when <= 0 {
badTimer()
}
tmp := t[i]
for i > 0 {
p := (i - 1) / 4 // parent
if when >= t[p].when {
break
}
t[i] = t[p]
i = p
}
if tmp != t[i] {
t[i] = tmp
}
return i
}
时间轮
时间轮算法的核心是,轮询线程不再是负责遍历所有任务,而只在负责处于其当前时间上的任务.就像钟表一样,时间轮有一个指针会一直旋转,每转到一个新的时间,就去执行挂在这一时刻下的所有任务,当然,这些任务通常是交给其他线程去做,指针要做的只是继续往下转,不会关心任务的执行进度.
多层时间轮的概念也非常清晰,将时间轮分为多个,每两个轮之间是进位
的关系,例如最普遍的秒,分,时.
即:
-
秒级时间轮上,设有60个槽, 每两个槽之间的时间为1s.
-
分钟级时间轮上,设有60s个槽,每两个槽之间的时间为1min
-
小时级时间轮上,设有24个槽,每两个槽之间的时间为1h.
这样,秒级每走60个槽,时间过去一分钟,秒级时间轮归零,分级时间轮走一个槽; 分级时间轮每走过60个槽,时间过去一小时,分级时间轮归零,小时级时间轮走一个槽.
通过三级时间轮,只需要遍历60+60+60 =180个槽,就可以成为一个精度为1s, 周期为60x60x60 = 216000s的定时调度器.
多级时间轮的思想在很多开发中间件中都被应用,例如Netty
、Akka
、Quartz
、ZooKeeper
、Kafka
等等.
使用golang实现一个简单的时间轮,然后在看go-zero中如何使用时间轮的
type Timer struct {
interval time.Duration
callback func()
}
type TimeWheel struct {
tickMs int
wheelSize int
currentTime int
ticker *time.Ticker
wheel []map[int]*Timer
}
func NewTimeWheel(tickMs, wheelSize int) *TimeWheel {
tw := &TimeWheel{
tickMs: tickMs,
wheelSize: wheelSize,
currentTime: 0,
ticker: time.NewTicker(time.Millisecond * time.Duration(tickMs)),
wheel: make([]map[int]*Timer, wheelSize),
}
for i := 0; i < wheelSize; i++ {
tw.wheel[i] = make(map[int]*Timer)
}
go tw.start()
return tw
}
func (tw *TimeWheel) start() {
for {
select {
case <-tw.ticker.C:
tw.tick()
}
}
}
func (tw *TimeWheel) tick() {
tw.currentTime++
slot := tw.currentTime % tw.wheelSize
timerList := tw.wheel[slot]
for key, timer := range timerList {
timer.interval -= time.Duration(tw.tickMs)
if timer.interval <= 0 {
timer.callback()
delete(timerList, key)
}
}
}
func (tw *TimeWheel) AddTimer(interval time.Duration, callback func()) {
slot := (tw.currentTime + int(interval)/tw.tickMs) % tw.wheelSize
timer := &Timer{
interval: interval,
callback: callback,
}
tw.wheel[slot][len(tw.wheel[slot])] = timer
}
可以发现,这里我们也是使用slice存储所有的槽,然后每个槽里的事件列表是一个map,随着每个tick后期,依次递减事件列表中元素的internal,直到到期后执行任务,然后从map中删除
Go-Zero中也实现了一套时间轮机制,我们首先看看是如何使用的:
// NewCache returns a Cache with given expire.
func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
cache := &Cache{
data: make(map[string]any),
expire: expire,
lruCache: emptyLruCache,
barrier: syncx.NewSingleFlight(),
unstableExpiry: mathx.NewUnstable(expiryDeviation),
}
for _, opt := range opts {
opt(cache)
}
if len(cache.name) == 0 {
cache.name = defaultCacheName
}
cache.stats = newCacheStat(cache.name, cache.size)
timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {
key, ok := k.(string)
if !ok {
return
}
cache.Del(key)
})
if err != nil {
return nil, err
}
cache.timingWheel = timingWheel
return cache, nil
}
// Del deletes the item with the given key from c.
func (c *Cache) Del(key string) {
c.lock.Lock()
delete(c.data, key)
c.lruCache.remove(key)
c.lock.Unlock()
c.timingWheel.RemoveTimer(key)
}
其实现的逻辑是,定时删除过期的缓存内容,而设置缓存的方法如下:
// Set sets value into c with key.
func (c *Cache) Set(key string, value any) {
c.SetWithExpire(key, value, c.expire)
}
// SetWithExpire sets value into c with key and expire with the given value.
func (c *Cache) SetWithExpire(key string, value any, expire time.Duration) {
c.lock.Lock()
_, ok := c.data[key]
c.data[key] = value
c.lruCache.add(key)
c.lock.Unlock()
expiry := c.unstableExpiry.AroundDuration(expire)
if ok {
c.timingWheel.MoveTimer(key, expiry)
} else {
c.timingWheel.SetTimer(key, value, expiry)
}
}
这里就有如何将任务放入到时间轮的channel中,来让时间轮到期执行,运行一个简单的实例来看一下,如何使用
package main
import (
"fmt"
"net/http"
"time"
"github.com/zeromicro/go-zero/core/collection"
)
func main() {
tw, err := collection.NewTimingWheel(time.Second, 60, func(key, value any) {
count := 0
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
if count >= 3 {
fmt.Printf("第%d轮电话拨打完毕...\n", value)
break
}
if count == 0 {
fmt.Printf("time: %s, 第%d轮第1次拨打电话....\n", time.Now(), value)
count += 1
}
select {
case <-t.C:
count += 1
fmt.Printf("time: %s, 第%d轮第%d次拨打电话....\n", time.Now(), value, count)
}
}
})
if err != nil {
fmt.Println(err)
}
defer tw.Stop()
tw.SetTimer("test1", 1, time.Second>>1)
tw.SetTimer("test2", 2, time.Minute*5)
tw.SetTimer("test3", 3, time.Minute*10)
http.ListenAndServe(":8001", nil)
}
更多详细内容,可以深入go-zero代码中查看
参考链接:
转载自:https://juejin.cn/post/7267441368061460514