Eureka源码9-Server端(处理Client状态修改和删除overridden状态请求)
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- eureka版本:1.10.11
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4 测试demo代码:github.com/hsfxuebao/s…
1. 处理删除状态请求
删除deleteStatusOverride请求: DELETE请求 path:"apps/" + appName + '/' + id + "/status" 详见Eureka源码4-服务离线源码(状态变更)解析中第3.1节
服务端处理客户端变更状态和删除状态请求的方法都在 InstanceResource
类:
// isReplication:是否是集群节点同步复制
// newStatusValue:客户端发起删除状态时,这里为 null
// lastDirtyTimestamp:最新修改时间戳(脏)
@DELETE
@Path("status")
public Response deleteStatusUpdate(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("value") String newStatusValue,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
// 查询本地注册表中的服务实例信息,如果查询不到返回404
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// 客户端发过来的 newStatusValue = null ,所以 newStatus = UNKNOWN
InstanceStatus newStatus = newStatusValue == null ? InstanceStatus.UNKNOWN : InstanceStatus.valueOf(newStatusValue);
// todo 处理删除状态
boolean isSuccess = registry.deleteStatusOverride(app.getName(), id,
newStatus, lastDirtyTimestamp, "true".equals(isReplication));
if (isSuccess) {
logger.info("Status override removed: {} - {}", app.getName(), id);
// 处理成功返回200
return Response.ok().build();
} else {
logger.warn("Unable to remove status override: {} - {}", app.getName(), id);
// 处理失败返回500
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error removing instance's {} status override", id);
// 处理异常返回500
return Response.serverError().build();
}
}
1.1 处理删除状态
@Override
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
// todo 调用父类的deleteStatusOverride方法
if (super.deleteStatusOverride(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
// todo 处理成功后同步复制给集群节点
replicateToPeers(Action.DeleteStatusOverride, appName, id, null, null, isReplication);
return true;
}
return false;
}
1.2 父类处理删除状态方法
// AbstractInstanceRegistry.class
public boolean deleteStatusOverride(String appName, String id,
InstanceStatus newStatus,
String lastDirtyTimestamp,
boolean isReplication) {
// 打开读锁
read.lock();
try {
STATUS_OVERRIDE_DELETE.increment(isReplication);
// 从注册表获取 实例列表
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
// 找到对应的实例
lease = gMap.get(id);
}
if (lease == null) {
// 如果获取不到,返回 false
return false;
} else {
// 刷新续租过期时间
// 本地收到客户端删除状态请求,表明客户端还存活着,所以刷新续租过期时间
lease.renew();
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
// 将执行的Client的overriddenStatus从overriddenInstanceStatusMap中删除
// 获取覆盖状态,并从 overriddenInstanceStatusMap 中删除
InstanceStatus currentOverride = overriddenInstanceStatusMap.remove(id);
if (currentOverride != null && info != null) {
// 修改注册表中的该Client状态为UNKOWN
info.setOverriddenStatus(InstanceStatus.UNKNOWN);
// todo 如果提交的是CANCEL_OVERRIDE 则newStatus为UNKOWN
// 设置实例信息的状态,但不标记 dirty
info.setStatusWithoutDirty(newStatus);
long replicaDirtyTimestamp = 0;
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.parseLong(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
// 如果 客户端实例的最新修改时间戳(脏) 大于 本地注册表中相应实例信息的最新修改时间戳(脏)
// 则把本地的更新为客户端的
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 设置行为类型为变更
info.setActionType(ActionType.MODIFIED);
// 将本次修改写入到recentlyChangedQueue缓存
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置本地相应实例信息的最新修改时间戳
info.setLastUpdatedTimestamp();
// 让相应缓存失效
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
// 关闭读锁
read.unlock();
}
}
1.3 同步复制给集群节点
同步给其他集群,不在赘述
1.4 处理删除overridden状态请求与下架请求对比
- 处理删除overridden状态请求完成的主要任务:
- 从缓存map中删除指定client的overriddenStatus
修改注册表中该client的overriddenStatus为UNKNOWN
修改注册表中该client的status为UNKNOWN
- 将本次操作记录到recentlyChangedQueue
- 修改注册表中该client的lastUpdatedTimestamp
- 处理下架请求完成的主要任务:
将该client从注册表中删除
- 从缓存map中删除指定client的overriddenStatus
- 将本次操作记录到recentlyChangedQueue
- 修改注册表中该client的lastUpdatedTimestamp
2. 处理变更状态请求
变更状态请求:PUT请求,path为 :"apps/" + appName + '/' + id + "/status" 详见Eureka源码4-服务离线源码(状态变更)解析中第3.1节
// InstanceResource.class
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
// 查询本地注册表中的实例信息,如果查询不到返回404
return Response.status(Status.NOT_FOUND).build();
}
// todo 处理变更状态
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));
if (isSuccess) {
logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus);
// 处理成功返回200
return Response.ok().build();
} else {
logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus);
// 处理成功返回500
return Response.serverError().build();
}
} catch (Throwable e) {
logger.error("Error updating instance {} for status {}", id,
newStatus);
// 处理异常返回500
return Response.serverError().build();
}
}
2.1 处理变更状态
// PeerAwareInstanceRegistryImpl.class
@Override
public boolean statusUpdate(final String appName, final String id,
final InstanceStatus newStatus, String lastDirtyTimestamp,
final boolean isReplication) {
// todo 先调用父类的statusUpdate方法
if (super.statusUpdate(appName, id, newStatus, lastDirtyTimestamp, isReplication)) {
// todo 处理成功后同步复制给集群节点
replicateToPeers(Action.StatusUpdate, appName, id, null, newStatus, isReplication);
return true;
}
return false;
}
2.2 父类处理变更状态方法
// AbstractInstanceRegistry.class
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String lastDirtyTimestamp,
boolean isReplication) {
// 打开读锁
read.lock();
try {
STATUS_UPDATE.increment(isReplication);
// 从注册表中找到 实例列表
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
// 找到对应的实例
lease = gMap.get(id);
}
if (lease == null) {
return false;
} else {
// 刷新续租过期时间
// 本地收到客户端变更状态请求,表明客户端还存活着,所以刷新续租过期时间
lease.renew();
InstanceInfo info = lease.getHolder();
// Lease is always created with its instance info object.
// This log statement is provided as a safeguard, in case this invariant is violated.
if (info == null) {
logger.error("Found Lease without a holder for instance id {}", id);
}
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// Mark service as UP if needed
// 若更新为服务上线状态,当本地相关实例信息不为空,且状态和客户端请求变更的状态不一致
if (InstanceStatus.UP.equals(newStatus)) {
// 如果状态要变更为 UP ,且实例第一次启动,则记录启动时间
lease.serviceUp();
}
// This is NAC overridden status
// 保存变更的状态到 overriddenInstanceStatusMap
overriddenInstanceStatusMap.put(id, newStatus);
// Set it for transfer of overridden status to replica on
// replica start up
// 实例信息设置覆盖状态
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
// 设置实例信息的状态,但不记录脏时间戳
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.parseLong(lastDirtyTimestamp);
}
// If the replication's dirty timestamp is more than the existing one, just update
// it to the replica's.
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
// 如果 客户端实例的最新修改时间戳(脏) 大于 本地注册表中相应实例信息的最新修改时间戳(脏)
// 则把本地的更新为客户端的
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
// 设置行为类型为变更
info.setActionType(ActionType.MODIFIED);
// 本次修改记录到recentlyChangedQueue中
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 设置本地相应实例信息的最新修改时间戳
info.setLastUpdatedTimestamp();
// 让相应缓存失效
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
// 关闭读锁
read.unlock();
}
}
2.3 replicateToPeers()
同步给其他集群,不在赘述
参考文章
eureka-0.10.11源码(注释) springcloud-source-study学习github地址 Eureka源码解析 SpringCloud技术栈系列文章 Eureka 源码解析
转载自:https://juejin.cn/post/7156554844714008606