深入理解Spring(五)、事务源码分析
前言
深入理解Spring源码分为7小节,本小节为Spring源码第五小节,各个小节目录如下。
- 扫描过程
- bean创建过程
- 容器扩展
- AOP源码分析
5. 事务源码分析
- Spring JDBC源码分析
- Spring常用设计模式
本章来分析一下在使用JdbcTemplate时,加入@Transactional注解为何能实现自动回滚、自动提交,这章是建立在第四小节上的,因为核心实现依然是使用AOP。
这章应该和第六章一起说,但最终还是决定把他们分开,首先了解下JdbcTemplate,JdbcTemplate是Spring提供的一个类,用来执行SQL语句,他需要依赖DataSource,DataSource表示数据源,他是个接口,一般使用他的实现类DriverManagerDataSource,他包含的信息是数据库连接所需要的信息,要完成简单的CRUD,这两个就可以完成了,但是在使用事务的时候,还需要加入TransactionManager,这三个类,对于使用了@Transactional的情况缺一不可。
通常会我们会这样使用。
@Configuration
@EnableTransactionManagement
public class TransactionConfig {
@Bean
public DataSource dataSource() {
DriverManagerDataSource driverManagerDataSource = new DriverManagerDataSource("jdbc:mysql://localhost:3306/test");
driverManagerDataSource.setUsername("root");
driverManagerDataSource.setPassword("hxl..");
driverManagerDataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
return driverManagerDataSource;
}
@Bean
public DataSourceTransactionManager dataSourceTransactionManager(DataSource dataSource) {
DataSourceTransactionManager dataSourceTransactionManager = new DataSourceTransactionManager(dataSource);
return dataSourceTransactionManager;
}
@Bean
public JdbcTemplate jdbcTemplate() {
return new JdbcTemplate(dataSource());
}
}
这其中还需要一个重要的东西ThreadLocal,这也是实现commit、rollback的关键,因为如果想rollback,最起码是在同一个Connection下把,举个例子,下面代码想实现rollback,那么Spring肯定在上层捕获了异常,接着会调用Connection的rollback()方法,但问题是上层代码是怎么知道下层的Connection的呢?
@Transactional()
public void test() {
jdbcTemplate.update("UPDATE test set name =111 where id =1");
throw new RuntimeException();
}
这里明确的说,并不是通过参数传递的,而是通过ThreadLocal,Spring在进入目标方法前,自己会创建Connection,并放入ThreadLocal,下层代码中的jdbcTemplate对象会从ThreadLocal里面拿这个Connection,如果发现没有,那么自己会创建。那么上面那句疑问句就可以反过来说,是下层代码知道上层代码所创建的Connection。但也可以反过来,因为在两个地方创建Connection,都会绑定到ThreadLocal中,正确的流程是上层创建。
用于管理ThreadLocal的是TransactionSynchronizationManager,可以通过下面代码获取,resourceMap就保存着当前Connection,他的key是上面我们向容器添加的数据源(DataSource)。
@Transactional(noRollbackForClassName = {"org.springframework.test.TestException"})
public void print() {
jdbcTemplate.update("UPDATE test set name =111 where id =1");
Map<Object, Object> resourceMap = TransactionSynchronizationManager.getResourceMap();
throw new RuntimeException();
}
@EnableTransactionManagement
我们正式开始分析源码,首先是@EnableTransactionManagement注解,他会向容器Import一个TransactionManagementConfigurationSelector类,他继承ImportSelector,所以他可能还会向容器导入多个bean,具体实现可以参考第二章。
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
}
在这里一共会导入两个,ProxyTransactionManagementConfiguration、InfrastructureAdvisorAutoProxyCreator。
但ProxyTransactionManagementConfiguration还定义了三个关键bean,也会添加到容器,如下。
BeanFactoryTransactionAttributeSourceAdvisor
TransactionAttributeSource
TransactionInterceptor
第一个类是通知器,包含下面两个对象,而TransactionInterceptor是方法拦截,所有标有@Transactional注解的方法会先进入到这里,开启事务、创建Connection都在这个拦截下。
TransactionAttributeSource用来获取方法上的@Transactional信息
InfrastructureAdvisorAutoProxyCreator
InfrastructureAdvisorAutoProxyCreator实现了BeanPostProcessor,主要用于对标有@Transactional的方法所在的类生成代理对象。
@Override
public Object postProcessAfterInitialization(@Nullable Object bean, String beanName) {
if (bean != null) {
Object cacheKey = getCacheKey(bean.getClass(), beanName);
if (this.earlyProxyReferences.remove(cacheKey) != bean) {
return wrapIfNecessary(bean, beanName, cacheKey);//创建代理对象,如果这个bean有被代理的资格
}
}
return bean;
}
这里所说的代理资格,是判断这个类上是否标有@Transactional注解,或者类中方法是否有@Transactional注解,首先同上一章一样,会获取所有Advisor的实现,恰好的是,在上面通过@Import导入了一个Advisor的实现类BeanFactoryTransactionAttributeSourceAdvisor,这里单独演示下他的作用。
这段代码用来判断UserTest的test方法上有没有@Transactional注解,或者类上,有的话就返回true,用于是否为这个对象生成代理,这里面的逻辑实现也就是可以把@Transactional标在方法上、类上、接口上的关键。
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
advisor.setTransactionAttributeSource(new AnnotationTransactionAttributeSource());
MethodMatcher methodMatcher = advisor.getPointcut().getMethodMatcher();
boolean test = methodMatcher.matches(UserTest.class.getDeclaredMethod("test"), UserTest.class);
System.out.println(test);
具体逻辑就不看了,很简单。
TransactionInterceptor(事务拦截)
在来看TransactionInterceptor,标有@Transactional的方法会进入到这里,具体实现在他的invokeWithinTransaction()方法下。
首先判断事务管理器是否是ReactiveTransactionManager的子类,一般情况下都不是,这个事务管理器被称为反应式,不太了解。
这个事务管理器是我们手动向容器添加的,可以自行实现,一般使用DataSourceTransactionManager。
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
}
然后会进入到这里,创建事务有关信息。
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
其他我们就不分析了,主要分析关键点,也就是创建Connection并且设置不自动提交,并绑定Connection到ThreadLocal,为JdbcTemplate做准备,由下面的doBegin完成。
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
/**
* 如果Connection不存在,创建新的Connection
*/
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
/**
* 设置不自动提交
*/
con.setAutoCommit(false);
}
prepareTransactionalConnection(con, definition);
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
/**
* 在ThreadLocal中绑定这个Connection
*/
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
}
}
调用原方法
接下来的这句用于调用我们所写的方法。
Object retVal;
try {
//调用我们的方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
//抛出异常后试着回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
假如我们执行了这段sql。
jdbcTemplate.update("UPDATE test set name =111 where id =1");
那么想都不用想,首先肯定是获取Connection,会尝试先从TransactionSynchronizationManager中获取,因为前面已经创建了Connection,所以这里直接返回,否则创建新的连接。
public static Connection doGetConnection(DataSource dataSource) throws SQLException {
ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(dataSource);
if (conHolder != null && (conHolder.hasConnection() || conHolder.isSynchronizedWithTransaction())) {
conHolder.requested();
if (!conHolder.hasConnection()) {
conHolder.setConnection(fetchConnection(dataSource));
}
return conHolder.getConnection();
}
/**
*新的连接
*/
Connection con = fetchConnection(dataSource);
return con;
}
尝试回滚
调用我们的方法时被包裹在try中,如果我们的方法抛出异常,那么会进入下面这个方法进行回滚,但也不是任何异常都会回滚,回不回滚取决于你配置的信息。
completeTransactionAfterThrowing(txInfo, ex);
回到Transactional注解上,有这么两个值。
@Target({ElementType.TYPE, ElementType
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Transactional {
Class<? extends Throwable>[] rollbackFor() default {};
Class<? extends Throwable>[] noRollbackFor() default {};
}
rollbackFor用于异常是这类的时候进行回滚
noRollbackFor用于异常不是这类的时候进行回滚。
默认情况下,也就是都不配置,回滚的条件如下,即是RuntimeException的子类或者Error的子类。
return (ex instanceof RuntimeException || ex instanceof Error);
提交
最后进行commit,完成一套流程
protected void commitTrajavansactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
事务是个复杂的过程,除了回滚、提交,还有其他的东西要管理,比如传播行为,传播行为是当事务方法被另一个事务方法调用时,是在现有事务中运行,还是能开启一个新事务,并在自己的事务中运行,Spring定义了七种传播行为,默认是Propagation.REQUIRED,表示在当前事务中运行,这些信息以后的篇幅中详细说。
转载自:https://juejin.cn/post/7062986347048337444