图解+源码讲解 ShardingSphere 为什么不支持 Nacos
图解+源码讲解 ShardingSphere 为什么不支持 Nacos
如果把生活比喻为创作的意境,那么阅读就像阳光 —— 池莉
明文规定
邮件明文规定,下面是负责 ShardingSphere 团队给出明确表示,所以ShardingSphere 在4.x版本的时候支持的 Nacos 只是单独的配置作用,并没有起到监听的效果,也就是说配置中心的配置进行改变之后,并不会进行动态的更新,服务本地是没有任何效果的已经发邮件明文规定不支持 Nacos和Apollo 这两种配置中心了,现在只支持 Zookeeper 和 etcd 两种配置中心的方式,官网也明确表明了其他的配置中心可以通过 spi 的方式进行接入了
从源码分析启动流程初始化配置
1. 获取配置中心配置流程
OrchestrationShardingDataSource,进入了治理分片数据源方法,这个方法进行了几乎所有信息的加载流程,比如配置监听器,获取数据源、获取分片治理规则、获取属性等等
public OrchestrationShardingDataSource(final OrchestrationConfiguration orchestrationConfig) throws SQLException {
// ShardingConstant.LOGIC_SCHEMA_NAME 的值是 logic_db
super(new ShardingOrchestrationFacade(orchestrationConfig, Collections.singletonList(ShardingConstant.LOGIC_SCHEMA_NAME)));
// 获取上面创建好的配置中心
ConfigurationService configService = getShardingOrchestrationFacade().getConfigService();
// 获取主从规则
MasterSlaveRuleConfiguration masterSlaveRuleConfig = configService.loadMasterSlaveRuleConfiguration(ShardingConstant.LOGIC_SCHEMA_NAME);
Preconditions.checkState(!Strings.isNullOrEmpty(masterSlaveRuleConfig.getMasterDataSourceName()), "No available master slave rule configuration to load.");
dataSource = new MasterSlaveDataSource(DataSourceConverter.getDataSourceMap(configService.loadDataSourceConfigurations(ShardingConstant.LOGIC_SCHEMA_NAME)),
new OrchestrationMasterSlaveRule(masterSlaveRuleConfig), configService.loadProperties());
initShardingOrchestrationFacade();
}
spring.shardingsphere.orchestration.name=demo_spring_boot_ds_ms
spring.shardingsphere.orchestration.overwrite=false
spring.shardingsphere.orchestration.registry.type=nacos
spring.shardingsphere.orchestration.registry.server-lists=172.16.117.152:8848
spring.shardingsphere.orchestration.registry.namespace=public
spring.shardingsphere.orchestration.registry.props.group=DEFAULT_GROUP
spring.shardingsphere.orchestration.registry.props.timeout=30000
orchestrationConfig 这些都是我们在配置文件中配置的消息
2. 创建分片治理 ShardingOrchestrationFacade
public ShardingOrchestrationFacade(final OrchestrationConfiguration orchestrationConfig, final Collection<String> shardingSchemaNames) {
// 创建注册中心实例对象
regCenter = new RegistryCenterServiceLoader().load(orchestrationConfig.getRegCenterConfig());
// 本地是否覆盖注册中心的配置 false 就是不覆盖
isOverwrite = orchestrationConfig.isOverwrite();// false
// 创建了一个配置服务,名字是 demo_spring_boot_ds_ms,注册中心就是上面创建好的
configService = new ConfigurationService(orchestrationConfig.getName(), regCenter);
// 创建了一个状态服务
stateService = new StateService(orchestrationConfig.getName(), regCenter);
// 创建一个分片治理监听管理器
listenerManager = shardingSchemaNames.isEmpty() ?
new ShardingOrchestrationListenerManager(orchestrationConfig.getName(),
regCenter, configService.getAllShardingSchemaNames())
: new ShardingOrchestrationListenerManager(orchestrationConfig.getName(),
regCenter, shardingSchemaNames);
}
3. 创建配置中心实例对象 Nacos
通过反射创建的 Nacos 访问对象 Class.forName("com.alibaba.nacos.client.config.NacosConfigService")
@Override
public void init(final RegistryCenterConfiguration config) {
try {
Properties properties = new Properties();
// config.getServerLists() 172.16.117.152:8848 单机的
properties.put("serverAddr", config.getServerLists());
properties.put("namespace", null == config.getNamespace() ? "" :
config.getNamespace());
configService = NacosFactory.createConfigService(properties);
} catch (final NacosException ex) {
log.debug("exception for: {}", ex.toString());
}
}
4. 创建分片治理监听管理器
创建了配置监听管理器和状态监听管理器,我们先看一下配置监听管理器
public ShardingOrchestrationListenerManager(final String name,
final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
// 配置监听管理器
configurationChangedListenerManager =
new ConfigurationChangedListenerManager(name, regCenter, shardingSchemaNames);
// 状态监听管理器
stateChangedListenerManager = new StateChangedListenerManager(name, regCenter);
}
5. 配置改变监听管理器
public ConfigurationChangedListenerManager(final String name, final RegistryCenter regCenter, final Collection<String> shardingSchemaNames) {
// schema改变监听器
schemaChangedListener = new SchemaChangedListener(name, regCenter, shardingSchemaNames);
// 属性改变监听器
propertiesChangedListener = new PropertiesChangedListener(name, regCenter);
authenticationChangedListener = new AuthenticationChangedListener(name, regCenter);
}
1. schema 监听器
new ConfigurationNode(name).getSchemaPath() 这个值是 demo_spring_boot_ds_ms/config/schema这个值是监听的 key
public SchemaChangedListener(final String name, final RegistryCenter regCenter,
final Collection<String> shardingSchemaNames) {
// demo_spring_boot_ds_ms/config/schema
super(regCenter, new ConfigurationNode(name).getSchemaPath());
configurationService = new ConfigurationService(name, regCenter);
configurationNode = new ConfigurationNode(name);
existedSchemaNames.addAll(shardingSchemaNames);
}
// spuer 里面的方法
@ConstructorProperties({"regCenter", "watchKey"})
public PostShardingOrchestrationEventListener(RegistryCenter regCenter, String watchKey) {
this.regCenter = regCenter;
this.watchKey = watchKey;
}
2. properties 监听器
new ConfigurationNode(name).getPropsPath() 这个值是 /demo_spring_boot_ds_ms/config/props这个值监听属性配置的 key
public PropertiesChangedListener(final String name, final RegistryCenter regCenter) {
// demo_spring_boot_ds_ms/config/props,
super(regCenter, new ConfigurationNode(name).getPropsPath());
}
// spuer 里面的方法
@ConstructorProperties({"regCenter", "watchKey"})
public PostShardingOrchestrationEventListener(RegistryCenter regCenter, String watchKey) {
this.regCenter = regCenter;
this.watchKey = watchKey;
}
6. 状态改变监听管理器
实例状态改变监听器和数据源状态改变监听器
public StateChangedListenerManager(final String name, final RegistryCenter regCenter) {
// 实例状态改变监听器
instanceStateChangedListener = new InstanceStateChangedListener(name, regCenter);
// 数据源状态改变监听器
dataSourceStateChangedListener = new DataSourceStateChangedListener(name, regCenter);
}
1. 实例状态改变监听器
newStateNode(name).getInstancesNodeFullPath(OrchestrationInstance.getInstance().getInstanceId()) 这个值是 demo_spring_boot_ds_ms/state/instances/192.168.60.1@740140@4b384e1d-2d95-42bf-b597-6fc458a6eb71这个值监听属性配置的 key
public InstanceStateChangedListener(final String name, final RegistryCenter regCenter) {
// demo_spring_boot_ds_ms/state/
// instances/192.168.60.1@740140@4b384e1d-2d95-42bf-b597-6fc458a6eb71
super(regCenter, new StateNode(name).
getInstancesNodeFullPath(OrchestrationInstance.getInstance().getInstanceId()));
}
2. 数据源状态改变监听器
new StateNode(name).getDataSourcesNodeFullRootPath() 这个值是 demo_spring_boot_ds_ms/state/ datasources这个值监听属性配置的 key
public DataSourceStateChangedListener(final String name,
final RegistryCenter regCenter) {
//demo_spring_boot_ds_ms/state/datasources
super(regCenter, new StateNode(name).getDataSourcesNodeFullRootPath());
stateNode = new StateNode(name);
}
7. 获取主从规则
public MasterSlaveRuleConfiguration loadMasterSlaveRuleConfiguration(final String shardingSchemaName) {
return new MasterSlaveRuleConfigurationYamlSwapper().
swap(
YamlEngine.unmarshal(
regCenter.getDirectly(configNode.getRulePath(shardingSchemaName)),
YamlMasterSlaveRuleConfiguration.class));
}
1. 从配置中心获取配置好的规则
利用 nacos 客户端通过dataId 和group 进行配置获取,此时获取的是主从规则
@Override
// key = /demo_spring_boot_ds_ms/config/schema/logic_db/rule
public String getDirectly(final String key) {
try {
// dataId = .demo_spring_boot_ds_ms.config.schema.logic_db.rule
String dataId = key.replace("/", ".");
String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
long timeoutMs = Long.parseLong(properties.getProperty("timeout", "3000"));
return configService.getConfig(dataId, group, timeoutMs);
} catch (final NacosException ex) {
log.debug("exception for: {}", ex.toString());
return null;
}
}
1. 获取结果
8. 获取配置中心的数据源信息
方法还是和之前的那个获取规则的方法是一样的不过就是key是有变化的,变成了.demo_spring_boot_ds_ms /config/schema/logic_db/datasource 这个样子的
@Override
// /demo_spring_boot_ds_ms/config/schema/logic_db/datasource
public String getDirectly(final String key) {
try {
String dataId = key.replace("/", ".");
String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
long timeoutMs = Long.parseLong(properties.getProperty("timeout", "3000"));
return configService.getConfig(dataId, group, timeoutMs);
} catch (final NacosException ex) {
log.debug("exception for: {}", ex.toString());
return null;
}
}
9. 获取配置中心的属性信息
和上面获取主从规则,数据源信息一样的,不过key就变成了 .demo_spring_boot_ds_ms.config.props
@Override
// /demo_spring_boot_ds_ms/config/schema/logic_db/datasource
public String getDirectly(final String key) {
try {
String dataId = key.replace("/", ".");
String group = properties.getProperty("group", "SHARDING_SPHERE_DEFAULT_GROUP");
long timeoutMs = Long.parseLong(properties.getProperty("timeout", "3000"));
return configService.getConfig(dataId, group, timeoutMs);
} catch (final NacosException ex) {
log.debug("exception for: {}", ex.toString());
return null;
}
}
10. 初始化 initShardingOrchestrationFacade()
这里面主要关注的就是进行监听器的初始化了,进行具体的配置中心监听器设置,设置更新和删除方法,这里面的更新包括新增
protected final void initShardingOrchestrationFacade() {
shardingOrchestrationFacade.init();
......
}
public void initListeners() {
configurationChangedListenerManager.initListeners();
......
}
/**
* Initialize all configuration changed listeners.
* configurationChangedListenerManager
*/
public void initListeners() {
// 进行具体的配置中心监听器设置,设置更新和删除方法,这里面的更新包括新增
schemaChangedListener.watch(ChangedType.UPDATED, ChangedType.DELETED);
propertiesChangedListener.watch(ChangedType.UPDATED);
authenticationChangedListener.watch(ChangedType.UPDATED);
}
为什么不支持 Nacos
1. 配置中心的配置是
2. 监听器中监听的key是
demo_spring_boot_ds_ms/config/schema
3. 为什么监听不到
因为根本就没有对上面配置的dataId进行监听所以根本就不支持动态监听,不过官网也确实说了不支持 我们配置中心配置的逻辑是 为什么要这么配置是因为在获取配置中心配置的时候指定的路径是这个样子的private static final String RULE_NODE = "rule"; 监听器配置的是这个 demo_spring_boot_ds_ms/config/schema所以监听器配置的和配置中心配置的不一样所以监听不到
转载自:https://juejin.cn/post/7087065807389196325