redis之内存淘汰策略(六)
内存淘汰策略的由来
由于Redis内存是有大小的,并且我可能里面的数据都没有过期,当快满的 时候,我又没有过期的数据进行淘汰,那么这个时候内存也会满。内存满 了,Redis也会不能放入新的数据。 所以,我们不得已需要一些策略来解决这个问题来保证可用性。
淘汰算法分类
- noeviction 写入报错,但是可以读取
- allkeys-lru: Keeps most recently used keys; removes least recently used (LRU) keys 基于伪LRU算法 在所有的key中去淘汰
- allkeys-lfu: Keeps frequently used keys; removes least frequently used (LFU) keys 基于伪LFU算法 在所有的key中去淘汰
- volatile-lru: Removes least recently used keys with the expire field set to true . 基于伪LRU算法 在设置了过期时间的key中去淘汰
- volatile-lfu: Removes least frequently used keys with the expire field set to true . 基于伪LFU算法 在设置了过期时间的key中去淘汰
- allkeys-random: Randomly removes keys to make space for the new data added. 基于随机算法 在所有的key中去淘汰
- volatile-random: Randomly removes keys with expire field set to true . 基于随机算法 在设置了过期时间的key中去淘汰
- volatile-ttl: Removes least frequently used keys with expire field set to true and the shortest remaining time-to-live (TTL) value. 根 据过期时间来,淘汰即将过期的
配置文件配置如下
maxmemory-policy noeviction
淘汰算法原理
淘汰算法要解决如下几个问题
- 什么时候进行淘汰.
- 淘汰哪些数据.
- 淘汰多少数据.
- 一次淘汰后内存还是不足如何处理.
根据上述几个问题,我们进行一一解决.
何时淘汰
关于数据何时淘汰有两种算法:
- 定时检查内存占用情况
- 存储数据时检查
redis目前采用了第二种算法来实现.因为定时检查也需要结合存储时检查.
淘汰算法
对于应该选择哪些数据进行淘汰.那么说明这些数据肯定存在对比关系.如
- 过期时间
- 访问次数
- 最后一次访问时间
- 随机
volatile-ttl
针对数据的有效时间进行比较.而过期时间存储在最外层的集合中redisDb的expires.
volatile-lru
如果redis采用纯粹的lru算法,那么需要额外的空间来构建一次链表,每次操作数据时都需要查找移动数据,这对于redis而言时不可接受的.所以其采用了一种近似lru算法.为每个key记录一个最近一次的访问时间.
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
redisObject中lru这个字段的大小为24bit,那么这个字段记录的是对象操作访问时候的 秒单位时间的后24bit!,获取这个key多长时间没访问的间隔用 当前的时间的秒单位的后24bit去减!,所以间隔值越大说明越久未被访问了.
#define LRU_BITS 24
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
unsigned int getLRUClock(void) {
return (mstime()/LRU_CLOCK_RESOLUTION) & LRU_CLOCK_MAX;
}
根据上述代码lru值获取方式:
- 获取当前毫秒时间/1000得到秒时间.
- 根据秒时间进行&操作得到低24位值.
计算空闲时间: (返回毫秒值)
unsigned long long estimateObjectIdleTime(robj *o) {
unsigned long long lruclock = LRU_CLOCK();
if (lruclock >= o->lru) {
return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
} else {
return (lruclock + (LRU_CLOCK_MAX - o->lru)) *
LRU_CLOCK_RESOLUTION;
}
}
volatile-lfu
如果redis采用纯粹的lru算法,那么需要额外的空间来构建一次链表,每次操作数据时都需要查找移动数据,这对于redis而言时不可接受的.所以其采用了一种近似lru算法.为每个key记录一个最近一次的访问时间.
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
若为lfu算法,那么lru分为2部分:
- 高16位最后一次访问的分钟数
- 低8位记录访问次数
初始次数为5,之所以不是1的原因是: 避免刚存储就被淘汰.
#define LFU_INIT_VAL 5
由于访问次数占用8bit,那么最大值只能为255,所以不能每次访问就进行递增,每隔多久没访问就递减.
lfu-decay-time 1 //多少分钟没操作访问就去衰减一
unsigned long LFUDecrAndReturn(robj *o) {
unsigned long ldt = o->lru >> 8;
unsigned long counter = o->lru & 255;
unsigned long num_periods = server.lfu_decay_time ? LFUTimeElapsed(ldt) / server.lfu_decay_time : 0;
if (num_periods)
counter = (num_periods > counter) ? 0 : counter - num_periods;
return counter;
}
次数递增的计算逻辑:
uint8_t LFULogIncr(uint8_t counter) {
if (counter == 255) return 255;
double r = (double)rand()/RAND_MAX;
double baseval = counter - LFU_INIT_VAL;
if (baseval < 0) baseval = 0;
double p = 1.0/(baseval*server.lfu_log_factor+1);
if (r < p) counter++;
return counter;
}
- 当前次数为255,则直接返回255
- 获取0-1间的随机小数r
- 用当前访问次数减去初始值5,获得差值baseval,如果baseval小于0则为0
- 1/(baseval*增长因子lfu_log_factor+1)得到差值p
- 若差值p大于r,则访问次数加1,否则不处理.
由以上计算方式而言,在lfu_log_factor不变的前提下couter值越大加1的概率越低.而counter不变的lfu_log_factor值越大增加的频率越低.
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
每次更新LFU值时先递减在递增lfu值.
下图是不同的lfu_log_factor值得情况经过多少次操作得到255的次数.
random
随机算法比较简单: 直接从某个哈希桶中获取key即可.
dictEntry *dictGetRandomKey(dict *d)
{
dictEntry *he, *orighe;
unsigned long h;
int listlen, listele;
if (dictSize(d) == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
if (dictIsRehashing(d)) {
do {
/* We are sure there are no elements in indexes from 0
* to rehashidx-1 */
h = d->rehashidx + (randomULong() % (dictSlots(d) - d->rehashidx));
he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] :
d->ht[0].table[h];
} while(he == NULL);
} else {
do {
h = randomULong() & d->ht[0].sizemask;
he = d->ht[0].table[h];
} while(he == NULL);
}
/* Now we found a non empty bucket, but it is a linked
* list and we need to get a random element from the list.
* The only sane way to do so is counting the elements and
* select a random index. */
listlen = 0;
orighe = he;
while(he) {
he = he->next;
listlen++;
}
listele = random() % listlen;
he = orighe;
while(listele--) he = he->next;
return he;
}
- 获取一个随机数,并&数组长度减1sizeMark 得到下标index.
- 根据下标返回部位空的整个哈希桶.
由于篇幅有限,具体淘汰逻辑将在下一篇展开.
转载自:https://juejin.cn/post/7135082586753204232