likes
comments
collection
share

从 Redis Sentinel 服务监控源码探索 Redis 高可用的秘密:监控全流程解析

作者站长头像
站长
· 阅读数 12

Redis Sentinel(哨兵)是一个用于监控和管理 Redis 主从复制和高可用性的组件。它允许你确保 Redis 在主节点发生故障或不可用的情况下能够自动切换到备用节点。

环境准备

我们的目标是分析哨兵的服务监控流程源码,所以只需要搭建一个最基础的哨兵来监控一个 Redis 服务即可,下面是一个最基础的哨兵配置文件:

# 哨兵端口
port 26379

dir /tmp

daemonize no 

# 需要监控的 Redis master,quorum 为 1
sentinel monitor mymaster 127.0.0.1 6379 1

在源码中,与哨兵最相关的两个文件是 src\server.csrc\sentinel.c 分别对应服务和哨兵。

相关数据类型

一个哨兵服务对应一些关键的数据类型,这些类型中有许多成员用来实现不同的功能。

  • redisServer:包含了 Redis 实例的主要信息和配置,如端口、数据库以及哨兵配置等信息。
  • sentinelState:代表一个 Sentinel 进程的主要状态,包括当前哨兵 ID、已经监视的主服务器等信息。
  • sentinelRedisInstance:表示 Sentinel 所监视的一个 Redis 实例的信息,包括实例的 IP、端口、实例对应的主/从节点等。

为了保持简单,这里只列出一些跟文章主题相关的成员。

// file: src/server.h
struct redisServer {
    // 事件循环相关
    aeEventLoop *el; 
    // 当前服务的监听端口
    int port; 
    // 哨兵配置
    struct sentinelConfig *sentinel_config; 
    ...
}

// file: src/sentinel.c
struct sentinelState {
    // 当前哨兵 ID
    char myid[CONFIG_RUN_ID_SIZE+1]; 
    // 存储哨兵 master 实例的字典
    // key 为哨兵的名称,value 为哨兵实例 sentinelRedisInstance
    dict *masters; 
}

struct sentinelRedisInstance {
    // 当前实例的一些标记,如实例类型(master/slave)
    int flags;
    // 实例名称
    char *name;
    // 监控的 redis master 服务地址
    sentinelAddr *addr;
    // 上次发布 hello 消息的时间(毫秒)
    mstime_t last_pub_time;
    // 主观下线时间
    mstime_t s_down_since_time;
    // 客观下线时间
    mstime_t o_down_since_time;
    
    /* Master 相关成员 */
    // 监控在同一个 master 上的其他哨兵
    dict *sentinels;
    // 从节点
    dict *slaves;
    // 认定集群需要故障恢复的确认数量
    unsigned int quorum;
    
    /* Slave 相关成员 */
    // 当前从节点所属的 master 节点
    struct sentinelRedisInstance *master;
    ...
}

初始化 sentinel

Redis 的主程序支持服务器和哨兵两种模式,即两种模式都是同一个程序入口,入口函数中根据参数来判断是服务模式还是哨兵模式,例如当执行 redis-sentinelredis-server --sentinel 时表示是以哨兵模式运行。

当以哨兵模式启动服务时,会先对哨兵相关的数据类型和数据结构进行初始化,然后将哨兵配置文件加载进当前服务,最后启动服务。

int main(int argc, char **argv) {
    ...
    // 检查是否为哨兵模式启动
    // 当执行 redis-sentinel 或 --sentinel 时,sentinel_mode 为 1
    server.sentinel_mode = checkForSentinelMode(argc,argv, exec_name);
    
    // 哨兵模式的初始化
    if (server.sentinel_mode) {
        // 指定当前服务的默认端口
        initSentinelConfig();
        // 初始化 sentinelState 结构体中的一些成员和数据结构
        initSentinel();
    }
    
    if (argc >= 2) {
        ...
        // 加载哨兵配置
        if (server.sentinel_mode) loadSentinelConfigFromQueue();
    }
    // 检查哨兵配置是否正确加载完成以及是否拥有访问权限
    // 因为哨兵运行过程中会动态地修改配置文件
    if (server.sentinel_mode) sentinelCheckConfigFile();

    ...
    // 初始化并启动服务
    initServer();
    
    return 0;
}

loadSentinelConfigFromQueue() 会遍历所有哨兵配置,并调用sentinelHandleConfiguration() 逐个处理。

void loadSentinelConfigFromQueue(void) {
    const char *err = NULL;
    listIter li;
    listNode *ln;
    int linenum = 0;
    sds line = NULL;
    unsigned int j;

    /* if there is no sentinel_config entry, we can return immediately */
    if (server.sentinel_config == NULL) return;
    
    for (j = 0; j < sizeof(sentinel_configs) / sizeof(sentinel_configs[0]); j++) {
        listRewind(sentinel_configs[j],&li);
        // 将当前 sentinel 的配置加载到 sentinel 实例上和更新 sentinelStat 状态
        // 比如:
        // monitor <name> <host> <port> <quorum>
        // auth-pass <name> <password>
        // 如果是 monitor 配置则会先创建一个 sentinel 实例
        while((ln = listNext(&li))) {
            struct sentinelLoadQueueEntry *entry = ln->value;
            // 处理配置
            err = sentinelHandleConfiguration(entry->argv,entry->argc);
            ...
        }
    }

    /* free sentinel_config when config loading is finished */
    freeSentinelConfig();
    return;
}

sentinelHandleConfiguration() 负责解析不同的哨兵配置指令,对于最基本 monitor 配置来说,它会创建一个哨兵实例。

const char *sentinelHandleConfiguration(char **argv, int argc) {
    sentinelRedisInstance *ri;

    if (!strcasecmp(argv[0],"monitor") && argc == 5) {
        /* monitor <name> <host> <port> <quorum> */
        int quorum = atoi(argv[4]);

        if (quorum <= 0) return "Quorum must be 1 or greater.";
        // 创建 sentinel 实例,当前角色是 master
        if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
                                        atoi(argv[3]),quorum,NULL) == NULL)
        {
            return sentinelCheckCreateInstanceErrors(SRI_MASTER);
        }
    }
    ...
}

createSentinelRedisInstance() 创建一个 sentinelRedisInstance 实例,包含当前主服务器、从服务器字典等信息。对于 master 实例,会加入到 sentinelState.masters 表中,slave 和其他 sentinel 加入 master 下对应的字典中。

sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
    sentinelRedisInstance *ri;
    ...
    dict *table = NULL;
    
    ...
    // 根据当前 sentinel 实例的角色加入到对应的表中
    if (flags & SRI_MASTER) table = sentinel.masters; 
    else if (flags & SRI_SLAVE) table = master->slaves;
    else if (flags & SRI_SENTINEL) table = master->sentinels;
    
    ...
    
    // 分配 sentinel 实例
    ri = zmalloc(sizeof(*ri));
    // 监控同一个 master 的其它 sentinel 实例
    ri->sentinels = dictCreate(&instancesDictType);
    // 当前 master
    ri->master = master;
    // 初始化 slaves
    ri->slaves = dictCreate(&instancesDictType);
    // 加入 dict
    dictAdd(table, ri->name, ri);
    ..
    return ri;
}

启动服务

initServer() 用于初始化 Redis 服务器实例,包括设置事件循环、创建时间事件、初始化服务器配置等。

Redis 的哨兵监控是通过后台定时任务执行的,在服务器初始化时,通过aeCreateTimeEvent() 创建 serverCron 时间事件,该时间事件函数会作为后台任务调度器被定时调用。除了哨兵监控外,一些重要的后台任务也会在这里面执行,比如清除过期键、AOF 文件重写、内存不足时淘汰数据等等。

void initServer(void) {
    ...
    // 创建定时任务,定期执行 serverCron 回调函数
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
}

对于哨兵模式的定时任务,serverCron 时间事件会调用 sentinelTimer() 来处理哨兵相关的任务。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ...
    if (server.sentinel_mode) sentinelTimer();
}

sentinelTimer() 是哨兵定时监控任务调度的入口函数,它调用了多个不同的哨兵处理函数。

void sentinelTimer(void) {
    // 检查并处理 TITL 状态
    sentinelCheckTiltCondition();
    // 处理所有 sentinelRedisInstance 实例任务
    sentinelHandleDictOfRedisInstances(sentinel.masters);
    // 执行处于 pending 状态的脚本
    sentinelRunPendingScripts();
    // 检查已经终止的脚本
    sentinelCollectTerminatedScripts();
    // kill 掉已经执行超时的脚本
    sentinelKillTimedoutScripts();

    // 随机修改 Redis 服务器定时器频率,错开多个哨兵的定时任务的执行时间
    server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ;
}

我们主要看 sentinelHandleDictOfRedisInstances() 函数,它递归地对所有 sentinelRedisInstance 实例调用 sentinelHandleRedisInstance(),即每一种类型的 sentinelRedisInstance 实例(master、slave、sentinel)都会执行相同的任务。

void sentinelHandleDictOfRedisInstances(dict *instances) {
    dictIterator *di;
    dictEntry *de;
    sentinelRedisInstance *switch_to_promoted = NULL;

    /* There are a number of things we need to perform against every master. */
    di = dictGetIterator(instances);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        
        // 实例处理函数
        sentinelHandleRedisInstance(ri);
        if (ri->flags & SRI_MASTER) {
            // 当前为 master 实例,继续处理它的 slaves 和 sentinels
            sentinelHandleDictOfRedisInstances(ri->slaves);
            sentinelHandleDictOfRedisInstances(ri->sentinels);
            if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
                switch_to_promoted = ri;
            }
        }
    }
    ...
}

sentinelHandleRedisInstance() 是哨兵模式中最核心的函数,它会对一个特定的 sentinelRedisInstance 实例执行以下操作:

  • 周期性地向实例指向的服务发送 PING、INFO 命令以及 Hello 消息。
  • 检查主、客观下线情况。
  • master 下线时执行故障恢复(failover)。
  • ......
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
    /* ========== MONITORING HALF ============ */
    /* 所有类型实例执行 */
    // 尝试重连已经断开的实例
    sentinelReconnectInstance(ri);
    // 发送周期性任务
    sentinelSendPeriodicCommands(ri);

    /* ============== ACTING HALF ============= */
    ...

    /* 所有类型实例执行 */
    // 检查服务主观下线情况
    sentinelCheckSubjectivelyDown(ri);

    /* 仅 master 类型实例执行 */
    if (ri->flags & SRI_MASTER) {
        // 检查服务客观下线情况
        sentinelCheckObjectivelyDown(ri);
        // 判断是否满足故障恢复条件
        if (sentinelStartFailoverIfNeeded(ri))
            // 向其他哨兵询问 master 下线情况,以满足故障恢复所需的 quorum 数量
           sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
        // 故障恢复状态机
        sentinelFailoverStateMachine(ri);
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

服务监控

sentinelSendPeriodicCommands() 是哨兵实现监控的基础,它会对 Redis 服务进行定期探活,并记录一些状态供后续使用,监控包含发送 INFO、PING 命令以及 Hello 消息三种方式。

redisAsyncCommand() 是 Redis 客户端 hiredis 提供的一个异步命令执行的方法,它异步地向 Redis 服务发送命令,并在回调函数中处理异步命令的结果,我们主要关心回调函数中的处理逻辑就好。

void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
    mstime_t now = mstime();
    mstime_t info_period, ping_period;
    int retval;
    
    // 向 masters 和 slaves 发送 INFO 命令
    if ((ri->flags & SRI_SENTINEL) == 0 &&
        (ri->info_refresh == 0 ||
        (now - ri->info_refresh) > info_period))
    {
        retval = redisAsyncCommand(ri->link->cc,
            sentinelInfoReplyCallback, ri, "%s",
            sentinelInstanceMapCommand(ri,"INFO"));
        if (retval == C_OK) ri->link->pending_commands++;
    }
    
    // 向所有相关 redis 实例发送 PING 命令
    if ((now - ri->link->last_pong_time) > ping_period &&
               (now - ri->link->last_ping_time) > ping_period/2) {
        sentinelSendPing(ri);
    }
    
    // 向所有相关 redis 实例发送 hello 消息
    if ((now - ri->last_pub_time) > sentinel_publish_period) {
        sentinelSendHello(ri);
    }
}

INFO 处理

Redis 的 INFO 命令会返回服务的具体信息和一些相关的统计,在回调中会根据 INFO 返回值更新当前实例的一些信息(如当前实例的 Role、IP 等)以及向哨兵配置文件中写入一些必要的配置。

void sentinelInfoReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
    redisReply *r;

    if (!reply || !link) return;
    // 执行中命令队列计数 -1
    link->pending_commands--;
    // INFO 命令结果
    r = reply;

    // 解析 INFO 命令结果字符串并更新实例上的相关信息等
    if (r->type == REDIS_REPLY_STRING || r->type == REDIS_REPLY_VERB)
        sentinelRefreshInstanceInfo(ri,r->str);
}

PING 处理

对于我们的场景来说,PING 命令的处理比较简单,当成功返回时(如 PONG),则将被监控服务的最后一次可达时间记录为当前时间,这也是后续判断主观下线的依据之一,除此之外还有一些其他属性的更新。

void sentinelPingReplyCallback(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
    redisReply *r;

    if (!reply || !link) return;
    link->pending_commands--;
    r = reply;

    if (r->type == REDIS_REPLY_STATUS ||
        r->type == REDIS_REPLY_ERROR) {
        // 当 PING 返回 PONG、LOADING、MASTERDOWN 时表示服务可用
        if (strncmp(r->str,"PONG",4) == 0 ||
            strncmp(r->str,"LOADING",7) == 0 ||
            strncmp(r->str,"MASTERDOWN",10) == 0)
        {
            // 记录被监控服务器最后一次可达的时间
            link->last_avail_time = mstime();
            // 上一次向服务器发送 PING 命令的时间
            // 标记为0表示已经成功对方已经成功接收到命令并可达
            link->act_ping_time = 0;

            if (ri->flags & SRI_MASTER_REBOOT && strncmp(r->str,"PONG",4) == 0)
                ri->flags &= ~SRI_MASTER_REBOOT;

        } 
        ...
    }
    // 记录最后一次收到被监控服务器回复的时间
    link->last_pong_time = mstime();
}

Hello 消息

通过 Pub/Sub 向被监控服务器发送 Hello 消息,这个消息是为了广播当前哨兵所指向的 master 信息,同时也告诉服务器当前哨兵的存在,一个 Hello 消息包含了当前哨兵和其所指向的 master 的一些信息,如 IP 和端口,这里我们简单看一下。

int sentinelSendHello(sentinelRedisInstance *ri) {
    char ip[NET_IP_STR_LEN];
    char payload[NET_IP_STR_LEN+1024];
    int retval;
    char *announce_ip;
    int announce_port;
    // 获取当前 master
    sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
    sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);

    if (ri->link->disconnected) return C_ERR;

    // 格式化 hello 消息,最终格式为:
    // sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
    // master_name,master_ip,master_port,master_config_epoch.
    snprintf(payload,sizeof(payload),
        "%s,%d,%s,%llu," /* Info about this sentinel. */
        "%s,%s,%d,%llu", /* Info about current master. */
        announce_ip, announce_port, sentinel.myid,
        (unsigned long long) sentinel.current_epoch,
        /* --- */
        master->name,announceSentinelAddr(master_addr),master_addr->port,
        (unsigned long long) master->config_epoch);
        
    // 使用 PUBLISH 命令发布一个消息,服务启动时会订阅对应的 Hello channel
    // 在回调函数中会将最近一次发送消息的时间记录为当前时间
    retval = redisAsyncCommand(ri->link->cc,
        sentinelPublishReplyCallback, ri, "%s %s %s",
        sentinelInstanceMapCommand(ri,"PUBLISH"),
        SENTINEL_HELLO_CHANNEL,payload);
    if (retval != C_OK) return C_ERR;
    
    ri->link->pending_commands++;
    return C_OK;
}

主观下线

哨兵会根据三个指标来判断实例是否下线,其中最简单的一项就是判断上次 PING 命令的响应时间是否已经超过设定时间阈值。如果认为哨兵已经下线,则标记对应的状态并触发一个主观下线事件。

void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
    mstime_t elapsed = 0;

    // 计算上次 PING 的响应时间与当前的时间差
    if (ri->link->act_ping_time)
        elapsed = mstime() - ri->link->act_ping_time;
    else if (ri->link->disconnected)
        elapsed = mstime() - ri->link->last_avail_time;
        
    ...
    
    // 满足三种条件之一,则认为实例已经下线
    // 1、上次 PING 响应的时间间隔已经超过设定的时间阈值
    // 2、主节点角色变为从节点并且在一段时间内没有恢复为主节点,这段时间是主观下线阈值加上信息广播周期的两倍
    // 3、主节点的重启时间超出阈值
    if (elapsed > ri->down_after_period ||
        (ri->flags & SRI_MASTER &&
         ri->role_reported == SRI_SLAVE &&
         mstime() - ri->role_reported_time >
          (ri->down_after_period+sentinel_info_period*2)) ||
          (ri->flags & SRI_MASTER_REBOOT && 
           mstime()-ri->master_reboot_since_time > ri->master_reboot_down_after_period))
    {
        // 防止重复触发主观下线事件
        if ((ri->flags & SRI_S_DOWN) == 0) {
            // 触发主观下线事件,并将指向的 Redis 实例标记为主观下线
            sentinelEvent(LL_WARNING,"+sdown",ri,"%@");
            // 维护主观下线的两个状态
            ri->s_down_since_time = mstime();
            ri->flags |= SRI_S_DOWN;
        }
    } else {
        // 实例已正常在线,如果之前处于主观下线状态,则触发一个 -sdown 事件
        if (ri->flags & SRI_S_DOWN) {
            sentinelEvent(LL_WARNING,"-sdown",ri,"%@");
            ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT);
        }
    }
}

客观下线

客观下线主要就是收集其他哨兵的状态,如果认为 master 下线的哨兵数量达到了配置文件中设定的 quorum,则将 master 实例标记为客观下线,并触发事件。

void sentinelCheckObjectivelyDown(sentinelRedisInstance *master) {
    dictIterator *di;
    dictEntry *de;
    unsigned int quorum = 0, odown = 0;

    // 首先确保实例已经主观下线
    if (master->flags & SRI_S_DOWN) {
        // 当前实例的 quorum
        quorum = 1;
        // 遍历监控在该实例上的其他哨兵
        di = dictGetIterator(master->sentinels);
        while((de = dictNext(di)) != NULL) {
            sentinelRedisInstance *ri = dictGetVal(de);
            // 其他哨兵是否认为 master 已下线,累加 quorum
            if (ri->flags & SRI_MASTER_DOWN) quorum++;
        }
        dictReleaseIterator(di);
        // quorum 达到配置中的阈值,认为实例已经主观下线
        if (quorum >= master->quorum) odown = 1;
    }

    // 触发主观下线相关的事件
    if (odown) {
        if ((master->flags & SRI_O_DOWN) == 0) {
            sentinelEvent(LL_WARNING,"+odown",master,"%@ #quorum %d/%d",
                quorum, master->quorum);
            master->flags |= SRI_O_DOWN;
            master->o_down_since_time = mstime();
        }
    } else {
        if (master->flags & SRI_O_DOWN) {
            sentinelEvent(LL_WARNING,"-odown",master,"%@");
            master->flags &= ~SRI_O_DOWN;
        }
    }
}

询问 master 状态

哨兵实例的 SRI_MASTER_DOWN 状态是向其他哨兵询问得来的,在检查完客观下线状态后会向其他哨兵询问 master 状态。所谓询问就是向其他哨兵实例发送一个 SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid> 命令来获取一个状态。

void sentinelAskMasterStateToOtherSentinels(sentinelRedisInstance *master, int flags) {
    dictIterator *di;
    dictEntry *de;

    // 遍历所有哨兵实例
    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);
        ...
        // 发送命令
        // SENTINEL IS-MASTER-DOWN-BY-ADDR <ip> <port> <current-epoch> <runid>
        retval = redisAsyncCommand(ri->link->cc,
                    sentinelReceiveIsMasterDownReply, ri,
                    "%s is-master-down-by-addr %s %s %llu %s",
                    sentinelInstanceMapCommand(ri,"SENTINEL"),
                    announceSentinelAddr(master->addr), port,
                    sentinel.current_epoch,
                    (master->failover_state > SENTINEL_FAILOVER_STATE_NONE) ?
                    sentinel.myid : "*");
        if (retval == C_OK) ri->link->pending_commands++;
    }
    dictReleaseIterator(di);
}

发送完成后,其他哨兵会接收到这个命令,进入到命令处理分支。当前哨兵判断该 master 实例是否处于主观下线状态,然后将这个状态返回。

void sentinelCommand(client *c) {
    ...
    // is-master-down-by-addr 命令分支
    if (!strcasecmp(c->argv[1]->ptr,"is-master-down-by-addr")) {
        ...
        // 获取 master 实例
         ri = getSentinelRedisInstanceByAddrAndRunID(sentinel.masters,
            c->argv[2]->ptr,port,NULL);

        // 如果当前哨兵已经认为主观下线,则回复下线状态
        if (!sentinel.tilt && ri && (ri->flags & SRI_S_DOWN) &&
                                    (ri->flags & SRI_MASTER))
            isdown = 1;
            
        ...
        addReply(c, isdown ? shared.cone : shared.czero);
    }
}

发送方哨兵在命令回调函数中获取状态并更新哨兵实例的 SRI_MASTER_DOWN 状态,用于下一次客观下线检查。

void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = privdata;
    instanceLink *link = c->data;
    redisReply *r;

    if (!reply || !link) return;
    link->pending_commands--;
    r = reply;

    if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
        r->element[0]->type == REDIS_REPLY_INTEGER &&
        r->element[1]->type == REDIS_REPLY_STRING &&
        r->element[2]->type == REDIS_REPLY_INTEGER)
    {
        ri->last_master_down_reply_time = mstime();
        // 这里获取到其他哨兵回复的下线状态,更新对应哨兵实例状态
        // 根据这个状态在客观下线检查中计算 quorum
        if (r->element[0]->integer == 1) {
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            ri->flags &= ~SRI_MASTER_DOWN;
        }
        ...
    }
}

总结

一个哨兵服务监控的流程就是通过后台定时任务不断地对 Redis 实例进行一些探活和通信,如果通过 quorum 决策已经确定了 master 客观下线,后续就会进行 failover(故障恢复)。为了篇幅简洁之后会另开一篇来讲述故障恢复的处理。