10、Nacos 配置服务服务端源码分析(一)
上两篇介绍了Nacos
配置服务客户端的源码,分别是8、Nacos 配置服务客户端源码分析(一)和9、Nacos 配置服务客户端源码分析(二),从本篇开始,让我们站在服务端的角度来分析一下配置服务的服务端源码。
我们先回忆一下是如何使用配置服务的。一般情况下,是通过Nacos
提供的web控制台登录。然后通过界面新增配置信息。后续客户端只要配置了对应的NameSpace
,group
,dataId
就可以在客户端获取到对应的配置信息。既然这样,服务端肯定会存储我们在web控制台配置的配置信息。我们也可以从这个最直观的入口出发,一步步来分析服务端的流程。
现在就不带大家配置,直接开门见山,告诉大家这个web控制台配置的入口在com.alibaba.nacos.config.server.controller.ConfigController#publishConfig
。
@PostMapping
@TpsControl(pointName = "ConfigPublish")
@Secured(action = ActionTypes.WRITE, signType = SignType.CONFIG)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId,
@RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
// 内容加密
Pair<String, String> pair = EncryptionHandler.encryptHandler(dataId, content);
content = pair.getSecond();
// 参数检查
ParamUtils.checkTenant(tenant);
ParamUtils.checkParam(dataId, group, "datumId", content);
ParamUtils.checkParam(tag);
// 构造配置信息
ConfigForm configForm = new ConfigForm();
configForm.setDataId(dataId);
configForm.setGroup(group);
configForm.setNamespaceId(tenant);
configForm.setContent(content);
configForm.setTag(tag);
configForm.setAppName(appName);
configForm.setSrcUser(srcUser);
configForm.setConfigTags(configTags);
configForm.setDesc(desc);
configForm.setUse(use);
configForm.setEffect(effect);
configForm.setType(type);
configForm.setSchema(schema);
if (StringUtils.isBlank(srcUser)) {
configForm.setSrcUser(RequestUtil.getSrcUserName(request));
}
if (!ConfigType.isValidType(type)) {
configForm.setType(ConfigType.getDefaultType().getType());
}
// 构造请求对象
ConfigRequestInfo configRequestInfo = new ConfigRequestInfo();
configRequestInfo.setSrcIp(RequestUtil.getRemoteIp(request));
configRequestInfo.setRequestIpApp(RequestUtil.getAppName(request));
configRequestInfo.setBetaIps(request.getHeader("betaIps"));
// 获取加密key
String encryptedDataKey = pair.getFirst();
// 发布配置
return configOperationService.publishConfig(configForm, configRequestInfo, encryptedDataKey);
}
这段代码逻辑很简单:
- 加密处理
- 参数检查
- 构造配置信息
- 构造请求对象
- 发布配置
加密说明
这个加密处理,我对比了Nacos 2.0
版本,是新增的。之前也有版本基线说2.2进行了插件化的处理。这个加密就是使用了插件化处理。这里详细先分析下加密。主要还是分析下插件化的思想。看看是如何使用插件或者扩展来进行加解密的。
// 静态方法
public static Pair<String, String> encryptHandler(String dataId, String content) {
// 检查是否需要加密
if (!checkCipher(dataId)) {
return Pair.with("", content);
}
Optional<String> algorithmName = parseAlgorithmName(dataId);
// 获取加密的处理类
Optional<EncryptionPluginService> optional = algorithmName
.flatMap(EncryptionPluginManager.instance()::findEncryptionService);
if (!optional.isPresent()) {
// 获取不到,还是走非加密型
LOGGER.warn("[EncryptionHandler] [encryptHandler] No encryption program with the corresponding name found");
return Pair.with("", content);
}
EncryptionPluginService encryptionPluginService = optional.get();
// 根据扩展的插件类,获取密钥
String secretKey = encryptionPluginService.generateSecretKey();
// 利用密钥加密
String encryptContent = encryptionPluginService.encrypt(secretKey, content);
return Pair.with(encryptionPluginService.encryptSecretKey(secretKey), encryptContent);
}
private static boolean checkCipher(String dataId) {
// 判断dataId的前缀,如果以cipher-开头就是需要加密的
return dataId.startsWith(PREFIX) && !PREFIX.equals(dataId);
}
加密逻辑也很简单:
- 判断是否需要处理加密
- 需要的话,去插件里面获取
- 获取不到打日志,返回非加密处理(相当于兜底,没有就默认不加密处理了)
- 获取到加密插件,利用插件获取秘钥,然后再加密
我们继续往下看这段EncryptionPluginManager.instance()::findEncryptionService
这个EncryptionPluginManager.instance()
直接获取实例,说明这个是个单例,然后再获取内部服务
private EncryptionPluginManager() {
// 经典的构造方法初始化
loadInitial();
}
private void loadInitial() {
// 通过NacosServiceLoader扩展机制,获取处理类的集合
Collection<EncryptionPluginService> encryptionPluginServices = NacosServiceLoader.load(
EncryptionPluginService.class);
for (EncryptionPluginService encryptionPluginService : encryptionPluginServices) {
if (StringUtils.isBlank(encryptionPluginService.algorithmName())) {
LOGGER.warn("[EncryptionPluginManager] Load EncryptionPluginService({}) algorithmName(null/empty) fail."
+ " Please Add algorithmName to resolve.", encryptionPluginService.getClass());
continue;
}
// 放入集合
ENCRYPTION_SPI_MAP.put(encryptionPluginService.algorithmName(), encryptionPluginService);
LOGGER.info("[EncryptionPluginManager] Load EncryptionPluginService({}) algorithmName({}) successfully.",
encryptionPluginService.getClass(), encryptionPluginService.algorithmName());
}
}
因为是单例,所以获取单例的时候会进行初始化。初始化的时候会根据自己写的扩展机制,获取EncryptionPluginService
,然后再进行反射初始化。NacosServiceLoader
就不细看了,之前文章已经分析过了。
重要的还是这种插件化的思想,它仅仅依赖于原生JDK
的SPI
机制,可以按需扩展和定制:
- 提供给插件化的接口,由第三方去实现(自定义功能)
- 在初始化的时候,
Nacos
去加载处理类
public Boolean publishConfig(ConfigForm configForm, ConfigRequestInfo configRequestInfo, String encryptedDataKey)
throws NacosException {
// 配置信息转map
Map<String, Object> configAdvanceInfo = getConfigAdvanceInfo(configForm);
// 检查参数
ParamUtils.checkParam(configAdvanceInfo);
if (AggrWhitelist.isAggrDataId(configForm.getDataId())) {
LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", configRequestInfo.getSrcIp(),
configForm.getDataId(), configForm.getGroup());
throw new NacosApiException(HttpStatus.FORBIDDEN.value(), ErrorCode.INVALID_DATA_ID,
"dataId:" + configForm.getDataId() + " is aggr");
}
final Timestamp time = TimeUtils.getCurrentTime();
ConfigInfo configInfo = new ConfigInfo(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
configForm.getAppName(), configForm.getContent());
configInfo.setType(configForm.getType());
configInfo.setEncryptedDataKey(encryptedDataKey);
if (StringUtils.isBlank(configRequestInfo.getBetaIps())) {
if (StringUtils.isBlank(configForm.getTag())) {
// 分析这段代码就行,默认非beta测试,不加tag
// 插入或者更新配置
configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(),
configInfo, time, configAdvanceInfo, false);
// 发布数据变动事件
ConfigChangePublisher.notifyConfigChange(
new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(),
configForm.getNamespaceId(), time.getTime()));
} else {
// 省略部分代码...
}
} else {
// beta publish
// 省略部分代码...
}
// 日志跟踪
ConfigTraceService.logPersistenceEvent(configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(),
configRequestInfo.getRequestIpApp(), time.getTime(), InetUtils.getSelfIP(),
ConfigTraceService.PERSISTENCE_EVENT_PUB, configForm.getContent());
return true;
}
这里面有一些判断逻辑和分支走向,我们分析下最常用了,即正常发布,没有tag类型。其实就做了两件事
configInfoPersistService.insertOrUpdate(configRequestInfo.getSrcIp(), configForm.getSrcUser(), configInfo, time, configAdvanceInfo, false);
ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, configForm.getDataId(), configForm.getGroup(), configForm.getNamespaceId(), time.getTime()));
数据库操作
数据库操作分为了内置数据库和外置数据库,我们通常使用外置数据mysql
数据库。也就是ExternalConfigInfoPersistServiceImpl
分析。
public void insertOrUpdate(String srcIp, String srcUser, ConfigInfo configInfo, Timestamp time,
Map<String, Object> configAdvanceInfo, boolean notify) {
try {
addConfigInfo(srcIp, srcUser, configInfo, time, configAdvanceInfo, notify);
} catch (DataIntegrityViolationException ive) { // Unique constraint conflict
updateConfigInfo(configInfo, srcIp, srcUser, time, configAdvanceInfo, notify);
}
}
这里没有直接判断是新增还是更新,而且依赖数据库唯一性做检查,重复了就做更新。我们看下新增的方法
public void addConfigInfo(final String srcIp, final String srcUser, final ConfigInfo configInfo,
final Timestamp time, final Map<String, Object> configAdvanceInfo, final boolean notify) {
boolean result = tjt.execute(status -> {
try {
// 自动插入,返回主键id
long configId = addConfigInfoAtomic(-1, srcIp, srcUser, configInfo, time, configAdvanceInfo);
String configTags = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("config_tags");
// 新增tag管理
addConfigTagsRelation(configId, configTags, configInfo.getDataId(), configInfo.getGroup(),
configInfo.getTenant());
// 插入历史数据
historyConfigInfoPersistService.insertConfigHistoryAtomic(0, configInfo, srcIp, srcUser, time, "I");
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
return Boolean.TRUE;
});
}
// 新增方法
public long addConfigInfoAtomic(final long configId, final String srcIp, final String srcUser,
final ConfigInfo configInfo, final Timestamp time, Map<String, Object> configAdvanceInfo) {
final String appNameTmp =
StringUtils.isBlank(configInfo.getAppName()) ? StringUtils.EMPTY : configInfo.getAppName();
final String tenantTmp =
StringUtils.isBlank(configInfo.getTenant()) ? StringUtils.EMPTY : configInfo.getTenant();
final String desc = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("desc");
final String use = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("use");
final String effect = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("effect");
final String type = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("type");
final String schema = configAdvanceInfo == null ? null : (String) configAdvanceInfo.get("schema");
final String encryptedDataKey =
configInfo.getEncryptedDataKey() == null ? StringUtils.EMPTY : configInfo.getEncryptedDataKey();
final String md5Tmp = MD5Utils.md5Hex(configInfo.getContent(), Constants.ENCODE);
KeyHolder keyHolder = new GeneratedKeyHolder();
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
final String sql = configInfoMapper.insert(
Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user",
"gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema",
"encrypted_data_key"));
String[] returnGeneratedKeys = configInfoMapper.getPrimaryKeyGeneratedKeys();
try {
jt.update(new PreparedStatementCreator() {
@Override
public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
PreparedStatement ps = connection.prepareStatement(sql, returnGeneratedKeys);
ps.setString(1, configInfo.getDataId());
ps.setString(2, configInfo.getGroup());
ps.setString(3, tenantTmp);
ps.setString(4, appNameTmp);
ps.setString(5, configInfo.getContent());
ps.setString(6, md5Tmp);
ps.setString(7, srcIp);
ps.setString(8, srcUser);
ps.setTimestamp(9, time);
ps.setTimestamp(10, time);
ps.setString(11, desc);
ps.setString(12, use);
ps.setString(13, effect);
ps.setString(14, type);
ps.setString(15, schema);
ps.setString(16, encryptedDataKey);
return ps;
}
}, keyHolder);
Number nu = keyHolder.getKey();
if (nu == null) {
throw new IllegalArgumentException("insert config_info fail");
}
return nu.longValue();
} catch (CannotGetJdbcConnectionException e) {
LogUtil.FATAL_LOG.error("[db-error] " + e, e);
throw e;
}
}
这个方法似乎就是jdbcTemplate
执行一句sql
。但是这应该只是一种数据库的新增,如果换了库应该怎么处理呢,重新写一套?
这里提供的解决方案依然是通过插件化的形式去处理。这里面有两句很重要的代码:
// 根据数据库类型找到处理的mapper类
ConfigInfoMapper configInfoMapper = mapperManager.findMapper(dataSourceService.getDataSourceType(),
TableConstant.CONFIG_INFO);
// 将参数转换成对应数据库类型的sql语句
final String sql = configInfoMapper.insert(
Arrays.asList("data_id", "group_id", "tenant_id", "app_name", "content", "md5", "src_ip", "src_user",
"gmt_create", "gmt_modified", "c_desc", "c_use", "effect", "type", "c_schema",
"encrypted_data_key"));
我们先看第一行代码怎么处理的
public <R extends Mapper> R findMapper(String dataSource, String tableName) {
LOGGER.info("[MapperManager] findMapper dataSource: {}, tableName: {}", dataSource, tableName);
if (StringUtils.isBlank(dataSource) || StringUtils.isBlank(tableName)) {
throw new NacosRuntimeException(FIND_DATASOURCE_ERROR_CODE, "dataSource or tableName is null");
}
// 从SPI缓存中获取
Map<String, Mapper> tableMapper = MAPPER_SPI_MAP.get(dataSource);
if (Objects.isNull(tableMapper)) {
throw new NacosRuntimeException(FIND_DATASOURCE_ERROR_CODE,
"[MapperManager] Failed to find the datasource,dataSource:" + dataSource);
}
Mapper mapper = tableMapper.get(tableName);
if (Objects.isNull(mapper)) {
throw new NacosRuntimeException(FIND_TABLE_ERROR_CODE,
"[MapperManager] Failed to find the table ,tableName:" + tableName);
}
if (dataSourceLogEnable) {
MapperProxy mapperProxy = new MapperProxy();
return (R) mapperProxy.createProxy(mapper);
}
return (R) mapper;
}
而这个MAPPER_SPI_MAP
初始化也和之前EncryptionPluginService
的一样。在单例的构造方法中加载
private MapperManager() {
loadInitial();
}
public void loadInitial() {
Collection<Mapper> mappers = NacosServiceLoader.load(Mapper.class);
for (Mapper mapper : mappers) {
Map<String, Mapper> mapperMap = MAPPER_SPI_MAP.getOrDefault(mapper.getDataSource(), new HashMap<>(16));
mapperMap.put(mapper.getTableName(), mapper);
MAPPER_SPI_MAP.put(mapper.getDataSource(), mapperMap);
LOGGER.info("[MapperManager] Load Mapper({}) datasource({}) tableName({}) successfully.",
mapper.getClass(), mapper.getDataSource(), mapper.getTableName());
}
}
我们也可以看到Nacos
源码加载的Mapper
插件。
第二行的configInfoMapper.insert
就可以根据插件的扩展,通过不同的实现类去处理了。这样就能完美的解决数据库类型中差异化sql
语句。
总结
本篇重点介绍了两个通过Nacos
插件化的例子。分析了配置中心的配置信息实际在数据库中也存储了一份。只是数据库可以使用内置数据库,也可以使用外置数据库。本篇还留了一部分内容没去分析,就是ConfigChangePublisher.notifyConfigChange
部分的内容,这个内容有很多,也很重要。我们留到下篇去说明,敬请期待。
转载自:https://juejin.cn/post/7215151665931124796