Eureka源码11-Server端(定时清除过期Client)
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- eureka版本:1.10.11
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4 测试代码:github.com/hsfxuebao/s…
1. 方法入口
@Configuration
// Step1:引入 EurekaServerInitializerConfiguration 配置类
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
// ......
}
@Configuration
public class EurekaServerInitializerConfiguration
implements ServletContextAware, SmartLifecycle, Ordered {
// ......
// EurekaServerInitializerConfiguration 配置类实现了 SmartLifecycle 接口
// IOC 容器初始化即将结束时,回调生命周期 start() 方法
@Override
public void start() {
new Thread(() -> {
try {
// TODO: is this class even needed now?
// Step2:Eureka Server 上下文初始化
eurekaServerBootstrap.contextInitialized(
EurekaServerInitializerConfiguration.this.servletContext);
// ....
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}
// ......
}
public class EurekaServerBootstrap {
// ......
public void contextInitialized(ServletContext context) {
try {
initEurekaEnvironment();
// todo 初始化 Eureka Server 上下文
initEurekaServerContext();
context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
}
catch (Throwable e) {
// ......
}
}
protected void initEurekaServerContext() throws Exception {
// ......
// todo 打开交通
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
// ......
}
// ......
}
// InstanceRegistry.class
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// todo 调用父类打开交通方法
// count:服务端启动时同步集群节点注册表的实例数,不能为0,如果为0默认赋值1
super.openForTraffic(applicationInfoManager,
count == 0 ? this.defaultOpenForTrafficCount : count);
}
2. super.openForTraffic()
/**
* 打开交通,Server端定时清理过期的Client
* @param applicationInfoManager
* @param count 服务端启动时同步集群节点注册表的实例数,不能为0,如果为0默认赋值1
*/
// PeerAwareInstanceRegistryImpl.class
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
// Renewals happen every 30 seconds and for a minute it should be a factor of 2.
// 预期收到心跳续租的实例数赋值
this.expectedNumberOfClientsSendingRenews = count;
// todo 更新预期每分钟收到心跳续租请求数
updateRenewsPerMinThreshold();
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold);
// 记录服务端启动时间
this.startupTime = System.currentTimeMillis();
if (count > 0) {
// 设置 peerInstancesTransferEmptyOnStartup = false
// 表示服务端启动时同步集群节点注册表的实例数不为空,判断是否允许客户端拉取注册表时提到过
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
// 设置服务端实例状态为 UP
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
// todo 调用父类方法
super.postInit();
}
2.1 updateRenewsPerMinThreshold()
// AbstractInstanceRegistry.class
protected void updateRenewsPerMinThreshold() {
// 客户端数量 * (60 / 心跳间隔)* 自我保护开启的阈值因子)
// = (客户端数量 * 每个客户端每分钟发送心跳的数量 * 阈值因子)
// = (所有客户端每分钟发送的心跳数量 * 阈值因子)
// = 当前Server开启自我保护机制的每分钟最小心跳数量
this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
2.2 super.postInit()
// AbstractInstanceRegistry.class
protected void postInit() {
// todo 统计最近一分钟处理的心跳续租数的定时任务
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
// 取消定期清理过期实例任务
evictionTaskRef.get().cancel();
}
// 启动新的定期清理过期实例任务
// 固定时间重复执行,默认一分钟后开始,每分钟执行一次
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
3. 定期清理过期实例任务
class EvictionTask extends TimerTask {
// 最近一次执行清理任务时间
private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
// todo 获取补偿时间
// 因时间偏斜或GC,导致任务实际执行时间超过指定的间隔时间(默认一分钟)
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
// todo 处理过期实例
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
...
}
3.1 计算补偿时间
// AbstractInstanceRegistry.EvictionTask.class
long getCompensationTimeMs() {
// 获取当前时间
long currNanos = getCurrentTimeNano();
// 获取最近一次执行任务时间
// 并赋值 lastExecutionNanosRef 为当前时间,给下一次执行任务时使用
long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);
if (lastNanos == 0l) {
return 0l;
}
// 计算最近一次任务的实际执行时间 elapsedMs = 当前任务执行时间 - 最近一次任务执行时间
long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);
// 计算最近一个任务执行的超时时间 compensationTime = 最近一次任务的实际执行时间 - 设定的任务执行间隔时间
long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();
// 如果超时时间大于0,则作为补偿时间返回
// 如果超时时间小于等于0,则表示没有超时,返回0
return compensationTime <= 0l ? 0l : compensationTime;
}
long getCurrentTimeNano() { // for testing
// 返回当前时间(纳秒)
return System.nanoTime();
}
3.2 处理过期实例
// AbstractInstanceRegistry.class
public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// todo 自我保护
if (!isLeaseExpirationEnabled()) {
// 如果服务端允许自我保护且 最近一分钟处理心跳续租请求数 小于等于 预期每分钟收到心跳续租请求数
// 则开启自我保护机制,不再清理过期实例
// 配置文件可以配置关闭自我保护机制
logger.debug("DS: lease expiration is currently disabled.");
return;
}
// We collect first all expired items, to evict them in random order. For large eviction sets,
// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
// the impact should be evenly distributed across all applications.
// 新建一个过期实例租约信息列表
List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
if (leaseMap != null) {
for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
Lease<InstanceInfo> lease = leaseEntry.getValue();
// 遍历服务端注册表,判断每个实例是否过期
// 如果过期,则将相应实例租约信息添加到 expiredLeases
// todo isExpired() 判断实例过期方法
if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
expiredLeases.add(lease);
}
}
}
}
// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
// triggering self-preservation. Without that we would wipe out full registry.
// 获取服务端注册表的实例数
int registrySize = (int) getLocalRegistrySize();
// 计算注册实例数阈值,默认 registrySizeThreshold = 注册实例数 * 0.85
int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
// 计算清理实例限制数,evictionLimit = 注册实例数 - 注册实例数阈值
int evictionLimit = registrySize - registrySizeThreshold;
// 清理实例限制数 和 过期实例数 取最小值作为实际需要清理的实例数
// Eureka 这样设计是为了保证可用性和分区容错性,避免一次性清理大量过期实例
int toEvict = Math.min(expiredLeases.size(), evictionLimit);
if (toEvict > 0) {
logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
Random random = new Random(System.currentTimeMillis());
for (int i = 0; i < toEvict; i++) {
// Pick a random item (Knuth shuffle algorithm)
// 从过期实例中随机选择下架
// Knuth 洗牌算法
int next = i + random.nextInt(expiredLeases.size() - i);
Collections.swap(expiredLeases, i, next);
Lease<InstanceInfo> lease = expiredLeases.get(i);
String appName = lease.getHolder().getAppName();
String id = lease.getHolder().getId();
EXPIRED.increment();
logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
// todo 下架实例,并且标识非同步复制集群节点
internalCancel(appName, id, false);
}
}
}
3.2.1 判断过期方法
// Lease.class
public boolean isExpired(long additionalLeaseMs) {
// evictionTimestamp:实例下架时间,当客户端下架时记录
// lastUpdateTimestamp:续租到期时间,当客户端注册或心跳续租时记录
// duration:续租时间,如果客户端注册时未指定,默认90s
// additionalLeaseMs:补偿时间
// 如果 实例下架时间大于0 或 (当前时间 大于 续租到期时间 + 续租时间 + 补偿时间),则表示已过期
return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
}
4. 下架实例
internalCancel()
这个方法在Eureka源码8-Server端(处理服务续约和服务下架请求)第2小节有详细说明。
5. 自我保护renewsLastMin
renewsLastMin
是服务端统计最后一分钟处理的心跳续租数的定时任务:
- 服务端启动时开启任务,关闭时停止任务
- 服务端处理客户端实例心跳续租时+1
- 服务端处理客户端实例下架时-1
- 服务端的定期清理过期实例任务中被用来判断是否开启自我保护
在 AbstractInstanceRegistry
类中初始化:
// AbstractInstanceRegistry.class
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {
// ......
this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);
// ......
}
// 除了这里使用这个计数任务工具类
// 还有 PeerAwareInstanceRegistryImpl.numberOfReplicationsLastMin 字段也使用该工具类
// numberOfReplicationsLastMin:服务端统计最后一分钟同步复制给集群节点的操作数
public class MeasuredRate {
// 最近一分钟(上一分钟)的计数
// getCount() 返回该计数
private final AtomicLong lastBucket = new AtomicLong(0);
// 当前一分钟正在统计的计数
private final AtomicLong currentBucket = new AtomicLong(0);
// ......
public synchronized void start() {
if (!isActive) {
// 开启任务
// 固定时间重复执行,一分钟后开始,没分钟执行一次
timer.schedule(new TimerTask() {
@Override
public void run() {
try {
// Zero out the current bucket.
// 每当到了任务刚刚开始的时候
// 记录上一分钟的计数
// 清零当前一分钟正在统计的计数
lastBucket.set(currentBucket.getAndSet(0));
} catch (Throwable e) {
logger.error("Cannot reset the Measured Rate", e);
}
}
}, sampleInterval, sampleInterval);
isActive = true;
}
}
...
}
6. 总结
-
定期清理过期实例任务:服务端启动时开启,固定时间重复执行,默认一分钟后开始,每分钟执行一次
-
清理过期实例流程:
- 首先,服务端判断是否开启自动保护,如果开启则不清理过期实例,如果不开启则继续处理
- 然后,从本地注册表中取出所有过期实例
- 接着,计算出清理实例限制数,从清理实例限制数和过期实例取出最小值(避免一次性清理大量过期实例,保证可用性和分区容错性)
- 最后,上面取出的最小值最为实例下架数,随机下架过期实例
参考文章
eureka-0.10.11源码(注释) springcloud-source-study学习github地址 Eureka源码解析 SpringCloud技术栈系列文章 Eureka 源码解析
转载自:https://juejin.cn/post/7157892419093004324