likes
comments
collection
share

Redis 8.0 前瞻——BIO 要被换掉了?

作者站长头像
站长
· 阅读数 45

前言

  好久没逛 Redis 社区了,回去看了看 PR 列表,结果发现了一个不得了的东西!(PR 链接

Redis 8.0 前瞻——BIO 要被换掉了?

  PR 的作者昵称是 “JimB123”,是 AWS(亚马逊云) 的大佬。 这个 PR 标题意思是使用 Background Job Manager,后台任务管理器,简称 BJM,用于替代 Redis 原本的 bio 。

  这份 PR 新增了 5 个文件:

  • bjm.h、bjm.c: 后台任务管理器
  • fifo.c、fifo.h:先进先出队列
  • lazyfree.h:惰性删除功能的头文件(原来只有 lazyfree.c)

  除此之外,还删除了 bio.c 和 bio.h 两个文件,有多个文件均被改动。

  这份 PR 虽然还没有被正式合并,但 Redis 核心成员已经表示愿意推进它,他们打算发布下一个稳定版的 7.2 后,在 8.0 中考虑合并。

简述 Background Job Manager

  首先翻译一下作者原话,作者 PR 的动机如下:

  1. BIO(后台 I/O)用词不当。我猜这最初是一种在 AOF 文件上执行异步刷新的机制,但它后来演变成一种执行其他类型的后台活动的机制,比如惰性删除(lazyfree),这与 I/O 无关。
  2. BIO 没有很好的分层。BIO 应该是用于处理异步后台任务的低级通用程序。然而,此通用程序的设计要求“低级”代码具有“高级”应用程序的特定知识。
  3. BIO 不是很模块化。对于每种新类型的后台任务,必须更改 BIO 代码。为每种类型的任务创建一个新线程。为每个新任务添加新的定义和逻辑。
  4. 提升性能。BIO 创建了多个线程,但没有有效地使用它们。每种类型的任务都有一个唯一的线程。在一连串的惰性驱逐期间(内存淘汰),只有对应的 lazyfree 线程处于活动状态,而其他 BIO 线程处于空闲状态。而且在惰性 flush 期间,两个字典(主要和过期)按顺序处理而不是并行处理。
  5. 减少内存使用。BIO 使用 list(链表结构)来维护作业队列。如果有大量作业,新的 Fifo 将使用更少的内存(并且性能更高)。

  作者 PR 的内容主要如下:

  • Fifo - 提供一个简单的 FIFO 队列,它比使用 list (链表)的空间效率高 50% 以上,速度快 50% 以上。它作为一个独立的、模块化的、可重用的通用数据结构提供。
  • BJM - 提供了一个简单的即发即弃接口,让后台线程完成一些事情。它维护 固定/可配置 数量的线程(而不是每个“任务类型”一个)。这可以防止有太多活动的后台线程,还可以更快地处理大量相同作业类型的任务。

  接下来,我们看一下目前完成的 bjm 部分代码,但是要注意最终合并时的代码可能会发生变化。

  bjm.c 变量,结构体定义:

static const unsigned BJM_THREAD_STACK_SIZE = 4 * 1024 * 1024; // 后台线程栈大小
static const int INITIAL_FUNCTION_CAPACITY = 8; // functions 和 jobsByFunc 数组初始容量

static int functionsCount;
static int functionsCapacity;
static bjmJobFunc *functions;   // 任务回调函数指针数组

// A FIFO queue with a mutex to protect access
// 带互斥锁的 Fifo 队列
typedef struct {
    pthread_mutex_t mutex;
    Fifo *fifo;
} MutexFifo;

// A Joblist contains a specific function to be executed with a list of privdata
// 任务队列,包含一个特定的函数,该函数将通过一个私有数据队列来执行
typedef struct {
    bjmJobFunc func;        // 任务回调函数
    MutexFifo jobs;         // 任务私有数据队列,私有数据将作为参数提供给回调函数  
    redisAtomic long job_count;  // 任务数量,包括正在处理的任务,所以可能大于 jobs 长度 
} Joblist;

// This arrays hold a Joblist* for each known callback function.
// 以回调函数作索引的 Joblist* 数组
static Joblist **jobsByFunc;    // Array indexed by index in functions[]

// This FIFO queue hold Joblists from the array above.  Each time one of those
//  Joblists becomes non-empty, it gets added to the active queue.
// 活跃任务队列。每当一个任务队列变为非空时,它就会被添加到活动任务队列中。
static MutexFifo activeJoblists;

static redisAtomic long queuedJobCount;
static redisAtomic long processedJobCount;

static pthread_cond_t wakeup_cond;  // Triggered when jobs are submitted

static int threadCount = 0;
static pthread_t *threads;      // Array of threads

  bjm.c 三个主要的底层函数:

// Find the function's index.  Adds the function if it's a new one.
// 查找 func 在 functions 数组中的下标,如果 func 不存在则将它加入到数组尾部
static int getFuncIdx(bjmJobFunc func) {
    // It's expected that the function count is small, probably spanning only 1 or 2 cache lines.
    //  A simple linear search will be faster than a complex structure like hash.
    for (int i = 0;  i < functionsCount;  i++) {
        if (functions[i] == func) return i;
    }

    // At this point, we know that the function isn't in the list.  Insert at end.
    if (functionsCount == functionsCapacity) increaseFunctionCapacity();
    int idx = functionsCount++;
    functions[idx] = func;
    jobsByFunc[idx] = zmalloc(sizeof(Joblist));
    mutexFifoInit(&jobsByFunc[idx]->jobs);
    jobsByFunc[idx]->func = func;
    atomicSet(jobsByFunc[idx]->job_count, 0);
    return idx;
}


/* Pull one job from the active joblists.  Synchronously waits for a job if none available.
 *  privdata_ptr - returns the caller supplied privdata.
 *  joblist_ptr  - returns the joblist that the job was taken from.  This is needed by the caller
 *                 in order to (later) decrement the job_count.
 * Returns:
 *  Returns the bjmJobFunc to be called.
 */
// 从活跃任务队列中获取一个任务的私有数据和回调函数,如果没有可用的任务,则同步等待。
static bjmJobFunc waitForJob(void **privdata_ptr, Joblist **joblist_ptr) {
    bjmJobFunc func = NULL;

    mutexFifoLock(&activeJoblists);
    while (fifoLength(activeJoblists.fifo) == 0) {
        pthread_cond_wait(&wakeup_cond, &activeJoblists.mutex);
    }

    Joblist *joblist = fifoPeek(activeJoblists.fifo);
    func = joblist->func;
    *joblist_ptr = joblist;

    mutexFifoLock(&joblist->jobs);
    *privdata_ptr = fifoPop(joblist->jobs.fifo);

    if (fifoLength(joblist->jobs.fifo) == 0) {
        // No jobs left for this function
        fifoPop(activeJoblists.fifo);
    } else if (fifoLength(activeJoblists.fifo) > 1) {
        // Rotate the joblist for this function to the end
        fifoPop(activeJoblists.fifo);
        fifoPush(activeJoblists.fifo, joblist);
    }
    // Keep the lock on the individual joblist until it is properly handled in
    //  the activeJobLists.  Can't have the size changing.
    mutexFifoUnlock(&joblist->jobs);
    mutexFifoUnlock(&activeJoblists);

    return func;
}

// 创建负责获取和执行任务的后台线程
static void *pthreadFunction(void *arg) {
    int threadNum = (intptr_t)arg;

    const int MAX_THREAD_NAME = 16;
    char thread_name[MAX_THREAD_NAME];
    snprintf(thread_name, MAX_THREAD_NAME, "bjm thread %d", threadNum);
    redis_set_thread_title(thread_name);

    redisSetCpuAffinity(server.bio_cpulist);

    makeThreadKillable();

    /* Block SIGALRM so only the main thread will receive the watchdog signal. */
    sigset_t sigset;
    sigemptyset(&sigset);
    sigaddset(&sigset, SIGALRM);
    if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) {
        serverLog(LL_WARNING, "Warning: can't mask SIGALRM in BJM thread: %s", strerror(errno));
    }

    while (1) {
        void *privdata;
        Joblist *joblist;
        bjmJobFunc func = waitForJob(&privdata, &joblist);

        func(privdata);                     // Execute the callback
        atomicDecr(joblist->job_count, 1);   // Decrement count AFTER callback finishes
        atomicDecr(queuedJobCount, 1);
        atomicIncr(processedJobCount, 1);
    }

    return NULL;
}

  bjm 提供的 API:

/* Initialize BJM with the requested number of background threads.
 */
// 初始化后台任务管理器,可提供线程数
void bjmInit(int numThreads);

/* Register a job function which can process background jobs.  A handle is returned for submitting
 * jobs & gathering metrics.  This function is idempotent - submitting the same function again will
 * return the same handle.  Handle values will be > 0, so this pattern can be used:
 *
 *    static bjmJobFuncHandle myHandle;
 *    if (!myHandle) myHandle = bjmRegisterJobFunc(myFunc);
 *    bjmSubmitJob(myHandle, ...);
 *
 * This co-locates a static variable at the point of job submission, and avoids repeated
 * registration calls.
 */
// 注册后台任务回调函数
bjmJobFuncHandle bjmRegisterJobFunc(bjmJobFunc func);

/* Submit a job to BJM.  The provided function will be executed on a background thread.  privdata
 * will be provided as a parameter to the provided function.  For fairness, jobs with different
 * callback functions will be executed in round-robin fashion.  Since jobs are executed across
 * multiple threads, there is no guarantee as to ordering or exclusion between jobs.
 */
// 给后台任务管理器提交一个任务
void bjmSubmitJob(bjmJobFuncHandle funcHandle, void *privdata);


/* Kill all threads in an unclean way.  Non-recoverable.
 * Only used during collection of debug information.
 */
// 杀死所有后台线程,只在收集 debug 信息时使用
void bjmKillThreads(void);


/* Count the number of pending/active jobs for the given job function.
 * Note that this value is highly volatile as background threads are processing the jobs.
 */
// 获取某个回调函数所对应的后台任务数量
long bjmPendingJobsOfType(bjmJobFuncHandle funcHandle);


/* Provide metrics data for INFO
 */
// 提供 BJM 相关信息,目前有后台线程数,回调函数数量,排队任务数,正在执行任务数。
sds bjmCatInfo(sds info);

  接下来,我们再看看作者设计的 fifo 队列,以下是作者所编写的注释:

/* The FifoBlock contains up to 7 items (pointers).  When compared with adlist, this results in
 * roughly 60% memory reduction and 7x fewer memory allocations.  Memory reduction is guaranteed
 * with 5+ items in queue.
 *
 * In each block, there are 7 slots for item pointers (pointers to the caller's FIFO item).
 *  We need to keep track of the first & last slot used.  Contextually, we will only need
 *  a single index - either the first slot used or the last slot used.  Based on context,
 *  we can determine what is needed.
 *
 * Blocks are linked together in a chain.  If the list is empty, there are no blocks.
 *  For non-empty lists, we will either have a single block OR a chain of blocks.
 *
 * For a SINGLE BLOCK containing (for example) 4 items, the layout looks like this:
 *                 +--------+--------+--------+--------+--------+--------+--------+--------+
 *  SINGLE BLOCK:  | slot 0 | slot 1 | slot 2 | slot 3 | slot 4 | slot 5 | slot 6 | next/  | 
 *                 |  item  |  item  |  item  |  item  |   -    |   -    |   -    | lastIdx|
 *                 +--------+--------+--------+--------+--------+--------+--------+--------+
 *                                                ^
 *                                             lastIdx (3)
 *
 *  In single blocks, the items are always shifted so that the first item is in slot 0.
 *  We need to keep track of the lastIdx so that we will know where to push the next item.
 *  The last index is stored in the final 3 bits of the (unused) next pointer
 *
 * When MULTIPLE BLOCKS are chained together, items will be popped from the first block, and
 *  pushed onto the last block.  All blocks in the middle are full(*).  In the first block, we keep
 *  the firstIdx (so we know where to pop) ... on the last block, we keep lastIdx (so we know
 *  where to push).
 * 
 * (*) While blocks in the middle of a chain are generally full, the Fifo supports O(1) joining of
 *     two lists.  In this case, a block at the join point may not be full.  In this case, it will
 *     look like the FIRST BLOCK below, with the first index stored in the indexing bits.
 *
 * Example FIRST BLOCK with 2 items remaining:
 *                 +--------+--------+--------+--------+--------+--------+--------+--------+
 *  FIRST BLOCK:   | slot 0 | slot 1 | slot 2 | slot 3 | slot 4 | slot 5 | slot 6 | next/  | 
 *                 |   -    |   -    |   -    |   -    |   -    |  item  |  item  |firstIdx|
 *                 +--------+--------+--------+--------+--------+--------+--------+--------+
 *                                                                  ^
 *                                                              firstIdx (5)
 * Example LAST BLOCK with 3 items pushed so far:
 *                 +--------+--------+--------+--------+--------+--------+--------+--------+
 *  LAST BLOCK:    | slot 0 | slot 1 | slot 2 | slot 3 | slot 4 | slot 5 | slot 6 | next/  | 
 *                 |  item  |  item  |  item  |   -    |   -    |   -    |   -    | lastIdx|
 *                 +--------+--------+--------+--------+--------+--------+--------+--------+
 *                                        ^
 *                                    lastIdx (2)
 */
typedef struct FifoBlock {
    void *items[ITEMS_PER_BLOCK];
    union {
        /* The last 3 bits of a pointer to a block allocated by malloc must always be zero as a
         *  minimum of 8-byte alignment is required for all such blocks.  These bits are used as
         *  an index into the block indicating the first or last item in the block, depending on
         *  context.
         *
         * This UNION overlays a pointer with an integral value.  This allows us to look at the
         *  pointer OR the integer without casting - but they use the same memory.
         *
         * If there is MORE THAN ONE block in the chain, the first block has a pointer/index that 
         *  looks like this.  However, if there is only a single block, it looks like the LAST block.
         *   +-----------------------------------------------------------+
         *   |                 next pointer                   | firstIdx |
         *   |                  (61 bits)                     | (3 bits) |
         *   +-----------------------------------------------------------+
         *     * The next pointer is only valid after zeroing out the last 3 bits.
         *     * "lastIdx" is implied to be 6 (because there are additional blocks).
         *     * "firstIdx" represents the first filled index (0..6).  POP occurs here.
         *
         * Any blocks in the middle of the chain have a regular pointer like this:
         *   +-----------------------------------------------------------+
         *   |                 next pointer                   |    0*    |
         *   |                  (61 bits)                     | (3 bits) |
         *   +-----------------------------------------------------------+
         *     * The next pointer is valid as-is
         *     * "lastIdx" is implied to be 6 in all middle blocks.
         *     * "firstIdx" is implied to be 0 in all middle blocks.
         *     * NOTE: In middle blocks, the index bits(0) are really still the firstIdx value.
         *             When Fifo's are joined, the O(1) operation may result in a partially
         *             full middle block.  In this case, the items are "right-justified" and
         *             firstIdx indicates where the items start.
         *
         * The last (or only) block in the chain contains only the lastIndex, the pointer is unused.
         *   +-----------------------------------------------------------+
         *   |                      0                         | lastIdx  |
         *   |                  (61 bits)                     | (3 bits) |
         *   +-----------------------------------------------------------+
         *     * The next pointer is unused and guaranteed NULL.
         *     * "lastIdx" represents the last filled index (0..6).
         *     * "firstIdx" is implied to be zero on the last (or only) block.
         */
        uintptr_t last_or_first_idx;
        struct FifoBlock *next;
    } u;
} FifoBlock;

struct Fifo {
    long length;        // Total number of items in queue
    FifoBlock *first;
    FifoBlock *last;
};

  作者的 fifo 队列是按 block(块)为单位存储的,每个 block 通过指针连接形成一个单链表,目前每个 block 可以容纳 7 个元素,作者认为与原本的 adlist(双端列表)相比,大约能减少 60% 的内存占用。

  Fifo 结构体维护了元素数量,头节点(用于 peek,pop 操作)和尾节点(用于 push 操作)指针。

  FifoBlock 结构体维护了元素数组,末尾或首个元素索引,还有下一个 block 的指针。这个 last_or_first_idx 在当前 block 是头节点时,它是 first idx,即队列的第一个元素索引,pop 或者 peek 操作通过这个索引找到 Fifo 队列的首个元素;若当前 block 是尾节点,它是 last idx,即队列的末尾元素索引,push 操作通过这个索引找到新元素该加入的位置(lastidx + 1);若当前 block 是中间节点,last_or_first_idx 为 0 且不被使用。

  我们可以看到,last_or_first_idx 和 next 指针使用了 union,被分配在了同一个 8 byte 空间,索引使用了后 3 bit,只有在后 3 bit 全为 0 时,next 指针才有效。所以中间节点后 3 bit 全为 0,当 pop 将头节点元素取完后,需要移动到下一个 block 前,也是做了一个后 3 bit 清零操作: q->first->u.last_or_first_idx &= ~IDX_MASK; (IDX_MASK = 0x0007)。

总结

  最后我们来总结一下 BJM 对于 BIO 的改进方面:

  1. 用户可以根据需要调整出业务适合的后台线程数,从而提高业务处理效率,或者节约线程资源。而 BIO 固定了 3 个后台线程。
  2. 每个后台线程都支持执行任意类型的任务,提高了线程利用率。而 BIO 每个后台线程只负责一个对应的任务。
  3. BJM 使用 Fifo 存储任务,与 BIO 使用的 adlist 相比减少了内存占用,用作者的话来说能够减少大约 60 %。

  BJM 后续可能还会有改动,所以本文只对代码部分做了简单的介绍,接下来可能还会有一些代码细节上的改进,但是大体上应该是一致的。而且 BJM 对 Redis 8 应该是板上钉钉的事了,所以我认为还是值得拿出来给大家介绍一下的。

最后的最后,放一下二次元浓度比较高的个人网站链接,欢迎大家来踩🤣:金毛败犬の秘密空间

转载自:https://juejin.cn/post/7231858374828179513
评论
请登录