Redis 源码分析字典(dict)
「这是我参与2022首次更文挑战的第31天,活动详情查看:2022首次更文挑战」。
字典(dict)简介
字典,又称为符号表(symbol table)、关联数组(associative array )或映射(map), 是一种用于保存键值对(key-value pair)的抽象数据结构。
字典通常作为一种数据结构类型很多高级语言中实现,但是 redis 使用的 c 语言并没有这种数据结构,因此 redis 构建了自己的字典实现。
哈希表节点 dict.h/dictEntry
typedef struct dictEntry {
// 键
void *key;
// 值
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下一个 hash 节点
struct dictEntry *next; /* Next entry in the same hash bucket. */
void *metadata[]; /* An arbitrary number of bytes (starting at a
* pointer-aligned address) of size as returned
* by dictType's dictEntryMetadataBytes(). */
} dictEntry;
// dict 相关的操作函数,以函数指针的方式存在。
typedef struct dictType {
// 计算哈希值函数
unsigned int (*hashFunction)(const void *key);
// 复制键的函数
void *(*keyDup)(void *privdata, const void *key);
// 复制值的函数
void *(*valDup)(void *privdata, const void *obj);
// 对比的函数
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
// 销毁键的函数
void (*keyDestructor)(void *privdata, void *key);
// 销毁值的函数
void (*valDestructor)(void *privdata, void *obj);
} dictType;
字典结构由 dict.h/dict 结构表示
// 字典
typedef struct dict {
dictEntry **table;
//类型特定函数
dictType *type;
unsigned long size; //hash 表的大小
unsigned long sizemask; // mask 计算索引值
unsigned long used; // 表示已经使用的个数
void *privdata;
} dict;
typedef struct dictIterator {
dict *ht;
int index;
dictEntry *entry, *nextEntry;
} dictIterator;
梳理一下结构, 普通状态下(没有 rehash)的字典。
重点解释
- dict 中的 size 每次都是 <= 2^s 进行分配
/* Our hash table capability is a power of two */
static unsigned long _dictNextPower(unsigned long size) {
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX;
while(1) {
if (i >= size)
return i;
i *= 2;
}
- dict 中的 rehashidex = -1|# 标记是否进行 rehash, 默认 -1 表示未进行, # 表示当前 dt[0] 中第 # 个 tb 实例
等于 -1:
#define dictIsRehashing(d) ((d)->rehashidx != -1)
不等于 -1:
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht_used[0] != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
while(d->ht_table[0][d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht_table[0][d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
de->next = d->ht_table[1][h];
d->ht_table[1][h] = de;
d->ht_used[0]--;
d->ht_used[1]++;
de = nextde;
}
d->ht_table[0][d->rehashidx] = NULL;
d->rehashidx++;
}
-
dict 中的 sizemark 用于计算当前 key 位于那个 dictEntry, 其大小等于 size -1.
idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
使用 & 运算比 % 性能更高。 -
dict 第一次是初始化只会启用 ht_table[0], ht_table[1] 在这个那个 dict 充当临时存储的作用
哈希算法
- 哈希算法
在 redis 中 hash 算法默认使用的是 siphash 算法(虽然也可以自定义)。计算哈希值和索引值的方法如下:
1、使用字典设置的哈希函数,计算键 key 的哈希值
hash = dict -> type -> hashFunction(Key);
2、使用哈希表的 sizemake 属性和第一步得到哈希值,计算索引值
#define DICTHT_SIZE_MASK(exp) ((exp) == -1 ? 0 : (DICTHT_SIZE(exp))-1)
index = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
- 哈希冲突
redis 解决哈希冲突的方法是链地址法,而且采用的是头插式
- 哈希拓容
采用链地址法来解决哈希冲突,如果数据量太大,大量的冲突会导致某个桶上的链表非常长,不利于数据查询,因此需要更具负载因子(负载因子用来评估键冲突的概率)来判断是否需要进行拓容量,就执行函数 _dictExpandIfNeeded。 其中 resize 还与是否进行持久化有关。
void updateDictResizePolicy(void) {
if (!hasActiveChildProcess())
dictEnableResize();
else
dictDisableResize();
}
_dictExpandIfNeeded 函数
static int _dictExpandIfNeeded(dict *d)
{
// 如果 rehash 在进行就不需要拓容
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
//如果 ht_size_exp[0] 为空,那么就拓展为默认大小
/* If the hash table is empty expand it to the initial size. */
if (DICTHT_SIZE(d->ht_size_exp[0]) == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht_used[0] >= DICTHT_SIZE(d->ht_size_exp[0]) &&
(dict_can_resize ||
d->ht_used[0]/ DICTHT_SIZE(d->ht_size_exp[0]) > dict_force_resize_ratio) &&
dictTypeExpandAllowed(d))
{
return dictExpand(d, d->ht_used[0] + 1);
}
return DICT_OK;
}
思考一下:为什么持久化下,尽量减少 resize?
- 缩容
Redis 中的 reszie 的条件是由两处决定 htNeedsResize 和 dictResize
- 如果了初始值填充小于 10% , 这个说明需要缩容
#define DICT_HT_INITIAL_EXP 2
#define DICT_HT_INITIAL_SIZE (1<<(DICT_HT_INITIAL_EXP))
/* Hash table parameters */
#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
发生的时机主要是在每次数据移除 serverCron 定时任务中
serverCron 定时任务:
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
rehash 优化
rehash 过程,ht_table[0] 中会存在空节点,为了防止阻塞,每次限定如果连续访问 10 * N (N 表示步长)个空字节后,直接返回。见 dictRehash
函数
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht_used[0] != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(DICTHT_SIZE(d->ht_size_exp[0]) > (unsigned long)d->rehashidx);
while(d->ht_table[0][d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht_table[0][d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & DICTHT_SIZE_MASK(d->ht_size_exp[1]);
de->next = d->ht_table[1][h];
d->ht_table[1][h] = de;
d->ht_used[0]--;
d->ht_used[1]++;
de = nextde;
}
d->ht_table[0][d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht_used[0] == 0) {
zfree(d->ht_table[0]);
/* Copy the new ht onto the old one */
d->ht_table[0] = d->ht_table[1];
d->ht_used[0] = d->ht_used[1];
d->ht_size_exp[0] = d->ht_size_exp[1];
_dictReset(d, 1);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
渐进式 hash , 是指 rehash 操作不是一次性、集中式完成的。
第一类:正在操作上面的 dictRehash 函数。但是对于 redis. 来说如果 hash 表的 key 太多,这样可能导致 rehash 操作需要很长的时间进行,阻塞服务器,所以 redis 本身将 rehash 操作分散在了后续的每次增删改查(以桶为单位)。
第二类: 针对一类渐进式 rehash , 存在一个问题;如果 a 服务器长期处理空闲状态,导致哈希表长期使用 0 和 1 号两个表。为了解决这个问题,在 serverCron 定时函数中,每次拿出 1ms 时间来执行 rehash 操作,每次步长为 100 , 但是需要开启 activerehashing 。 见 databaseCron 函数。
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
* as will cause a lot of copy-on-write of memory pages. */
if (!hasActiveChildProcess()) {
/* We use global counters so if we stop the computation at a given
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
resize_db++;
}
/* Rehash */
if (server.activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= server.dbnum;
}
}
}
}
}
这里需要注意的是, 在定时任务中不能话费太长时间,防止其阻塞其他操作
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
rehash 注意点
- rehash 过程中 dictAdd , 只插入 ht_table[1] 中,确保 ht_table[0]. 只减不增。
entry->next = d->ht_table[htidx][index];
d->ht_table[htidx][index] = entry;
d->ht_used[htidx]++;
-
rehash 过程 dictFind 和 dictDelete , 需要涉及到 0 和 1 两个表。
-
字典默认的 hash 算法是 siphash
/* The default hashing function uses SipHash implementation
* in siphash.c. */
uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k);
- redis 在持久化时,服务器这习惯呢拓展操作所3需要的负载因子并不相同,默认为 5 s.
rehash 后数据结构如下所示:
迭代器
redis 的 dict 迭代器分为两种类型,安全迭代器 (safe = 1) 和非安全迭代器(safe = 0)。
安全模式下,支持边遍历边修改(字典 key 的过期判断),支持字典的添加,删除,查找,但是不支持 rehash 操作(避免重复操作)
void keysCommand(client *c) {
dictIterator *di;
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c);
di = dictGetSafeIterator(c->db->dict);
allkeys = (pattern[0] == '*' && plen == 1);
while((de = dictNext(di)) != NULL) {
sds key = dictGetKey(de);
robj *keyobj;
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
keyobj = createStringObject(key,sdslen(key));
if (!keyIsExpired(c->db,keyobj)) {
addReplyBulk(c,keyobj);
numkeys++;
}
decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
setDeferredArrayLen(c,replylen,numkeys);
}
非安全模式下,只支持只读操作,使用字典的删除,添加,查找等方法可能造成不可预期的问题,如重复遍历元素或者漏掉元素,但支持 rehash 操作。
if (iter->safe)
iter->d->iterators++;
else
iter->fingerprint- = dictFingerprint(iter->d);
static void _dictRehashStep(dict *d) {
if (d->pauserehash == 0) dictRehash(d,1);
}
迭代器选择
1、为了避免元素的重复遍历,必须使用得带起的安全模式如 bgaofwrite 以及 bgsave 操作
2、遍历过程需要处理元素,此时也必须使用完全迭代器,比如 keys 命令。
3、允许便利过程中出现个别元素重复时,选择非安全迭代器,比如 scan.
转载自:https://juejin.cn/post/7065498940883337253