8、Nacos 配置服务客户端源码分析(一)
Nacos
除了服务注册中心,还集成了配置中心。本篇就从Nacos
的配置中心的客户端源码入手。分析Nacos
客户端是如何获取到Nacos
服务的配置文件的。
分析源码的第一步是找到入口,官方再次给了我们一个注册中心的样例代码,在源码的nacos-example
工程中。先看下这部分的代码。
public static void main(String[] args) throws NacosException, InterruptedException {
String serverAddr = "localhost";
String dataId = "test";
String group = "DEFAULT_GROUP";
Properties properties = new Properties();
properties.put("serverAddr", serverAddr);
// 通过工厂模式创建了一个ConfigService服务
ConfigService configService = NacosFactory.createConfigService(properties);
// 获取配置信息
String content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
// 添加监听器,监听对应dataId,group的配置信息
configService.addListener(dataId, group, new Listener() {
@Override
public void receiveConfigInfo(String configInfo) {
System.out.println("receive:" + configInfo);
}
@Override
public Executor getExecutor() {
return null;
}
});
// 发布配置信息
boolean isPublishOk = configService.publishConfig(dataId, group, "content");
System.out.println(isPublishOk);
Thread.sleep(3000);
// 获取配置
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
// 移除配置
boolean isRemoveOk = configService.removeConfig(dataId, group);
System.out.println(isRemoveOk);
Thread.sleep(3000);
// 获取配置
content = configService.getConfig(dataId, group, 5000);
System.out.println(content);
Thread.sleep(300000);
}
NacosConfigService
创建
创建ConfigService
实现比较简单,就是通过反射创建了NacosConfigService
。只是要注意下创建NacosConfigService
时会调用它的构造方法,做一些初始化的操作。
public NacosConfigService(Properties properties) throws NacosException {
final NacosClientProperties clientProperties = NacosClientProperties.PROTOTYPE.derive(properties);
ValidatorUtils.checkInitParam(clientProperties);
// 初始化Namespace
initNamespace(clientProperties);
// 创建了一个配置过滤器链,可以采用SPI扩展机制加载对应的过滤器实现类
this.configFilterChainManager = new ConfigFilterChainManager(clientProperties.asProperties());
ServerListManager serverListManager = new ServerListManager(clientProperties);
// 创建了一个服务管理器,内含一个定时轮询线程池,每隔30s拉取一次服务
serverListManager.start();
// 创建了一个客户端工作者,包含了一个代理对象
this.worker = new ClientWorker(this.configFilterChainManager, serverListManager, clientProperties);
// will be deleted in 2.0 later versions
agent = new ServerHttpAgent(serverListManager);
}
在下一个篇章将重点介绍一下ClientWorker
,这个是个核心,所有和服务端的交互都通过它。因为本篇主要讲解客户端注册的流程框架,先不深入细节。这里只需要知道这里进行了创建。
获取配置信息
创建ConfigService
后就可以通过接口获取配置信息了。
String content = configService.getConfig(dataId, group, 5000);
public String getConfig(String dataId, String group, long timeoutMs) throws NacosException {
return getConfigInner(namespace, dataId, group, timeoutMs);
}
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException {
group = blank2defaultGroup(group);
// 检查参数
ParamUtils.checkKeyParam(dataId, group);
ConfigResponse cr = new ConfigResponse();
// 设置配置信息
cr.setDataId(dataId);
cr.setTenant(tenant);
cr.setGroup(group);
// We first try to use local failover content if exists.
// A config content for failover is not created by client program automatically,
// but is maintained by user.
// This is designed for certain scenario like client emergency reboot,
// changing config needed in the same time, while nacos server is down.
// 这里有个失败转移的配置。如果能读到失败转移的配置信息,则直接返回了。原因的话英文注释写的很清楚了
// 优先使用失败转移,设计的目的是当server挂后,又需要修改配置,就可以读本地目录
String content = LocalConfigInfoProcessor.getFailover(worker.getAgentName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}",
worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
cr.setContent(content);
String encryptedDataKey = LocalEncryptedDataKeyProcessor
.getEncryptDataKeyFailover(agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
try {
// 通过客户端远程拉取配置信息
ConfigResponse response = worker.getServerConfig(dataId, group, tenant, timeoutMs, false);
cr.setContent(response.getContent());
cr.setEncryptedDataKey(response.getEncryptedDataKey());
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
} catch (NacosException ioe) {
if (NacosException.NO_RIGHT == ioe.getErrCode()) {
throw ioe;
}
LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}",
worker.getAgentName(), dataId, group, tenant, ioe.toString());
}
// 非鉴权失败的异常的,可以从本地快照中获取配置,如果有的话
content = LocalConfigInfoProcessor.getSnapshot(worker.getAgentName(), dataId, group, tenant);
if (content != null) {
LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}",
worker.getAgentName(), dataId, group, tenant, ContentUtils.truncateContent(content));
}
cr.setContent(content);
String encryptedDataKey = LocalEncryptedDataKeyProcessor
.getEncryptDataKeySnapshot(agent.getName(), dataId, group, tenant);
cr.setEncryptedDataKey(encryptedDataKey);
configFilterChainManager.doFilter(null, cr);
content = cr.getContent();
return content;
}
获取配置信息经历了三个步骤:
- 从本地失败转移的文件夹中获取配置,这个是手工添加的,程序不会自动处理,是针对一些特定情况下,比如服务挂了还需要修改本地的配置的情况。给了一次本地修改处理的方式,也算是预留了一个备案,防止一些极端情况。但是这个得了解其固定的目录和拉取的配置,再处理,用完了要及时删除,否则会一直拉取本地的文件,毕竟是优先处理这部分的逻辑
- 去服务端拉取,这个就是正常逻辑,获取服务端存储的配置信息
- 对于出现了比如超时的情况,在有本地快照的情况的,从本地快照拉取配置,不至于偶尔超时了就配置没了
监听配置
监听采用的回调的思想,当服务端通知的时候,调用回调方法。
@Override
public void addListener(String dataId, String group, Listener listener) throws NacosException {
worker.addTenantListeners(dataId, group, Arrays.asList(listener));
}
public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners)
throws NacosException {
group = blank2defaultGroup(group);
String tenant = agent.getTenant();
// 根据dataId,group和listeners获取一个cacheData
CacheData cache = addCacheDataIfAbsent(dataId, group, tenant);
synchronized (cache) {
for (Listener listener : listeners) {
cache.addListener(listener);
}
// 非丢弃,删除类型
cache.setDiscard(false);
// 未同步到服务端
cache.setSyncWithServer(false);
// 处理配置类
agent.notifyListenConfig();
}
}
private final BlockingQueue<Object> listenExecutebell = new ArrayBlockingQueue<>(1);
@Override
public void notifyListenConfig() {
// listenExecutebell是一个阻塞队列,放入bellItem,即一个触发条件,相当于生产者
listenExecutebell.offer(bellItem);
}
@Override
public void startInternal() {
// 线程池在阻塞等到信号的到来
executor.schedule(() -> {
while (!executor.isShutdown() && !executor.isTerminated()) {
try {
// 获取到listenExecutebell.offer(bellItem)的信号
listenExecutebell.poll(5L, TimeUnit.SECONDS);
if (executor.isShutdown() || executor.isTerminated()) {
continue;
}
// 触发执行监听
executeConfigListen();
} catch (Throwable e) {
LOGGER.error("[ rpc listen execute ] [rpc listen] exception", e);
}
}
}, 0L, TimeUnit.MILLISECONDS);
}
@Override
public void executeConfigListen() {
// 省略其他代码...
for (CacheData cache : cacheMap.get().values()) {
synchronized (cache) {
//check local listeners consistent.
if (cache.isSyncWithServer()) {
// 检查监听的md5值
cache.checkListenerMd5();
if (!needAllSync) {
continue;
}
}
// 省略其他代码...
}
}
// 省略其他代码...
}
void checkListenerMd5() {
for (ManagerListenerWrap wrap : listeners) {
if (!md5.equals(wrap.lastCallMd5)) {
// 通知变动的配置信息
safeNotifyListener(dataId, group, content, type, md5, encryptedDataKey, wrap);
}
}
}
private void safeNotifyListener(final String dataId, final String group, final String content, final String type,
final String md5, final String encryptedDataKey, final ManagerListenerWrap listenerWrap) {
// 省略其他代码...
Runnable job = () -> {
// 省略其他代码...
// 触发回调
listener.receiveConfigInfo(contentTmp);
// 省略其他代码...
};
final long startNotify = System.currentTimeMillis();
try {
// 监听器配置了异步执行器,就用配置的执行
if (null != listener.getExecutor()) {
listener.getExecutor().execute(job);
} else {
try {
// 内部线程池执行
INTERNAL_NOTIFIER.submit(job);
} catch (RejectedExecutionException rejectedExecutionException) {
// 省略其他代码...
job.run();
} catch (Throwable throwable) {
// 省略其他代码...
job.run();
}
}
} catch (Throwable t) {
// 省略其他代码...
}
// 省略其他代码...
}
监听配置是在cacheData中配置上监听器,等待触发条件后,进行本地的内容和远程内容的比对,如果不一致,调用监听器上的回调逻辑,完成配置的更新通知。
Spring-Cloud-alibaba-nacos-config
的处理
经过对以上的分析,我们再看一看Spring-Cloud-alibaba-nacos-config
配置的代码是如何完成配置的更新的(这里用的是spring-cloud-alibaba 2.2.0.RELEASE
版本的代码)。
public class NacosConfigAutoConfiguration {
// 省略其他代码...
@Bean
public NacosContextRefresher nacosContextRefresher(
NacosConfigManager nacosConfigManager,
NacosRefreshHistory nacosRefreshHistory) {
// 创建了一个nacos上下文刷新器
return new NacosContextRefresher(nacosConfigManager, nacosRefreshHistory);
}
}
public class NacosContextRefresher
implements ApplicationListener<ApplicationReadyEvent>, ApplicationContextAware {
// 省略其他代码...
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
if (this.ready.compareAndSet(false, true)) {
// 注册nacos监听
this.registerNacosListenersForApplications();
}
}
// 省略其他代码...
private void registerNacosListenersForApplications() {
if (isRefreshEnabled()) {
for (NacosPropertySource propertySource : NacosPropertySourceRepository
.getAll()) {
if (!propertySource.isRefreshable()) {
continue;
}
String dataId = propertySource.getDataId();
// 注册nacos监听
registerNacosListener(propertySource.getGroup(), dataId);
}
}
}
private void registerNacosListener(final String groupKey, final String dataKey) {
String key = NacosPropertySourceRepository.getMapKey(dataKey, groupKey);
// 创建监听
Listener listener = listenerMap.computeIfAbsent(key,
lst -> new AbstractSharedListener() {
@Override
public void innerReceive(String dataId, String group,
String configInfo) {
refreshCountIncrement();
nacosRefreshHistory.addRefreshRecord(dataId, group, configInfo);
// 当有配置信息变动的时候,通知spring cloud 刷新事件
applicationContext.publishEvent(
new RefreshEvent(this, null, "Refresh Nacos config"));
if (log.isDebugEnabled()) {
log.debug(String.format(
"Refresh Nacos config group=%s,dataId=%s,configInfo=%s",
group, dataId, configInfo));
}
}
});
try {
// 很熟悉,对不对,绑定监听
configService.addListener(dataKey, groupKey, listener);
}
catch (NacosException e) {
log.warn(String.format(
"register fail for nacos listener ,dataId=[%s],group=[%s]", dataKey,
groupKey), e);
}
}
}
其代码主要逻辑如下:
- 在
spring.factories
中配置自动配置类NacosConfigAutoConfiguration
NacosConfigAutoConfiguration
创建NacosContextRefresher
NacosContextRefresher
监听ApplicationReadyEvent
事件- 触发
ApplicationReadyEvent
事件后注册监听,创建监听配置的dataId
,group
对应的配置信息的监听类 configService
绑定监听器,如果监听到配置信息的变化,则发布RefreshEvent
spring cloud
监听到RefreshEvent
后刷新配置,刷新spring容器
总结
本篇从ConfigExample
入手,主要介绍了配置服务是如何初始化,获取配置信息的。并且分析了监听器的作用已经简单分析了spring-cloud-alibaba-nacos-config
的部分源码,了解了nacos
是如何结合spring-cloud
。下篇将重点介绍ClientWorker
和cacheData
。敬请期待。
转载自:https://juejin.cn/post/7212937418960683065