likes
comments
collection
share

从二叉堆到时间轮,学习如何更好的处理延迟任务

作者站长头像
站长
· 阅读数 61

二叉堆

二叉堆是一种特殊的堆,二叉堆是完全二叉树或者是近似完全二叉树。

二叉堆满足堆特性

  1. 父节点的键值总是保持固定的序关系于任何一个子节点的键值
  2. 每个节点的左子树和右子树都是一个二叉堆。

当父节点的键值总是大于或等于任何一个子节点的键值时为“最大堆”。 当父节点的键值总是小于或等于任何一个子节点的键值时为“最小堆”。

从二叉堆到时间轮,学习如何更好的处理延迟任务

二叉堆一般用数组来表示。如果根节点在数组中的位置是1,第n个位置的子节点分别在2n和 2n+1。因此,第1个位置的子节点在2和3,第2个位置的子节点在4和5。以此类推。这种基于1的数组存储方式便于寻找父节点和子节点。

如果存储数组的下标基于0,那么下标为i的节点的子节点是2i + 1与2i + 2;其父节点的下标是⌊floor((i − 1) ∕ 2)⌋。函数floor(x)的功能是“向下取整”,或者说“向下舍入”,即取不大于x的最大整数(与“四舍五入”不同,向下取整是直接取按照数轴上最接近要求值的左边值,即不大于要求值的最大的那个值)。比如floor(1.1)、floor(1.9)都返回1。

如下图的两个堆:

从二叉堆到时间轮,学习如何更好的处理延迟任务

将这两个堆保存在以1开始的数组中:

位置:  1  2  3  4  5  6  7  8  9 10 11
左图:  1  2  3  4  5  6  7  8  9 10 11
右图: 11  9 10  5  6  7  8  1  2  3  4

对于一个很大的堆,这种存储是低效的。因为节点的子节点很可能在另外一个内存页中。B-heap是一种效率更高的存储方式,把每个子树放到同一内存页。

如果用指针链表存储堆,那么需要能访问叶节点的方法。可以对二叉树“穿线”(threading)方式,来依序遍历这些节点。

在二叉堆上可以进行插入节点、删除节点、取出值最小的节点、减小节点的值等基本操作。下面从Golang代码的角度实现一下二叉堆


import (
	"fmt"
)

type Item struct {
	key   int
	value interface{}
}

type BinaryHeap struct {
	items []*Item
}

func NewBinaryHeap() *BinaryHeap {
	return &BinaryHeap{
		items: []*Item{},
	}
}

func (h *BinaryHeap) Insert(key int, value interface{}) {
	item := &Item{
		key:   key,
		value: value,
	}
	h.items = append(h.items, item)
	h.bubbleUp(len(h.items) - 1)
}

func (h *BinaryHeap) bubbleUp(index int) {
	parentIndex := (index - 1) / 2
	if parentIndex < 0 {
		return
	}

	if h.items[parentIndex].key > h.items[index].key {
		h.swap(parentIndex, index)
		h.bubbleUp(parentIndex)
	}
}

func (h *BinaryHeap) ExtractMin() (*Item, error) {
	if len(h.items) == 0 {
		return nil, fmt.Errorf("heap is empty")
	}

	min := h.items[0]
	lastIndex := len(h.items) - 1
	h.swap(0, lastIndex)
	h.items = h.items[:lastIndex]
	h.bubbleDown(0)

	return min, nil
}

func (h *BinaryHeap) bubbleDown(index int) {
	minIndex := index
	leftChildIndex := 2*index + 1
	rightChildIndex := 2*index + 2

	if leftChildIndex < len(h.items) && h.items[leftChildIndex].key < h.items[minIndex].key {
		minIndex = leftChildIndex
	}

	if rightChildIndex < len(h.items) && h.items[rightChildIndex].key < h.items[minIndex].key {
		minIndex = rightChildIndex
	}

	if minIndex != index {
		h.swap(minIndex, index)
		h.bubbleDown(minIndex)
	}
}

func (h *BinaryHeap) swap(i, j int) {
	h.items[i], h.items[j] = h.items[j], h.items[i]
}

下一个基本的测试用例

func TestMain(t *testing.T) {
	heap := NewBinaryHeap()
	heap.Insert(10, "A")
	heap.Insert(4, "B")
	heap.Insert(6, "C")
	heap.Insert(5, "D")
	heap.Insert(1, "E")
	heap.Insert(12, "F")
	heap.Insert(31, "G")
	heap.Insert(193, "H")
	heap.Insert(29, "I")
	heap.Insert(2, "J")

	for len(heap.items) > 0 {
		item, err := heap.ExtractMin()
		if err != nil {
			fmt.Println(err)
			return
		}
		fmt.Printf("Key: %d, Value: %s\n", item.key, item.value)
	}
}

输出结果如下(注意此结果为本地mac本测试结果,仅供参考):

Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestMain$ workspace/gozerotimewhell

=== RUN   TestMain
Key: 1, Value: E
Key: 2, Value: J
Key: 4, Value: B
Key: 5, Value: D
Key: 6, Value: C
Key: 10, Value: A
Key: 12, Value: F
Key: 29, Value: I
Key: 31, Value: G
Key: 193, Value: H
--- PASS: TestMain2 (0.00s)
PASS
ok  	workspace/gozerotimewhell	0.538s

四叉堆

从二叉堆的基本特性和问题,我们再来看一下四叉堆,直接上代码:


type QuadHeap struct {
	items []*Item
}

func NewQuadHeap() *QuadHeap {
	return &QuadHeap{
		items: []*Item{},
	}
}

func (h *QuadHeap) Insert(key int, value interface{}) {
	item := &Item{
		key:   key,
		value: value,
	}
	h.items = append(h.items, item)
	h.bubbleUp(len(h.items) - 1)
}

func (h *QuadHeap) bubbleUp(index int) {
	parentIndex := (index - 1) / 4
	if parentIndex < 0 {
		return
	}

	if h.items[parentIndex].key > h.items[index].key {
		h.swap(parentIndex, index)
		h.bubbleUp(parentIndex)
	}
}

func (h *QuadHeap) ExtractMin() (*Item, error) {
	if len(h.items) == 0 {
		return nil, fmt.Errorf("heap is empty")
	}

	min := h.items[0]
	lastIndex := len(h.items) - 1
	h.swap(0, lastIndex)
	h.items = h.items[:lastIndex]
	h.bubbleDown(0)

	return min, nil
}

func (h *QuadHeap) bubbleDown(index int) {
	minIndex := index
	childStartIndex := 4*index + 1
	childEndIndex := int(math.Min(float64(childStartIndex+4), float64(len(h.items))))

	for i := childStartIndex; i < childEndIndex; i++ {
		if h.items[i].key < h.items[minIndex].key {
			minIndex = i
		}
	}

	if minIndex != index {
		h.swap(minIndex, index)
		h.bubbleDown(minIndex)
	}
}

func (h *QuadHeap) swap(i, j int) {
	h.items[i], h.items[j] = h.items[j], h.items[i]
}

单元测试看一下结果:

func TestMain2(t *testing.T) {
	heap := NewQuadHeap()
	heap.Insert(10, "A")
	heap.Insert(4, "B")
	heap.Insert(6, "C")
	heap.Insert(5, "D")
	heap.Insert(1, "E")
	heap.Insert(12, "F")
	heap.Insert(31, "G")
	heap.Insert(193, "H")
	heap.Insert(29, "I")
	heap.Insert(2, "J")

	for len(heap.items) > 0 {
		item, err := heap.ExtractMin()
		if err != nil {
			fmt.Println(err)
			return
		}
		fmt.Printf("Key: %d, Value: %s\n", item.key, item.value)
	}
}

测试结果:

Running tool: /usr/local/go/bin/go test -timeout 30s -run ^TestMain2$ workspace/gozerotimewhell

=== RUN   TestMain2
Key: 1, Value: E
Key: 2, Value: J
Key: 4, Value: B
Key: 5, Value: D
Key: 6, Value: C
Key: 10, Value: A
Key: 12, Value: F
Key: 29, Value: I
Key: 31, Value: G
Key: 193, Value: H
--- PASS: TestMain2 (0.00s)
PASS
ok      workspace/gozerotimewhell       0.105s

单从两次测试结果来看,也能发现四叉堆的性能要略高,当然了,这种测试并不是严谨的测试结果,仅供参考。

从二叉堆到时间轮,学习如何更好的处理延迟任务

在golang内置的timer中,就有使用到四叉堆,

// siftupTimer puts the timer at position i in the right place
// in the heap by moving it up toward the top of the heap.
// It returns the smallest changed index.
func siftupTimer(t []*timer, i int) int {
	if i >= len(t) {
		badTimer()
	}
	when := t[i].when
	if when <= 0 {
		badTimer()
	}
	tmp := t[i]
	for i > 0 {
		p := (i - 1) / 4 // parent
		if when >= t[p].when {
			break
		}
		t[i] = t[p]
		i = p
	}
	if tmp != t[i] {
		t[i] = tmp
	}
	return i
}

时间轮

时间轮算法的核心是,轮询线程不再是负责遍历所有任务,而只在负责处于其当前时间上的任务.就像钟表一样,时间轮有一个指针会一直旋转,每转到一个新的时间,就去执行挂在这一时刻下的所有任务,当然,这些任务通常是交给其他线程去做,指针要做的只是继续往下转,不会关心任务的执行进度. 多层时间轮的概念也非常清晰,将时间轮分为多个,每两个轮之间是进位的关系,例如最普遍的秒,分,时.

即:

  • 秒级时间轮上,设有60个槽, 每两个槽之间的时间为1s.

  • 分钟级时间轮上,设有60s个槽,每两个槽之间的时间为1min

  • 小时级时间轮上,设有24个槽,每两个槽之间的时间为1h.

从二叉堆到时间轮,学习如何更好的处理延迟任务 这样,秒级每走60个槽,时间过去一分钟,秒级时间轮归零,分级时间轮走一个槽; 分级时间轮每走过60个槽,时间过去一小时,分级时间轮归零,小时级时间轮走一个槽.

通过三级时间轮,只需要遍历60+60+60 =180个槽,就可以成为一个精度为1s, 周期为60x60x60 = 216000s的定时调度器.

多级时间轮的思想在很多开发中间件中都被应用,例如NettyAkkaQuartzZooKeeperKafka等等. 使用golang实现一个简单的时间轮,然后在看go-zero中如何使用时间轮的


type Timer struct {
	interval time.Duration
	callback func()
}

type TimeWheel struct {
	tickMs      int
	wheelSize   int
	currentTime int
	ticker      *time.Ticker
	wheel       []map[int]*Timer
}

func NewTimeWheel(tickMs, wheelSize int) *TimeWheel {
	tw := &TimeWheel{
		tickMs:      tickMs,
		wheelSize:   wheelSize,
		currentTime: 0,
		ticker:      time.NewTicker(time.Millisecond * time.Duration(tickMs)),
		wheel:       make([]map[int]*Timer, wheelSize),
	}

	for i := 0; i < wheelSize; i++ {
		tw.wheel[i] = make(map[int]*Timer)
	}

	go tw.start()

	return tw
}

func (tw *TimeWheel) start() {
	for {
		select {
		case <-tw.ticker.C:
			tw.tick()
		}
	}
}

func (tw *TimeWheel) tick() {
	tw.currentTime++
	slot := tw.currentTime % tw.wheelSize
	timerList := tw.wheel[slot]

	for key, timer := range timerList {
		timer.interval -= time.Duration(tw.tickMs)
		if timer.interval <= 0 {
			timer.callback()
			delete(timerList, key)
		}
	}
}

func (tw *TimeWheel) AddTimer(interval time.Duration, callback func()) {
	slot := (tw.currentTime + int(interval)/tw.tickMs) % tw.wheelSize
	timer := &Timer{
		interval: interval,
		callback: callback,
	}
	tw.wheel[slot][len(tw.wheel[slot])] = timer
}

可以发现,这里我们也是使用slice存储所有的槽,然后每个槽里的事件列表是一个map,随着每个tick后期,依次递减事件列表中元素的internal,直到到期后执行任务,然后从map中删除

Go-Zero中也实现了一套时间轮机制,我们首先看看是如何使用的:


// NewCache returns a Cache with given expire.
func NewCache(expire time.Duration, opts ...CacheOption) (*Cache, error) {
	cache := &Cache{
		data:           make(map[string]any),
		expire:         expire,
		lruCache:       emptyLruCache,
		barrier:        syncx.NewSingleFlight(),
		unstableExpiry: mathx.NewUnstable(expiryDeviation),
	}

	for _, opt := range opts {
		opt(cache)
	}

	if len(cache.name) == 0 {
		cache.name = defaultCacheName
	}
	cache.stats = newCacheStat(cache.name, cache.size)

	timingWheel, err := NewTimingWheel(time.Second, slots, func(k, v any) {
		key, ok := k.(string)
		if !ok {
			return
		}

		cache.Del(key)
	})
	if err != nil {
		return nil, err
	}

	cache.timingWheel = timingWheel
	return cache, nil
}

// Del deletes the item with the given key from c.
func (c *Cache) Del(key string) {
	c.lock.Lock()
	delete(c.data, key)
	c.lruCache.remove(key)
	c.lock.Unlock()
	c.timingWheel.RemoveTimer(key)
}

其实现的逻辑是,定时删除过期的缓存内容,而设置缓存的方法如下:

// Set sets value into c with key.
func (c *Cache) Set(key string, value any) {
	c.SetWithExpire(key, value, c.expire)
}

// SetWithExpire sets value into c with key and expire with the given value.
func (c *Cache) SetWithExpire(key string, value any, expire time.Duration) {
	c.lock.Lock()
	_, ok := c.data[key]
	c.data[key] = value
	c.lruCache.add(key)
	c.lock.Unlock()

	expiry := c.unstableExpiry.AroundDuration(expire)
	if ok {
		c.timingWheel.MoveTimer(key, expiry)
	} else {
		c.timingWheel.SetTimer(key, value, expiry)
	}
}

这里就有如何将任务放入到时间轮的channel中,来让时间轮到期执行,运行一个简单的实例来看一下,如何使用

package main

import (
	"fmt"
	"net/http"
	"time"

	"github.com/zeromicro/go-zero/core/collection"
)

func main() {
	tw, err := collection.NewTimingWheel(time.Second, 60, func(key, value any) {
		count := 0
		t := time.NewTicker(time.Minute)
		defer t.Stop()
		for {
			if count >= 3 {
				fmt.Printf("第%d轮电话拨打完毕...\n", value)
				break
			}
			if count == 0 {
				fmt.Printf("time: %s, 第%d轮第1次拨打电话....\n", time.Now(), value)
				count += 1
			}
			select {
			case <-t.C:
				count += 1
				fmt.Printf("time: %s, 第%d轮第%d次拨打电话....\n", time.Now(), value, count)
			}

		}
	})
	if err != nil {
		fmt.Println(err)
	}

	defer tw.Stop()
	tw.SetTimer("test1", 1, time.Second>>1)
	tw.SetTimer("test2", 2, time.Minute*5)
	tw.SetTimer("test3", 3, time.Minute*10)
	http.ListenAndServe(":8001", nil)
}

从二叉堆到时间轮,学习如何更好的处理延迟任务 更多详细内容,可以深入go-zero代码中查看

参考链接:

  1. 二叉堆
  2. 时间轮
  3. go-zero
转载自:https://juejin.cn/post/7267441368061460514
评论
请登录