likes
comments
collection
share

为 BitCask 存储引擎实现过期删除功能

作者站长头像
站长
· 阅读数 11

最近参与了一个不错的开源项目,是一个基于 BitCask 模型实现的 KV 存储引擎。项目地址:CouloyDB。大家觉得不错的话可以来一个小小的 star

因为功能上想向 Redis 看齐,所以打算实现过期自动删除的功能。

我采取了小顶堆来实现过期删除,并且在 Get 的时候,也会进行惰性删除。

时间堆的实现

堆中的每个元素都是一个 Job

type Job struct {
	Key        string
	Expiration time.Time
}

其中记录了每个Key和它的过期时间Expiration

堆的实现定义如下:

type h struct {
	heap  []*Job
	index map[string]int
}

index存储的是Key对应的Job在数组中的切片,用以快速获取Job而无需遍历切片。

在 Go 中可以通过实现heap.go中的接口来实现堆:

type Interface interface {
	sort.Interface
	Push(x any) // add x as element Len()
	Pop() any   // remove and return element Len() - 1.
}

具体实现可以查看CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub,这里就不给出详细代码了,重点注意实现的时候需要在接口方法里同样对 index 进行更新。

实现了堆的接口之后,h结构体就可以使用堆的一些方法来操作了。额外对 h 封装了一层:

type TimeHeap struct {
	heap h
}

TimeHeap实现了如下方法:

  • Push.
  • Pop.
  • Get.
  • Remove.
  • Peek.
  • IsEmpty.

具体实现同样可以查看CouloyDB/public/ds/timeHeap.go at master · Kirov7/CouloyDB · GitHub

利用时间堆实现 TTL 机制

具体实现可以查看CouloyDB/ttl.go at master · Kirov7/CouloyDB · GitHub,这里就重点说一下exec方法:

func (ttl *ttl) exec() {
	now := time.Now()
	duration := MaxDuration

	ttl.mu.Lock()
	job := ttl.timeHeap.Peek()
	ttl.mu.Unlock()

	if job != nil {
		if job.Expiration.After(now) {
			duration = job.Expiration.Sub(now)
		} else {
			duration = 0
		}
	}

	if duration > 0 {
		timer := time.NewTimer(duration)
		defer timer.Stop()

		select {
		case <-ttl.eventCh:
			return
		case <-timer.C:
		}
	}

	ttl.mu.Lock()
	job = ttl.timeHeap.Pop()
	ttl.mu.Unlock()

	if job == nil {
		return
	}

	go ttl.deleter(job.Key)
}

start方法的执行流程如下:

  1. 从堆顶取出一个 Job;
  2. 判断这个 Key 是否已经过期了,并计算剩余的存活时间;
  3. 如果这个 Key 没有过期,那么启动一个 Timer 来等待这个 Key 过期,Timer 触发之后,会将这个 Job 从堆中 Pop 出来;如果已经过期了那么直接 Pop 出来。
  4. 开启一个协程来异步删除 Key。

这个流程中有一个很值得注意的地方是:

select {
case <-ttl.eventCh:
    return
case <-timer.C:
}

一旦有堆中元素被插入或删除都需要直接返回,跳到下一次exec的调用。这是因为有可能新插入的元素会在当前堆顶元素之前过期,应该要先删除。或是当前堆顶元素已经被删除了,再次 Pop 的话就会出现错误了。

调用接口

Get方法中,如果 Key 存在就要去时间堆中检查一下是否过期了,如果过期了就直接返回错误。

if db.ttl.isExpired(string(key)) {
    // if the key is expired, just return and don't delete the key now
    return nil, public.ErrKeyNotFound
}

然后我将原先的Put方法又重新封装了一下:

func (db *DB) PutWithExpiration(key, value []byte, duration time.Duration) error {
	return db.put(key, value, duration)
}

func (db *DB) Put(key, value []byte) error {
	return db.put(key, value, 0)
}

put方法中可以根据duration是不是 0 来进行相应的操作。

如果duration不为 0,那么就计算出过期时间,并向时间堆中添加/更新一个Job,但duration如果为 0,有可能是要将之间设置了过期时间的 Key 给取消过期时间,所以要进行一次删除。

var expiration int64
if duration != 0 {
    expiration = time.Now().Add(duration).UnixNano()
    db.ttl.add(ds.NewJob(string(key), time.Unix(0, expiration)))
} else {
    // If it is a key without an expiration time set
    // you may need to remove the previously set expiration time
    db.ttl.del(string(key))
}

接着在Del方法中,只需要简单的从时间堆删除对应的Job即可。

db.ttl.del(string(key))

测试

测试了正常的过期删除、数据库重启之后的过期删除(重构时间堆)、取消过期时间、对同一个 Key 再次 Put 来取消过期时间均无问题。具体实现可以查看CouloyDB/ttl_test.go at master · Kirov7/CouloyDB · GitHub

参与项目

转载自:https://juejin.cn/post/7254001262646804517
评论
请登录