Eureka源码10-Server端(处理全量下载和增量下载请求)
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- eureka版本:1.10.11
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4 测试代码:github.com/hsfxuebao/s…
1. 处理客户端拉取全量注册表
拉取全量注册表的请求为:GET请求,path为:apps/ 详见Eureka源码3-Client启动入口(注册,续约,定时任务)中第4.1.1小节
服务端接收客户端拉取全量注册表的请求在 ApplicationsResource
类中的 getContainers()
方法:
// ApplicationsResource.class
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
// 当 regionsStr 不为空字符串的时候 ,isRemoteRegionRequested = true
// 表示需要拉取的注册表包含远程 region 的
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
// todo 判断服务端是否允许访问注册表
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
// 不允许返回403,拒绝请求
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
/** 定义缓存 key
* Key.EntityType.Application:Application 表示拉取注册表
* ResponseCacheImpl.ALL_APPS:ALL_APPS 表示全量拉取
* keyType:key 的类型,默认为 json
* CurrentRequestVersion.get():请求的版本号
* EurekaAccept.fromString(eurekaAccept):返回数据格式,默认为 full
* regions:远程 region 列表,不为空表示要从这些远程 region 拉取注册表
*/
// 先是构建一个key对象,注意ALL_APPS 这,说明是获取所有的注册实例信息
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
// 判断是否经过压缩
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
// acceptEncoding = gzip 表示响应体里的数据需要压缩后返回
// todo 从 responseCache 响应缓存中获取注册表
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
// 没有经过压缩,调用responseCache.get(cacheKey)
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}
1.1 registry.shouldAllowAccess()
public boolean shouldAllowAccess(boolean remoteRegionRequired) {
if (this.peerInstancesTransferEmptyOnStartup) {
// peerInstancesTransferEmptyOnStartup = true 时,表示当前服务端是集群节点中第一个启动的
// 这个时候注册表为空,不允许访问
// 要在一定时间后才能访问,默认当前服务端启动5分钟后
if (!(System.currentTimeMillis() > this.startupTime + serverConfig.getWaitTimeInMsWhenSyncEmpty())) {
return false;
}
}
if (remoteRegionRequired) {
// 如果需要从远程 region 拉取注册表,本地所有注册了的远程 region 只要有一个还没准备好,那么不允许访问
for (RemoteRegionRegistry remoteRegionRegistry : this.regionNameVSRemoteRegistry.values()) {
if (!remoteRegionRegistry.isReadyForServingData()) {
return false;
}
}
}
return true;
}
1.2 responseCache.getGZIP()
// ResponseCacheImpl.class
public byte[] getGZIP(Key key) {
// 从缓存中获取注册表信息
// shouldUseReadOnlyResponseCache:表示是否允许使用只读缓存,默认为 true
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
// 返回压缩过的数据
return payload.getGzipped();
}
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
// 如果允许从只读缓存中获取注册表信息
// 先从只读缓存中获取,如果只读缓存中为空,再从读写缓存中获取并保存到只读缓存中
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
// 如果不允许从只读缓存中获取注册表信息,那么直接从读写缓存中获取
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}
1.3 ResponseCache 缓存机制
分析前简单熟悉下前置知识LoadingCache
:
-
Guava 提供的本地缓存类,在多线程的场景下保证只有一个线程会去加载缓存数据
-
能够指定监听缓存移除时的监听器和实现缓存过期方法,当缓存过期时触发过期方法
-
能够实现加载方法,当缓存中获取不到指定缓存项时触发加载方法
-
有三种基于指定时间清理或刷新缓存项的方式
expireAfterAccess
: 当缓存项在指定的时间段内没有被读或写就会被移除expireAfterWrite
:当缓存项在指定的时间段内没有被刷新就会被移除refreshAfterWrite
:缓存项写入后超过指定时间不会被移除,在获取缓存项时如果发现缓存项最近一次写入超过了指定的时间,则触发加载方法刷新缓存项
-
能够指定缓存初始化大小和最大容量,当超过最大容量时利用 LRU 算法移除缓存项
initialCapacity
:指定初始化大小maximumSize
:指定最大容量
接下来,我们看一下ResponseCache
的代码:
public class ResponseCacheImpl implements ResponseCache {
...
// 只读缓存
private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
// 读写缓存
private final LoadingCache<Key, Value> readWriteCacheMap;
// 判断是否使用只读缓存
private final boolean shouldUseReadOnlyResponseCache;
...
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
......
// todo 这个ReadWrite 缓存是使用的google guava包里面的CacheLoader缓存。
// 如果从readWriteCacheMap中获取不到,会调用CacheLoader的load方法加载
// 构建读写缓存,指定了缓存的初始大小(默认1000)、过期时间(默认180s),实现了缓存过期方法、加载方法
this.readWriteCacheMap =
CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
// 指定监听缓存移除时的监听器
.removalListener(new RemovalListener<Key, Value>() {
@Override
// 当监听到缓存移除时,执行过期方法
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
// 这里的 removedKey 指的是上面的缓存 key
// 如果过期缓存的缓存 key 里的远程 regions 不为空
// 则移除 regionSpecificKeys 相应的值(下面加载方法会放入)
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() {
@Override
// 如果读写缓存中获取不到指定缓存,则触发加载方法
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
// 如果缓存 key 里的远程 regions 不为空
// 则以“移除远程 regions 的缓存 key ”为 key ,“未移除远程 regions 的缓存 key ” 为 value ,放入 regionSpecificKeys 中
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
// todo 加载指定缓存 key 的 value(注册表)
Value value = generatePayload(key);
return value;
}
});
if (shouldUseReadOnlyResponseCache) {
// 如果允许使用只读缓存,则开启一个定时任务,处理只读缓存和读写缓存之间的数据同步
// 固定时间重复执行的定时任务,默认30s后开始,每30s执行一次
// getCacheUpdateTask():缓存同步定时任务
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}
// ......
}
1.3.1 readOnlyCacheMap 和 readWriteCacheMap
-
readOnlyCacheMap
:只读缓存在服务端中添加或更新缓存项通过 put 方法,时机是:- 在获取响应缓存中的注册表信息时,如果允许使用 readOnlyCacheMap ,在 readOnlyCacheMap 中获取不到相应缓存项,则从 readWriteCacheMap 获取后添加进 readOnlyCacheMap
- 执行缓存同步定时任务时,如果存在缓存项在 readOnlyCacheMap 和 readWriteCacheMap 中不一致,则将 readWriteCacheMap 中的更新到 readOnlyCacheMap
-
readWriteCacheMap
:读写缓存在服务端中添加或更新缓存项通过 get 方法,时机是:- 执行缓存同步定时任务时,如果从 readWriteCacheMap 获取不到相应缓存项,则触发加载方法,调用 generatePayload() 方法进行获取后添加
- 在获取响应缓存中的注册表信息时,如果从 readWriteCacheMap 获取不到相应缓存项,则触发加载方法,调用 generatePayload() 方法进行获取后添加
-
readWriteCacheMap
:读写缓存在服务端中清理缓存项通过invalidate()
方法,时机是:处理客户端注册、下架、更改状态、删除状态
1.3.2 缓存同步定时任务
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
// 遍历只读缓存的缓存 key
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
// 设置版本号
CurrentRequestVersion.set(key.getVersion());
// 从readWrite 缓存中获取该key对应的value
Value cacheValue = readWriteCacheMap.get(key);
// 从readOnly中获取该key对应缓存的值
Value currentCacheValue = readOnlyCacheMap.get(key);
// 如果两个value不一致的话,说明出现了缓存不一致的情况,这个时候就会更新readOnly里面的缓存
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}
1.3.3 获取注册表
// ResponseCacheImpl.class
private Value generatePayload(Key key) {
Stopwatch tracer = null;
try {
String payload;
switch (key.getEntityType()) {
// 主要关注该类型
case Application:
boolean isRemoteRegionRequested = key.hasRegions();
// todo 获取全量注册表
if (ALL_APPS.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeAllAppsWithRemoteRegionTimer.start();
// todo 需要获取的注册表,包含本地服务端的和客户端请求指定的远程 regions 的
// 调用 getApplicationsFromMultipleRegions() 方法获取
// getPayLoad():返回结果前,转换格式,默认 json
payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeAllAppsTimer.start();
// todo 获取注册表中所有的实例信息
payload = getPayLoad(key, registry.getApplications());
}
// todo 获取增量注册表
} else if (ALL_APPS_DELTA.equals(key.getName())) {
if (isRemoteRegionRequested) {
tracer = serializeDeltaAppsWithRemoteRegionTimer.start();
versionDeltaWithRegions.incrementAndGet();
versionDeltaWithRegionsLegacy.incrementAndGet();
// todo 调用 getApplicationDeltasFromMultipleRegions() 方法获取
payload = getPayLoad(key,
registry.getApplicationDeltasFromMultipleRegions(key.getRegions()));
} else {
tracer = serializeDeltaAppsTimer.start();
versionDelta.incrementAndGet();
versionDeltaLegacy.incrementAndGet();
// todo 获取注册表,调用 getApplicationDeltas() 方法获取
payload = getPayLoad(key, registry.getApplicationDeltas());
}
} else {
tracer = serializeOneApptimer.start();
payload = getPayLoad(key, registry.getApplication(key.getName()));
}
break;
case VIP:
case SVIP:
tracer = serializeViptimer.start();
payload = getPayLoad(key, getApplicationsForVip(key, registry));
break;
default:
logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType());
payload = "";
break;
}
return new Value(payload);
} finally {
if (tracer != null) {
tracer.stop();
}
}
}
registry.getApplicationsFromMultipleRegions()
// AbstractInstanceRegistry.class
public Applications getApplicationsFromMultipleRegions(String[] remoteRegions) {
boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;
logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",
includeRemoteRegion, remoteRegions);
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();
} else {
GET_ALL_CACHE_MISS.increment();
}
// 新建一个注册表对象
// 该对象既是返回给客户端请求的注册表,也是 readWriteCacheMap 只读缓存中的 value
Applications apps = new Applications();
apps.setVersion(1L);
// todo 遍历 注册表registry
for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) {
Application app = null;
if (entry.getValue() != null) {
for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) {
Lease<InstanceInfo> lease = stringLeaseEntry.getValue();
if (app == null) {
// 从相应实例的租约信息中取出服务名,新建一个服务信息对象 app
// 第一进入当前 for 循环时候触发
app = new Application(lease.getHolder().getAppName());
}
// 相应实例的租约信息转换成实例信息,添加到 app
app.addInstance(decorateInstanceInfo(lease));
}
}
if (app != null) {
// 服务实例信息添加到 apps
apps.addApplication(app);
}
}
// 远程region
if (includeRemoteRegion) {
// 遍历需要获取的远程 regions
for (String remoteRegion : remoteRegions) {
// 获取已经注册到本地的远程 region 信息
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
// 获取注册表信息
Applications remoteApps = remoteRegistry.getApplications();
for (Application application : remoteApps.getRegisteredApplications()) {
// 判断是否允许该服务从注册表获取
// 白名单机制
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
logger.info("Application {} fetched from the remote region {}",
application.getName(), remoteRegion);
// 根据实例名从 apps 中获取服务信息
Application appInstanceTillNow = apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
// 如果上面处理本地注册表后, apps 没有相应服务信息,而注册到本地的远程 regions 注册表中有
// 则新建一个服务信息添加到 apps
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
// 实例信息添加到服务信息
appInstanceTillNow.addInstance(instanceInfo);
}
} else {
logger.debug("Application {} not fetched from the remote region {} as there exists a "
+ "whitelist and this app is not in the whitelist.",
application.getName(), remoteRegion);
}
}
} else {
logger.warn("No remote registry available for the remote region {}", remoteRegion);
}
}
}
apps.setAppsHashCode(apps.getReconcileHashCode());
return apps;
}
registry.getApplications()
// AbstractInstanceRegistry.class
public Applications getApplications() {
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (disableTransparentFallback) {
return getApplicationsFromLocalRegionOnly();
} else {
return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled.
}
}
public Applications getApplicationsFromLocalRegionOnly() {
// getApplicationsFromMultipleRegions() 方法上面已分析
// 这里入参 EMPTY_STR_ARRAY 表示只获取本地注册表
return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY);
}
public Applications getApplicationsFromAllRemoteRegions() {
// getApplicationsFromMultipleRegions() 方法上面已分析
// 这里入参 allKnownRemoteRegions 表示获取服务端本地的注册表和所有注册到本地的远程 region 的注册表
return getApplicationsFromMultipleRegions(allKnownRemoteRegions);
}
2. 处理客户端拉取增量注册表
增量拉取注册表的请求: GET请求 path为: apps/delta 详见Eureka源码3-Client启动入口(注册,续约,定时任务)中第4.1.2小节
服务端接收客户端拉取增量注册表的请求在 ApplicationsResource
类中的 getContainers()
方法:
// ApplicationsResource.class
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
// 代码逻辑只有下面两点和全量的不同,其他基本一样
...
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
// 如果本地服务端配置文件配置了禁止拉取增量注册表或不允许访问注册表
// 则返回403,拒绝请求
return Response.status(Status.FORBIDDEN).build();
}
...
// 定义缓存 key
// ResponseCacheImpl.ALL_APPS_DELTA:表示增量拉取
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
...
}
接下里,我们接着上面1.3.3.小节,分析增量拉取的代码:
2.1 getApplicationDeltasFromMultipleRegions()
// AbstractInstanceRegistry.class
public Applications getApplicationDeltasFromMultipleRegions(String[] remoteRegions) {
// null 表示需要拉取所有注册到本地的远程 region 注册表
if (null == remoteRegions) {
remoteRegions = allKnownRemoteRegions; // null means all remote regions.
}
boolean includeRemoteRegion = remoteRegions.length != 0;
if (includeRemoteRegion) {
GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS_DELTA.increment();
} else {
GET_ALL_CACHE_MISS_DELTA.increment();
}
// 新建一个注册表对象
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDeltaWithRegions().get());
// 新建一个服务实例 map
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
// 开启写锁
write.lock();
try {
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is :{}", this.recentlyChangedQueue.size());
// 遍历最近变更队列
while (iter.hasNext()) {
// 获取实例租约信息
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
// 获取实例租约信息
InstanceInfo instanceInfo = lease.getHolder();
logger.debug("The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo.getAppName());
if (app == null) {
// applicationInstancesMap 中获取不到服务信息,则新建一个添加进去
// 当服务信息第一次新建时触发
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
// 服务信息添加到 apps
apps.addApplication(app);
}
// 相应实例的租约信息转换成实例信息,添加到 app
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
// 远程region
if (includeRemoteRegion) {
// 遍历需要获取的远程 regions
for (String remoteRegion : remoteRegions) {
// 获取已经注册到本地的远程 region 信息
RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion);
if (null != remoteRegistry) {
// 获取已经注册到本地的远程 region 信息
Applications remoteAppsDelta = remoteRegistry.getApplicationDeltas();
if (null != remoteAppsDelta) {
for (Application application : remoteAppsDelta.getRegisteredApplications()) {
// 判断是否允许该服务从注册表获取
// 白名单机制
if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) {
// 根据实例名从 apps 中获取服务信息
Application appInstanceTillNow =
apps.getRegisteredApplications(application.getName());
if (appInstanceTillNow == null) {
// 如果上面处理本地注册表后, apps 没有相应服务信息,而注册到本地的远程 regions 注册表中有
// 则新建一个服务信息添加到 apps
appInstanceTillNow = new Application(application.getName());
apps.addApplication(appInstanceTillNow);
}
for (InstanceInfo instanceInfo : application.getInstances()) {
// 实例信息添加到服务信息
appInstanceTillNow.addInstance(new InstanceInfo(instanceInfo));
}
}
}
}
}
}
}
// 对本地全量注册表(包含注册到本地的远程 region 的)计算 hashCode 值,返回给客户端
// 客户端在收到服务端返回的注册表更新后,也对自己的注册表计算 hashCode 值
// 如果两个 hashCode 值不一致表示两端注册表不一致,客户端再发起全量拉取注册表请求
Applications allApps = getApplicationsFromMultipleRegions(remoteRegions);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
2.2 registry.getApplicationDeltas()
// AbstractInstanceRegistry.class
@Deprecated
public Applications getApplicationDeltas() {
GET_ALL_CACHE_MISS_DELTA.increment();
// 新建一个注册表对象
Applications apps = new Applications();
apps.setVersion(responseCache.getVersionDelta().get());
Map<String, Application> applicationInstancesMap = new HashMap<String, Application>();
// 开启写锁
write.lock();
try {
// 遍历recentlyChangedQueue 队列(服务注册、续约、下线 的时候都会被塞到这最近改变队列中,这个队列只会保留最近三分钟的数据)
Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator();
logger.debug("The number of elements in the delta queue is : {}",
this.recentlyChangedQueue.size());
// 遍历最近更改队列
while (iter.hasNext()) {
// 获取实例租约信息
Lease<InstanceInfo> lease = iter.next().getLeaseInfo();
// 获取实例信息
InstanceInfo instanceInfo = lease.getHolder();
logger.debug(
"The instance id {} is found with status {} and actiontype {}",
instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name());
Application app = applicationInstancesMap.get(instanceInfo
.getAppName());
if (app == null) {
// applicationInstancesMap 中获取不到服务信息,则新建一个添加进去
// 当服务信息第一次新建的时触发
app = new Application(instanceInfo.getAppName());
applicationInstancesMap.put(instanceInfo.getAppName(), app);
// 服务信息添加到 apps
apps.addApplication(app);
}
// 相应实例的租约信息转换成实例信息,添加到 app
app.addInstance(new InstanceInfo(decorateInstanceInfo(lease)));
}
boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();
if (!disableTransparentFallback) {
// 全量获取本地注册表信息,不包含远程 regions 的
Applications allAppsInLocalRegion = getApplications(false);
// 遍历所有注册到本地的远程 regions 信息
for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) {
// 获取注册表增量信息
Applications applications = remoteRegistry.getApplicationDeltas();
// 遍历注册表增量信息
for (Application application : applications.getRegisteredApplications()) {
Application appInLocalRegistry =
allAppsInLocalRegion.getRegisteredApplications(application.getName());
if (appInLocalRegistry == null) {
// 如果 注册到本地的远程 region 增量注册表的某个服务实例 在 本地注册表信息 中存在
// 则该服务实例信息添加到 apps
apps.addApplication(application);
}
}
}
}
// 获取全量注册表实例信息(包含注册到本地的远程 region 的),生成一个hashcode,这个hashcode主要是带给客户端,
// 客户端收到响应后,先将最近变更是实例信息更新到自己的本地注册表中,
// 然后对自己本地注册表实例信息生成一个hashcode ,与
// Server响应回来的那个hashcode做比较,如果一样的话,说明本地注册表与Eureka Server的一样,
// 如果不一样的话,说明出现了差异,就会全量拉取一次注册表信息放到本地注册表中。
Applications allApps = getApplications(!disableTransparentFallback);
apps.setAppsHashCode(allApps.getReconcileHashCode());
return apps;
} finally {
write.unlock();
}
}
3. 总结
3.1 主要流程
- 首先,当服务端收到客户端拉取注册表请求时,判断是否允许访问注册表,如果不允许的话返回403拒绝请求,如果允许的话继续处理
- 然后,定义缓存 key ,根据缓存 key 从响应缓存(只读缓存、读写缓存)中获取注册表,压缩后返回给客户端
3.2 获取注册表
- 全量:
- 从本地注册表中获取,如果有需要的话,还要从注册到本地的远程 region 注册表中获取,前后进行合并
- 增量:
- 从本地最近变更队列中获取,如果有需要的话,还要从注册到本地的远程 region 增量注册表中获取,前后进行合并
3.3 几个问题
-
问题1:
readWriteCacheMap
中的数据从哪里来?- 在
ResponseCacheImpl
构造器中创建并初始化了这个读写缓存map。
- 在
-
问题2 :
readOnlyCacheMap
的数据来自于readWriteCacheMap
,但readWriteCacheMap
中的数据若发生了变更,那么readOnlyCacheMap
中的数据一定也需要发生变化,那么readOnlyCacheMap
中的数据在最里发生的变更?- 在
ResponseCacheImpl
构造器中定义并开启了一个定时任务,用于定时从readWriteCacheMap
中更 新readOnlyCacheMap
中的数据。
- 在
-
问题3:为什么不直接从
readWriteCacheMap
中获取数据,而是从readOnlyCacheMap
获取?即这种方案的好处是什么?- 为了保证对
readWriteCacheMap
的迭代稳定性。即将读写进行了分离,分离到了两个共享集合。但这种解决方案存在一个很严重的弊端:读、写两个集合的数据无法保证强一致性,即只能做到最终一致性。所以这种方案的应用场景是,对数据的实时性要求不是很高,对数据是否是最新数据要求不高
- 为了保证对
参考文章
eureka-0.10.11源码(注释) springcloud-source-study学习github地址 Eureka源码解析 SpringCloud技术栈系列文章 Eureka 源码解析
转载自:https://juejin.cn/post/7157604129878048799