9、Nacos 配置服务客户端源码分析(二)
上篇介绍了客户端获取配置信息的流程,包括注册监听,回调通知,不过当时并没有深入细节。没看过的小伙伴可以点击这里 8、Nacos 配置服务客户端源码分析(一)。本篇将全面分析里面的核心点,深入细节。
CacheData
首先需要讲解到的就是CacheData
。所有的配置信息和监听器都存放在CacheData
中,包括一些操作方法。所以它不仅仅是一个存储的配置信息的实体,也包括了操作,获取,更新这个信息的功能。以下通过代码分析一下它的私有变量。
public class CacheData {
// 线程池的最大线程数
static final int CONCURRENCY = 5;
// 线程工厂,表明创建的是后台线程并赋予名字
static ThreadFactory internalNotifierFactory = r -> {
Thread t = new Thread(r);
t.setName("nacos.client.cachedata.internal.notifier");
t.setDaemon(true);
return t;
};
// 是否初始化快照
static boolean initSnapshot;
// 静态块,获取属性值
static {
initSnapshot = NacosClientProperties.PROTOTYPE.getBoolean("nacos.cache.data.init.snapshot", true);
LOGGER.info("nacos.cache.data.init.snapshot = {} ", initSnapshot);
}
// 内部的通知线程池
static final ThreadPoolExecutor INTERNAL_NOTIFIER = new ThreadPoolExecutor(0, CONCURRENCY, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), internalNotifierFactory);
// 名称
private final String name;
// 过滤器链
private final ConfigFilterChainManager configFilterChainManager;
// dataId
public final String dataId;
// group
public final String group;
// tenant
public final String tenant;
// 监听器
private final CopyOnWriteArrayList<ManagerListenerWrap> listeners;
// 配置信息的md5值
private volatile String md5;
// 是否使用本地缓存
private volatile boolean isUseLocalConfig = false;
// 上次配置修改时间,long类型
private volatile long localConfigLastModified;
// 配置信息
private volatile String content;
// 加密数据的key
private volatile String encryptedDataKey;
// 修改时间
private volatile AtomicLong lastModifiedTs = new AtomicLong(0);
// taskId
private int taskId;
// 是否初始化
private volatile boolean isInitializing = true;
// 是否和服务端同步
private volatile boolean isSyncWithServer = false;
// 是否丢失
private volatile boolean isDiscard = false;
// type
private String type;
}
CacheData
的一些方法和成员变量的解释说明会在调用的时候再具体说明,这里只是需要有个印象,它不仅仅是对配置信息的保存,也包含了一些其他的状态位和操作的方法和变量。
ClientWorker
对CacheData
有了初步印象后,我们就开始重点分析一下ClientWorker
。先看它的构造方法。
public ClientWorker(final ConfigFilterChainManager configFilterChainManager, ServerListManager serverListManager,
final NacosClientProperties properties) throws NacosException {
this.configFilterChainManager = configFilterChainManager;
init(properties);
// 创建一个用于配置服务端的Rpc通信客户端
agent = new ConfigRpcTransportClient(properties, serverListManager);
int count = ThreadUtils.getSuitableThreadCount(THREAD_MULTIPLE);
ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(Math.max(count, MIN_THREAD_NUM), r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.Worker");
t.setDaemon(true);
return t;
});
// 配置了一个异步处理线程池
agent.setExecutor(executorService);
// 调用start方法
agent.start();
}
public void start() throws NacosException {
// 认证服务,主要是通过Secret Key和Access Key做认证用
securityProxy.login(this.properties);
this.executor.scheduleWithFixedDelay(() -> securityProxy.login(properties), 0,
this.securityInfoRefreshIntervalMills, TimeUnit.MILLISECONDS);
// 内部启动
startInternal();
}
public void startInternal() {
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
continue;
}
// 执行配置监听
executeConfigListen();
} catch (Throwable e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
在创建ClientWorker
的时候构造方法中做了如下操作:
- 初始化了本地变量和参数
- 创建了一个远程通信的客户端
- 配置了一个定时线程池,并且对需要认证的服务端做了认证
- 执行配置的监听,如果没有监听器的变动,则等待5s处理一次
继续看executeConfigListen()
方法。
public void executeConfigListen() {
// 存放含有listen的cacheData
Map<String, List<CacheData>> listenCachesMap = new HashMap<>(16);
// 存放不含有listen的cacheData
Map<String, List<CacheData>> removeListenCachesMap = new HashMap<>(16);
long now = System.currentTimeMillis();
// 当前时间减去上次全量同步时间是否大于5分钟
boolean needAllSync = now - lastAllSyncTime >= ALL_SYNC_INTERNAL;
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
// 是否和服务端一致
if (cache.isSyncWithServer()) {
// 一致则检查md5值,若md5值和上一个不一样,则说明变动了,需要通知监听器
cache.checkListenerMd5();
// 是否到全量同步时间了,未到则直接跳过
if (!needAllSync) {
continue;
}
}
if (!cache.isDiscard()) {
// 非丢弃型,即新增,放入listenCachesMap
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = listenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<>();
listenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
} else if (cache.isDiscard()) {
// 丢弃型,即删除, 放入removeListenCachesMap
if (!cache.isUseLocalConfigInfo()) {
List<CacheData> cacheDatas = removeListenCachesMap.get(String.valueOf(cache.getTaskId()));
if (cacheDatas == null) {
cacheDatas = new LinkedList<>();
removeListenCachesMap.put(String.valueOf(cache.getTaskId()), cacheDatas);
}
cacheDatas.add(cache);
}
}
}
}
// 此时,如果需要和服务端数据同步,则listenCachesMap和removeListenCachesMap存放了本地数据,需要和服务端对比
boolean hasChangedKeys = false;
// 新增的处理
if (!listenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : listenCachesMap.entrySet()) {
String taskId = entry.getKey();
Map<String, Long> timestampMap = new HashMap<>(listenCachesMap.size() * 2);
List<CacheData> listenCaches = entry.getValue();
for (CacheData cacheData : listenCaches) {
timestampMap.put(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant),
cacheData.getLastModifiedTs().longValue());
}
// 构建新增数据的请求参数,此请求用于远程和本地对比,发现变动了会进行通知
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(listenCaches);
// 配置需要新增或更新监听数据
configChangeListenRequest.setListen(true);
try {
// 获取一个rpc的客户端
RpcClient rpcClient = ensureRpcClient(taskId);
ConfigChangeBatchListenResponse configChangeBatchListenResponse = (ConfigChangeBatchListenResponse) requestProxy(
rpcClient, configChangeListenRequest);
if (configChangeBatchListenResponse.isSuccess()) {
Set<String> changeKeys = new HashSet<>();
//handle changed keys,notify listener
if (!CollectionUtils.isEmpty(configChangeBatchListenResponse.getChangedConfigs())) {
// 有变动的数据
hasChangedKeys = true;
for (ConfigChangeBatchListenResponse.ConfigContext changeConfig : configChangeBatchListenResponse
.getChangedConfigs()) {
String changeKey = GroupKey
.getKeyTenant(changeConfig.getDataId(), changeConfig.getGroup(),
changeConfig.getTenant());
changeKeys.add(changeKey);
// 刷新配置并通知变动
refreshContentAndCheck(changeKey);
}
}
//handler content configs
for (CacheData cacheData : listenCaches) {
String groupKey = GroupKey
.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.getTenant());
if (!changeKeys.contains(groupKey)) {
//sync:cache data md5 = server md5 && cache data md5 = all listeners md5.
synchronized (cacheData) {
if (!cacheData.getListeners().isEmpty()) {
Long previousTimesStamp = timestampMap.get(groupKey);
if (previousTimesStamp != null && !cacheData.getLastModifiedTs()
.compareAndSet(previousTimesStamp, System.currentTimeMillis())) {
continue;
}
// 缓存数据没有变动,设置为和服务器同步
cacheData.setSyncWithServer(true);
}
}
}
// 设置为需要初始化
cacheData.setInitializing(false);
}
}
} catch (Exception e) {
LOGGER.error("Async listen config change error ", e);
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
}
// 需要删除的数据不为空
if (!removeListenCachesMap.isEmpty()) {
for (Map.Entry<String, List<CacheData>> entry : removeListenCachesMap.entrySet()) {
String taskId = entry.getKey();
List<CacheData> removeListenCaches = entry.getValue();
ConfigBatchListenRequest configChangeListenRequest = buildConfigRequest(removeListenCaches);
// 配置需要删除
configChangeListenRequest.setListen(false);
try {
// 获取rpc客户端
RpcClient rpcClient = ensureRpcClient(taskId);
// 通知服务端移除数据
boolean removeSuccess = unListenConfigChange(rpcClient, configChangeListenRequest);
if (removeSuccess) {
for (CacheData cacheData : removeListenCaches) {
synchronized (cacheData) {
if (cacheData.isDiscard()) {
// 移除缓存
ClientWorker.this
.removeCache(cacheData.dataId, cacheData.group, cacheData.tenant);
}
}
}
}
} catch (Exception e) {
LOGGER.error("async remove listen config change error ", e);
}
try {
Thread.sleep(50L);
} catch (InterruptedException interruptedException) {
//ignore
}
}
}
if (needAllSync) {
// 更新同步时间
lastAllSyncTime = now;
}
//If has changed keys,notify re sync md5.
if (hasChangedKeys) {
// 服务端告知了有数据变动,则需要再同步一次
notifyListenConfig();
}
}
上述一大段代码主要是描述的针对配置的变动的处理。触发点有两个
- 本地的数据的变动,比如添加或者移除监听
- 定时轮询5s检查一次本地配置(本地在3种情况下有变化,分别是第一次新增监听、接收了新的配置和监听被移除),如果本地一直没变化,5min去全量同步一次服务端中的配置信息。
其主要是将本地的配置文件一个个去和服务端对比,发现变动了获取服务端最新的配置信息,并通知响应的监听器。这里有个几个重点方法,分别介绍下
第一个就是cache.checkListenerMd5()
// cacheData的校验方法
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
// 内容变动了,直接通知
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
// 获取到监听器
final Listener listener = listenerWrap.listener;
if (listenerWrap.inNotifying) {
LOGGER.warn(
"[{}] [notify-currentSkip] dataId={}, group={}, md5={}, listener={}, listener is not finish yet,will try next time.",
name, dataId, group, md5, listener);
return;
}
// 定义一个通知任务
Runnable job = () -> {
long start = System.currentTimeMillis();
ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader();
ClassLoader appClassLoader = listener.getClass().getClassLoader();
try {
if (listener instanceof AbstractSharedListener) {
// 扩展点,像spring cloud alibaba就用到,创建了NacosContextRefresher
AbstractSharedListener adapter = (AbstractSharedListener) listener;
adapter.fillContext(dataId, group);
LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5);
}
// Before executing the callback, set the thread classloader to the classloader of
// the specific webapp to avoid exceptions or misuses when calling the spi interface in
// the callback method (this problem occurs only in multi-application deployment).
Thread.currentThread().setContextClassLoader(appClassLoader);
ConfigResponse cr = new ConfigResponse();
cr.setDataId(dataId);
cr.setGroup(group);
cr.setContent(content);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
String contentTmp = cr.getContent();
listenerWrap.inNotifying = true;
// 回调通知,也就是通知变动的内容
listener.receiveConfigInfo(contentTmp);
// compare lastContent and content
if (listener instanceof AbstractConfigChangeListener) {
// 扩展点,告知配置内容的变动
Map<String, ConfigChangeItem> data = ConfigChangeHandler.getInstance()
.parseChangeData(listenerWrap.lastContent, contentTmp, type);
ConfigChangeEvent event = new ConfigChangeEvent(data);
((AbstractConfigChangeListener) listener).receiveConfigChange(event);
listenerWrap.lastContent = contentTmp;
}
// 赋值最新的md5
listenerWrap.lastCallMd5 = md5;
} catch (NacosException ex) {
// 省略部分代码...
} catch (Throwable t) {
// 省略部分代码...
} finally {
listenerWrap.inNotifying = false;
Thread.currentThread().setContextClassLoader(myClassLoader);
}
};
final long startNotify = System.currentTimeMillis();
try {
if (null != listener.getExecutor()) {
// 设置了自己监听器的异步线程池,就用自己的执行回调job
listener.getExecutor().execute(job);
} else {
try {
// 没有设置,用内置的线程池回调
INTERNAL_NOTIFIER.submit(job);
} catch (RejectedExecutionException rejectedExecutionException) {
// 省略部分代码...
job.run();
} catch (Throwable throwable) {
// 省略部分代码...
job.run();
}
}
} catch (Throwable t) {
// 省略部分代码...
}
final long finishNotify = System.currentTimeMillis();
// 省略部分代码...
}
这个方法是比对内容的md5值,发现不一致了就会进行回调通知。当然其也做了一个扩展点的预留。俗称钩子方法。可以继承后抽象类后,自己对通知变动的一个扩展。然后它还采用了异步通知的方式。尽可能的不阻塞流程。因为回调的处理存在不确定性,为了不阻塞整个通知回调流程,采用了异步的方法。
第二个是RpcClient rpcClient = ensureRpcClient(taskId)
// 获取一个rpcClient
private RpcClient ensureRpcClient(String taskId) throws NacosException {
synchronized (ClientWorker.this) {
Map<String, String> labels = getLabels();
Map<String, String> newLabels = new HashMap<>(labels);
newLabels.put("taskId", taskId);
// 通过工厂创建一个客户端,这里创建了个GrpcSdkClient
RpcClient rpcClient = RpcClientFactory
.createClient(uuid + "_config-" + taskId, getConnectionType(), newLabels);
if (rpcClient.isWaitInitiated()) {
// 初始化网络请求处理器
initRpcClientHandler(rpcClient);
rpcClient.setTenant(getTenant());
rpcClient.clientAbilities(initAbilities());
// 启动客户端
rpcClient.start();
}
return rpcClient;
}
}
先分析下initRpcClientHandler(rpcClient)
private void initRpcClientHandler(final RpcClient rpcClientInner) {
// 注册了服务端调用客户端的处理方法,注意不是客户端请求,而是客户端接受服务端的请求,因为Grpc是可以双向请求的
rpcClientInner.registerServerRequestHandler((request) -> {
if (request instanceof ConfigChangeNotifyRequest) {
// 服务端推送的请求
ConfigChangeNotifyRequest configChangeNotifyRequest = (ConfigChangeNotifyRequest) request;
LOGGER.info("[{}] [server-push] config changed. dataId={}, group={},tenant={}",
rpcClientInner.getName(), configChangeNotifyRequest.getDataId(),
configChangeNotifyRequest.getGroup(), configChangeNotifyRequest.getTenant());
String groupKey = GroupKey
.getKeyTenant(configChangeNotifyRequest.getDataId(), configChangeNotifyRequest.getGroup(),
configChangeNotifyRequest.getTenant());
CacheData cacheData = cacheMap.get().get(groupKey);
if (cacheData != null) {
synchronized (cacheData) {
// 推送了数据后,设置最后的更新事件,并且表明和服务端不同步,通知处理
cacheData.getLastModifiedTs().set(System.currentTimeMillis());
cacheData.setSyncWithServer(false);
notifyListenConfig();
}
}
return new ConfigChangeNotifyResponse();
}
return null;
});
rpcClientInner.registerServerRequestHandler((request) -> {
if (request instanceof ClientConfigMetricRequest) {
// 指标通知处理
ClientConfigMetricResponse response = new ClientConfigMetricResponse();
response.setMetrics(getMetrics(((ClientConfigMetricRequest) request).getMetricsKeys()));
return response;
}
return null;
});
// 处理连接和断开事件,因为能感应到断开了,所以不需要心跳检测
rpcClientInner.registerConnectionListener(new ConnectionEventListener() {
@Override
public void onConnected() {
LOGGER.info("[{}] Connected,notify listen context...", rpcClientInner.getName());
notifyListenConfig();
}
@Override
public void onDisConnect() {
String taskId = rpcClientInner.getLabels().get("taskId");
LOGGER.info("[{}] DisConnected,clear listen context...", rpcClientInner.getName());
Collection<CacheData> values = cacheMap.get().values();
for (CacheData cacheData : values) {
if (StringUtils.isNotBlank(taskId)) {
if (Integer.valueOf(taskId).equals(cacheData.getTaskId())) {
cacheData.setSyncWithServer(false);
}
} else {
cacheData.setSyncWithServer(false);
}
}
}
});
rpcClientInner.serverListFactory(new ServerListFactory() {
@Override
public String genNextServer() {
return ConfigRpcTransportClient.super.serverListManager.getNextServerAddr();
}
@Override
public String getCurrentServer() {
return ConfigRpcTransportClient.super.serverListManager.getCurrentServerAddr();
}
@Override
public List<String> getServerList() {
return ConfigRpcTransportClient.super.serverListManager.getServerUrls();
}
});
// 增加订阅服务列表变动事件
NotifyCenter.registerSubscriber(new Subscriber<ServerlistChangeEvent>() {
@Override
public void onEvent(ServerlistChangeEvent event) {
rpcClientInner.onServerListChange();
}
@Override
public Class<? extends Event> subscribeType() {
return ServerlistChangeEvent.class;
}
});
}
在初始化网络处理器的时候需要知道的是Grpc
的双向处理,客户端注册了服务端请求到客户端,客户端的处理方法。并且可以对连接和断开事件做出响应。
再分析下rpcClient.start()
public final void start() throws NacosException {
// 设置状态为启动状态
boolean success = rpcClientStatus.compareAndSet(RpcClientStatus.INITIALIZED, RpcClientStatus.STARTING);
if (!success) {
return;
}
// 创建线程池
clientEventExecutor = new ScheduledThreadPoolExecutor(2, r -> {
Thread t = new Thread(r);
t.setName("com.alibaba.nacos.client.remote.worker");
t.setDaemon(true);
return t;
});
// connection event consumer.
clientEventExecutor.submit(() -> {
while (!clientEventExecutor.isTerminated() && !clientEventExecutor.isShutdown()) {
ConnectionEvent take;
try {
// 处理连接事件的阻塞队列
take = eventLinkedBlockingQueue.take();
if (take.isConnected()) {
notifyConnected();
} else if (take.isDisConnected()) {
notifyDisConnected();
}
} catch (Throwable e) {
// Do nothing
}
}
});
clientEventExecutor.submit(() -> {
while (true) {
try {
if (isShutdown()) {
break;
}
// 处理重新连接
ReconnectContext reconnectContext = reconnectionSignal
.poll(rpcClientConfig.connectionKeepAlive(), TimeUnit.MILLISECONDS);
// 省略部分代码...
reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
} catch (Throwable throwable) {
// Do nothing
}
}
});
// 省略部分代码...
// 注册连接重置处理器
registerServerRequestHandler(new ConnectResetRequestHandler());
// 注册网络请求处理器
registerServerRequestHandler(request -> {
if (request instanceof ClientDetectionRequest) {
return new ClientDetectionResponse();
}
return null;
});
}
start
方法是开启了线程池,对连接进行处理,包括连接,断开,重连等事件的处理。Grpc
虽然是双向通信,提升性能,但是由此也更加复杂,需要管理连接,对连接事件做处理。所以,技术并没有银弹,有的是取舍。当你选择一项技术,在享受其便利性的同时也得付出一定的代价,只是这个代价是否能够接受。能否处理好其带来的影响。
总结
本篇介绍了两个核心类CacheData
和ClientWorker
的一些核心方法和说明。希望帮助大家更深一步理解获取配置信息,通知,回调的机制、流程。下篇开始,我们从服务端的角度,分析下服务端是怎么处理客户端请求。
转载自:https://juejin.cn/post/7213307113111306299