Redis 源码分析 RedisServer 启动过程
Offer 驾到,掘友接招!我正在参与2022春招打卡活动,点击查看活动详情。
数据结构
在 Redis 的 server.h
源码中 redisServer 数据结构可以看出,其功能得有多复杂。因此在这里我会将其拆成一个个小块,有些内容会在后面单独进行详细分析。而且都做了详细的注释,在这里我就不贴出来。大致分为如下几个部分。
- 通用参数(General)
- 模块(Modules)
- 网络(Networking)
- 常见的命令回调函数
- 统计相关(stat)
- 配置信息(Configuration)
- AOF持久化相关(包括aof rewrite过程中,父子进程用于消除数据差异的管道)
- RDB持久化相关(包括RDB过程中,父子进程用于通信的管道)
- AOF或者主从复制下,命令传播相关
- 日志相关(logging)
- 主从复制(Replication (master)+Replication (slave))
- 主从复制的脚本缓存(Replication script cache)
- 主从同步相关(Synchronous replication)
- 系统限制(Limits)
- 阻塞客户端(Blocked clients)
- sort命令相关(Sort)
- 数据结构转换参数
- 时间缓存(time cache)
- 发布订阅(Pubsub)
- 集群(Cluster)
- Lazy free(表示删除过期键,是否启用后台线程异步删除)
- LUA脚本
- 延迟监控相关
- 服务端中互斥锁信息
- 系统硬件信息
启动过程
redis 服务端启动时调用 server.c
文件中的 int main(int argc, char **argv)
方法完成的,代码如下:
下面我们一起来分析一下,redis-server
的启动过程。
通用设置
main 函数中,通用设置包括时区设置,hash 种子
// 时区设置
setlocale(LC_COLLATE,"");
tzset(); /* Populates 'timezone' global. */
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
srandom(time(NULL)^getpid());
gettimeofday(&tv,NULL);
init_genrand64(((long long) tv.tv_sec * 1000000 + tv.tv_usec) ^ getpid());
crc64_init();
- 生成 hash 种子
void dictSetHashFunctionSeed(uint8_t *seed) {
memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed));
}
/* The default hashing function uses SipHash implementation
* in siphash.c. */
uint64_t siphash(const uint8_t *in, const size_t inlen, const uint8_t *k);
uint64_t siphash_nocase(const uint8_t *in, const size_t inlen, const uint8_t *k);
uint64_t dictGenHashFunction(const void *key, int len) {
return siphash(key,len,dict_hash_function_seed);
}
模式选择
根据参数判断,是否是哨兵模式
server.sentinel_mode = checkForSentinelMode(argc,argv);
/* Returns 1 if there is --sentinel among the arguments or if
* argv[0] contains "redis-sentinel". */
int checkForSentinelMode(int argc, char **argv) {
int j;
if (strstr(argv[0],"redis-sentinel") != NULL) return 1;
for (j = 1; j < argc; j++)
if (!strcmp(argv[j],"--sentinel")) return 1;
return 0;
}
初始化服务端
服务端的初始化,主要是函数 initServerConfig
。 主要是的职责就是初始化 redisServer
数据库结构中各个成员变量。
void initServerConfig(void) {
int j;
updateCachedTime(1);
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
clearReplicationId2();
server.hz = CONFIG_DEFAULT_HZ; /* Initialize it ASAP, even if it may get
updated later after loading the config.
This value may be used before the server
is initialized. */
server.timezone = getTimeZone(); /* Initialized by tzset(). */
server.configfile = NULL;
server.executable = NULL;
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.bindaddr_count = 0;
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd.count = 0;
server.tlsfd.count = 0;
server.sofd = -1;
server.active_expire_enabled = 1;
server.skip_checksum_validation = 0;
server.saveparams = NULL;
server.loading = 0;
server.loading_rdb_used_mem = 0;
server.logfile = zstrdup(CONFIG_DEFAULT_LOGFILE);
server.aof_state = AOF_OFF;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
server.aof_flush_sleep = 0;
server.aof_last_fsync = time(NULL);
atomicSet(server.aof_bio_fsync_status,C_OK);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
server.aof_lastbgrewrite_status = C_OK;
server.aof_delayed_fsync = 0;
server.aof_fd = -1;
server.aof_selected_db = -1; /* Make sure the first time will not match */
server.aof_flush_postponed_start = 0;
server.pidfile = NULL;
server.active_defrag_running = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
memset(server.blocked_clients_by_type,0,
sizeof(server.blocked_clients_by_type));
server.shutdown_asap = 0;
server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2);
unsigned int lruclock = getLRUClock();
atomicSet(server.lruclock,lruclock);
resetServerSaveParams();
appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
/* Replication related */
server.masterauth = NULL;
server.masterhost = NULL;
server.masterport = 6379;
server.master = NULL;
server.cached_master = NULL;
server.master_initial_offset = -1;
server.repl_state = REPL_STATE_NONE;
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_transfer_s = NULL;
server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
server.repl_down_since = 0; /* Never connected, repl is down since EVER. */
server.master_repl_offset = 0;
/* Replication partial resync backlog */
server.repl_backlog = NULL;
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_no_slaves_since = time(NULL);
/* Failover related */
server.failover_end_time = 0;
server.force_failover = 0;
server.target_replica_host = NULL;
server.target_replica_port = 0;
server.failover_state = NO_FAILOVER;
/* Client output buffer limits */
for (j = 0; j < CLIENT_TYPE_OBUF_COUNT; j++)
server.client_obuf_limits[j] = clientBufferLimitsDefaults[j];
/* Linux OOM Score config */
for (j = 0; j < CONFIG_OOM_COUNT; j++)
server.oom_score_adj_values[j] = configOOMScoreAdjValuesDefaults[j];
/* Double constants initialization */
R_Zero = 0.0;
R_PosInf = 1.0/R_Zero;
R_NegInf = -1.0/R_Zero;
R_Nan = R_Zero/R_Zero;
/* Command table -- we initialize it here as it is part of the
* initial configuration, since command names may be changed via
* redis.conf using the rename-command directive. */
server.commands = dictCreate(&commandTableDictType,NULL);
server.orig_commands = dictCreate(&commandTableDictType,NULL);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
server.zpopminCommand = lookupCommandByCString("zpopmin");
server.zpopmaxCommand = lookupCommandByCString("zpopmax");
server.sremCommand = lookupCommandByCString("srem");
server.execCommand = lookupCommandByCString("exec");
server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
server.lmoveCommand = lookupCommandByCString("lmove");
/* Debugging */
server.watchdog_period = 0;
/* By default we want scripts to be always replicated by effects
* (single commands executed by the script), and not by sending the
* script to the slave / AOF. This is the new way starting from
* Redis 5. However it is possible to revert it via redis.conf. */
server.lua_always_replicate_commands = 1;
/* Client Pause related */
server.client_pause_type = CLIENT_PAUSE_OFF;
server.client_pause_end_time = 0;
initConfigValues();
}
哨兵设置
根据前文所选择的模式,如果选择的是哨兵模式,这里就进入哨兵初始化流程。
是否需要 rdb/aof 校验
/* Check if we need to start in redis-check-rdb/aof mode. We just execute
* the program main. However the program is part of the Redis executable
* so that we can easily execute an RDB check on loading errors. */
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
解析命令行参数
根据命令行参数,如配置文件中已有设置,那就就覆盖默认参数设置
if (argc >= 2) {
j = 1; /* First option to parse in argv[] */
sds options = sdsempty();
/* Handle special options --help and --version */
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* Parse command line options
* Precedence wise, File, stdin, explicit options -- last config is the one that matters.
*
* First argument is the config file name? */
if (argv[1][0] != '-') {
/* Replace the config file in server.exec_argv with its absolute path. */
server.configfile = getAbsolutePath(argv[1]);
zfree(server.exec_argv[1]);
server.exec_argv[1] = zstrdup(server.configfile);
j = 2; // Skip this arg when parsing options
}
while(j < argc) {
/* Either first or last argument - Should we read config from stdin? */
if (argv[j][0] == '-' && argv[j][1] == '\0' && (j == 1 || j == argc-1)) {
config_from_stdin = 1;
}
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual config file
* and stdin input are parsed (if they exist). */
else if (argv[j][0] == '-' && argv[j][1] == '-') {
/* Option name */
if (sdslen(options)) options = sdscat(options,"\n");
options = sdscat(options,argv[j]+2);
options = sdscat(options," ");
} else {
/* Option argument */
options = sdscatrepr(options,argv[j],strlen(argv[j]));
options = sdscat(options," ");
}
j++;
}
// 读取配置文件
loadServerConfig(server.configfile, config_from_stdin, options);
if (server.sentinel_mode) loadSentinelConfigFromQueue();
sdsfree(options);
}
读取配置文件
主要是读取 redis.conf
配置参数。
/* Load the server configuration from the specified filename.
* The function appends the additional configuration directives stored
* in the 'options' string to the config file before loading.
*
* Both filename and options can be NULL, in such a case are considered
* empty. This way loadServerConfig can be used to just load a file or
* just load a string. */
void loadServerConfig(char *filename, char config_from_stdin, char *options) {
sds config = sdsempty();
char buf[CONFIG_MAX_LINE+1];
FILE *fp;
/* Load the file content */
if (filename) {
if ((fp = fopen(filename,"r")) == NULL) {
serverLog(LL_WARNING,
"Fatal error, can't open config file '%s': %s",
filename, strerror(errno));
exit(1);
}
while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)
config = sdscat(config,buf);
fclose(fp);
}
/* Append content from stdin */
if (config_from_stdin) {
serverLog(LL_WARNING,"Reading config from stdin");
fp = stdin;
while(fgets(buf,CONFIG_MAX_LINE+1,fp) != NULL)
config = sdscat(config,buf);
}
/* Append the additional options */
if (options) {
config = sdscat(config,"\n");
config = sdscat(config,options);
}
loadServerConfigFromString(config);
sdsfree(config);
}
是否后台运行
int background = server.daemonize && !server.supervised;
// 是否是后台运行
if (background) daemonize();
// 执行后台运行的方法
void daemonize(void) {
int fd;
if (fork() != 0) exit(0); /* parent exits */
setsid(); /* create a new session */
/* Every output goes to /dev/null. If Redis is daemonized but
* the 'logfile' is set to 'stdout' in the configuration file
* it will not log at all. */
if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
if (fd > STDERR_FILENO) close(fd);
}
}
这里需要说一下(经常面试问到),当然 Redis 中台进程,貌似常见的问题比较简单,我们看来纯正的后台进程:
int
daemon_init(const char *pname, int facility)
{
int i;
pid_t pid;
if ( (pid = Fork()) < 0) //创建一个子进程
return (-1);
else if (pid)
_exit(0); /* parent terminates 父进程退出*/
//子进程继续运行
//此进程是子进程,所以肯定不是组长进程
/* child 1 continues... */
if (setsid() < 0) /* become session leader 创建会话,成为会话首进程,新的进程组的组长进程*/
return (-1);
Signal(SIGHUP, SIG_IGN); //把挂起信号设置为忽略
if ( (pid = Fork()) < 0) //再创建一个子进程
return (-1);
else if (pid) //父进程退出
_exit(0); /* child 1 terminates */
//第二个子进程继续运行,因为第二个子进程已经不是会话首进程了,所以永远不会获得控制终端
/* child 2 continues... */
daemon_proc = 1; /* for err_XXX() functions 再error.c中定义的变量*/
chdir("/"); /* change working directory 调整工作目录到根目录 */
/* close off file descriptors */
for (i = 0; i < MAXFD; i++) //关闭所有文件描述符
close(i);
/* redirect stdin, stdout, and stderr to /dev/null 定义标准输入,标准输出和标准错误到/dev/null */
open("/dev/null", O_RDONLY);
open("/dev/null", O_RDWR);
open("/dev/null", O_RDWR);
openlog(pname, LOG_PID, facility); //打开日志文件
return (0); /* success 函数运行成功 */
}
- setsid(): 确保当前进程编程新绘画的会话头进程组的进程组头进程,从而不在受终端控制。
- 忽略 SIGHUP 信号之后再次 fork 目的是确保本守护京城来即使打开一个终端设备,也不会自动获取终端。当乜有控制终端的一个会话头进程打开一个控制终端时,该终端自动成为这个会话头进程的控制终端。然后再次 fork 之后,我们确保新的子进程不在是一个会话头进程,从而不能自动获取一控制器终端。这里必须要忽略 SIGHUP 信号,因为会话头进程(即首次 fork 产生的子进程)终止时,其会话中的所有进程(即再次 fork 的子进程)都会收到 SIGHUP 信号
- 更改跟路径
- 关闭继承来的所有套接字
- 重定向 stdin ,stdout 和 stderr ,否则他会输出到屏幕上。
初始化服务
信号量
服务端打交道最多的是 SIGHUP 和 SIGPIPE,这两个信号默认行为是终止。而服务器随便就终止那是不行的,我么需要进行忽略。
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
针对其他的信号,比如 SIGINT 信号,我们对其进行捕获后 redis 进行收尾工作,避免进程呗暴力 kill
void setupSignalHandlers(void) {
struct sigaction act;
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
* Otherwise, sa_handler is used. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = sigShutdownHandler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
sigemptyset(&act.sa_mask);
act.sa_flags = SA_NODEFER | SA_RESETHAND | SA_SIGINFO;
act.sa_sigaction = sigsegvHandler;
if(server.crashlog_enabled) {
sigaction(SIGSEGV, &act, NULL);
sigaction(SIGBUS, &act, NULL);
sigaction(SIGFPE, &act, NULL);
sigaction(SIGILL, &act, NULL);
sigaction(SIGABRT, &act, NULL);
}
return;
}
syslog 设置
syslog 是 linux 系统自带的,主要是为了 daemon 进程提供日志服务,我们在前面讲过。deamon 进程无法打印到终端,那么如何方便的接受输出的日志呢?linux 提供了 syslog 服务,该服务支持打印各种级别的日志以及输出位置(本地或者远程均可)
if (server.syslog_enabled) {
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
其他常用参数设置
/* Initialization after setting defaults from the config system. */
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
server.hz = server.config_hz;
server.pid = getpid();
server.in_fork_child = CHILD_TYPE_NONE;
server.main_thread_id = pthread_self();
server.current_client = NULL;
server.errors = raxNew();
server.fixed_time_expire = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
server.clients_timeout_table = raxNew();
server.replication_allowed = 1;
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.client_pause_type = 0;
server.paused_clients = listCreate();
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
server.blocked_last_cron = 0;
server.blocking_op_nesting = 0;
创建共享对象
redis 非常注重内存消耗的,有些常用的对象,采用引用计数的方式进行复用
createSharedObjects();
/* Shared command responses */
shared.crlf = createObject(OBJ_STRING,sdsnew("\r\n"));
shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n"));
shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n"));
shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n"));
shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n"));
shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n"));
shared.pong = createObject(OBJ_STRING,sdsnew("+PONG\r\n"));
shared.queued = createObject(OBJ_STRING,sdsnew("+QUEUED\r\n"));
shared.emptyscan = createObject(OBJ_STRING,sdsnew("*2\r\n$1\r\n0\r\n*0\r\n"));
shared.space = createObject(OBJ_STRING,sdsnew(" "));
shared.colon = createObject(OBJ_STRING,sdsnew(":"));
shared.plus = createObject(OBJ_STRING,sdsnew("+"));
根据系统限制调整你打开的文件数
adjustOpenFilesLimit();
网络模型 reactor 初始化
redis 支持 Unix 和 tcp 两种模型,当服务端和客户端都在本机的时候, Unix 域套接字更快,因为不需要协议头解析等
const char *clk_msg = monotonicInit();
serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg);
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,&server.ipfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
exit(1);
}
if (server.tls_port != 0 &&
listenToPort(server.tls_port,&server.tlsfd) == C_ERR) {
serverLog(LL_WARNING, "Failed listening on port %u (TLS), aborting.", server.tls_port);
exit(1);
}
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
anetCloexec(server.sofd);
}
LRU 策略中过期池初始化
/* Create a new eviction pool. */
void evictionPoolAlloc(void) {
struct evictionPoolEntry *ep;
int j;
ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE);
for (j = 0; j < EVPOOL_SIZE; j++) {
ep[j].idle = 0;
ep[j].key = NULL;
ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE);
ep[j].dbid = 0;
}
EvictionPoolLRU = ep;
}
初始化 rdb 和 aof 信息
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
server.rdb_pipe_buff = NULL;
server.rdb_pipe_bufflen = 0;
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
server.child_info_nread = 0;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
初始化状态信息
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.stat_current_cow_bytes = 0;
server.stat_current_cow_updated = 0;
server.stat_current_save_keys_processed = 0;
server.stat_current_save_keys_total = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.stat_module_cow_bytes = 0;
server.stat_module_progress = 0;
for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
server.stat_clients_type_memory[j] = 0;
server.cron_malloc_stats.zmalloc_used = 0;
server.cron_malloc_stats.process_rss = 0;
server.cron_malloc_stats.allocator_allocated = 0;
server.cron_malloc_stats.allocator_active = 0;
server.cron_malloc_stats.allocator_resident = 0;
server.lastbgsave_status = C_OK;
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
注册事件
网络编程中,包括三类事件:文件事件、定时时间和信号事件
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
serverPanic("Unrecoverable error creating TCP socket accept handler.");
}
if (createSocketAcceptHandler(&server.tlsfd, acceptTLSHandler) != C_OK) {
serverPanic("Unrecoverable error creating TLS socket accept handler.");
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
这里采用回调的方式来注册事件、结合 IO 复用技术时间高效的网络模型。
- 复制初始化等等
- 一些 lua 虚拟机脚本添加
慢日志初始化
/* Initialize the slow log. This function should be called a single time
* at server startup. */
void slowlogInit(void) {
server.slowlog = listCreate();
server.slowlog_entry_id = 0;
listSetFreeMethod(server.slowlog,slowlogFreeEntry);
}
后台任务线程创建
这里后台线程主要包含三类
- 异步关闭文件描述符
- AOF 异步刷盘
- 过期键异步删除
/* Initialize the background system, spawning the thread. */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
lua 脚本
其他
外部模块加载
void moduleLoadFromQueue(void) {
listIter li;
listNode *ln;
listRewind(server.loadmodule_queue,&li);
while((ln = listNext(&li))) {
struct moduleLoadQueueEntry *loadmod = ln->value;
if (moduleLoad(loadmod->path,(void **)loadmod->argv,loadmod->argc)
== C_ERR)
{
serverLog(LL_WARNING,
"Can't load module from %s: server aborting",
loadmod->path);
exit(1);
}
}
磁盘加载数据
/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == AOF_ON) {
if (loadAppendOnlyFile(server.aof_filename) == C_OK)
serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
errno = 0; /* Prevent a stale value from affecting error checking */
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
/* Restore the replication ID / offset from the RDB file. */
if ((server.masterhost ||
(server.cluster_enabled &&
nodeIsSlave(server.cluster->myself))) &&
rsi.repl_id_is_set &&
rsi.repl_offset != -1 &&
/* Note that older implementations may save a repl_stream_db
* of -1 inside the RDB file in a wrong way, see more
* information in function rdbPopulateSaveInfo. */
rsi.repl_stream_db != -1)
{
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
server.master_repl_offset = rsi.repl_offset;
/* If we are a slave, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself();
selectDb(server.cached_master,rsi.repl_stream_db);
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
}
事件循环前准备
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
beforeSleep
beforeSleep 包括一下几个步骤:
- 运行 fast cycle 模式,进行过期处理
- 向所有从节点发送 ack 请求
- 接阻塞从节点
- 处理阻塞的客户端
- 将 AOF 刷盘
- 处理挂起的请求
代码如下:
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;
/* Just call a subset of vital functions in case we are re-entering
* the event loop from processEventsWhileBlocked(). Note that in this
* case we keep track of the number of events we are processing, since
* processEventsWhileBlocked() wants to stop ASAP if there are no longer
* events to handle. */
if (ProcessingEventsWhileBlocked) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += tlsProcessPendingData();
processed += handleClientsWithPendingWrites();
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
return;
}
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
tlsProcessPendingData();
/* If tls still has pending unread data don't sleep at all. */
aeSetDontWait(server.el, tlsHasPendingData());
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
* later in this function. */
if (server.cluster_enabled) clusterBeforeSleep();
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (server.active_expire_enabled && server.masterhost == NULL)
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Unblock all the clients blocked for synchronous replication
* in WAIT. */
if (listLength(server.clients_waiting_acks))
processClientsWaitingReplicas();
/* Check if there are clients unblocked by modules that implement
* blocking commands. */
if (moduleCount()) moduleHandleBlockedClients();
/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients))
processUnblockedClients();
/* Send all the slaves an ACK request if at least one client blocked
* during the previous event loop iteration. Note that we do this after
* processUnblockedClients(), so if there are multiple pipelined WAITs
* and the just unblocked WAIT gets blocked again, we don't have to wait
* a server cron cycle in absence of other event loop events. See #6623.
*
* We also don't send the ACKs while clients are paused, since it can
* increment the replication backlog, they'll be sent after the pause
* if we are still the master. */
if (server.get_ack_from_slaves && !checkClientPauseTimeoutAndReturnIfPaused()) {
robj *argv[3];
argv[0] = shared.replconf;
argv[1] = shared.getack;
argv[2] = shared.special_asterick; /* Not used argument. */
replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
server.get_ack_from_slaves = 0;
}
/* We may have recieved updates from clients about their current offset. NOTE:
* this can't be done where the ACK is recieved since failover will disconnect
* our clients. */
updateFailoverStatus();
/* Send the invalidation messages to clients participating to the
* client side caching protocol in broadcasting (BCAST) mode. */
trackingBroadcastInvalidationMessages();
/* Write the AOF buffer on disk */
if (server.aof_state == AOF_ON)
flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Try to process blocked clients every once in while. Example: A module
* calls RM_SignalKeyAsReady from within a timer callback (So we don't
* visit processCommand() at all). */
handleClientsBlockedOnKeys();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
if (moduleCount()) moduleReleaseGIL();
/* Do NOT add anything below moduleReleaseGIL !!! */
}
进入事件主循环
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 写:sendReplyToClient
// 读:readQueryFromClient
aeProcessEvents
中包含网络时间以及定时事件,这两类时间通过 IO 复用很好的继承在一起。
请求整体流程
参考与引用
- 《Redis 设计与实现》 黄健宏
- 图片来源于网络
转载自:https://juejin.cn/post/7072347362382839821