Seata源码(一)初始化
相关阅读:
1. 核心操作
1.初始化GlobalTransactionScanner
2.GlobalTransactionScanner识别启用全局事务的类,添加不同拦截器
3.初始化TM客户端和RM客户端
4.添加destroy钩子
2. 核心类结构
3. 自动装配
通过spring的SPI机制,自动实例化GlobalTransactionScanner。
引入不同的库使用的自动装配类不同,
groupId | artifactId | 规范 |
---|---|---|
io.seata | seata-spring-boot-starter | SeataAutoConfiguration |
com.alibaba.cloud | spring-cloud-alibaba-seata | GlobalTransactionAutoConfiguration |
3.1 seata-spring-boot-starter
3.1.1 入口
SeataAutoConfiguration.java
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class SeataAutoConfiguration {
private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class);
@Bean(BEAN_NAME_FAILURE_HANDLER)
@ConditionalOnMissingBean(FailureHandler.class)
public FailureHandler failureHandler() {
return new DefaultFailureHandlerImpl();
}
@Bean
@DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER})
@ConditionalOnMissingBean(GlobalTransactionScanner.class)
public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Automatically configure Seata");
}
return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
}
}
3.1.2 SPI配置
在seata-spring-boot-starter模块的spring.factories文件中配置了SeataAutoConfiguration类使能自动装配。
# Auto Configure
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.seata.spring.boot.autoconfigure.SeataPropertiesAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\
io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\
io.seata.spring.boot.autoconfigure.HttpAutoConfiguration
3.1.3 SeataProperties参数初始化
GlobalTransactionScanner的构造器参数seataProperties:
GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler);
SeataPropertiesAutoConfiguration.java
AutoConfigureBefore注解配置在SeataAutoConfiguration之前完成初始化。
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
@ComponentScan(basePackages = "io.seata.spring.boot.autoconfigure.properties")
@AutoConfigureBefore({SeataAutoConfiguration.class, SeataDataSourceAutoConfiguration.class})
public class SeataPropertiesAutoConfiguration {
static {
PROPERTY_BEAN_MAP.put(SEATA_PREFIX, SeataProperties.class);
3.2 spring-cloud-alibaba-seata
3.2.1 入口
GlobalTransactionAutoConfiguration.java
一步到位把SeataProperties也初始化了,但是少了FailureHandler的初始化:
@Configuration
@EnableConfigurationProperties({SeataProperties.class})
public class GlobalTransactionAutoConfiguration {
private final ApplicationContext applicationContext;
private final SeataProperties seataProperties;
public GlobalTransactionAutoConfiguration(ApplicationContext applicationContext, SeataProperties seataProperties) {
this.applicationContext = applicationContext;
this.seataProperties = seataProperties;
}
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
String applicationName = this.applicationContext.getEnvironment().getProperty("spring.application.name");
String txServiceGroup = this.seataProperties.getTxServiceGroup();
if (StringUtils.isEmpty(txServiceGroup)) {
txServiceGroup = applicationName + "-fescar-service-group";
this.seataProperties.setTxServiceGroup(txServiceGroup);
}
return new GlobalTransactionScanner(applicationName, txServiceGroup);
}
}
3.2.2 SPI配置
在spring-cloud-alibaba-seata模块的spring.factories文件中配置了GlobalTransactionAutoConfiguration类使能自动装配。
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.alibaba.cloud.seata.rest.SeataRestTemplateAutoConfiguration,\
com.alibaba.cloud.seata.web.SeataHandlerInterceptorConfiguration,\
com.alibaba.cloud.seata.GlobalTransactionAutoConfiguration,\
com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration,\
com.alibaba.cloud.seata.feign.hystrix.SeataHystrixAutoConfiguration
4. 核心类GlobalTransactionScanner
主要作用:
1.实现InitializingBean接口完成bean初始化操作
2.实现ConfigurationChangeListener接口监听seata配置参数变化并做处理
3.实现DisposableBean接口做destroy处理
4.实现ApplicationContextAware接口用户注入ApplicationContext
5.继承AbstractAutoProxyCreator类,重载其方法,添加事务处理拦截器
4.1 初始化操作
1.注册参数变更监听器
2.初始化TM和RM
@Override
public void afterPropertiesSet() {
// 来自service.disableGlobalTransaction配置参数
if (disableGlobalTransaction) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Global transaction is disabled.");
}
// 添加service.disableGlobalTransaction参数变更监听器,也就是它自己,如果参数变化则会动态初始化TM客户端和RM客户端
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)this);
// 如果service.disableGlobalTransaction=false则不会初始化TM客户端和RM客户端
return;
}
// 没有初始化则初始化(TM客户端和RM客户端)
if (initialized.compareAndSet(false, true)) {
initClient();
}
}
4.2 监听参数变化
监听参数为不启用全局事务变化到启用全局事务,变化后初始化RM和TM。
@Override
public void onChangeEvent(ConfigurationChangeEvent event) {
if (ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION.equals(event.getDataId())) {
disableGlobalTransaction = Boolean.parseBoolean(event.getNewValue().trim());
// 监听service.disableGlobalTransaction参数配置,如果启用全局事务并且没有做过初始化则做初始化
if (!disableGlobalTransaction && initialized.compareAndSet(false, true)) {
LOGGER.info("{} config changed, old value:{}, new value:{}", ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
disableGlobalTransaction, event.getNewValue());
initClient();
}
}
}
4.3 添加拦截器
4.3.1 拦截器实例化
根据条件添加TCC模式和AT模式拦截器:
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
// 监听配置参数service.disableGlobalTransaction的变化
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
4.3.2 判断事务注解
GlobalTransactional支持类和方法级,GlobalLock仅支持方法级。
private boolean existsAnnotation(Class<?>[] classes) {
if (CollectionUtils.isNotEmpty(classes)) {
for (Class<?> clazz : classes) {
if (clazz == null) {
continue;
}
// 类上的GlobalTransactional注解
GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
// 方法上GlobalTransactional注解
trxAnno = method.getAnnotation(GlobalTransactional.class);
if (trxAnno != null) {
return true;
}
// 方法上GlobalLock注解
GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);
if (lockAnno != null) {
return true;
}
}
}
}
return false;
}
4.3.3 返回拦截器
实现AbstractAutoProxyCreator类的方法返回拦截器:
@Override
protected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource)
throws BeansException {
return new Object[]{interceptor};
}
4.4 添加destory操作
4.4.1 实现DisposableBean接口
实现DisposableBean接口,调用ShutdownHook做destory操作:
@Override
public void destroy() {
// 统一委托给ShutdownHook做destory,虽然它自己也可以做
ShutdownHook.getInstance().destroyAll();
}
4.4.2 注册shutdownhook
在init时注册shutdownhook
private void registerSpringShutdownHook() {
if (applicationContext instanceof ConfigurableApplicationContext) {
((ConfigurableApplicationContext) applicationContext).registerShutdownHook();
// 这里移除了ShutdownHook注册的java虚拟机的shutdownhook
ShutdownHook.removeRuntimeShutdownHook();
}
// TM客户端和RM客户端的父类实现了Disposable的destory方法,ShutdownHook遍历调用所有Disposable实现类的destory方法
ShutdownHook.getInstance().addDisposable(TmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
ShutdownHook.getInstance().addDisposable(RmNettyRemotingClient.getInstance(applicationId, txServiceGroup));
}
end.
<--阅过留痕,左边点赞! 
转载自:https://juejin.cn/post/7288597387799658530