Eureka源码6-同步注册表机制
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- eureka版本:1.10.11
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
书接上文:Eureka源码5-Server端(启动流程中第3.1小节,本文分析registry.syncUp()
同步注册表机制。
1. 方法入口
@Configuration
// 引入 EurekaServerInitializerConfiguration 配置类
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
// ......
}
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
// ......
// EurekaServerInitializerConfiguration 配置类实现了 SmartLifecycle 接口
// IOC 容器初始化即将结束时,回调生命周期 start() 方法
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
// Eureka Server 上下文初始化
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
// ....
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
// ......
}
public class EurekaServerBootstrap {
// ......
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
// 初始化 Eureka Server 上下文
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
// ......
}
}
protected void initEurekaServerContext() throws Exception {
// ......
// 同步集群节点注册表
int registryCount = this.registry.syncUp();
// ......
}
// ......
}
2. 同步集群节点注册表
// PeerAwareInstanceRegistry.class
public int syncUp() {
// Copy entire entry from neighboring DS node
// 统计同步到本地注册表的实例数
int count = 0;
// 默认重试5次,每次间隔30秒,成功一次则不再重试
for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
if (i > 0) {
try {
Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
} catch (InterruptedException e) {
logger.warn("Interrupted during registry transfer..");
break;
}
}
// 服务端启动时,会拉取集群节点注册表中的服务实例信息
Applications apps = eurekaClient.getApplications();
// 遍历服务信息
for (Application app : apps.getRegisteredApplications()) {
// 遍历服务信息中的实例信息
for (InstanceInfo instance : app.getInstances()) {
try {
if (isRegisterable(instance)) {
// 注册实例到本地注册表
register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
count++;
}
} catch (Throwable t) {
logger.error("During DS init copy", t);
}
}
}
}
return count;
}
2.1 eurekaClient.getApplications()
// DiscoveryClient.class
public class DiscoveryClient implements EurekaClient {
// ......
@Override
public Applications getApplications() {
return localRegionApps.get();
}
// ......
}
2.1.1 localRegionApps 来源
Eureka源码3-Client启动入口(注册,续约,定时任务)客户端拉取注册表中讲到过
3. 注册实例到本地注册表
// AbstractInstanceRegistry.class
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
// 开启读锁
read.lock();
// 从本地注册表中根据服务名获取服务租约信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
// 服务租约信息获取不到则新建一个添加进本地注册表
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 从服务租约信息中获取实例租约信息
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
// 如果本地获取的到实例租约信息,并且其 最新修改时间(脏) 大于 待注册实例的最新修改时间(脏)
// 则使用本地已存在的实例租约信息
// 防止已过期的服务实例注册到本地注册表中
registrant = existingLease.getHolder();
}
} else {
// The lease does not exist and hence it is a new registration
// 第一次注册到本地注册表的情况
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to register it, increase the number of clients sending renews
// 预期收到心跳续租的实例数+1
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// todo 更新 预期每分钟收到实例心跳续租请求数
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 实例租约信息添加到服务租约信息
gMap.put(registrant.getId(), lease);
// 服务实例信息添加到最新注册队列
synchronized (recentRegisteredQueue) {
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
}
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
// 待注册实例的覆盖状态不为 UNKNOWN 时,表示不是第一次注册,说明被修改过,需要缓存到 overriddenInstanceStatusMap
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
// 设置覆盖状态
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
// todo 基于覆盖状态按照一定规则匹配出实例状态并赋值
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
// 设置实例状态
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// If the lease is registered with UP status, set lease service up timestamp
if (InstanceStatus.UP.equals(registrant.getStatus())) {
// 如果实例状态为 UP ,则实例租约信息中记录实例启动时间
lease.serviceUp();
}
// 设置行为类型
registrant.setActionType(ActionType.ADDED);
// 实例变更信息加入最近更改队列
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
// 实例信息设置最新修改时间
registrant.setLastUpdatedTimestamp();
// 让相应缓存失效,涉及响应体缓存,在处理客户端拉取注册表时使用,后续分析
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
// 关闭读锁
read.unlock();
}
}
3.1 updateRenewsPerMinThreshold()
// AbstractInstanceRegistry.class
// 预期每分钟收到实例心跳续租请求数
protected volatile int numberOfRenewsPerMinThreshold;
// 预期收到心跳续租的实例数
protected volatile int expectedNumberOfClientsSendingRenews;
protected void updateRenewsPerMinThreshold() {
// 默认 预期每分钟收到实例心跳续租请求数 = 预期收到心跳续租的实例数 * (60 / 30)* 0.85
// 服务端自我保护机制相关,后续分析
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
3.2 getOverriddenInstanceStatus()
// AbstractInstanceRegistry.class
protected InstanceInfo.InstanceStatus getOverriddenInstanceStatus(InstanceInfo r,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// todo 获取规则
InstanceStatusOverrideRule rule = getInstanceInfoOverrideRule();
logger.debug("Processing override status using rule: {}", rule);
// todo 根据规则匹配处理
return rule.apply(r, existingLease, isReplication).status();
}
3.2.1 getInstanceInfoOverrideRule()
// PeerAwareInstanceRegistryImpl.class
private final InstanceStatusOverrideRule instanceStatusOverrideRule;
public PeerAwareInstanceRegistryImpl(
EurekaServerConfig serverConfig,
EurekaClientConfig clientConfig,
ServerCodecs serverCodecs,
EurekaClient eurekaClient
) {
// ......
// 执行构造方法初始化时,添加了3个规则
this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(),
new OverrideExistsRule(overriddenInstanceStatusMap), new LeaseExistsRule());
}
protected InstanceStatusOverrideRule getInstanceInfoOverrideRule() {
return this.instanceStatusOverrideRule;
}
3.3.2 rule.apply()
// FirstMatchWinsCompositeRule.class
private final InstanceStatusOverrideRule defaultRule;
public FirstMatchWinsCompositeRule(InstanceStatusOverrideRule... rules) {
// ......
// 执行构造方法初始化时,设置了默认规则
this.defaultRule = new AlwaysMatchInstanceStatusRule();
// ......
}
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// 遍历执行上述三个规则
for (int i = 0; i < this.rules.length; ++i) {
StatusOverrideResult result = this.rules[i].apply(instanceInfo, existingLease, isReplication);
if (result.matches()) {
return result;
}
}
// 如果上述三个规则匹配执行后还没结果,执行默认规则
return defaultRule.apply(instanceInfo, existingLease, isReplication);
}
3.3.2.1 规则匹配解析
DownOrStartingRule
OverrideExistsRule
LeaseExistsRule
AlwaysMatchInstanceStatusRule
四个规则按顺序处理,直到匹配成功,默认规则兜底
规则匹配只有服务端处理注册和心跳续租请求时调用:
public class DownOrStartingRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(DownOrStartingRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// ReplicationInstance is DOWN or STARTING - believe that, but when the instance says UP, question that
// The client instance sends STARTING or DOWN (because of heartbeat failures), then we accept what
// the client says. The same is the case with replica as well.
// The OUT_OF_SERVICE from the client or replica needs to be confirmed as well since the service may be
// currently in SERVICE
// 如果客户端实例的状态为 DOWN 或 STARTING ,则认为是可信的,可以直接返回相应状态值
// 如果实例的状态为 UP 或 OUT_OF_SERVICE ,则认为是不可信的,因为实例在服务端的状态可能通过 Actuator 更改了,所以需要查看实例的覆盖状态是否在服务端也被改了,需要下一个规则进行处理
if ((!InstanceInfo.InstanceStatus.UP.equals(instanceInfo.getStatus()))
&& (!InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(instanceInfo.getStatus()))) {
logger.debug("Trusting the instance status {} from replica or instance for instance {}",
instanceInfo.getStatus(), instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
return StatusOverrideResult.NO_MATCH;
}
@Override
public String toString() {
return DownOrStartingRule.class.getName();
}
}
public class OverrideExistsRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(OverrideExistsRule.class);
private Map<String, InstanceInfo.InstanceStatus> statusOverrides;
public OverrideExistsRule(Map<String, InstanceInfo.InstanceStatus> statusOverrides) {
this.statusOverrides = statusOverrides;
}
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo, Lease<InstanceInfo> existingLease, boolean isReplication) {
InstanceInfo.InstanceStatus overridden = statusOverrides.get(instanceInfo.getId());
// If there are instance specific overrides, then they win - otherwise the ASG status
// 当执行该规则时,实例的状态为 UP 或 OUT_OF_SERVICE
// 该规则执行逻辑:查看服务端缓存的 overriddenInstanceStatusMap 中是否有相应实例的 overriddenStatus
// 如果有则认为是可信的,直接返回 overriddenStatus
// 如果没有则需要下一个规则处理
// 因为通过 Actuator 更改实例状态时,会缓存在 overriddenInstanceStatusMap 中
if (overridden != null) {
logger.debug("The instance specific override for instance {} and the value is {}",
instanceInfo.getId(), overridden.name());
return StatusOverrideResult.matchingStatus(overridden);
}
return StatusOverrideResult.NO_MATCH;
}
@Override
public String toString() {
return OverrideExistsRule.class.getName();
}
}
public class LeaseExistsRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(LeaseExistsRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// This is for backward compatibility until all applications have ASG
// names, otherwise while starting up
// the client status may override status replicated from other servers
// 当执行该规则时,实例的状态为 UP 或 OUT_OF_SERVICE
// 该规则执行逻辑:如果不是集群节点同步复制的情况下,查看本地注册表相应实例的实例状态
// 如果实例状态存在,且为 UP 或 OUT_OF_SERVICE ,则直接返回相应状态
// 如果实例状态存在,且不为 UP 或 OUT_OF_SERVICE ,则需要下一个规则处理
// 如果实例状态不存在,则需要下一个规则处理
if (!isReplication) {
InstanceInfo.InstanceStatus existingStatus = null;
if (existingLease != null) {
existingStatus = existingLease.getHolder().getStatus();
}
// Allow server to have its way when the status is UP or OUT_OF_SERVICE
if ((existingStatus != null)
&& (InstanceInfo.InstanceStatus.OUT_OF_SERVICE.equals(existingStatus)
|| InstanceInfo.InstanceStatus.UP.equals(existingStatus))) {
logger.debug("There is already an existing lease with status {} for instance {}",
existingLease.getHolder().getStatus().name(),
existingLease.getHolder().getId());
return StatusOverrideResult.matchingStatus(existingLease.getHolder().getStatus());
}
}
return StatusOverrideResult.NO_MATCH;
}
@Override
public String toString() {
return LeaseExistsRule.class.getName();
}
}
public class AlwaysMatchInstanceStatusRule implements InstanceStatusOverrideRule {
private static final Logger logger = LoggerFactory.getLogger(AlwaysMatchInstanceStatusRule.class);
@Override
public StatusOverrideResult apply(InstanceInfo instanceInfo,
Lease<InstanceInfo> existingLease,
boolean isReplication) {
// 直接返回相应客户端实例的状态
logger.debug("Returning the default instance status {} for instance {}", instanceInfo.getStatus(),
instanceInfo.getId());
return StatusOverrideResult.matchingStatus(instanceInfo.getStatus());
}
@Override
public String toString() {
return AlwaysMatchInstanceStatusRule.class.getName();
}
}
3.3.3 小结:
DownOrStartingRule
:如果客户端实例传过来的状态为DOWN
或STARTING
,则认为是可信的,否则不可信进行下一个规则匹配。因为可以通过 Actuator 更改客户端实例在服务端注册表中的状态 。服务端注册表中维护实例的实例状态不一定是客户端实例的真实状态。OverrideExistsRule
:如果服务端的overriddenInstanceStatusMap
中存在客户端实例相应的overriddenStatus
,则认为是可信的,否则不可信进行下一步规则处理。因为通过Actuator
更改客户端实例在服务端注册表中的状态时,会缓存在overriddenInstanceStatusMap
中。LeaseExistsRule
:如果不是集群节点同步复制,且服务端本地存在相应客户端实例的状态信息,同时状态为UP
或OUT_OF_SERVICE
,则认为可信的,否则不可信进行下一步规则处理。AlwaysMatchInstanceStatusRule
:默认规则,直接返回相应客户端实例状态。
4. 总结
注意:该总结针对于服务端启动时同步注册表,上面部分分析涉及到其他情况
- 首先取出本地从集群节点拉取的所有实例信息,然后遍历注册到本地注册表
- 注册过程中,新建服务租约信息和实例租约信息保存到本地注册表
- 然后更新预期每分钟收到实例心跳续租请求数,用于服务端自我保护机制
- 接着相应实例信息添加进最近注册队列和最近更改队列,同时根据规则匹配出实例状态并记录
- 最后让相应的缓存(响应缓存,服务端处理拉取注册表请求时使用)失效
参考文章
eureka-0.10.11源码(注释) springcloud-source-study学习github地址 Eureka源码解析 SpringCloud技术栈系列文章 Eureka 源码解析
转载自:https://juejin.cn/post/7154912732658008100