likes
comments
collection
share

从源码分析Redis的扩容机制

作者站长头像
站长
· 阅读数 10

前言

已知Redis中的哈希对象的底层数据结构使用了字典dict数据结构,实际上Redis中的数据就是以key-value的形式存储在dict中,dict数据结构的示意图可以表示如下。

从源码分析Redis的扩容机制

即一个dict数据结构持有两张哈希表dictht,每张dictht中持有一个存储元素的节点数组,每对键值对会被封装成一个dictEntry节点然后添加到节点数组中,当存在哈希冲突时,Redis中使用拉链法解决哈希冲突。但是dictEntry数组的默认容量为4,发生哈希冲突的概率极高,如果不进行扩容,会导致哈希表的时间复杂度恶化为O(logN),所以满足一定条件时,需要进行dictEntry数组的扩容,即进行Redis的扩容,本篇文章将对Redis的扩容机制进行学习。

Redis源码版本:6.0.16

正文

一. Redis的扩容时机

Redis会在如下两种情况触发扩容。

  • 如果没有fork子进程在执行RDB或者AOF的持久化,一旦满足ht[0].used >= ht[0].size,此时触发扩容;
  • 如果有fork子进程在执行RDB或者AOF的持久化时,则需要满足ht[0].used > 5 * ht[0].size,此时触发扩容。

下面将结合源码对Redis的扩容时机进行学习。当向dict添加或者更新数据时,对应的方法是位于dict.c文件中的dictReplace() 方法,如下所示。

int dictReplace(dict *d, void *key, void *val) {
    dictEntry *entry, *existing, auxentry;

    // 如果添加成功,dictAddRaw()方法会把成功添加的dictEntry返回
    // 返回的dictEntry只设置了键,值需要在这里调用dictSetVal()方法进行设置
    entry = dictAddRaw(d,key,&existing);
    if (entry) {
        dictSetVal(d, entry, val);
        return 1;
    }

    // 执行到这里,表明在哈希表中已经存在一个dictEntry的键与当前待添加的键值对的键相等
    // 此时应该做更新值的操作,且existing此时是指向这个已经存在的dictEntry
    auxentry = *existing;
    // 更新值,即为existing指向的dictEntry设置新值
    dictSetVal(d, existing, val);
    // 释放旧值
    dictFreeVal(d, &auxentry);
    return 0;
}

dictReplace() 方法会执行键值对的添加或更新,如果哈希表中不存在dictEntry的键与待添加键值对的键相等,此时会基于待添加键值对新创建一个dictEntry并以头插法插入哈希表中,此时返回1;如果哈希表中存在dictEntry的键与待添加键值对的键相等,此时就为已经存在的dictEntry设置新值并释放旧值,然后返回0。通常要触发扩容,触发时机一般在添加键值对的时候,所以继续分析dictAddRaw() 方法,其源码实现如下所示。

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) {
    long index;
    dictEntry *entry;
    dictht *ht;

    // 判断是否在进行rehash,如果正在进行rehash,则触发渐进式rehash
    // dictIsRehashing()方法在dict.h文件中,如果dict的rehashidx不等于-1,则表明此时在进行rehash
    if (dictIsRehashing(d)) _dictRehashStep(d);

    // 获取待添加键值对在哈希表中的索引index
    // 如果哈希表已经存在dictEntry的键与待添加键值对的键相等,此时_dictKeyIndex()方法返回-1
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;
    
    // 如果在进行rehash,待添加的键值对存放到ht[1],否则存放到ht[0]
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    // 为新dictEntry开辟内存,此时dictEntry的键和值尚未设置
    entry = zmalloc(sizeof(*entry));
    // 头插的方式插入哈希表index位置的哈希桶中
    entry->next = ht->table[index];
    ht->table[index] = entry;
    // 哈希表的当前大小加1
    ht->used++;

    // 为新dictEntry设置键
    dictSetKey(d, entry, key);
    return entry;
}

dictAddRaw() 方法会首先判断当前是否处于rehash阶段(判断当前是否正在扩容),如果正在rehash,则触发一次哈希桶的迁移操作(这一点后面再详细分析),然后通过_dictKeyIndex() 方法获取待添加键值对在哈希表中的索引index,如果获取到的index为-1,表明存在dictEntry的键与待添加键值对的键相等,此时dictAddRaw() 方法返回NULL以告诉方法调用方需要执行更新操作,如果index不为-1,则基于待添加键值对创建新的dictEntry并以头插的方式插入哈希表index位置的哈希桶中,然后更新哈希表的当前大小以及为新dictEntry设置键。扩容的触发在_dictKeyIndex() 方法中,其源码实现如下所示。

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing) {
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;

    // 在_dictExpandIfNeeded()方法中判断是否需要扩容,如果需要,则执行扩容
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    for (table = 0; table <= 1; table++) {
        // 将待添加键值对的键的hash值与哈希表掩码相与以得到待添加键值对在哈希表中的索引
        idx = hash & d->ht[table].sizemask;
        // 遍历哈希表中idx索引位置的哈希桶的链表
        he = d->ht[table].table[idx];
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                // 链表中存在dictEntry的key与待添加键值对的key相等
                // 此时让existing指向这个已经存在的dictEntry,并返回-1
                // 表明存在dictEntry的key与待添加键值对的key相等且existing已经指向了这个存在的dictEntry
                if (existing) *existing = he;
                return -1;
            }
            // 继续遍历链表中的下一个节点
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    // 执行到这里,表明哈希表中不存在dictEntry的key与待添加键值对的key相等,返回索引idx
    return idx;
}

_dictKeyIndex() 方法的一开始,就会调用_dictExpandIfNeeded() 方法来判断是否需要扩容,如果需要,则会执行扩容逻辑,假如_dictExpandIfNeeded() 方法在扩容过程中出现错误,会返回状态码1,也就是DICT_ERR字段表示的值,此时_dictKeyIndex() 方法直接返回-1,如果不需要扩容或者扩容成功,则将待添加键值对的键的hash值与哈希表掩码相与得到待添加键值对在哈希表中的索引,然后遍历索引位置的哈希桶的链表,看是否能够找到一个dictEntry的键与待添加键值对的键相等,如果能够找到一个这样的dictEntry,则返回-1并让existing指向这个dictEntry,否则返回之前计算得到的索引。可知判断扩容以及执行扩容的逻辑都在_dictExpandIfNeeded() 方法中,其源码实现如下所示。

static int _dictExpandIfNeeded(dict *d) {
    // 如果已经在扩容,则返回状态码0
    if (dictIsRehashing(d)) return DICT_OK;

    // 如果哈希表的容量为0,则初始化哈希表的容量为4
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    // 如果哈希表的当前大小大于等于容量,并且dict_can_resize为1或者当前大小大于容量的五倍
    // 此时判断需要扩容,调用dictExpand()方法执行扩容逻辑,且指定扩容后的容量至少为当前大小的2倍
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio)) {
        return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

通过_dictExpandIfNeeded() 方法的源码可知,要触发扩容,首先需要满足的条件就是哈希表当前大小大于等于了哈希表的容量,然后再判断Redis当前是否允许扩容,如果允许扩容,则执行扩容逻辑,如果不允许扩容,那么再判断哈希表当前大小是否已经大于了哈希表容量的5倍,如果已经大于,则强制执行扩容逻辑。在_dictExpandIfNeeded() 方法有两个重要的参数,分别是dict_can_resizedict_force_resize_ratio,这两个参数定义在dict.c文件中,初始值如下所示。

static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;

那么现在最后还需要结合源码分析一下,什么时候会更改dict_can_resize的值,在dict.c文件中有如下两个方法,会将dict_can_resize的值设置为1或者0,如下所示。

void dictEnableResize(void) {
    dict_can_resize = 1;
}

void dictDisableResize(void) {
    dict_can_resize = 0;
}

这两个方法会被server.c文件中的updateDictResizePolicy() 方法调用,如下所示。

void updateDictResizePolicy(void) {
    // 如果有正在执行RDB或AOF持久化的子进程,hasActiveChildProcess()方法返回true
    if (!hasActiveChildProcess())
        // 没有正在执行RDB或AOF持久化的子进程时将dict_can_resize设置为1,表示允许扩容
        dictEnableResize();
    else
        // 有正在执行RDB或AOF持久化的子进程时将dict_can_resize设置为0,表示不允许扩容
        dictDisableResize();
}

至此Redis的扩容时机的源码分析就到此为止,现在进行小节:当向Redis添加或者更新数据时,会判断一下存储数据的哈希表的当前大小是否大于等于哈希表容量,如果大于,再判断Redis是否允许扩容,而决定Redis是否允许扩容的依据就是当前是否存在子线程在执行RDB或者AOF的持久化,如果存在就不允许扩容,反之则允许扩容,假如Redis不允许扩容,那么还需要判断一下是否要强制扩容,判断依据就是存储数据的哈希表的当前大小是否已经大于哈希表容量的5倍,如果大于,则强制扩容。

下面给出流程图,对上述整个触发扩容的源码流程进行示意。

从源码分析Redis的扩容机制

二. Redis的扩容步骤

当一旦需要进行扩容时,此时会使用到dict中的ht[1]Redis的扩容步骤如下所示。

  • 计算ht[1]的容量size,即扩容后的容量,ht[1] 的容量为大于等于ht[0].used * 2且同时为2的幂次方的最小值;
  • ht[1]设置sizesizemask字段的值,初始化used字段为0,并为dictEntry数组分配空间;
  • dictrehashidx字段设置为0,表示此时开启渐进式rehashRedis会通过渐进式rehash的方式逐步将ht[0]上的dictEntry迁移到**ht[1]**上;
  • ht[0] 的所有键值对全部存放到ht[1] 中后,释放ht[0] 的内存空间,然后ht[1] 变为ht[0]

特别注意,上述的步骤仅针对正常的扩容,如果是ht[0] 的初始化,则与上述的步骤稍有不同,这里不再赘述。当dict中键值对特别多时,rehash会特别耗时,所以Redis采用一种渐进式rehash的方式来完成扩容,dict中的rehashidx字段用于记录当前已经rehash到的哈希桶的索引,而渐进式rehash就是Redis不会一次性将ht[0] 上的键值对迁移到ht[1] 上,而是会在某些时间点迁移一部分,这些时间点如下所示。

  • 当对数据进行增删改查时会从ht[0] 迁移一个哈希桶到ht[1] 上;
  • Redis会定时的从ht[0] 迁移一部分哈希桶到ht[1] 上。

特别注意,如果在渐进式rehash的过程中有新的键值对添加,那么会直接添加到ht[1] 中。

下面将结合Redis源码对Redis的扩容步骤进行学习。在第一节中已知,执行扩容逻辑的方法是dict.c文件的dictExpand() 方法,其源码实现如下所示。

int dictExpand(dict *d, unsigned long size) {
    // 如果正在rehash或者ht[0]的当前大小大于了扩容后的大小的最小值
    // 此时返回状态码1,表示扩容发生异常
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;

    // n就是扩容后的哈希表
    dictht n;
    // 得到一个大于等于size的满足2的幂次方的最小值作为n的容量
    unsigned long realsize = _dictNextPower(size);

    // 如果扩容后的哈希表的容量与老哈希表容量一样
    // 此时返回状态码1,表示扩容发生异常
    if (realsize == d->ht[0].size) return DICT_ERR;

    // 为n设置容量size
    n.size = realsize;
    // 为n设置掩码sizemask
    n.sizemask = realsize-1;
    // 为n的数组分配空间
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    // 初始化n的当前大小used为0
    n.used = 0;

    // 如果是初始化哈希表,那么直接将ht[0]置为n
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }

    // 执行到这里,表明是非初始化哈希表的扩容,将ht[1]置为n
    d->ht[1] = n;
    // 将dict的rehashidx字段设置为0,表示开启渐进式rehash
    d->rehashidx = 0;
    return DICT_OK;
}

dictExpand() 方法主要完成的逻辑就是为ht[1] 设置sizesizemask字段的值,初始化used字段为0,并为dictEntry数组分配空间,最后将dictrehashidx字段设置为0以开启渐进式rehash

下面再结合源码看一下什么时候进行键值对的迁移,首先在第一节中分析dictAddRaw() 方法时有提到,dictAddRaw() 方法一开始就会判断当前是否处于rehash阶段,如果正在rehash,则触发一次哈希桶的迁移操作,这个迁移操作对应的方法是dict.c文件的_dictRehashStep() 方法,其源码实现如下。

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}

继续看dictRehash() 方法的实现。

// 参数n表示本次rehash需要迁移的哈希桶个数
int dictRehash(dict *d, int n) {
    // 允许遍历的最大空桶数
    int empty_visits = n*10;
    // 如果没有在进行渐进式rehash,则返回
    if (!dictIsRehashing(d)) return 0;

    // 在ht[0]当前大小不为0的前提下
    // 需要迁移多少个哈希桶,就循环多少次,每次循环迁移一个哈希桶
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;

        // rehashidx的值不能大于等于ht[0]的容量
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        // 如果哈希表ht[0]的rehashidx位置的哈希桶是空,则继续遍历下一个哈希桶
        // 如果遍历的空桶数达到了empty_visits,则本次rehash结束,直接返回
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        // 得到ht[0]的rehashidx位置的哈希桶
        de = d->ht[0].table[d->rehashidx];
        // 遍历并将rehashidx位置的哈希桶的链表上的节点全部迁移到ht[1]上
        while(de) {
            uint64_t h;

            nextde = de->next;
            // 将链表节点的键的hash值与ht[1]的掩码相与得到当前节点在ht[1]上的索引
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            // 使用头插法插入ht[1]
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            // ht[0]的当前大小减1
            d->ht[0].used--;
            // ht[1]的当前大小加1
            d->ht[1].used++;
            // 继续迁移链表的下一节点
            de = nextde;
        }
        // 全部迁移完成后,将ht[0]的rehashidx位置置为空
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }

    // 判断是否将ht[0]上的键值对全部迁移到了ht[1]
    if (d->ht[0].used == 0) {
        // 如果ht[0]上的键值对全部迁移到了ht[1]
        // 先释放ht[0]的数组空间
        zfree(d->ht[0].table);
        // 然后将ht[0]置为ht[1]
        d->ht[0] = d->ht[1];
        // 重置ht[1]
        // 即将ht[1]的数组置为空,容量,掩码和当前大小全部置为0
        _dictReset(&d->ht[1]);
        // 将dict的rehashidx字段设置为-1,表示rehash结束
        d->rehashidx = -1;
        return 0;
    }

    return 1;
}

dictRehash() 方法有两个参数,第一个是需要进行rehashdict,第二个是需要迁移的哈希桶的个数,可知如果是对数据的增删改查而触发的rehash,需要迁移的哈希桶的个数为1。

dictRehash() 方法一开始就定义了一个最大空桶数,其值为本次迁移数的10倍,因为在遍历哈希表时,可能会遇到很多空桶,所以为了避免遍历大量空桶而带来的时间消耗,Redis规定本次rehash迁移时,如果遇到的空桶数达到了本次需要迁移的哈希桶数的10倍,则停止迁移并返回。

dictRehash() 方法中对于每一个哈希桶的迁移,其实就是遍历这个哈希桶上的链表,将每个链表节点重新基于ht[1] 计算一个索引并迁移到ht[1] 上。

dictRehash() 方法的最后需要判断一下是否将ht[0] 上的数据全部迁移到了ht[1] 上,如果已经全部完成迁移,此时需要先释放老的ht[0] 的数组空间,然后将ht[0] 置为ht[1],接着重置ht[1] 即将其数组置为空,容量,掩码和当前大小全部置为0,最后将dictrehashidx字段设置为-1,表示rehash结束。

除了对数据的增删改查会调用到dictRehash() 方法来迁移哈希桶外,Redis也会定时的调用到dictRehash() 方法来迁移哈希桶,这个定时任务方法是server.c文件的serverCron() 方法,在该方法中会调用到server.c文件的databasesCron() 方法,该方法会处理Redis数据库中的增量执行的后台操作,这些操作中就包括渐进式rehash,所以在databasesCron() 方法中又通过调用server.c文件的incrementallyRehash() 方法来执行rehash,接着又在incrementallyRehash() 方法中调用到了dict.c文件的dictRehashMilliseconds() 方法,在dictRehashMilliseconds() 方法中就真正调用到了dictRehash() 方法来执行迁移哈希桶的逻辑,dictRehashMilliseconds() 方法的源码实现如下所示。

int dictRehashMilliseconds(dict *d, int ms) {
    long long start = timeInMilliseconds();
    int rehashes = 0;

    // 在1毫秒的时间内循环进行迁移
    // 每次循环迁移100个哈希桶
    while(dictRehash(d,100)) {
        rehashes += 100;
        if (timeInMilliseconds()-start > ms) break;
    }
    return rehashes;
}

那么至此,Redis的扩容步骤的源码就分析完毕。

总结

Redis中的数据存储在字典dict数据结构中,一个dict数据结构持有两张哈希表dictht,每张dictht中持有一个存储数据的dictEntry数组,每份数据会以键值对的形式封装成一个dictEntry节点然后添加到dictEntry数组中,当存在哈希冲突时,Redis中使用拉链法解决哈希冲突,但是dictEntry数组的默认容量为4,发生哈希冲突的概率极高,如果不进行扩容,会导致哈希表的时间复杂度恶化为O(logN),所以满足一定条件时,需要进行dictEntry数组的扩容,即进行Redis的扩容。

Redis的扩容的时机总结如下。

  • 如果没有fork子进程在执行RDB或者AOF的持久化,一旦满足ht[0].used >= ht[0].size,此时触发扩容;
  • 如果有fork子进程在执行RDB或者AOF的持久化时,则需要满足ht[0].used > 5 * ht[0].size,此时触发扩容。

Redisdict数据结构通常只会使用两张哈希表中的其中一张,,即ht[0],但是当需要进行扩容时,此时会使用到dict的另外一张哈希表ht[1]Redis的扩容步骤如下所示。

  • 计算ht[1] 的容量size,即扩容后的容量,ht[1] 的容量为大于等于ht[0].used * 2且同时为2的幂次方的最小值;
  • ht[1] 设置sizesizemask字段的值,初始化used字段为0,并为dictEntry数组分配空间;
  • dictrehashidx字段设置为0,表示此时开启渐进式rehashRedis会通过渐进式rehash的方式逐步将ht[0] 上的dictEntry以哈希桶为单位逐次迁移到ht[1] 上;
  • ht[0] 的所有键值对全部存放到ht[1] 中后,释放ht[0] 的内存空间,然后ht[1] 变为ht[0]

dict中键值对特别多时,rehash会特别耗时,所以Redis采用一种渐进式rehash的方式来完成扩容,dict中的rehashidx字段用于记录当前已经rehash到的哈希桶的索引,而渐进式rehash就是Redis不会一次性将ht[0] 上的键值对迁移到ht[1] 上,而是会在某些时间点迁移一部分,这些时间点如下所示。

  • 当对数据进行增删改查时会从ht[0] 迁移一个哈希桶到ht[1] 上;
  • Redis会定时的从ht[0] 迁移一部分哈希桶到ht[1] 上。

特别注意,如果在渐进式rehash的过程中有新的键值对添加,那么会直接添加到ht[1] 中。