Redis 8.0 前瞻——BIO 要被换掉了?
前言
好久没逛 Redis 社区了,回去看了看 PR 列表,结果发现了一个不得了的东西!(PR 链接)
PR 的作者昵称是 “JimB123”,是 AWS(亚马逊云) 的大佬。 这个 PR 标题意思是使用 Background Job Manager,后台任务管理器,简称 BJM,用于替代 Redis 原本的 bio 。
这份 PR 新增了 5 个文件:
- bjm.h、bjm.c: 后台任务管理器
- fifo.c、fifo.h:先进先出队列
- lazyfree.h:惰性删除功能的头文件(原来只有 lazyfree.c)
除此之外,还删除了 bio.c 和 bio.h 两个文件,有多个文件均被改动。
这份 PR 虽然还没有被正式合并,但 Redis 核心成员已经表示愿意推进它,他们打算发布下一个稳定版的 7.2 后,在 8.0 中考虑合并。
简述 Background Job Manager
首先翻译一下作者原话,作者 PR 的动机如下:
- BIO(后台 I/O)用词不当。我猜这最初是一种在 AOF 文件上执行异步刷新的机制,但它后来演变成一种执行其他类型的后台活动的机制,比如惰性删除(lazyfree),这与 I/O 无关。
- BIO 没有很好的分层。BIO 应该是用于处理异步后台任务的低级通用程序。然而,此通用程序的设计要求“低级”代码具有“高级”应用程序的特定知识。
- BIO 不是很模块化。对于每种新类型的后台任务,必须更改 BIO 代码。为每种类型的任务创建一个新线程。为每个新任务添加新的定义和逻辑。
- 提升性能。BIO 创建了多个线程,但没有有效地使用它们。每种类型的任务都有一个唯一的线程。在一连串的惰性驱逐期间(内存淘汰),只有对应的 lazyfree 线程处于活动状态,而其他 BIO 线程处于空闲状态。而且在惰性 flush 期间,两个字典(主要和过期)按顺序处理而不是并行处理。
- 减少内存使用。BIO 使用 list(链表结构)来维护作业队列。如果有大量作业,新的 Fifo 将使用更少的内存(并且性能更高)。
作者 PR 的内容主要如下:
- Fifo - 提供一个简单的 FIFO 队列,它比使用 list (链表)的空间效率高 50% 以上,速度快 50% 以上。它作为一个独立的、模块化的、可重用的通用数据结构提供。
- BJM - 提供了一个简单的即发即弃接口,让后台线程完成一些事情。它维护 固定/可配置 数量的线程(而不是每个“任务类型”一个)。这可以防止有太多活动的后台线程,还可以更快地处理大量相同作业类型的任务。
接下来,我们看一下目前完成的 bjm 部分代码,但是要注意最终合并时的代码可能会发生变化。
bjm.c 变量,结构体定义:
static const unsigned BJM_THREAD_STACK_SIZE = 4 * 1024 * 1024; // 后台线程栈大小
static const int INITIAL_FUNCTION_CAPACITY = 8; // functions 和 jobsByFunc 数组初始容量
static int functionsCount;
static int functionsCapacity;
static bjmJobFunc *functions; // 任务回调函数指针数组
// A FIFO queue with a mutex to protect access
// 带互斥锁的 Fifo 队列
typedef struct {
pthread_mutex_t mutex;
Fifo *fifo;
} MutexFifo;
// A Joblist contains a specific function to be executed with a list of privdata
// 任务队列,包含一个特定的函数,该函数将通过一个私有数据队列来执行
typedef struct {
bjmJobFunc func; // 任务回调函数
MutexFifo jobs; // 任务私有数据队列,私有数据将作为参数提供给回调函数
redisAtomic long job_count; // 任务数量,包括正在处理的任务,所以可能大于 jobs 长度
} Joblist;
// This arrays hold a Joblist* for each known callback function.
// 以回调函数作索引的 Joblist* 数组
static Joblist **jobsByFunc; // Array indexed by index in functions[]
// This FIFO queue hold Joblists from the array above. Each time one of those
// Joblists becomes non-empty, it gets added to the active queue.
// 活跃任务队列。每当一个任务队列变为非空时,它就会被添加到活动任务队列中。
static MutexFifo activeJoblists;
static redisAtomic long queuedJobCount;
static redisAtomic long processedJobCount;
static pthread_cond_t wakeup_cond; // Triggered when jobs are submitted
static int threadCount = 0;
static pthread_t *threads; // Array of threads
bjm.c 三个主要的底层函数:
// Find the function's index. Adds the function if it's a new one.
// 查找 func 在 functions 数组中的下标,如果 func 不存在则将它加入到数组尾部
static int getFuncIdx(bjmJobFunc func) {
// It's expected that the function count is small, probably spanning only 1 or 2 cache lines.
// A simple linear search will be faster than a complex structure like hash.
for (int i = 0; i < functionsCount; i++) {
if (functions[i] == func) return i;
}
// At this point, we know that the function isn't in the list. Insert at end.
if (functionsCount == functionsCapacity) increaseFunctionCapacity();
int idx = functionsCount++;
functions[idx] = func;
jobsByFunc[idx] = zmalloc(sizeof(Joblist));
mutexFifoInit(&jobsByFunc[idx]->jobs);
jobsByFunc[idx]->func = func;
atomicSet(jobsByFunc[idx]->job_count, 0);
return idx;
}
/* Pull one job from the active joblists. Synchronously waits for a job if none available.
* privdata_ptr - returns the caller supplied privdata.
* joblist_ptr - returns the joblist that the job was taken from. This is needed by the caller
* in order to (later) decrement the job_count.
* Returns:
* Returns the bjmJobFunc to be called.
*/
// 从活跃任务队列中获取一个任务的私有数据和回调函数,如果没有可用的任务,则同步等待。
static bjmJobFunc waitForJob(void **privdata_ptr, Joblist **joblist_ptr) {
bjmJobFunc func = NULL;
mutexFifoLock(&activeJoblists);
while (fifoLength(activeJoblists.fifo) == 0) {
pthread_cond_wait(&wakeup_cond, &activeJoblists.mutex);
}
Joblist *joblist = fifoPeek(activeJoblists.fifo);
func = joblist->func;
*joblist_ptr = joblist;
mutexFifoLock(&joblist->jobs);
*privdata_ptr = fifoPop(joblist->jobs.fifo);
if (fifoLength(joblist->jobs.fifo) == 0) {
// No jobs left for this function
fifoPop(activeJoblists.fifo);
} else if (fifoLength(activeJoblists.fifo) > 1) {
// Rotate the joblist for this function to the end
fifoPop(activeJoblists.fifo);
fifoPush(activeJoblists.fifo, joblist);
}
// Keep the lock on the individual joblist until it is properly handled in
// the activeJobLists. Can't have the size changing.
mutexFifoUnlock(&joblist->jobs);
mutexFifoUnlock(&activeJoblists);
return func;
}
// 创建负责获取和执行任务的后台线程
static void *pthreadFunction(void *arg) {
int threadNum = (intptr_t)arg;
const int MAX_THREAD_NAME = 16;
char thread_name[MAX_THREAD_NAME];
snprintf(thread_name, MAX_THREAD_NAME, "bjm thread %d", threadNum);
redis_set_thread_title(thread_name);
redisSetCpuAffinity(server.bio_cpulist);
makeThreadKillable();
/* Block SIGALRM so only the main thread will receive the watchdog signal. */
sigset_t sigset;
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) {
serverLog(LL_WARNING, "Warning: can't mask SIGALRM in BJM thread: %s", strerror(errno));
}
while (1) {
void *privdata;
Joblist *joblist;
bjmJobFunc func = waitForJob(&privdata, &joblist);
func(privdata); // Execute the callback
atomicDecr(joblist->job_count, 1); // Decrement count AFTER callback finishes
atomicDecr(queuedJobCount, 1);
atomicIncr(processedJobCount, 1);
}
return NULL;
}
bjm 提供的 API:
/* Initialize BJM with the requested number of background threads.
*/
// 初始化后台任务管理器,可提供线程数
void bjmInit(int numThreads);
/* Register a job function which can process background jobs. A handle is returned for submitting
* jobs & gathering metrics. This function is idempotent - submitting the same function again will
* return the same handle. Handle values will be > 0, so this pattern can be used:
*
* static bjmJobFuncHandle myHandle;
* if (!myHandle) myHandle = bjmRegisterJobFunc(myFunc);
* bjmSubmitJob(myHandle, ...);
*
* This co-locates a static variable at the point of job submission, and avoids repeated
* registration calls.
*/
// 注册后台任务回调函数
bjmJobFuncHandle bjmRegisterJobFunc(bjmJobFunc func);
/* Submit a job to BJM. The provided function will be executed on a background thread. privdata
* will be provided as a parameter to the provided function. For fairness, jobs with different
* callback functions will be executed in round-robin fashion. Since jobs are executed across
* multiple threads, there is no guarantee as to ordering or exclusion between jobs.
*/
// 给后台任务管理器提交一个任务
void bjmSubmitJob(bjmJobFuncHandle funcHandle, void *privdata);
/* Kill all threads in an unclean way. Non-recoverable.
* Only used during collection of debug information.
*/
// 杀死所有后台线程,只在收集 debug 信息时使用
void bjmKillThreads(void);
/* Count the number of pending/active jobs for the given job function.
* Note that this value is highly volatile as background threads are processing the jobs.
*/
// 获取某个回调函数所对应的后台任务数量
long bjmPendingJobsOfType(bjmJobFuncHandle funcHandle);
/* Provide metrics data for INFO
*/
// 提供 BJM 相关信息,目前有后台线程数,回调函数数量,排队任务数,正在执行任务数。
sds bjmCatInfo(sds info);
接下来,我们再看看作者设计的 fifo 队列,以下是作者所编写的注释:
/* The FifoBlock contains up to 7 items (pointers). When compared with adlist, this results in
* roughly 60% memory reduction and 7x fewer memory allocations. Memory reduction is guaranteed
* with 5+ items in queue.
*
* In each block, there are 7 slots for item pointers (pointers to the caller's FIFO item).
* We need to keep track of the first & last slot used. Contextually, we will only need
* a single index - either the first slot used or the last slot used. Based on context,
* we can determine what is needed.
*
* Blocks are linked together in a chain. If the list is empty, there are no blocks.
* For non-empty lists, we will either have a single block OR a chain of blocks.
*
* For a SINGLE BLOCK containing (for example) 4 items, the layout looks like this:
* +--------+--------+--------+--------+--------+--------+--------+--------+
* SINGLE BLOCK: | slot 0 | slot 1 | slot 2 | slot 3 | slot 4 | slot 5 | slot 6 | next/ |
* | item | item | item | item | - | - | - | lastIdx|
* +--------+--------+--------+--------+--------+--------+--------+--------+
* ^
* lastIdx (3)
*
* In single blocks, the items are always shifted so that the first item is in slot 0.
* We need to keep track of the lastIdx so that we will know where to push the next item.
* The last index is stored in the final 3 bits of the (unused) next pointer
*
* When MULTIPLE BLOCKS are chained together, items will be popped from the first block, and
* pushed onto the last block. All blocks in the middle are full(*). In the first block, we keep
* the firstIdx (so we know where to pop) ... on the last block, we keep lastIdx (so we know
* where to push).
*
* (*) While blocks in the middle of a chain are generally full, the Fifo supports O(1) joining of
* two lists. In this case, a block at the join point may not be full. In this case, it will
* look like the FIRST BLOCK below, with the first index stored in the indexing bits.
*
* Example FIRST BLOCK with 2 items remaining:
* +--------+--------+--------+--------+--------+--------+--------+--------+
* FIRST BLOCK: | slot 0 | slot 1 | slot 2 | slot 3 | slot 4 | slot 5 | slot 6 | next/ |
* | - | - | - | - | - | item | item |firstIdx|
* +--------+--------+--------+--------+--------+--------+--------+--------+
* ^
* firstIdx (5)
* Example LAST BLOCK with 3 items pushed so far:
* +--------+--------+--------+--------+--------+--------+--------+--------+
* LAST BLOCK: | slot 0 | slot 1 | slot 2 | slot 3 | slot 4 | slot 5 | slot 6 | next/ |
* | item | item | item | - | - | - | - | lastIdx|
* +--------+--------+--------+--------+--------+--------+--------+--------+
* ^
* lastIdx (2)
*/
typedef struct FifoBlock {
void *items[ITEMS_PER_BLOCK];
union {
/* The last 3 bits of a pointer to a block allocated by malloc must always be zero as a
* minimum of 8-byte alignment is required for all such blocks. These bits are used as
* an index into the block indicating the first or last item in the block, depending on
* context.
*
* This UNION overlays a pointer with an integral value. This allows us to look at the
* pointer OR the integer without casting - but they use the same memory.
*
* If there is MORE THAN ONE block in the chain, the first block has a pointer/index that
* looks like this. However, if there is only a single block, it looks like the LAST block.
* +-----------------------------------------------------------+
* | next pointer | firstIdx |
* | (61 bits) | (3 bits) |
* +-----------------------------------------------------------+
* * The next pointer is only valid after zeroing out the last 3 bits.
* * "lastIdx" is implied to be 6 (because there are additional blocks).
* * "firstIdx" represents the first filled index (0..6). POP occurs here.
*
* Any blocks in the middle of the chain have a regular pointer like this:
* +-----------------------------------------------------------+
* | next pointer | 0* |
* | (61 bits) | (3 bits) |
* +-----------------------------------------------------------+
* * The next pointer is valid as-is
* * "lastIdx" is implied to be 6 in all middle blocks.
* * "firstIdx" is implied to be 0 in all middle blocks.
* * NOTE: In middle blocks, the index bits(0) are really still the firstIdx value.
* When Fifo's are joined, the O(1) operation may result in a partially
* full middle block. In this case, the items are "right-justified" and
* firstIdx indicates where the items start.
*
* The last (or only) block in the chain contains only the lastIndex, the pointer is unused.
* +-----------------------------------------------------------+
* | 0 | lastIdx |
* | (61 bits) | (3 bits) |
* +-----------------------------------------------------------+
* * The next pointer is unused and guaranteed NULL.
* * "lastIdx" represents the last filled index (0..6).
* * "firstIdx" is implied to be zero on the last (or only) block.
*/
uintptr_t last_or_first_idx;
struct FifoBlock *next;
} u;
} FifoBlock;
struct Fifo {
long length; // Total number of items in queue
FifoBlock *first;
FifoBlock *last;
};
作者的 fifo 队列是按 block(块)为单位存储的,每个 block 通过指针连接形成一个单链表,目前每个 block 可以容纳 7 个元素,作者认为与原本的 adlist(双端列表)相比,大约能减少 60% 的内存占用。
Fifo 结构体维护了元素数量,头节点(用于 peek,pop 操作)和尾节点(用于 push 操作)指针。
FifoBlock 结构体维护了元素数组,末尾或首个元素索引,还有下一个 block 的指针。这个 last_or_first_idx 在当前 block 是头节点时,它是 first idx,即队列的第一个元素索引,pop 或者 peek 操作通过这个索引找到 Fifo 队列的首个元素;若当前 block 是尾节点,它是 last idx,即队列的末尾元素索引,push 操作通过这个索引找到新元素该加入的位置(lastidx + 1);若当前 block 是中间节点,last_or_first_idx 为 0 且不被使用。
我们可以看到,last_or_first_idx 和 next 指针使用了 union,被分配在了同一个 8 byte 空间,索引使用了后 3 bit,只有在后 3 bit 全为 0 时,next 指针才有效。所以中间节点后 3 bit 全为 0,当 pop 将头节点元素取完后,需要移动到下一个 block 前,也是做了一个后 3 bit 清零操作: q->first->u.last_or_first_idx &= ~IDX_MASK; (IDX_MASK = 0x0007)。
总结
最后我们来总结一下 BJM 对于 BIO 的改进方面:
- 用户可以根据需要调整出业务适合的后台线程数,从而提高业务处理效率,或者节约线程资源。而 BIO 固定了 3 个后台线程。
- 每个后台线程都支持执行任意类型的任务,提高了线程利用率。而 BIO 每个后台线程只负责一个对应的任务。
- BJM 使用 Fifo 存储任务,与 BIO 使用的 adlist 相比减少了内存占用,用作者的话来说能够减少大约 60 %。
BJM 后续可能还会有改动,所以本文只对代码部分做了简单的介绍,接下来可能还会有一些代码细节上的改进,但是大体上应该是一致的。而且 BJM 对 Redis 8 应该是板上钉钉的事了,所以我认为还是值得拿出来给大家介绍一下的。
最后的最后,放一下二次元浓度比较高的个人网站链接,欢迎大家来踩🤣:金毛败犬の秘密空间
转载自:https://juejin.cn/post/7231858374828179513