likes
comments
collection
share

Redis中的Reactor模型

作者站长头像
站长
· 阅读数 80

Reactor模式

在服务端网络编程中,根据是否阻塞、进程数量、线程数量、IO是否复用等纬度,网络编程模型可以划分为以下十种类型。(下图来自《Linux多线程服务端编程》) Redis中的Reactor模型

方案0~4中,需要为每个客户端连接维护一个进程/线程,即使采用了prefork,线程池等方法,仍然处理不了需要大量连接的业务场景。

方案5~9中都采用了IO多路复用技术,借助操作系统的支持,服务端一个线程可以同时监听多个网络连接,一旦某个连接上有事件需要处理,就能通知线程执行相应的读写操作。Linux所提供的poll, select, epoll都属于IO复用技术,由于selectpoll在实现上的缺陷(如内核和用户空间内存拷贝问题),现在服务端变成大多使用epoll

Reactor: The reactor design pattern is an event handling pattern for handling service delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

由定义可以看出,Reactor模型本身是一个较为宽泛的定义,满足以下条件的都可以称为Reactor模型。

  • 事件驱动模式
  • 能够处理一个或多个输入
  • 将事件分发个对应的handlers

Reactor模型中并不指定:

  • 网络事件处理和业务处理在不在同一个线程?
  • 能不能有多个线程做业务处理?
  • 能不能由多个线程来处理网络事件?

这些问题的不同方案也就构成了不同的Reactor模型,对应上图的5~9方案。本文下面两章分别阐述Redis 6.0之前单线程IO Reactor模型的实现,和Redis 6.0的多线程IO Reactor模型的实现。

Redis 6.0之前 -- 单I/O线程

本章所涉及到的代码对应Redis 5.0.10,描述Redis是如何实现单线程IO的。在描述的过程中,我们首先介绍Redis Reactor的整体框架,然后介绍框架中的几个关键函数;最后介绍Redis处理命令的整个执行流程。

整体框架

Redis中的Reactor模型

  1. Reactor主循环对应aeMain函数,该函数在Redis退出前,会循环执行aeProcessEvents来处理事件。
  2. Reactor模型中需要处理三类事件,对应的处理函数分别为:
    • 读事件处理函数:readQueryFromClient
    • 写事件处理函数:sendReplyToClient
    • 处理连接事件函数:acceptTcpHandler
  3. aeCreateFileEvent函数用于向Dispatcher注册事件的处理函数。之后新事件发生时,由Reactor来触发对应的处理函数。
  • Redis在进程初始化中,在监听的端口上注册acceptTcpHandler函数,并用之建立新连接;
  • 在新连接建立时,把readQueryFromClient注册到新连接对应的fd
  • 当服务端执行完命令,需要向客户端回复消息时,把sendReplyToClient注册到对应的事件上。

关键组件

  1. 主循环:aeMainaeProcessEvents
    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
        // 
        while (!eventLoop->stop) {
            if (eventLoop->beforesleep != NULL)
                // 注意:往客户端发送数据的地方在这beforesleep中
                eventLoop->beforesleep(eventLoop);
            // 处理事件
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
            ...
            // 调用封装的IO复用函数,获得需要处理的事件
            numevents = aeApiPoll(eventLoop, tvp);
    
            for (j = 0; j < numevents; j++) {
                // 处理读事件
                if (!invert && fe->mask & mask & AE_READABLE) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
    
                // 处理写事件
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
            }
            ...
    }
    
  2. multiplexing:aeApiPollRedis支持epoll, selectkqueue几种不同的IO多路复用技术,这里我们只关注epollRedisaeProcessEvents函数中会调用aeApiPoll,找操作系统拿到新的事件。
    static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
        ...
        // 调用epoll_wait
        retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
                tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
        if (retval > 0) {
            int j;
    
            numevents = retval;
            for (j = 0; j < numevents; j++) {
                int mask = 0;
                // 标记事件的类型
                struct epoll_event *e = state->events+j;
                if (e->events & EPOLLIN) mask |= AE_READABLE;
                if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
                if (e->events & EPOLLERR) mask |= AE_WRITABLE;
                if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
                eventLoop->fired[j].fd = e->data.fd;
                eventLoop->fired[j].mask = mask;
            }
        }
        return numevents;
    }
    
  3. 事件抽象:aeFileEvent,Redis将网络事件抽象成一个结构体,并用aeCreateFileEvent函数绑定事件和处理函数。
    typedef struct aeFileEvent {
       int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
       aeFileProc *rfileProc; // 读事件的处理函数
       aeFileProc *wfileProc; // 写事件的处理函数
       void *clientData;
    } aeFileEvent; 
    
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
           aeFileProc *proc, void *clientData)
    {
       aeFileEvent *fe = &eventLoop->events[fd];
       ...
       // 设置关注的事件类型
       fe->mask |= mask;
       // 设置读写事件的处理函数
       if (mask & AE_READABLE) fe->rfileProc = proc;
       if (mask & AE_WRITABLE) fe->wfileProc = proc;
       fe->clientData = clientData;
       ...
    }
    

命令处理流程

Redis中的Reactor模型

  1. 在客户端和服务端建立连接时,服务端会为客户端生成一个client结构体,该结构体中会维护客户端和服务端双向通信的两个缓冲区,事件结构中aeFileEvent->clientData指向的内容就是client结构体。
    /* With multiplexing we need to take per-client state.
     * Clients are taken in a linked list. */
    typedef struct client {
        uint64_t id;            /* Client incremental unique ID. */
        int fd;                 /* Client socket. */
        redisDb *db;            /* Pointer to currently SELECTed DB. */
        sds querybuf;          // clent->server buffer, 最大长度为 PROTO_MAX_QUERYBUF_LEN 1G
        ...
    
        /* server->clent buffer, 最大16K*/
        int bufpos;
        char buf[PROTO_REPLY_CHUNK_BYTES]; 
    } client;
    
  2. Reactor收到来自客户端的数据后,会触发读事件,调用readQueryFromClient处理函数。readQueryFromClient会将来自客户端的数据读到client->querybuf中,之后processInputBufferAndReplicate直接处理buffer的数据。
    void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
        // 取出client结构体
        client *c = (client*) privdata;
        ...
        // 读数据到querybuf
        qblen = sdslen(c->querybuf);
        if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
        c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
        nread = read(fd, c->querybuf+qblen, readlen);
        ...
        // 处理请求
        processInputBufferAndReplicate(c);
    }
    
  3. processInputBufferAndReplicate会判断自身是否为master,如果是master则要负责同步命令到其他节点;如果不是master,则只要调用processInputBuffer执行请求。
    void processInputBufferAndReplicate(client *c) {
        if (!(c->flags & CLIENT_MASTER)) {
            processInputBuffer(c);
        } else {
            size_t prev_offset = c->reploff;
            processInputBuffer(c);
            size_t applied = c->reploff - prev_offset;
            if (applied) {
                replicationFeedSlavesFromMasterStream(server.slaves,
                        c->pending_querybuf, applied);
                sdsrange(c->pending_querybuf,applied,-1);
            }
        }
    }
    
  4. Redis哈希结构dict *commands维护了一份command->handler的映射,processCommand在处理命令时,现根据命令查找对应的handler,再去执行handler
    int processCommand(client *c) {
        // 查找命令
        c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
        ...
        // 校验权限
        if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)
            ...
        // 判断能否执行命令,内存是否用完、存盘是否失败、当前节点是否为只读节点、从节点数量是否足够等
    
        // 执行命令
        if (c->flags & CLIENT_MULTI &&
            c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
            c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
        {
            // 延迟执行
            queueMultiCommand(c);
            addReply(c,shared.queued);
        } else {
            // 执行命令
            call(c,CMD_CALL_FULL);
            c->woff = server.master_repl_offset;
            if (listLength(server.ready_keys))
                handleClientsBlockedOnKeys();
        }
        return C_OK;
    }
    
  5. 服务端执行完请求后,会调用addReply函数,addReply并不会直接往客户端发送数据,只是把数据放到缓冲区,并把这个client加入server.clients_pending_write链表中。
    void addReply(client *c, robj *obj) {
        // 把 c 加入`server.clients_pending_write`链表
        if (prepareClientToWrite(c) != C_OK) return;
    
        // 把数据写入buffer
        if (sdsEncodedObject(obj)) {
            if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
                _addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));
        } 
        ...
    }
    
    int prepareClientToWrite(client *c) {
        ...
        // 判断是否在server.clients_pending_write中,如不在,则加入
        if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
        ...
    }
    
    void clientInstallWriteHandler(client *c) {
        if (!(c->flags & CLIENT_PENDING_WRITE) &&
            (c->replstate == REPL_STATE_NONE ||
             (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
        {
            c->flags |= CLIENT_PENDING_WRITE;
            // 真正加入链表的操作在这里
            listAddNodeHead(server.clients_pending_write,c);
        }
    }
    
  6. 往客户端发送数据的函数是handleClientsWithPendingWrites,主循环aeMain中会调用beforeSleep函数,beforeSleep会调用handleClientsWithPendingWrites
    // Redis每次进入主循环都会调用此函数
    void beforeSleep(struct aeEventLoop *eventLoop) {
        ... 
    
        /* Write the AOF buffer on disk */
        flushAppendOnlyFile(0);
    
        // 发送数据到客户端
        handleClientsWithPendingWrites();
    
        ...
    }
    
  7. 上面说到服务端写完数据时,会将有数据的client放入一个链表,handleClientsWithPendingWrites会遍历这个链表,往客户端发送数据。
    int handleClientsWithPendingWrites(void) {
        listIter li;
        listNode *ln;
        ...
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            listDelNode(server.clients_pending_write,ln);
    
            ...
            // 往客户端发送数据
            if (writeToClient(c->fd,c,0) == C_ERR) continue;
    
            // 如果缓冲区中数据没发送完,继续监听该client的写事件,等到下次可写入时继续发送数据
            if (clientHasPendingReplies(c)) {
                ...
                if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                    sendReplyToClient, c) == AE_ERR)
                {
                        freeClientAsync(c);
                }
            }
        }
        return processed;
    }
    

总结

在单线程Reactor模型中,读数据、写数据、执行命令都在同一个线程中,整个框架十分简单清晰,且没有数据竞争问题,对两个buffer的操作不用加锁。

单线程的缺点也很明显,“不能同时做两件事”,处理网络IO时不能执行命令,执行命令时不能处理网络IO。能不能使用多线程提升处理效率?

Redis 6.0 -- 多I/O线程

整体框架

Redis中的Reactor模型

  1. 在多线程IO模型中,主Reactor收到事件后,不在本线程做收发数据的操作,会把有事件的client加到server.clients_pending_read链表中,后面再将server.clients_pending_read链表中的值分配给io_threads_list数组,io_threads_list数组中每个元素是一个链表,每个IO线程负责处理一个链表上的事件。
  2. 每个IO线程有一个主循环IOThreadMain,该循环会不停的处理io_threads_list[i]链表上的读写事件。
  3. 主线程会等所有IO线程处理完事件后,IO数据已经在client的缓冲区,由主线程来执行querybuf中的命令。
  4. 主线程执行完命令,并不直接发送数据给客户端,同样发要发送的数据放到缓冲区,并放入clients_pending_write中,之后再分配给不同的IO线程(放到io_threads_list[i])。
  5. 由主循环调用handleClientsWithPendingWritesUsingThreads后,各IO线程再处理io_threads_list[i]做对应的发送操作。

主线程与IO线程之间的协作

关键数据结构

// 这个数组里面每个值是一个原子变量,io_threads_pending[i]表示第i个IO线程上待处理的事件数量
threads_pending io_threads_pending[IO_THREADS_MAX_NUM];

// io_threads_list[i] 表示一个链表,链表上每个元素是一个client结构,表示对应的client上有待处理的事件
list *io_threads_list[IO_THREADS_MAX_NUM];

主线程通知IO线程干活

主线程在aeApiPoll拿出事件之后,再对应的处理函数里,并不做真正的数据读写,只是把有数据的client放到server.clients_pending_read链表上。

void readQueryFromClient(connection *conn) {
    ...
    /* Check if we want to read from the client later when exiting from
     * the event loop. This is the case if threaded I/O is enabled. */
    if (postponeClientRead(c)) return;
    ...
}

int postponeClientRead(client *c) {
    if (server.io_threads_active &&
        server.io_threads_do_reads &&
        !ProcessingEventsWhileBlocked &&
        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
        io_threads_op == IO_THREADS_OP_IDLE)
    {
        // 把这个client加到server.clients_pending_read链表
        listAddNodeHead(server.clients_pending_read,c);
        c->pending_read_list_node = listFirst(server.clients_pending_read);
        return 1;
    } else {
        return 0;
    }
}

在主线程“处理完”事件之后,主循环会调用handleClientsWithPendingReadsUsingThreads,这个函数会将读写事件分配给IO线程。

int handleClientsWithPendingReadsUsingThreads(void) {
    ...
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        // 这里通过item_id对线程数量取余,决定当前的任务分配个哪一个IO线程
        int target_id = item_id % server.io_threads_num;
        listAddNodeTail(io_threads_list[target_id],c);
        item_id++;
    }

    
    for (int j = 1; j < server.io_threads_num; j++) {
        int count = listLength(io_threads_list[j]);
        // 这里会修改io_threads_pending数组里原子变量的值,通知IO线程干活
        setIOPendingCount(j, count);
    }

    // 主线程自己也会做一些IO的活
    listRewind(io_threads_list[0],&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        readQueryFromClient(c->conn);
    }
    listEmpty(io_threads_list[0]);

    // 等所有IO操作完成
    while(1) {
        unsigned long pending = 0;
        for (int j = 1; j < server.io_threads_num; j++)
            pending += getIOPendingCount(j);
        if (pending == 0) break;
    }

    ...
    while(listLength(server.clients_pending_read)) {
        ...
        // 处理命令
        if (processPendingCommandAndInputBuffer(c) == C_ERR) {
            /* If the client is no longer valid, we avoid
             * processing the client later. So we just go
             * to the next. */
            continue;
        }

        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
            // 把发给客户端的数据放到server.clients_pending_write链表中,等待后续发给IO线程
            putClientInPendingWriteQueue(c);
    }
    // ...
    return processed;
}

IO线程通知主线程活干完了

IO线程会执行IOThreadMain循环,不断等待新事件(io_threads_pending[i]>0),有事件之后处理完,再将io_threads_pending[i]值设置为0。

void *IOThreadMain(void *myid) {
    
    while(1) {
        // 一直循环、等待,直到getIOPendingCount()>0
        for (int j = 0; j < 1000000; j++) {
            if (getIOPendingCount(id) != 0) break;
        }

        if (getIOPendingCount(id) == 0) {
            pthread_mutex_lock(&io_threads_mutex[id]);
            pthread_mutex_unlock(&io_threads_mutex[id]);
            continue;
        }

        // 处理IO事件,
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            if (io_threads_op == IO_THREADS_OP_WRITE) {
                writeToClient(c,0);
            } else if (io_threads_op == IO_THREADS_OP_READ) {
                readQueryFromClient(c->conn);
            } else {
                serverPanic("io_threads_op value is unknown");
            }
        }
        // 清空队列,设置pending值为0,表示自己干完了
        listEmpty(io_threads_list[id]);
        setIOPendingCount(id, 0);
    }
}

总结

Redis 6.0在引入多线程IO之后,整体的处理框架改动很小,主线程和IO线程通过原子变量、io_threads_list来通信交换数据,且主线程和IO线程保证不会同时操作io_threads_list链表,避免了使用锁。各个线程的主循环之间关系为:

Redis中的Reactor模型

执行网络IO和执行命令两部分是完全通过IOPendingCount隔离开的,这样的设计虽然避免了用锁,但也会拖慢整体的执行效率。假设有1个主线程,9个IO线程,aeApiPoll拿到了100个事件,平分给了10个线程,Redis要等待10个线程中最慢的那个线程处理完,才会开始处理命令。

转载自:https://juejin.cn/post/7124667316637270046
评论
请登录