Eureka源码13-EurekaServer扩展(读写锁)
欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈
0. 环境
- eureka版本:1.10.11
- Spring Cloud : 2020.0.2
- Spring Boot :2.4.4
1. 读写锁
1.1 位置
// AbstractInstanceRegistry.class
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
// 最近变更队列
private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 读锁
private final Lock read = readWriteLock.readLock();
// 写锁
private final Lock write = readWriteLock.writeLock();
...
}
1.2 锁机制
- 读锁之间不互斥,写锁之间互斥,读写锁之间互斥
- 当一个线程获取读锁时,另一个线程能获取读锁但是不能获取写锁;当一个线程获取写锁,另一个线程读锁或写锁都不能获取。这个和 InnoDB 的读写锁是一样的
1.3 调用方
read
:处理客户端注册、下架、状态变更、删除状态write
:处理客户端拉取增量注册表
1.4 分析
-
在上述处理的相关代码中,可以发现共同点就是操作了
recentlyChangedQueue
最近变更队列,读写锁就是服务于这个队列 -
对于
recentlyChangedQueue
队列,写的场景很多,读的场景只有拉取增量注册表。当多个场景多线程对于recentlyChangedQueue
进行写操作时,recentlyChangedQueue
的数据结构是线程安全的,所以不用加写锁,但是加读锁和为了配合拉取增量注册表的读场景,保证对recentlyChangedQueue
写操作时无法读操作,读操作时不能写操作,保证拉取增量注册表时获取的recentlyChangedQueue
没变更,recentlyChangedQueue
变更时不给拉取增量注册表
2. invalidateCache()
2.1 源码解析
// AbstractInstanceRegistry.class
/**
* 在服务注册、 服务续约、服务下线的时候,都有说过一个事情,
* 就是更新了完了注册表,就会去删除对应的缓存
*/
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
// todo invalidate cache 响应缓存失效
responseCache.invalidate(appName, vipAddress, secureVipAddress);
}
// ResponseCacheImpl.class
@Override
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
for (Key.KeyType type : Key.KeyType.values()) {
for (Version v : Version.values()) {
// 指定服务名、全量、增量相关的缓存失效
// 缓存 key 中没有远程 region
// todo 调用invalidate 方法清除
invalidate(
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
);
if (null != vipAddress) {
invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
}
if (null != secureVipAddress) {
invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
}
}
}
}
// ResponseCacheImpl.class
public void invalidate(Key... keys) {
for (Key key : keys) {
logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
// “无远程 regin 缓存 key ”的读写缓存失效
// todo 清理对应key的所有缓存
readWriteCacheMap.invalidate(key);
// 从 regionSpecificKeys 根据 “无远程 regin 缓存 key ” 取出 “有远程 regin 缓存 key ”
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
// “有远程 regin 缓存 key ”的读写缓存失效
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
3. 关于读/写锁的问题
3.1 问题的发现
我们发现了三个问题:
-
读/写锁是反着加的
,为什么? 整体来说,是为了保证共享集合recentlyChangedQueue的迭代稳定性。方法名 操作的共享集合 读/写操作 添加的锁 register() registry、recentlyChangedQueue 写 读 statusUpdate() registry、recentlyChangedQueue 写 读 internalCancel() registry、recentlyChangedQueue 写 读 deleteStatusOverride() registry、recentlyChangedQueue 写 读 renew() registry 写 无 全量下载方法 registry 读 无 增量下载方法 recentlyChangedQueue 读 写 -
同样都是写操作,
为什么处理续约请求的方法中却没有加锁
? -
同样都是读操作,
为什么全量下载方法中却没有加锁
?
3.2 问题解答
加锁方法的特征
所有对recentlyChangedQueue
共享集合操作的方法都添加了锁。而没有对其进行操作的方法,没有加
锁。
为什么写操作要添加读锁?
若对这些写操作添加写锁,是否可以呢?写锁是排它锁
。若要为这些对recentlyChangedQueue
进行的
写操作添加写锁,则意味着当有一个写操作发生时,对recentlyChangedQueue的
所有其它读/写操作,
均会发生排队等待(阻塞)
,会导致效率低下。
而若要添加读锁,则会使所有对recentlyChangedQueue执行的写操作实现并行,提高了执行效率
。不过,这些写操作会引发线程安全问题吗?不会。因为recentlyChangedQueue是JUC的队列,是线程安全的
。
需要注意,虽然我们的关注点一直都在recentlyChangedQueue
上,但从代码角度来说,也为registry
的
写操作添加了读锁。不会影响registry的并行效率吗?不会。因为读锁是共享锁。
为什么读操作添加写锁?
为了保证对共享集合recentlyChangedQueue
的读/写操作的互斥。不过,该方式会导致读操作效率的低
下,因为读操作无法实现并行读取,只能排队读取。因为写锁是排它的。
读写锁反加应用场景
写操作相对于读操作更加频繁的场景。
续约操作能否添加写锁
不能。因为续约操作是一个发生频率非常高的写操作,若为其添加了写锁,则意味着其它client的续约 处理无法实现并行,发生排队等待。因为写锁是排它锁。
续约操作能否添加读锁
不能。因为添加读锁的目的是为了与写锁操作实现互斥。在上述所有方法中,对registry的所有操作中 均没有添加写锁,所以这里的写操作也无需添加读锁。
如果不加锁会怎样
若不对recentlyChangedQueue
的操作加锁,可能会存在同时对recentlyChangedQueue
进行读写操作的
情况。可能会引发对recentlyChangedQueue
的迭代稳定性问题。
为什么全量下载没有添加写锁
若为其添加了写锁,则必须会导致某client在读取期间,其它client的续约请求处理被阻塞的情况
参考文章
eureka-0.10.11源码(注释) springcloud-source-study学习github地址 Eureka源码解析 SpringCloud技术栈系列文章 Eureka 源码解析
转载自:https://juejin.cn/post/7158793880576475166