源码看 Redis Sentinel 如何实现故障恢复(failover)
哨兵的大部份工作都是在后台定时任务里执行的,包括监控、检查下线、故障恢复之类的逻辑。
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* 所有类型实例执行 */
// 尝试重连已经断开的实例
sentinelReconnectInstance(ri);
// 发送周期性任务
sentinelSendPeriodicCommands(ri);
/* ============== ACTING HALF ============= */
...
/* 所有类型实例执行 */
// 检查服务主观下线情况
sentinelCheckSubjectivelyDown(ri);
/* 仅 master 类型实例执行 */
if (ri->flags & SRI_MASTER) {
// 检查服务客观下线情况
sentinelCheckObjectivelyDown(ri);
// 判断是否满足故障恢复条件
if (sentinelStartFailoverIfNeeded(ri))
// 向其他哨兵询问 master 下线情况,确保 master 处于客观下线
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
// 故障恢复状态机
sentinelFailoverStateMachine(ri);
// 再次询问,确保 master 的状态信息在整个 Sentinel 系统中得到更新和同步
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
这里我们假设 master 已经被确定客观下线,然后sentinelStartFailoverIfNeeded() 方法会检查是否达到了故障恢复的条件,包括 客观下线、已经处于 failover 状态等,如果满足条件就进入 failover 流程。
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {
// master 需要处于客观下线状态
if (!(master->flags & SRI_O_DOWN)) return 0;
// master 已经在 failover 过程中
if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;
/* Last failover attempt started too little time ago? */
if (mstime() - master->failover_start_time <
master->failover_timeout*2)
{
...
}
sentinelStartFailover(master);
return 1;
}
sentinelStartFailover() 会记录一些 failover 所需的状态给后续流程使用。
void sentinelStartFailover(sentinelRedisInstance *master) {
serverAssert(master->flags & SRI_MASTER);
// 更新 failover 状态等待开始
master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;
// 标记 master 实例处理中状态
master->flags |= SRI_FAILOVER_IN_PROGRESS;
master->failover_epoch = ++sentinel.current_epoch;
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
// 触发事件
sentinelEvent(LL_WARNING,"+try-failover",master,"%@");
// 记录时间
master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
master->failover_state_change_time = mstime();
}
failover 状态机
故障恢复的整个流程都是在 sentinelFailoverStateMachine() 状态机中完成的,每一个状态对应一个处理函数,处理函数中变更为另一个状态,然后等待定时任务下一次调度时执行。
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_MASTER);
if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;
switch(ri->failover_state) {
// 故障恢复前置工作
case SENTINEL_FAILOVER_STATE_WAIT_START:
sentinelFailoverWaitStart(ri);
break;
// 选择合适的 slave
case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
sentinelFailoverSelectSlave(ri);
break;
// 向被选到的 slave 发送 slaveof 命令,使其变成 master
case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
sentinelFailoverSendSlaveOfNoOne(ri);
break;
// 等待目标 slave 变成 master
case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
sentinelFailoverWaitPromotion(ri);
break;
// 向现有的 slave 发送 slaveof 命令,更新它们所属的 master
case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
sentinelFailoverReconfNextSlave(ri);
break;
}
}
下面就来看看这个状态机的流转以及整个故障恢复是如何完成的。
WAIT_START
该状态为等待 failover 的启动并做一些前置工作。它尝试通过投票来确定哪个 Sentinel 被选为本次故障恢复的 leader 哨兵,这个 leader 哨兵会负责协调故障恢复过程。当前 Sentinel 拥有执行权时,会将状态机更新到下一个状态 SENTINEL_FAILOVER_STATE_SELECT_SLAVE。
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
char *leader;
int isleader;
// 投票决定主导本次故障恢复的 leader 哨兵
leader = sentinelGetLeader(ri, ri->failover_epoch);
isleader = leader && strcasecmp(leader,sentinel.myid) == 0;
// 当前哨兵不是 leader 且没有强制执行的标志时
// 表示无权执行 failover,直接返回
if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
...
return;
}
...
// 更新到 SENTINEL_FAILOVER_STATE_SELECT_SLAVE 状态并发布事件
ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_WARNING,"+failover-state-select-slave",ri,"%@");
}
SELECT_SLAVE
该状态选择一个 slave 来升级为新的 master。如果没有选择到合适的 slave 则会触发一个警告事件并取消故障切换,并将实例状态恢复,等待下一次调度。成功选择到 slave 后即更新到下一个状态 SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE。
void sentinelFailoverSelectSlave(sentinelRedisInstance *ri) {
// 选择一个 slave
sentinelRedisInstance *slave = sentinelSelectSlave(ri);
if (slave == NULL) {
// 没有选择到合适的 slave 则取消故障恢复并恢复实例状态等待下一次调度
sentinelEvent(LL_WARNING,"-failover-abort-no-good-slave",ri,"%@");
sentinelAbortFailover(ri);
} else {
sentinelEvent(LL_WARNING,"+selected-slave",slave,"%@");
// 记录被选中的 slave
slave->flags |= SRI_PROMOTED;
ri->promoted_slave = slave;
// 更新到 SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE 状态
ri->failover_state = SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE;
ri->failover_state_change_time = mstime();
sentinelEvent(LL_NOTICE,"+failover-state-send-slaveof-noone",
slave, "%@");
}
}
选择 slave
sentinelSelectSlave() 会逐个检查 slave 服务器,排除掉一些不合适的如被判定下线等,符合条件的 slaves 会使用特定的排序方法 compareSlavesForPromotion 来选择出优先级最高的。
sentinelRedisInstance *sentinelSelectSlave(sentinelRedisInstance *master) {
sentinelRedisInstance **instance =
zmalloc(sizeof(instance[0])*dictSize(master->slaves));
sentinelRedisInstance *selected = NULL;
int instances = 0;
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
// 排除一些不合适的 slave,如已经判定下线、断连、优先级为0等
if (slave->flags & (SRI_S_DOWN|SRI_O_DOWN)) continue;
if (slave->link->disconnected) continue;
if (slave->slave_priority == 0) continue;
...
// 加入 instance 数组
instance[instances++] = slave;
}
...
if (instances) {
// 使用特定的排序方法选出优先级最高的 slave
qsort(instance,instances,sizeof(sentinelRedisInstance*),
compareSlavesForPromotion);
selected = instance[0];
}
return selected;
}
这些满足条件 slaves 会根据特定规则按优先级大小倒序排序,排序后的第一个 slave 就是新的 master 服务器。排序的规则如下:
- 配置的优先级(
slave_priority)较小的排在前面。如果两个 slave 配置了相同的优先级,则按下一规则比较。 - 较大复制偏移量的 slave 排在前面,复制偏移量越大表示处理了更多来自 master 的数据。如果相同则进入下一规则。
- 如果优先级和偏移量都相同,则按运行 ID 排序,较小的运行 ID 排在前面。运行 ID 为 NULL 时会被认为比其它运行 ID 都要大。
int compareSlavesForPromotion(const void *a, const void *b) {
sentinelRedisInstance **sa = (sentinelRedisInstance **)a,
**sb = (sentinelRedisInstance **)b;
char *sa_runid, *sb_runid;
/*
* 说明:
* 当结果返回正数时,表示第一个元素应排在第二个元素之后
* 当结果返回负数时,表示第一个元素应排在第二个元素之前
*/
// 比较优先级,较小的排在前面
if ((*sa)->slave_priority != (*sb)->slave_priority)
return (*sa)->slave_priority - (*sb)->slave_priority;
// 优先级相同,比较复制偏移量,较大的排在前面
if ((*sa)->slave_repl_offset > (*sb)->slave_repl_offset) {
return -1; /* a < b */
} else if ((*sa)->slave_repl_offset < (*sb)->slave_repl_offset) {
return 1; /* a > b */
}
// 优先级和偏移量相同,比较 runid,较小的排在前面
// 为 NULL 时会被认为比其它 runid 都要大
sa_runid = (*sa)->runid;
sb_runid = (*sb)->runid;
if (sa_runid == NULL && sb_runid == NULL) return 0;
else if (sa_runid == NULL) return 1; /* a > b */
else if (sb_runid == NULL) return -1; /* a < b */
return strcasecmp(sa_runid, sb_runid);
}
SEND_SLAVEOF_NOONE
该状态的主要目标是将选定的 salve 升级为新的 master,如果 slave 已经断开连接,会等待下次调度时重试,直到超时则触发一个警告事件,并取消故障恢复。正常情况下则会向该从服务器发送 slaveof no one 命令,将其升级为新的 master,然后更新到下一状态 SENTINEL_FAILOVER_STATE_WAIT_PROMOTION。
void sentinelFailoverSendSlaveOfNoOne(sentinelRedisInstance *ri) {
int retval;
// 如果当前选中的 salve 已经断开连接,则等到下次调度时重试
// 直到超时取消故障恢复流程
if (ri->promoted_slave->link->disconnected) {
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
return;
}
// 向选中的 slave 发送 slaveof no one 命令,使其成为 master
retval = sentinelSendSlaveOf(ri->promoted_slave,NULL);
if (retval != C_OK) return;
sentinelEvent(LL_NOTICE, "+failover-state-wait-promotion",
ri->promoted_slave,"%@");
// 变更到下一状态
ri->failover_state = SENTINEL_FAILOVER_STATE_WAIT_PROMOTION;
ri->failover_state_change_time = mstime();
}
发送 slaveof 命令
这里就比较直白了,主要就是向指定的 Redis 实例发送命令,包括:
MULTI开启事务。SLAVEOF更新当前 slave 所属的 master,如果是NO ONE则将当前 slave 升级为新的 master。CONFIG REWRITE更新目标 redis 服务配置(redis.conf)。CLIENT KILL关闭目标 redis 上的客户端连接,因为该 redis 服务的配置已经发生改变(如从 slave 变成 master),如果不断开客户端并重连,会导致客户端继续向旧的 master 服务写入数据,从而造成数据不一致。EXEC执行事务。
由于不需要直接对这些命令的返回值做额外处理,所以统一使用了 sentinelDiscardReplyCallback 回调函数丢弃返回值。
int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
char portstr[32];
const char *host;
int retval;
// 设置命令参数,如果是 NO ONE,那么会将目标 redis 实例会变为 master
if (!addr) {
host = "NO";
memcpy(portstr,"ONE",4);
} else {
host = announceSentinelAddr(addr);
ll2string(portstr,sizeof(portstr),addr->port);
}
// 开启事务
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
// 执行 slaveof
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
// 更新目标 redis 服务配置
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
// 关闭目标 redis 上的客户端连接
for (int type = 0; type < 2; type++) {
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}
// 执行事务
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
return C_OK;
}
WAIT_PROMOTION
除了检测 failover 是否超时,这个状态并没有直接的逻辑处理,它只是用于等待 slave 升级为新的 master。
void sentinelFailoverWaitPromotion(sentinelRedisInstance *ri) {
// 超时则取消 failover
if (mstime() - ri->failover_state_change_time > ri->failover_timeout) {
sentinelEvent(LL_WARNING,"-failover-abort-slave-timeout",ri,"%@");
sentinelAbortFailover(ri);
}
}
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
// 处理 slave 向 master 转变的逻辑
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
// 这里符合我们之前正在进行的 failover 的所有条件
// 更新状态机到下一状态
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
...
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
...
}
}
...
}
RECONF_SLAVES
到了这个状态表示新的 master 已经可用,需要重新配置其他 slave,它会向其他的 slave 发送 slaveof <new master> 命令将它们所属的 master 配置为这个新的 master。
void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
...
while(in_progress < master->parallel_syncs &&
(de = dictNext(di)) != NULL)
{
sentinelRedisInstance *slave = dictGetVal(de);
...
// 向每个满足条件的 slave 发送 slaveof <new master> 命令
retval = sentinelSendSlaveOf(slave,master->promoted_slave->addr);
if (retval == C_OK) {
// 记录响应的状态并触发事件
slave->flags |= SRI_RECONF_SENT;
slave->slave_reconf_sent_time = mstime();
sentinelEvent(LL_NOTICE,"+slave-reconf-sent",slave,"%@");
in_progress++;
}
}
...
// 检查 slave 已经全部配置完成
sentinelFailoverDetectEnd(master);
}
最后会检查一下 slave 是否已经全部完成配置以及一些其他情况,如果存在超时的情况,则会重新向未完成配置的 slave 发送 slaveof 命令,直至整个故障恢复全部完成。
void sentinelFailoverDetectEnd(sentinelRedisInstance *master) {
...
if (timeout) {
dictIterator *di;
dictEntry *de;
di = dictGetIterator(master->slaves);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *slave = dictGetVal(de);
int retval;
if (slave->flags & (SRI_PROMOTED|SRI_RECONF_DONE|SRI_RECONF_SENT)) continue;
if (slave->link->disconnected) continue;
// 重新向未完成的 slave 发送 slaveof 命令
retval = sentinelSendSlaveOf(slave,master->promoted_slave->addr);
if (retval == C_OK) {
sentinelEvent(LL_NOTICE,"+slave-reconf-sent-be",slave,"%@");
slave->flags |= SRI_RECONF_SENT;
}
}
dictReleaseIterator(di);
}
}
总结
至此我们大致了解 Redis 哨兵故障恢复的核心流程,最后再简要地总结一下这个过程:
- 在所有哨兵中选择一个 leader 主导本次故障恢复。
- 选择一个合适的 slave 作为新的 master。
- 向选中的 slave 发送
slaveof no one命令使其成为 master。 - 等待该 slave 完成向 master 的角色转变。
- 新的 master 产生后,向其他 slave 发送
slaveof <new master>命令重新配置它们所属的 master 节点。
转载自:https://juejin.cn/post/7281196961751203901