从一条Redis命令是如何执行了解事件驱动
大家在使用 Redis 的时候有没有想过一条命令是如何执行的呢?下面我带领大家了解一下 Redis 命令执行的原理。
Redis 命令执行流程
客户端链接
当 Redis server
接收到客户端的连接请求时,就会使⽤注册好的 acceptTcpHandler
函数进⾏处理。acceptTcpHandler
函数会接受客户端连接,并创建已连接套接字 cfd
。然后把刚刚创建好的已连接套接字 cfd
作为参数调用 acceptCommonHandler
函数。
acceptCommonHandler
函数会调用 createClient
函数来创建客户端。在 createClient
函数中调用 aeCreateFileEvent
来添加可读事件的回调函数。具体调用如下:
aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c)
可以看到监听的事件类型是 AE_READABLE
,注册的回调函数是 readQueryFromClient
,现在我们了解到当有读事件时会调用 readQueryFromClient
来处理。
读事件处理
当服务器接收到客户端的命令时会触发可读事件,调用 readQueryFromClient
函数。readQueryFromClient
函数首先会将命令写入输入缓冲区,然后从输入缓冲区中解析发送过来的命令,调用 processCommand
函数来执行命令,最后将返回给客户端的内容写入输出缓冲区。
写事件处理
Redis
事件驱动框架每次循环进入事件处理函数之前,也就是在调用 aeProcessEvents
函数前,会调用 beforeSleep
函数进行一些任务处理,其中会调用 handleClientsWithPendingWrites
函数,它会将 Redis
的输出缓冲区的数据写回客户端。我们看一下事件循环的主函数 aeMain
:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
// 调用 beforesleep 函数
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
handleClientsWithPendingWrites
会遍历每个代写回数据的客户端,并调用 writeToClient
写回数据。如果还有待写回数据,则会创建可写事件监听设置回调函数sendReplyToClient
, sendReplyToClient
这个函数也会调用 writeToClient
写回数据。我们来看一下 handleClientsWithPendingWrites
函数的基本流程:
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
// 获取待写回的客户端列表
listRewind(server.clients_pending_write,&li);
//遍历所有待写会客户端
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
/* Try to write buffers to the client socket. */
// 将客户端输出缓冲区的数据写回
if (writeToClient(c->fd,c,0) == C_ERR) continue;
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
// 如果还有待写回数据
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
// 创建可写事件的监听并设置回调函数
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
事件驱动框架
Redis
事件驱动框架有两大类事件:IO
事件和时间事件。下面我们来学习一下他们相应的处理机制。
aeEventLoop 结构体与初始化
/* State of an event based program */
typedef struct aeEventLoop {
// 当前最大的 fd
int maxfd; /* highest file descriptor currently registered */
// fd的最大
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
// IO事件数组
aeFileEvent *events; /* Registered events */
// 已触发事件数组
aeFiredEvent *fired; /* Fired events */
// 记录时间事件的链表头
aeTimeEvent *timeEventHead;
int stop;
// 和API调⽤接⼝相关的数据
void *apidata; /* This is used for polling API specific data */
// 进入事件循环流程前执行的函数
aeBeforeSleepProc *beforesleep;
// 退出事件循环流程后执行的函数
aeBeforeSleepProc *aftersleep;
} aeEventLoop;
aeCreateEventLoop 函数的初始化操作
我们看一下 aeCreateEventLoop
函数的定义:
aeEventLoop *aeCreateEventLoop(int setsize);
可以看到入参只有 setSize
,在 sever
初始化时调用 aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
,可以看到 setSize
的值是由 server.maxclients
和 CONFIG_FDSET_INCR
这两个参数决定的,server.maxclients
的值可以通过 redis.conf
进行定义,默认值为 1000;而宏定义 CONFIG_FDSET_INCR
的值是 CONFIG_MIN_RESERVED_FDS
加上 96。
#define CONFIG_MIN_RESERVED_FDS 32
#define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
那么这个参数有什么用呢,我们接下来看一下 aeCreateEventLoop
函数的执行流程。
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
// 分配内存空间
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
// 设置时间事件的链表头为NULL
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
// 调⽤aeApiCreate函数,去实际调⽤操作系统提供的IO多路复⽤函数
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
// 将所有⽹络IO事件对应⽂件描述符的掩码设置为AE_NONE
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
// 如果初始化失败释放内存
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
通过上面的代码分析可知,aeCreateEventLoop
首先会创建一个 aeEventLoop
结构体类型的变量 eventLoop
。然后为 eventLoop
的成员变量分配内存空间,并且为 eventLoop
的成员变量赋初值。第二步调用 aeApiCreate
函数去实际调⽤操作系统提供的 IO
多路复⽤函数,假设 Redis
运⾏在 Linux
操作系统上,并且 IO
多路复⽤机制是 epoll
,那么此时,aeApiCreate
函数就会调⽤ epoll_create
创建 epoll
实例,同时会创建 epoll_event
结构的数组,数组⼤⼩等于参数 setsize
。如下面代码所示:
typedef struct aeApiState {
// epoll实例的描述符
int epfd;
// 记录监听事件
struct epoll_event *events;
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 将epoll_event数组保存在aeApiState结构体变量state中
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 将epoll实例描述符保存在aeApiState结构体变量state中
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 把state变量赋值给eventLoop中的apidata
eventLoop->apidata = state;
return 0;
}
最后 aeCreateEventLoop
函数会把所有⽹络 IO
事件对应⽂件描述符的掩码初始化为 AE_NONE
,表⽰暂时不对任何事件进行监听。
IO 事件
IO 时间创建
首先看一下 aeCreateFileEvent
函数的定义:
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
aeCreateFileEvent
函数有五个参数,分别是循环事件结构体 *eventLoop
,IO
事件对应的文件描述符 fd
,事件类型掩码 mask
,事件处理回调函数 *proc
,以及事件私有数据 *clientData
。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 根据事件的 fd 获取事件指针
aeFileEvent *fe = &eventLoop->events[fd];
// 添加监听的事件事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
// 设置回调函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
时间事件
时间事件定义
/* Time event structure */
typedef struct aeTimeEvent {
// 事件时间 id
long long id; /* time event identifier. */
// 事件到达的秒级时间戳
long when_sec; /* seconds */
// 事件到达的毫秒级时间戳
long when_ms; /* milliseconds */
// 事件触发时的函数
aeTimeProc *timeProc;
// 事件结束后的处理函数
aeEventFinalizerProc *finalizerProc;
// 事件相关的私有数据
void *clientData;
// 前向指针
struct aeTimeEvent *prev;
// 后向指针
struct aeTimeEvent *next;
} aeTimeEvent;
时间事件结构体中主要的变量,包括以秒记录和以毫秒记录的时间事件触发时的时间戳when_sec和 when_ms,以及时间事件触发后的处理函数timeProc。另外,在时间事件的结构体中,还包含了前向和 后向指针prev和*next,这表明时间事件是以链表的形式组织起来的。
时间事件创建
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 生成时间事件 id
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
// 分配内存
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
// 根据 milliseconds 参数计算所创建时间事件具体的触发时间戳
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}
具体的时间事件创建是在初始化时:
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
可以看到时间事件处理函数是serverCron。
时间事件触发
其实,时间事件的检测触发⽐较简单,事件驱动框架的aeMain函数会循环调⽤aeProcessEvents函数,来处 理各种事件。⽽aeProcessEvents函数在执⾏流程的最后,会调⽤processTimeEvents函数处理相应到时 的任务。
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
//从时间事件链表中取出事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
//获取当前时间
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
//调⽤注册的回调函数处理
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}
小结
本文从介绍一条 Redis 命令开始,逐步介绍了事件驱动框架,包括 IO 事件和时间事件。
转载自:https://juejin.cn/post/7171765312172818468