likes
comments
collection
share

Redis 源码分析字典(dict)

作者站长头像
站长
· 阅读数 23

「这是我参与2022首次更文挑战的第31天,活动详情查看:2022首次更文挑战」。

字典(dict)简介

字典,又称为符号表(symbol table)、关联数组(associative array )或映射(map), 是一种用于保存键值对(key-value pair)的抽象数据结构。

字典通常作为一种数据结构类型很多高级语言中实现,但是 redis 使用的 c 语言并没有这种数据结构,因此 redis 构建了自己的字典实现。

哈希表节点 dict.h/dictEntry

typedef struct dictEntry {
    // 键
    void *key;
    // 值
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下一个 hash 节点
    struct dictEntry *next;     /* Next entry in the same hash bucket. */
    void *metadata[];           /* An arbitrary number of bytes (starting at a
                                 * pointer-aligned address) of size as returned
                                 * by dictType's dictEntryMetadataBytes(). */
} dictEntry;


// dict 相关的操作函数,以函数指针的方式存在。
typedef struct dictType {
    // 计算哈希值函数
    unsigned int (*hashFunction)(const void *key);
    // 复制键的函数
    void *(*keyDup)(void *privdata, const void *key);
    // 复制值的函数
    void *(*valDup)(void *privdata, const void *obj);
    // 对比的函数
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    // 销毁键的函数
    void (*keyDestructor)(void *privdata, void *key);
    // 销毁值的函数
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

字典结构由 dict.h/dict 结构表示

// 字典
typedef struct dict {
    dictEntry **table;
    //类型特定函数
    dictType *type;
    unsigned long size; //hash 表的大小
    unsigned long sizemask; // mask 计算索引值
    unsigned long used; // 表示已经使用的个数
    void *privdata;
} dict;

typedef struct dictIterator {
    dict *ht;
    int index;
    dictEntry *entry, *nextEntry;
} dictIterator;

梳理一下结构, 普通状态下(没有 rehash)的字典。

Redis 源码分析字典(dict)

重点解释

  • dict 中的 size 每次都是 <= 2^s 进行分配
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size) {
    unsigned long i = DICT_HT_INITIAL_SIZE;

    if (size >= LONG_MAX) return LONG_MAX;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }

  • dict 中的 rehashidex = -1|# 标记是否进行 rehash, 默认 -1 表示未进行, # 表示当前 dt[0] 中第 # 个 tb 实例

等于 -1:

#define dictIsRehashing(d) ((d)->rehashidx != -1)

不等于 -1:

int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht_used[0] != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
        while(d->ht_table[0][d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht_table[0][d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
            de->next = d->ht_table[1][h];
            d->ht_table[1][h] = de;
            d->ht_used[0]--;
            d->ht_used[1]++;
            de = nextde;
        }
        d->ht_table[0][d->rehashidx] = NULL;
        d->rehashidx++;
    }
  • dict 中的 sizemark 用于计算当前 key 位于那个 dictEntry, 其大小等于 size -1. idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]); 使用 & 运算比 % 性能更高。

  • dict 第一次是初始化只会启用 ht_table[0], ht_table[1] 在这个那个 dict 充当临时存储的作用

哈希算法

  • 哈希算法

在 redis 中 hash 算法默认使用的是 siphash 算法(虽然也可以自定义)。计算哈希值和索引值的方法如下:

1、使用字典设置的哈希函数,计算键 key 的哈希值

hash = dict -> type -> hashFunction(Key);

2、使用哈希表的 sizemake 属性和第一步得到哈希值,计算索引值

#define DICTHT_SIZE_MASK(exp) ((exp) == -1 ? 0 : (DICTHT_SIZE(exp))-1)
index = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
  • 哈希冲突

redis 解决哈希冲突的方法是链地址法,而且采用的是头插式

  • 哈希拓容

采用链地址法来解决哈希冲突,如果数据量太大,大量的冲突会导致某个桶上的链表非常长,不利于数据查询,因此需要更具负载因子(负载因子用来评估键冲突的概率)来判断是否需要进行拓容量,就执行函数 _dictExpandIfNeeded。 其中 resize 还与是否进行持久化有关。

void updateDictResizePolicy(void) {
    if (!hasActiveChildProcess())
        dictEnableResize();
    else
        dictDisableResize();
}

_dictExpandIfNeeded 函数

static int _dictExpandIfNeeded(dict *d)
{
    // 如果 rehash 在进行就不需要拓容
    /* Incremental rehashing already in progress. Return. */
    if (dictIsRehashing(d)) return DICT_OK;

    //如果 ht_size_exp[0] 为空,那么就拓展为默认大小
    /* If the hash table is empty expand it to the initial size. */
    if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    if (d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0]) &&
        (dict_can_resize ||
         d->ht_used[0]/ DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio) &&
        dictTypeExpandAllowed(d))
    {
        return dictExpand(d, d->ht_used[0] + 1);
    }
    return DICT_OK;
}

思考一下:为什么持久化下,尽量减少 resize?

  • 缩容

Redis 中的 reszie 的条件是由两处决定 htNeedsResize 和 dictResize

  1. 如果了初始值填充小于 10% , 这个说明需要缩容
#define DICT_HT_INITIAL_EXP      2
#define DICT_HT_INITIAL_SIZE     (1<<(DICT_HT_INITIAL_EXP))

/* Hash table parameters */
#define HASHTABLE_MIN_FILL        10      /* Minimal hash table fill 10% */

int htNeedsResize(dict *dict) {
    long long size, used;

    size = dictSlots(dict);
    used = dictSize(dict);
    return (size > DICT_HT_INITIAL_SIZE &&
            (used*100/size < HASHTABLE_MIN_FILL));
}

发生的时机主要是在每次数据移除 serverCron 定时任务中

Redis 源码分析字典(dict)

serverCron 定时任务:

void databasesCron(void) {
    /* Expire keys by random sampling. Not required for slaves
     * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }

    /* Defrag keys gradually. */
    activeDefragCycle();

    /* Perform hash tables rehashing if needed, but only if there are no
     * other processes saving the DB on disk. Otherwise rehashing is bad
     * as will cause a lot of copy-on-write of memory pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the computation at a given
         * DB we'll be able to start from the successive in the next
         * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;

        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }

        /* Rehash */
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}

rehash 优化

rehash 过程,ht_table[0] 中会存在空节点,为了防止阻塞,每次限定如果连续访问 10 * N (N 表示步长)个空字节后,直接返回。见 dictRehash 函数

int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;

    while(n-- && d->ht_used[0] != 0) {
        dictEntry *de, *nextde;

        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
        while(d->ht_table[0][d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        de = d->ht_table[0][d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;

            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
            de->next = d->ht_table[1][h];
            d->ht_table[1][h] = de;
            d->ht_used[0]--;
            d->ht_used[1]++;
            de = nextde;
        }
        d->ht_table[0][d->rehashidx] = NULL;
        d->rehashidx++;
    }

    /* Check if we already rehashed the whole table... */
    if (d->ht_used[0] == 0) {
        zfree(d->ht_table[0]);
        /* Copy the new ht onto the old one */
        d->ht_table[0] = d->ht_table[1];
        d->ht_used[0] = d->ht_used[1];
        d->ht_size_exp[0] = d->ht_size_exp[1];
        _dictReset(d, 1);
        d->rehashidx = -1;
        return 0;
    }

    /* More to rehash... */
    return 1;
}

渐进式 hash , 是指 rehash 操作不是一次性、集中式完成的。

第一类:正在操作上面的 dictRehash 函数。但是对于 redis. 来说如果 hash 表的 key 太多,这样可能导致 rehash 操作需要很长的时间进行,阻塞服务器,所以 redis 本身将 rehash 操作分散在了后续的每次增删改查(以桶为单位)。

第二类: 针对一类渐进式 rehash , 存在一个问题;如果 a 服务器长期处理空闲状态,导致哈希表长期使用 0 和 1 号两个表。为了解决这个问题,在 serverCron 定时函数中,每次拿出 1ms 时间来执行 rehash 操作,每次步长为 100 , 但是需要开启 activerehashing 。 见 databaseCron 函数。

void databasesCron(void) {
    /* Expire keys by random sampling. Not required for slaves
     * as master will synthesize DELs for us. */
    if (server.active_expire_enabled) {
        if (iAmMaster()) {
            activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
        } else {
            expireSlaveKeys();
        }
    }

    /* Defrag keys gradually. */
    activeDefragCycle();

    /* Perform hash tables rehashing if needed, but only if there are no
     * other processes saving the DB on disk. Otherwise rehashing is bad
     * as will cause a lot of copy-on-write of memory pages. */
    if (!hasActiveChildProcess()) {
        /* We use global counters so if we stop the computation at a given
         * DB we'll be able to start from the successive in the next
         * cron loop iteration. */
        static unsigned int resize_db = 0;
        static unsigned int rehash_db = 0;
        int dbs_per_call = CRON_DBS_PER_CALL;
        int j;

        /* Don't test more DBs than we have. */
        if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;

        /* Resize */
        for (j = 0; j < dbs_per_call; j++) {
            tryResizeHashTables(resize_db % server.dbnum);
            resize_db++;
        }

        /* Rehash */
        if (server.activerehashing) {
            for (j = 0; j < dbs_per_call; j++) {
                int work_done = incrementallyRehash(rehash_db);
                if (work_done) {
                    /* If the function did some work, stop here, we'll do
                     * more at the next cron loop. */
                    break;
                } else {
                    /* If this db didn't need rehash, we'll try the next one. */
                    rehash_db++;
                    rehash_db %= server.dbnum;
                }
            }
        }
    }
}

这里需要注意的是, 在定时任务中不能话费太长时间,防止其阻塞其他操作

int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    }
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
        return 1; /* already used our millisecond for this loop... */
    }
    return 0;
}

rehash 注意点

  1. rehash 过程中 dictAdd , 只插入 ht_table[1] 中,确保 ht_table[0]. 只减不增。
    entry->next = d->ht_table[htidx][index];
    d->ht_table[htidx][index] = entry;
    d->ht_used[htidx]++;
  1. rehash 过程 dictFind 和 dictDelete , 需要涉及到 0 和 1 两个表。

  2. 字典默认的 hash 算法是 siphash

/* The default hashing function uses SipHash implementation
 * in siphash.c. */

uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k);
  1. redis 在持久化时,服务器这习惯呢拓展操作所3需要的负载因子并不相同,默认为 5 s.

rehash 后数据结构如下所示:

Redis 源码分析字典(dict)

迭代器

redis 的 dict 迭代器分为两种类型,安全迭代器 (safe = 1) 和非安全迭代器(safe = 0)。

安全模式下,支持边遍历边修改(字典 key 的过期判断),支持字典的添加,删除,查找,但是不支持 rehash 操作(避免重复操作)

void keysCommand(client *c) {
    dictIterator *di;
    dictEntry *de;
    sds pattern = c->argv[1]->ptr;
    int plen = sdslen(pattern), allkeys;
    unsigned long numkeys = 0;
    void *replylen = addReplyDeferredLen(c);

    di = dictGetSafeIterator(c->db->dict);
    allkeys = (pattern[0] == '*' && plen == 1);
    while((de = dictNext(di)) != NULL) {
        sds key = dictGetKey(de);
        robj *keyobj;

        if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
            keyobj = createStringObject(key,sdslen(key));
            if (!keyIsExpired(c->db,keyobj)) {
                addReplyBulk(c,keyobj);
                numkeys++;
            }
            decrRefCount(keyobj);
        }
    }
    dictReleaseIterator(di);
    setDeferredArrayLen(c,replylen,numkeys);
}

非安全模式下,只支持只读操作,使用字典的删除,添加,查找等方法可能造成不可预期的问题,如重复遍历元素或者漏掉元素,但支持 rehash 操作。

if (iter->safe)
  iter->d->iterators++;
else 
  iter->fingerprint- = dictFingerprint(iter->d);

static void _dictRehashStep(dict *d) {
    if (d->pauserehash == 0) dictRehash(d,1);
}

迭代器选择

1、为了避免元素的重复遍历,必须使用得带起的安全模式如 bgaofwrite 以及 bgsave 操作

2、遍历过程需要处理元素,此时也必须使用完全迭代器,比如 keys 命令。

3、允许便利过程中出现个别元素重复时,选择非安全迭代器,比如 scan.

转载自:https://juejin.cn/post/7065498940883337253
评论
请登录