likes
comments
collection
share

Eureka源码13-EurekaServer扩展(读写锁)

作者站长头像
站长
· 阅读数 18

欢迎大家关注 github.com/hsfxuebao ,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

0. 环境

  • eureka版本:1.10.11
  • Spring Cloud : 2020.0.2
  • Spring Boot :2.4.4

测试代码:github.com/hsfxuebao/s…

1. 读写锁

1.1 位置

// AbstractInstanceRegistry.class
public abstract class AbstractInstanceRegistry implements InstanceRegistry {
    
    // 最近变更队列
    private ConcurrentLinkedQueue<RecentlyChangedItem> recentlyChangedQueue = new ConcurrentLinkedQueue<RecentlyChangedItem>();
    
    private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    // 读锁
    private final Lock read = readWriteLock.readLock();
    // 写锁
    private final Lock write = readWriteLock.writeLock();
    ...   
}

1.2 锁机制

  • 读锁之间不互斥,写锁之间互斥,读写锁之间互斥
  • 当一个线程获取读锁时,另一个线程能获取读锁但是不能获取写锁;当一个线程获取写锁,另一个线程读锁或写锁都不能获取。这个和 InnoDB 的读写锁是一样的

1.3 调用方

  • read:处理客户端注册、下架、状态变更、删除状态
  • write:处理客户端拉取增量注册表

1.4 分析

  • 在上述处理的相关代码中,可以发现共同点就是操作了 recentlyChangedQueue 最近变更队列,读写锁就是服务于这个队列

  • 对于 recentlyChangedQueue 队列,写的场景很多,读的场景只有拉取增量注册表。当多个场景多线程对于 recentlyChangedQueue 进行写操作时,recentlyChangedQueue 的数据结构是线程安全的,所以不用加写锁,但是加读锁和为了配合拉取增量注册表的读场景,保证对 recentlyChangedQueue 写操作时无法读操作,读操作时不能写操作,保证拉取增量注册表时获取的 recentlyChangedQueue 没变更,recentlyChangedQueue 变更时不给拉取增量注册表

2. invalidateCache()

2.1 源码解析

// AbstractInstanceRegistry.class
/**
 * 在服务注册、 服务续约、服务下线的时候,都有说过一个事情,
 * 就是更新了完了注册表,就会去删除对应的缓存
 */
private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
    // todo invalidate cache 响应缓存失效
    responseCache.invalidate(appName, vipAddress, secureVipAddress);
}

// ResponseCacheImpl.class
@Override
public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) {
    for (Key.KeyType type : Key.KeyType.values()) {
        for (Version v : Version.values()) {
            // 指定服务名、全量、增量相关的缓存失效
            // 缓存 key 中没有远程 region
            // todo 调用invalidate 方法清除
            invalidate(
                    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),
                    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),
                    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),
                    new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)
            );
            if (null != vipAddress) {
                invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));
            }
            if (null != secureVipAddress) {
                invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));
            }
        }
    }
}

// ResponseCacheImpl.class
public void invalidate(Key... keys) {
    for (Key key : keys) {
        logger.debug("Invalidating the response cache key : {} {} {} {}, {}",
                key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());

        // “无远程 regin 缓存 key ”的读写缓存失效
        // todo 清理对应key的所有缓存
        readWriteCacheMap.invalidate(key);
        // 从 regionSpecificKeys 根据 “无远程 regin 缓存 key ” 取出 “有远程 regin 缓存 key ”
        Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
        if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
            for (Key keysWithRegion : keysWithRegions) {
                logger.debug("Invalidating the response cache key : {} {} {} {} {}",
                        key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
                // “有远程 regin 缓存 key ”的读写缓存失效
                readWriteCacheMap.invalidate(keysWithRegion);
            }
        }
    }
}

3. 关于读/写锁的问题

3.1 问题的发现

我们发现了三个问题:

  • 读/写锁是反着加的,为什么? 整体来说,是为了保证共享集合recentlyChangedQueue的迭代稳定性。

    方法名操作的共享集合读/写操作添加的锁
    register()registry、recentlyChangedQueue
    statusUpdate()registry、recentlyChangedQueue
    internalCancel()registry、recentlyChangedQueue
    deleteStatusOverride()registry、recentlyChangedQueue
    renew()registry
    全量下载方法registry
    增量下载方法recentlyChangedQueue
  • 同样都是写操作,为什么处理续约请求的方法中却没有加锁?

  • 同样都是读操作,为什么全量下载方法中却没有加锁

3.2 问题解答

加锁方法的特征

所有对recentlyChangedQueue共享集合操作的方法都添加了锁。而没有对其进行操作的方法,没有加 锁。

为什么写操作要添加读锁?

若对这些写操作添加写锁,是否可以呢?写锁是排它锁。若要为这些对recentlyChangedQueue进行的 写操作添加写锁,则意味着当有一个写操作发生时,对recentlyChangedQueue的所有其它读/写操作, 均会发生排队等待(阻塞),会导致效率低下。

而若要添加读锁,则会使所有对recentlyChangedQueue执行的写操作实现并行,提高了执行效率。不过,这些写操作会引发线程安全问题吗?不会。因为recentlyChangedQueue是JUC的队列,是线程安全的

需要注意,虽然我们的关注点一直都在recentlyChangedQueue上,但从代码角度来说,也为registry的 写操作添加了读锁。不会影响registry的并行效率吗?不会。因为读锁是共享锁。

为什么读操作添加写锁?

为了保证对共享集合recentlyChangedQueue的读/写操作的互斥。不过,该方式会导致读操作效率的低 下,因为读操作无法实现并行读取,只能排队读取。因为写锁是排它的。

读写锁反加应用场景

写操作相对于读操作更加频繁的场景。

续约操作能否添加写锁

不能。因为续约操作是一个发生频率非常高的写操作,若为其添加了写锁,则意味着其它client的续约 处理无法实现并行,发生排队等待。因为写锁是排它锁。

续约操作能否添加读锁

不能。因为添加读锁的目的是为了与写锁操作实现互斥。在上述所有方法中,对registry的所有操作中 均没有添加写锁,所以这里的写操作也无需添加读锁。

如果不加锁会怎样

若不对recentlyChangedQueue的操作加锁,可能会存在同时对recentlyChangedQueue进行读写操作的 情况。可能会引发对recentlyChangedQueue的迭代稳定性问题。

为什么全量下载没有添加写锁

若为其添加了写锁,则必须会导致某client在读取期间,其它client的续约请求处理被阻塞的情况

参考文章

eureka-0.10.11源码(注释) springcloud-source-study学习github地址 Eureka源码解析 SpringCloud技术栈系列文章 Eureka 源码解析