一文搞懂EureKa原理想要搞懂Eureka服务端/客户端的逻辑实现,看这篇文章就够了。。。从源码层面分析Eureka
关注公众号 不爱总结的麦穗 将不定期推送技术好文
Eureka Server 服务注册中心,提供服务发现并实现负载均衡和故障转移。它由两个组件组成:Eureka服务器和Eureka客户端。
Eureka Server
- 以 REST API 的形式为服务实例提供了注册、管理和查询等操作。同时,Eureka Server 也为我们提供了可视化的监控页面,可以直观地看到各个 Eureka Server 当前的运行状态和所有已注册服务的情况。
Eureka Client
- 客户端将自身服务注册到Eureka,从而使服务消费方能够找到
- 从Eureka服务器获取注册服务列表,从而能够消费服务
Eureka架构
从图上标注出来的地方,我们可以看到Eureka Server 和 Eureka Client通信过程:
- Register :服务注册
Eureka客户端向Eureka Server注册时,它提供自身的元数据,比如IP地址、端口等
- Renew:服务续约
Eureka客户端会每隔30秒发送一次心跳来续约。通过续约来告知Eureka Server该客户端仍然存在。
- Get Registries:获取注册列表信息
Eureka客户端从服务器获取注册表信息,将其缓存到本地。客户端会使用该信息查找其他服务,从而进行远程调用。该注册列表信息定期(每30秒)更新一次。
- Cancel:服务下线
Eureka客户端在程序关闭时向Eureka服务器发送取消请求。
- Make Remote Call:
从eureka client到eureka client,远程调用
- 服务剔除:
Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
源码解析
接下来,从源码层面来分析上述过程。
服务注册
当Eureka Client 启动时EurekaClientAutoConfiguration
配置类生效,会注入Bean CloudEurekaClient,然后调用父类DiscoveryClient的构造方法。最终通过RestTemplateEurekaHttpClient#register
方法发出REST请求。
Eureka Server ApplicationResource#addInstance
在收到请求后,最终通过AbstractInstanceRegistry#register
方法完成了客户端注册的主要逻辑。
- AbstractInstanceRegistry#register
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
read.lock();
try {
// 根据微服务名称从注册表 `registry` 中获取注册的 `Map<String, Lease<InstanceInfo>>`
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);
if (gMap == null) {
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
// 通过实例id,从map中获取服务实例对应的租约
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
// Retain the last dirty timestamp without overwriting it, if there is already a lease
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
// InstanceInfo instead of the server local copy.
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
registrant = existingLease.getHolder();
}
} else {
// 新注册的服务实例
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// 预期续约客户端数量+1
this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
// 自我保护阀值计算 2*(60/30)*0.85
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 通过服务实例信息和默认的90s租期构建Lease 租约信息
Lease<InstanceInfo> lease = new Lease<>(registrant, leaseDuration);
if (existingLease != null) {
// 更新服务启动时间
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
// 将封装了服务实例信息的Lease对象,放到gMap里面去
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>(
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// This is where the initial state transfer of overridden status happens
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
+ "overrides", registrant.getOverriddenStatus(), registrant.getId());
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}
// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);
// 启动状态 设置服务启动时间
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
registrant.setActionType(ActionType.ADDED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 删除缓存
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
logger.info("Registered instance {}/{} with status {} (replication={})",
registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
} finally {
read.unlock();
}
}
Eureka Server会把客户端的实列信息保存在自己的注册表中(双层Map结构)。
服务续约
当Eureka Client 启动时,会开启一个心跳任务,每隔30s向服务端发送一次心跳请求。
- DiscoveryClient#initScheduledTasks
private void initScheduledTasks() {
// 部分代码省略
// 构建一个心跳续约的定时任务,每30s(默认)执行一次
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
// 心跳续约逻辑
new HeartbeatThread()
);
scheduler.schedule(
heartbeatTask,
renewalIntervalInSecs, TimeUnit.SECONDS);
// 部分代码省略
Eureka Server InstanceResource#renewLease
方法接收到请求进行续约
- InstanceResource#renewLease
@PUT
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 调用renew进行续约
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);
// 部分代码省略
}
- AbstractInstanceRegistry#renew
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 根据服务名字获取实例信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
// 服务实例不存在,直接返回续约失败
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
// 获取服务实例的基本信息
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
// 获取服务实例的运行状态
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
// 如果运行状态未知,也返回续约失败
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
}
}
renewsLastMin.increment();
// 更新服务端最后一次收到心跳请求的时间
leaseToRenew.renew();
return true;
}
}
服务续约,就是更新服务端最后一次收到心跳请求的时间。
获取注册列表信息
当Eureka Client 启动时,会开启一个缓存刷新任务,每隔30s向服务端发送一次请求
- DiscoveryClient#initScheduledTasks
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// 构建一个缓存刷新的定时任务,每30s(默认)执行一次
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
// 缓存刷新逻辑
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 部分代码省略
}
- DiscoveryClient#fetchRegistry
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
// If the delta is disabled or if it is the first time, get all
// applications
Applications applications = getApplications();
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
logger.info("Application is null : {}", (applications == null));
logger.info("Registered Applications size is zero : {}",
(applications.getRegisteredApplications().size() == 0));
logger.info("Application version is -1: {}", (applications.getVersion() == -1));
// 全量获取服务列表
getAndStoreFullRegistry();
} else {
// 增量获取服务列表
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
logger.info(PREFIX + "{} - was unable to refresh its cache! This periodic background refresh will be retried in {} seconds. status = {} stacktrace = {}",
appPathIdentifier, clientConfig.getRegistryFetchIntervalSeconds(), e.getMessage(), ExceptionUtils.getStackTrace(e));
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
Eureka Server ApplicationsResource接收到请求后,从缓存中返回服务列表。
- ApplicationsResource#getContainers 全量获取服务列表
Key cacheKey = new Key(Key.EntityType.Application,
// 全量
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
- ApplicationsResource#getContainerDifferential 增量获取服务列表
Key cacheKey = new Key(Key.EntityType.Application,
// 增量
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
通过源码可以看到,这两个方法都是从缓存中获取服务列表。区别在于获取缓存的key。
那Eureka Server是怎样把客户端的注册信息存到缓存的呢?
Eureka缓存机制
Eureka Server 为了提供响应效率,提供了三层的缓存结构,将 Eureka Client 所需要的注册信息,直接存储在缓存结构中。
- 第一层缓存:readOnlyCacheMap,定时从 readWriteCacheMap 同步数据(默认时间为 30 秒)。定时器通过和 readWriteCacheMap 的值做对比,如果数据不一致,则以 readWriteCacheMap 的数据为准。
- 第二层缓存:readWriteCacheMap,主要同步于存储层。当获取缓存时判断缓存中是否没有数据,如果不存在此数据,则通过 CacheLoader 的 load 方法去加载,加载成功之后将数据放入缓存,同时返回数据。缓存过期时间,默认为 180 秒。
- 第三层缓存:registry注册信息表,当服务下线、过期、注册、状态变更,都会来清除readWriteCacheMap缓存中的数据。
服务下线
当Eureka Client 服务关闭的时候会取消本机的各种定时任务,给服务端发送请求告知自己下线。
- DiscoveryClient#shutdown
@PreDestroy
@Override
public synchronized void shutdown() {
// 部分代码省略
// 取消定时任务
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
// 向服务端发送请求告知自己下线
unregister();
}
// 部分代码省略
}
}
Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。
服务剔除
Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒)的服务剔除。
- AbstractInstanceRegistry#postInit
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
//剔除逻辑
evictionTaskRef.set(new EvictionTask());
创建服务剔除定时任务,60s(默认)
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}
转载自:https://juejin.cn/post/7407782285460144163