likes
comments
collection
share

深入Redis技术内幕:一条命令是如何执行的

作者站长头像
站长
· 阅读数 22

Redis 是一个高性能的、功能丰富且灵活的开源键值存储系统,其源码采用了许多优秀的编码实践和设计模式。通过学习 Redis 源码,可以深入了解 Redis 的内部工作原理和实现细节。从中学习到很多优秀的编程技巧、设计原则和架构思想,提升自己的编码能力,帮助我们更好地理解 Redis 的各种功能、性能特点和设计思想。

服务器启动主流程

每个Redis实例在内存中表现为一个redisServer结构,Redis的所有核心功能都和这个结构息息相关

// server.h
struct redisServer {
    ...
    redisDb *db;
    dict *commands;             /* Command table */
    dict *orig_commands;        /* Command table before command renaming. */
    aeEventLoop *el;
    _Atomic unsigned int lruclock; /* Clock for LRU eviction */
    ...
    dict *moduleapi;            /* Exported core APIs dictionary for modules. */
    dict *sharedapi;            /* Like moduleapi but containing the APIs that
                                   modules share with each other. */
    list *loadmodule_queue;     /* List of modules to load at startup. */
    ...
};

整个main函数的启动流程可以看做是对redisServer的构建过程

// server.c
int main(int argc, char **argv) {
    ...
    initServerConfig(); // 1
    ...
    moduleInitModulesSystem(); // 2
    ...
    if (argc >= 2) {
        ...
        loadServerConfig(server.configfile, config_from_stdin, options); // 3
    }
    ...
    initServer(); // 4
    ...
    if (!server.sentinel_mode) {
        ...
        moduleLoadFromQueue(); // 5
        ...
        InitServerLast(); // 6
        loadDataFromDisk(); // 7
        ...
    }
    aeMain(server.el); // 8
    aeDeleteEventLoop(server.el);
    return 0;
}
  1. initServerConfig初始化服务器配置相关数据结构,包括命令字典、默认配置项等
  2. moduleInitModulesSystem初始化模块相关数据结构,比如模块字典、模块API字典等
  3. loadServerConfig如果启动参数中指定了配置文件,将解析、加载这些配置,覆盖默认配置项
  4. initServer初始化一些关键数据结构,比如redisServer维护的一些链表结构、共享对象、网络模块、redisDb等
  5. moduleLoadFromQueue加载配置文件中配置的模块动态链接库、执行模块初始化
  6. InitServerLast主要工作是初始化Redis6.0增加的IO线程功能
  7. loadDataFromDisk从AOF或者RDB文件中加载数据到内存
  8. aeMain启动EventLoop,开始处理网络事件和周期性任务

网络与EventLoop

Redis使用一个单线程的事件循环来处理网络事件和周期性任务。EventLoop负责监听socket上的事件,如连接、读取和写入,并调用相应的回调函数进行处理。它基于操作系统提供的I/O多路复用机制(如selectepoll)来实现高效的事件监听。

// ae.h
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
    aeBeforeSleepProc *aftersleep;
    int flags;
} aeEventLoop;

文件事件(aeFileEvent)

文件事件是Redis对底层I/O事件的抽象表示。当一个socket上发生读写事件时,EventLoop会将该事件包装成文件事件,文件事件包含了socket描述符(数组下标)、事件类型(mask)和相应的回调函数(读或写)。

// ae.h
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

多路复用API

Redis将IO多路复用抽象为以下一组接口函数,在EventLoop中通过这些API管理文件事件

int aeApiCreate(aeEventLoop *eventLoop)
int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask)
int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)

Redis实现了多种IO多路复用,根据编译环境选择其中性能最好一种;

// ae.c
/* Include the best multiplexing layer supported by this system.
 * The following should be ordered by performances, descending. */
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

处理文件事件

服务器启动后,EventLoop开始循环处理文件事件和周期性任务,代码简化后如下所示:

while (1) {
    shortest = aeSearchNearestTimer(eventLoop);
    tvp = ...
    eventLoop->beforesleep(eventLoop); // 1
    numevents = aeApiPoll(eventLoop, tvp); // 2
    eventLoop->aftersleep(eventLoop);
    for (j = 0; j < numevents; j++) {
        aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
        if (fe->mask & mask & AE_READABLE) {
            fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 3
        }
        if (fe->mask & mask & AE_WRITABLE) {
            fe->wfileProc(eventLoop,fd,fe->clientData,mask); // 4
        }
    }
    processTimeEvents(eventLoop); // 处理时间事件:serverCron
}
  1. beforeSleep 从redis6.0开始,增加了IO线程辅助数据读写,这里主要是将读写操作分派到IO线程和一些周期性工作;
  2. 调用IO多路复用的实现函数,等待事件就绪;比如selectepoll_wait等;
  3. 处理读事件,分两种情况,服务端socket:就是我们启动时监听在6379端口的socket,它的rfileProcacceptTcpHandler函数;客户端socket:连接到Redis服务器的客户端rfileProcreadQueryFromClient,这个函数从客户端socket接收数据,将数据读入客户端读缓冲区,根据Redis的请求协议,解析命令名称、参数和选项,解析完成后,将命令请求转换为一个命令对象,调用相应的命令处理函数来执行实际的命令逻辑
  4. 处理写事件:一般情况下会在beforeSleep中直接调用writeToClient写数据到客户端,当writeToClient无法写出更多数据时(比如socket写缓冲区满了),才会安装写处理器为sendReplyToClient函数(对writeToClient的简单包装)

网络功能初始化

网络大部分功能在服务器启动时的initServer函数中完成初始化:

// server.c
void initServer(void) {
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); // 1
    if (server.port != 0 && listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) exit(1); // 2
    if (server.unixsocket != NULL) {
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog); // 2
    }
    aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL); // 创建时间事件:周期性调度serverCron
    for (j = 0; j < server.ipfd_count; j++) {
        aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL); // 3
    }
    aeSetBeforeSleepProc(server.el,beforeSleep); // 4
    aeSetAfterSleepProc(server.el,afterSleep); // 4
}
  1. 创建EventLoop,分配内存,初始化数据结构;
  2. listenToPort分别调用socketbindlisten三个系统调用,创建一个服务端socket,监听在6379端口,等待接收客户端发送的连接请求;如果我们的Redis和应用部署在同一台机器上,可以启用Unix域协议(一种进程间通信方式,比TCP/IP协议栈更高效,有兴趣的同学可以参考《Unix网络编程》第15章关于Unix域协议的内容),则调用到anetUnixServer,其代码和listenToPort类似;
  3. 将2中创建的socket注册到EventLoop,并安装读处理器为acceptTcpHandler,当新连接就绪时,会触发读事件,调用到acceptTcpHandler函数;
  4. 设置EventLoop的beforesleepaftersleep处理函数;

处理客户端连接

服务端socket变为可读时,表示有新连接接入,触发对读处理器acceptTcpHandler的调用

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 1
        conn = connCreateAcceptedSocket(cfd); // 2
        acceptCommonHandler(conn,0,cip); // 3
    }
}
  1. 调用accept系统调用,从就绪队列(已完成TCP三次握手)中获取一个连接文件描述符cfd
  2. cfd封装为一个connection结构,其实就是为cfd绑定一组操作函数(读、写等), 这些函数在CT_Socket结构中通过函数指针指定具体实现,这是C语言中一种常见的实现面向对象编程的方法;
// connection.c
ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};

这个函数会调用createClient为客户端连接创建一个client结构,作为这个连接在服务器的表示;createClient中会调用connSetReadHandler,然后调用conn->type->set_read_handler也就是CT_Socket.set_read_handler;最终调用到connSocketSetReadHandler,将cfd注册到EventLoop,设置读处理器为readQueryFromClient,之后接收到该客户端发送的数据时,会触发读事件,从而执行readQueryFromClient函数;

// server.h
typedef struct client {
    ...
    connection *conn;
    int resp;               /* RESP protocol version. Can be 2 or 3. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    ...
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    list *reply;            /* List of reply objects to send to the client. */
    ...
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

// networking.c
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        connNonBlock(conn);
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        connSetReadHandler(conn, readQueryFromClient); // 设置读处理器为readQueryFromClient
        connSetPrivateData(conn, c);
    }
    ...
}

// connection.c 
// CT_Socket.set_read_handler
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

命令执行

Redis命令

Redis命令在内存中表示为一个redisCommand结构,命令的执行其实就是执行proc函数指针指向的函数

// sercer.h
typedef void redisCommandProc(client *c);
struct redisCommand {
    char *name;
    redisCommandProc *proc;
    int arity;
    char *sflags;   /* Flags as string representation, one char per flag. */
    uint64_t flags; /* The actual flags, obtained from the 'sflags' field. */
    ...
};

命令字典初始化

Redis内置的命令定义在一个全局数组redisCommandTable

// server.c
struct redisCommand redisCommandTable[] = {
    {"module",moduleCommand,-2,
     "admin no-script",
     0,NULL,0,0,0,0,0,0},

    {"get",getCommand,2,
     "read-only fast @string",
     0,NULL,1,1,1,0,0,0},
    ...
}

服务器启动过程中,在initServerConfig函数中将静态命令表中的命令填充到server.commands命令字典中,执行客户端请求的命令时,会通过命令名称从字典中查找对应的命令;除此之外,自定义模块中也可以通过模块API创建自定义命令,这些命令会在模块加载时注册到server.commands命令字典中

// server.c
void initServerConfig(void) {
    ...
		server.commands = dictCreate(&commandTableDictType,NULL);
    server.orig_commands = dictCreate(&commandTableDictType,NULL);
    populateCommandTable();
    ...
}
void populateCommandTable(void) {
    ...
    for (j = 0; j < numcommands; j++) {
        struct redisCommand *c = redisCommandTable+j;
        ...
        retval1 = dictAdd(server.commands, sdsnew(c->name), c);
        retval2 = dictAdd(server.orig_commands, sdsnew(c->name), c);
    }
}

命令读取、解析、执行

  1. 客户端通过网络发送命令,触发可读事件,此时读处理器为readQueryFromClient
  2. readQueryFromClient将网络数据读取到客户端查询缓冲区client->querybuf
  3. 然后调用processInputBuffer将查询缓存区中的字节数据根据RESP协议(Redis序列化协议是一个简单的文本协议,可以参考官网相关内容)解码为Redis命令参数client->argcclient->argv;
  4. 最终调用processCommandAndResetClient->processCommand从命令字典中查找到目标命令,执行命令的proc指向的函数;
// server.c
int processCommand(client *c) {
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); // 根据命令名称,查找命令字典(包括模块命令)
    ... 长达200多行的各种校验逻辑
    /* Exec the command */
    if (c->flags & CLIENT_MULTI &&
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
    {
        queueMultiCommand(c); // multi之后的命令只入队不执行,等收到exec命令后才一起执行
        addReply(c,shared.queued);
    } else {
        call(c,CMD_CALL_FULL); // 执行命令,调用 c->cmd->proc(c);
        ...
    }
    return C_OK;
}

响应客户端

客户端输出缓冲区

Redis种有一系列addReplyXXX的函数,用于向客户端返回命令执行结果、错误信息等,这些命令会将数据写入一个16kb的客户端输出缓冲区client->buf,如果输出缓冲区空间不足,则写入一个输出缓冲队列client->reply,这些缓冲的数据会在合适的时机发送到客户端

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) return;

    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) // 先尝试写输出缓冲区
            _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); // 失败则写入输出缓冲队列
    } else if (obj->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
        if (_addReplyToBuffer(c,buf,len) != C_OK)
            _addReplyProtoToList(c,buf,len);
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

发送数据到客户端

入口函数为writeToClient,这里的connWrite会调用到CT_Socket.write,通过write系统调用将数据发送到客户端

// networking.c
int writeToClient(client *c, int handler_installed) {
    ...
    while(clientHasPendingReplies(c)) {
        if (c->bufpos > 0) {
            nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen); // 写输出缓冲区
            ...
        } else {
            o = listNodeValue(listFirst(c->reply));
            ...
            // 写应答队列:如果响应数据很多,会将响应数据分为多块,放到一个队列中
            nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen); 
            ...
        }
        ...
    }
    ...
    if (!clientHasPendingReplies(c)) {
        c->sentlen = 0;
        if (handler_installed) connSetWriteHandler(c->conn, NULL);
        if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
            freeClientAsync(c);
            return C_ERR;
        }
    }
    return C_OK;
}

Redis自定义模块

Redis可以通过自定义模块来扩展其功能,在Redis服务器中添加新的命令、数据类型、钩子函数和其他功能。模块相关的功能可以参考Redis官网文档,这里分析一下模块是如何被加载生效的。

模块加载

模块加载的核心函数是 moduleLoad:

  1. 通过dlopen库函数加载动态链接库;
  2. 通过dlsym库函数查找动态链接库符号为RedisModule_OnLoad的函数,使用onload保存该函数地址;
  3. 调用自定义的RedisModule_OnLoad函数,初始化模块;
// module.c
int moduleLoad(const char *path, void **module_argv, int module_argc) {
    int (*onload)(void *, void **, int);
    void *handle;
    RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
    ...
    handle = dlopen(path,RTLD_NOW|RTLD_LOCAL); // 1
    if (handle == NULL) {
        serverLog(LL_WARNING, "Module %s failed to load: %s", path, dlerror());
        return C_ERR;
    }
    onload = (int (*)(void *, void **, int))(unsigned long) dlsym(handle,"RedisModule_OnLoad"); // 2
    ...
    if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { // 3
        ...
    }
    /* Redis module loaded! Register it. */
    dictAdd(modules,ctx.module->name,ctx.module);
    ...
    return C_OK;
}

模块API初始化

在上面自定义模块的代码中,我们首先调用了RedisModule_Init函数,执行模块相关API的初始化,这个步骤很有意思,它的API加载过程是这样的:RedisModule_GetApi -> RedisModuleCtx.getapifuncptr -> RM_GetApi,在函数RM_GetApi中将自定义模块中相关API的函数指针指向Redis中的具体实现函数,这样在自定义模块中就可以调用这些API了,这些实现函数在服务器启动时通过 main -> moduleInitModulesSystem -> moduleRegisterCoreAPI 注册到server.moduleapi字典,API函数实现在module.c文件中,命名方式为RM_XXX,比如RM_AllocRM_Free等等。

// redismodule.h
#define REDISMODULE_GET_API(name) \
    RedisModule_GetApi("RedisModule_" #name, ((void **)&RedisModule_ ## name))
...
REDISMODULE_API void * (*RedisModule_Alloc)(size_t bytes) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_Realloc)(void *ptr, size_t bytes) REDISMODULE_ATTR;
...
static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int apiver) {
    void *getapifuncptr = ((void**)ctx)[0];
    RedisModule_GetApi = (int (*)(const char *, void *)) (unsigned long)getapifuncptr;
    REDISMODULE_GET_API(Alloc); 
  	// RedisModule_GetApi("RedisModule_Alloc", ((void **)&RedisModule_Alloc); 宏展开后
    REDISMODULE_GET_API(Calloc);
    ...
}

// module.c
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}}

struct RedisModuleCtx {
    void *getapifuncptr; // ((void**)ctx)[0]
    ...
}

// 这里注意**targetPtrPtr是指向函数指针RedisModule_Alloc的指针
int RM_GetApi(const char *funcname, void **targetPtrPtr) {
    dictEntry *he = dictFind(server.moduleapi, funcname); // 查找模块API字典
    if (!he) return REDISMODULE_ERR;
    *targetPtrPtr = dictGetVal(he); // 将函数指针RedisModule_Alloc指向内部实现的函数
    return REDISMODULE_OK;
}

自定义命令

模块初始化后我们通过调用RedisModule_CreateCommand创建了一个自定义命令FakeGPT.chat,这个API的实现函数为 RM_CreateCommand,如下所示,这里将向命令字典中增加一个执行cmdfunc函数的新命令。

// module.c
int RM_CreateCommand(RedisModuleCtx *ctx, const char *name, RedisModuleCmdFunc cmdfunc, const char *strflags, int firstkey, int lastkey, int keystep) {
    ...
    struct redisCommand *rediscmd;
    RedisModuleCommandProxy *cp;
    sds cmdname = sdsnew(name);
    if (lookupCommand(cmdname) != NULL) {
        sdsfree(cmdname);
        return REDISMODULE_ERR;
    }
    cp = zmalloc(sizeof(*cp));
    ...
    cp->func = cmdfunc;
    cp->rediscmd = zmalloc(sizeof(*rediscmd));
    cp->rediscmd->name = cmdname;
    cp->rediscmd->proc = RedisModuleCommandDispatcher; // 代理函数,里面还是调用到cp->func,传入RedisModuleCtx
    ...
    dictAdd(server.commands,sdsdup(cmdname),cp->rediscmd); // 注册新命令
    ...
    return REDISMODULE_OK;
}

IO线程

Redis是单线程还是多线程?你是否被问过这个问题。

Redis在6.0版本新增了可选的IO线程功能,辅助读、写网络数据,但命令任然在主线程中执行;可以这么说:Redis6.0改多线程了,但又没有完全”多“。

其实准确的说,Redis在6.0版本之前也是多线程的,类似bgsavebgrewriteaof之类的命令都会通过fork系统调用启用子进程处理IO任务(每个进程至少有个主线程,自然也算是多线程了)。

IO线程初始化

main函数的最后几行会调用到 InitServerLast函数,这个函数会调用initThreadedIO初始化IO线程,请注意io_threads_pending前面那么大的几个字_Atomic

_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM];
void initThreadedIO(void) {
    server.io_threads_active = 0; /* We start with threads not active. */
    /* Don't spawn any thread if the user selected a single thread:
     * we'll handle I/O directly from the main thread. */
    if (server.io_threads_num == 1) return;
    /* Spawn and initialize the I/O threads. */
    for (int i = 0; i < server.io_threads_num; i++) {
        /* Things we do for all the threads including the main thread. */
        io_threads_list[i] = listCreate();
        if (i == 0) continue; /* Thread 0 is the main thread. */

        /* Things we do only for the additional threads. */
        pthread_t tid;
        pthread_mutex_init(&io_threads_mutex[i],NULL);
        io_threads_pending[i] = 0;
        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { 
            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
            exit(1);
        }
        io_threads[i] = tid;
    }
}

IO线程处理读、写任务

IO线程的”run“方法是 IOThreadMain函数,每个线程都有一个自己的任务队列io_threads_list[id]和一个原子的计数器io_threads_pending[id](这里有兴趣的小伙伴可以想一想为什么只需要声明io_threads_pending_Atomic,参考cppreference)。

每个线程通过自旋检测io_threads_pending[id]等待主线程向自己的任务队列分配任务,这种实现方式很!粗!暴!

void *IOThreadMain(void *myid) {
    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is
     * used by the thread to just manipulate a single sub-array of clients. */
    long id = (unsigned long)myid;
    ...
    while(1) {
        /* Wait for start */
        for (int j = 0; j < 1000000; j++) {
            if (io_threads_pending[id] != 0) break;
        }
        /* Give the main thread a chance to stop this thread. */
        if (io_threads_pending[id] == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }
				...
        /* Process: note that the main thread will never touch our list
         * before we drop the pending count to 0. */
        listIter li;
        listNode *ln;
        listRewind(io_threads_list[id],&li);
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0); // 核心写入口
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn); // 核心读入口
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        listEmpty(io_threads_list[id]);
        io_threads_pending[id] = 0;
    }
}

主线程任务分派

主线程通过EventLoop的beforesleep将读、写事件分派到IO线程,其中读事件分派逻辑如下,写处理同理:

  1. server.clients_pending_read待处理列表的读任务平均分配到所有处理线程(包括主线程);
  2. 更新IO线程的待处理任务数,这个操作结果对IO线程是可见的,会触发IO线程执行任务(参考IO线程处理读、写任务);
  3. 主线程也会处理一部分任务;
  4. 主线程自旋等待所有IO线程执行完毕;
  5. 此时所有的网络数据已经读取解析完毕,主线程执行命令,然后从待处理列表中删除;
// networking.c
int handleClientsWithPendingReadsUsingThreads(void) {
    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
    int processed = listLength(server.clients_pending_read);
    if (processed == 0) return 0;
    ...
    listRewind(server.clients_pending_read,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c); // 1
        item_id++;
    }

    /* Give the start condition to the waiting threads, by setting the
     * start condition atomic var. */
    io_threads_op = IO_THREADS_OP_READ;
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        io_threads_pending[j] = count; // 2
    }

    /* Also use the main thread to process a slice of clients. */
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn); // 3
    }
    listEmpty(io_threads_list[0]);

    /* Wait for all the other threads to end their work. */
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += io_threads_pending[j];
        if (pending == 0) break; // 4
    }

    /* Run the list of clients again to process the new buffers. */
    while(listLength(server.clients_pending_read)) {
        ln = listFirst(server.clients_pending_read);
        client *c = listNodeValue(ln);
        c->flags &= ~CLIENT_PENDING_READ;
        listDelNode(server.clients_pending_read,ln);
        ...
        if (processPendingCommandsAndResetClient(c) == C_ERR) { // 5
            continue;
        }
        ...
    }

    /* Update processed count on server */
    server.stat_io_reads_processed += processed;

    return processed;
}

开启IO线程后的读-解析-执行

开启IO线程后,readQueryFromClient不会立即执行读-解析流程,而是将这个待客户端添加到server.clients_pending_read队列,等待后续主线程分派执行读-解析流程,同时标记客户端为CLIENT_PENDING_READ

// networking.c
void readQueryFromClient(connection *conn) {
    client *c = connGetPrivateData(conn);
    int nread, readlen;
    size_t qblen;

    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
    ...
}

int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !clientsArePaused() &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
    {
        c->flags |= CLIENT_PENDING_READ;
        listAddNodeHead(server.clients_pending_read,c);
        return 1;
    } else {
        return 0;
    }
}

同理,在解析完命令后也不会正在的执行命令,此时c->flags & CLIENT_PENDING_READ为true:

// networking.c
void processInputBuffer(client *c) {
    while(c->qb_pos < sdslen(c->querybuf)) {
        ... 解析命令
        /* Multibulk processing could see a <= 0 length. */
        if (c->argc == 0) {
            resetClient(c);
        } else {
            /* If we are in the context of an I/O thread, we can't really
             * execute the command here. All we can do is to flag the client
             * as one that needs to process the command. */
            if (c->flags & CLIENT_PENDING_READ) {
                c->flags |= CLIENT_PENDING_COMMAND;
                break;
            }

            /* We are finally ready to execute the command. */
            if (processCommandAndResetClient(c) == C_ERR) {
                return;
            }
        }
    }
		...
}

总结时刻

命令执行

吧啦吧啦说了这么一大堆,那么一条命令到底是怎么执行的呢?总结一下命令的整个过程如下:

  1. 客户端连接到Redis服务器,Redis将这个连接注册到EventLoop,监听读事件,读处理器为readQueryFromClient
  2. 客户端随后发送一条命令FakeGPT.chat 在吗?到Redis服务器,EventLoop触发读事件,调用readQueryFromClient
  3. readQueryFromClient读取、解析网络数据,然后从server.commands字典中查找名为FakeGPT.chat的命令(这个命令在模块加载时被添加到server.commands字典中),最终执行到我们自定义模块的FakeGPT_chat函数
  4. FakeGPT_chat函数中通过模块API向client输出缓冲区client->buf写入响应结果
  5. 随后EventLoop在beforesleep中调用writeToClient将client输出缓冲区中的数据发送到远程客户端

至此,整个命令的执行流程就结束了。