Spring 事务源码执行流程
引用:developer.aliyun.com/article/389…
Spring 事务源码中文注释:在gitee仓库的:gitee.com/old_yogurt/… 的 spring-tx 工程中做了。
在
spring-tx
工程中,每个包中的package-info
都对一个包按类的类型、功能做的划分总结。注解方式demo:在 a-transactional-learning 工程中的
com.transactional.annotation
包中。配置文件方式demo:在 a-transactional-learning 工程中的
com.transactional.xml
包中。
Spring 事务源码执行流程
一、Spring管理事务的由来
传统的Jdbc事务我们需要获取Jdbc连接,然后将事务自动提交关闭,然后创建事务,手动commit事务。
下面是伪代码:
try{
con.setAutoCommit(false);
PreparedStatement ps = con.prepareStatement("insert into money(account,money) value('1001002',100)");
ps.execute();
con.commit(); // 提交事务
}catch(SQLException e){
con.rollback(); // 回滚事务
}
这样就使得,我们需要自己来将Jdbc连接自动提交关闭,然后手动commit,并且需要我们对异常处理,对事务进行回滚。
这里就有两个问题:
- 1、业务代码都要嵌套在try catch模板代码中 ;
- 2、接触了底层Connection的事务功能,当使用其他一些框架时,我们更多的不会直接与Connection打交道,如:使用Mybatis时,就不容易获取Connection。
其实就和 Mybatis 这样的ORM框架一样,封装了对数据库底层的操作,使得程序员直接编写业务SQL映射即可;至于数据库连接和JDBC源和数据库打交道的部分交由Mybatis处理。
所以,针对事务的处理,Spring 编写了 spring-tx
工程,针对事务的处理做的封装,使得我们程序员在开发的过程中,只用关注业务逻辑的编写,而无需关心Spring对于事务的处理。
在 spring-tx
工程中:
-
针对问题一:
- 1、将事务处理部分的公共代码封装成模板以便复用。如同
JdbcTemplate对jdbc
操作的模板代码的封装,这便引出了下文TransactionTemplate
的模板代码。- 说实话,如果我们使用普通代码并没有走
TransactionTemplate
类,而是走:事务拦截器 TransactionInterceptor 的org.springframework.transaction.interceptor.TransactionInterceptor#invoke
方法里面的 invokeWithinTransaction()这一步;下面我们也是走该 事务拦截器,因为 事务拦截器实现的MethodInterceptor
会被spring-aop
所拦截。
- 说实话,如果我们使用普通代码并没有走
- 2、然后,真正将业务业务代码和事务代码完全分离的,还是利用的
spring aop
技术,通过AOP代理将事务逻辑植入业务逻辑中,这样就可以做到分离。
- 1、将事务处理部分的公共代码封装成模板以便复用。如同
-
针对问题二:
-
对于和JDBC的Connection打交道,那就需要一个统一的接口来完成事务的提交和回滚等功能,即接口
PlatformTransactionManager
。然后该接口下面有子类,对应了不同的管理器类型,如:
- 如果是使用
jdbc
,则使用DataSourceTransactionManager
子类来完成事务的提交和回滚;(后面的源码流程,我们就用的这个管理器进行分析) - 若果是使用
hibernate
,则使用HibernateTransactionManager
来完成事务的提交和回滚。
- 如果是使用
-
二、注册事务解析器
实例代码:
本人的 gitee 仓库的:gitee.com/old_yogurt/… 中:
注解方式demo:在 a-transactional-learning 工程中的
com.transactional.annotation
包中。配置文件方式demo:在 a-transactional-learning 工程中的
com.transactional.annotation
包中。
2.1 注解方式解析器注册
首先,我们知道,注解方式使用的注解:@EnableTransactionManagement
跟踪注解我们可以找到的:
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
/**
* 其实这里意思是:
* 在我们配置 @EnableTransactionManagement注解的时候,
* 配置属性 mode,有两个枚举:PROXY、ASPECTJ
* 我们选择事务拦截器的处理方式是使用 JDK代理方式(PROXY),还是使用ASPECTJ方式(ASPECTJ);
* 下面这个方法,就是根据不同的枚举注册相应的配置类。
*/
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
// JDK代理,注册 AutoProxyRegistrar,ProxyTransactionManagementConfiguration
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
//
case ASPECTJ:
return new String[] {
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
default:
return null;
}
}
}
首先,该类继承了 AdviceModeImportSelector
类,这个类的 <>
中对应的泛型,必须是注解类型,EnableTransactionManagement
符合;
我们从该类中的唯一方法,往上找调用该方法的类
TransactionManagementConfigurationSelector#selectImports
->被 AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata) 调用
-> 被 org.springframework.context.annotation.ConfigurationClassParser#processImports 调用
-> 其实来到ConfigurationClassParser类,我们就应该知道,该类是注解配置类(@Configuration)的解析类
以 PROXY
方式为例,我们可以看到,这两个里面的 case
返回的数据是 对应类的名字:
注册的:AutoProxyRegistrar
、ProxyTransactionManagementConfiguration
-
AutoProxyRegistrar
- 如果看过AOP源码的比较熟悉,该类中的唯一一个方法
org.springframework.context.annotation.AutoProxyRegistrar#registerBeanDefinitions
又一步AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
最终注册还是AnnotationAwareAspectJAutoProxyCreator
类,就是AOP的APC类型的类,并且将对应的代理类型属性设置进去。
- 如果看过AOP源码的比较熟悉,该类中的唯一一个方法
-
ProxyTransactionManagementConfiguration
- 代理方式的事务管理器配置类;
- 进入该方法,我们可以看到,该类直接以 注解的方式,往
Spring IOC
中注册事务基础类,如:- BeanFactoryTransactionAttributeSourceAdvisor:事务通知
- AnnotationTransactionAttributeSource:注册 注解方式的 事务源信息
- TransactionInterceptor:事务拦截器;就是我们前面说的问题一;
- 该类还是
AbstractTransactionManagementConfiguration
的子类,子类注册,父类也会注册:- 在该父类中会注册:
- 我们最终要的 事务管理器
PlatformTransactionManager
,就是我们前面说的问题二; - 事务监听器:TransactionalEventListenerFactory
- 我们最终要的 事务管理器
- 在该父类中会注册:
就这么简单,这里我们只需要关注,我们在这里注册了那些类就行。
2.2 配置文件方式解析器注册
我们可以从配置文件中的 transaction-manager
节点出发找注册类:
/*
* NamespaceHandler允许使用XML或注解配置声明性事务管理。
* 这个名称空间处理程序是Spring事务管理工具的核心功能,它提供了两种声明式管理事务的方法。
* 一种方法使用XML中使用<tx:advice>元素定义的事务语义,另一种方法将注解与<tx:annotation-driven>元素结合使用。
* 这两种方法在Spring参考手册中有很大程度的详细说明。
*/
public class TxNamespaceHandler extends NamespaceHandlerSupport {
/**
* 这里只是注册下面三种节点的解析器
*/
@Override
public void init() {
// <tx:advice>标签解析流程
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
// 这个也是注解方式
// 只是说,我们在业务代码编写事务流程,然后再在xml中配置:
// <tx:annotation-driven transaction-manager="transactionManager"/> 开启事务;
// 当然,也可以基于注解方式开始事务 @EnableTransactionManagement,那就是注解注册流程了,最终肯定有公共的部分,和我们开启aop一毛一样。
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
// JTA (Java Transaction Manager): 是基于Java规范,XA分布式事务方案在Java上的实现.
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
}
我们可以看到,注册节点名,对应的就是配置文件中的 事务的根节点,对应的解析器会遍历解析,将里面的节点信息以及节点上的属性信息给解析出来,构建实体。
其实 我们可以 TxNamespaceHandler
所继承了的类是:NamespaceHandlerSupport
,其实继承该类,都是为了解析XML
方式的命名空间节点;我们可以看该类的子类:(有我们比较熟悉的:AopNamespaceHandler
)
-
TxAdviceBeanDefinitionParser:
-
该类里面,我们可以很多事务的属性信息:
// 下面这些都是 <tx:advice/>标签的属性、子标签、子标签中的属性 信息。 // 方法节点 private static final String METHOD_ELEMENT = "method"; // 方法名节点 属性 name private static final String METHOD_NAME_ATTRIBUTE = "name"; // 属性节点 private static final String ATTRIBUTES_ELEMENT = "attributes"; // 方法名节点 属性 超时时间 private static final String TIMEOUT_ATTRIBUTE = "timeout"; // 方法名节点 属性 设置事务是否只读 private static final String READ_ONLY_ATTRIBUTE = "read-only"; // 方法名节点 属性 事务的传播行为 private static final String PROPAGATION_ATTRIBUTE = "propagation"; // 方法名节点 属性 事务的隔离级别 private static final String ISOLATION_ATTRIBUTE = "isolation"; // 方法名节点 属性 事务回滚 private static final String ROLLBACK_FOR_ATTRIBUTE = "rollback-for"; // 方法名节点 属性 事务不回滚 private static final String NO_ROLLBACK_FOR_ATTRIBUTE = "no-rollback-for";
-
-
AnnotationDrivenBeanDefinitionParser
- 配置文件方式 的注解事务解析器
- 这里里面的
parse
方法,又回到了两种 代理方式处理事务 一种:JDK代理方式,一种:ASPECTJ方式。 - 可以看到事务管理器,事务拦截器,事务属性等信息的注册。
- 这里里面的
- 配置文件方式 的注解事务解析器
-
JtaTransactionManagerBeanDefinitionParser
- JTA事务;想了解可以找找我在
spring-tx
工程中,对于这一块的源码注释,主流程中我们不关注。
- JTA事务;想了解可以找找我在
2.3 总结
不论是注解方式解析器注册还是配置文件方式解析器注册;我们可以看到二者最终都有注册相同类型的的bean;如:事务管理器,事务拦截器,事务属性源,事务增强器等。
三、事务源码执行流程
这里,强烈建议先看 spring-aop
源码执行流程(文章开头标注了本人另外一个关于spring-aop
源码的文章),因为 spring-tx
事务对于 aop
来说,仅仅是一个方法拦截器,对一个加上了 @transactional
注解的普通方法拦截增强。
写这篇文章,本人已经把 spring-aop
的源码都摸了个遍,包括源码执行流程。
所以这里直接来到事务增强器 TransactionInterceptor
;唯一一个方法:invoke(MethodInvocation invocation)
,该方法在 AOP
执行链中会被调用。( TransactionInterceptor
的invoke(MethodInvocation invocation)
方法为什么会被 执行链调用呢?我们第二章讲过,两种注册方式都会将TransactionInterceptor
注册到Spring 容器中,然后AOP
会在 容器中找到实现了MethodInterceptor
接口的类,然后加入到执行器链中,最终根据选择的代理方式 ASPECTJ
或 PROXY
,执行TransactionInterceptor
的invoke(MethodInvocation invocation)
方法**,然后这里就是对普通方法利用事务增强**。 )
/*
* AOP Alliance MethodInterceptor,用于使用公共Spring事务基础架构(PlatformTransactionManager)进行声明性事务管理。
* 派生自TransactionSpectSupport类,该类包含与Spring的底层事务API的集成。
* TransactionInterceptor只需按照正确的顺序调用相关的超类方法,例如invokeWithinTransaction。
* TransactionInterceptor是线程安全的。
*/
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// TransactionAttributeSource应该被传递给目标类和方法,后者可能来自接口。
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 适配 TransactionAspectSupport的 invokeWithinTransaction方法
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}
}
这里贴一张AOP
执行事务拦截器的图;看到这个方法,看过AOP
源码的同学应该很熟悉这里;
我们可以看到interceptorOrInterceptionAdvice
类型就是 TransactionInterceptor
;这样下一步的执行就被 TransactionInterceptor
拦截,执行代理方法: TransactionInterceptor#invoke(MethodInvocation invocation)
这样,我们找到了事务源码执行流程的入口;执行流程大致分为下面几步:
- 1、获取事务方法上事务属性;
- 2、获取对应类型的事务管理器;
- 3、主要是创建事务,根据事务管理器和事务属性创建事务。就是 begin阶段;
- 4、执行事务增强的方法, 是的!! 目标方法;
- 5、提交事务,commit阶段。
进入:org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction
方法中
public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
/**
* @param method 被调用的方法
* @param targetClass 被调用的方法所在的类
* @param invocation 用于处理目标方法调用的回调
* @return 方法的返回值(如果有)
* @throws Throwable 从目标调用传播
*
* 用于基于 around-advice-based子类的常规委托,委托给该类上的其他几个模板方法。
* 能够处理CallbackPreferringPlatformTransactionManager以及常规PlatformTransactionManager实现。
*/
@Nullable
protected Object invokeWithinTransaction(Method method,
@Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// 如果事务属性为null,则该方法是非事务性的,那就以非事务属性运行。
TransactionAttributeSource tas = getTransactionAttributeSource();
// 1、获取事务方法上事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 2、获取对应类型的事务管理器
// 可以是 Jms,Jta,DataSource,Hibernate ...
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 获取 事务方法名(id) ,也即:连接点。
// ege:方法名 com.transactional.xml.service.XmlTransactionalServiceImpl.xmlTransactionalMethod_update
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 事务属性为 null 或者 事务管理器不是 CallbackPreferringPlatformTransactionManager
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// 使用getTransaction和提交/回滚调用进行标准事务划分。
// 3、主要是创建事务, 根据事务管理器和事务属性创建事务。
// 就是 begin阶段
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal;
try {
// 下面是一个通知:调用链中的下一个拦截器。
// 这通常会导致调用目标对象。
// 4、执行事务增强的方法??? 是的!! 目标方法
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 目标方法调用发生异常,对异常进行处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
// 清理事务信息 / 重置当前线程(ThreadLocal)的TransactionInfo。
cleanupTransactionInfo(txInfo);
}
// 5、★在方法后置返回后,提交事务
// commit阶段
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 事务属性为null && 事务管理器是 CallbackPreferringPlatformTransactionManager
// 可以看到 下面的 lambda表达式,只是执行了 被事务拦截器所拦截的方法,并没有开启事务。
else {
// 这里省略一万字...
}
}
}
3.1 获取事务方法上事务属性
// 如果事务属性为null,则该方法是非事务性的,那就以非事务属性运行。
TransactionAttributeSource tas = getTransactionAttributeSource();
// 1、获取事务方法上事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
这里我们可以看到两个类:
一个:TransactionAttributeSource:这里的类型是 子类:AnnotationTransactionAttributeSource
一个:TransactionAttribute:这里的类型是 子类:RuleBasedTransactionAttribute
UML图:
-
第一个图:目的就是获取事务的属性源
-
TransactionAttributeSource:该类只有一个方法:
// 利用给定的方法和类,获取 TransactionAttribute 事务属性 TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass);
-
再介绍一个子类 AnnotationTransactionAttributeSource ,该类是注解方式事务方法,解析注解方式事务属性之后得到的一个事务源信息;我们在
getTransactionAttributeSource();
这一步获取了该事务源对象。
-
-
第二个图:事务的属性信息UML图;下面梳理一下该关系图
-
TransactionDefinition:定义事务属性常量,如:隔离级别,传播行为,超时的常量定义。
-
DefaultTransactionDefinition:子类默认实现,这个才是作为一个 对象,记录当前事务的属性信息的一个实体类(事务源);
-
TransactionAttribute:继承了TransactionDefinition,该接口新增 定义 rollback回滚规范;
- DefaultTransactionAttribute:继承了 DefaultTransactionDefinition (也就说继承了DefaultTransactionDefinition 作为事务属性的实体);并且实现了 TransactionAttribute 接口(也就要去定义回滚的规范);
- RuleBasedTransactionAttribute:继承了DefaultTransactionAttribute,也就是(DefaultTransactionDefinition 和 TransactionAttribute 对于事务的属性定义、回滚规范都有了,但该类具体还是对 回滚规范粒度降低:通过应用大量回滚规则(正负规则),计算给定异常是否应导致事务回滚。)
- DefaultTransactionAttribute:继承了 DefaultTransactionDefinition (也就说继承了DefaultTransactionDefinition 作为事务属性的实体);并且实现了 TransactionAttribute 接口(也就要去定义回滚的规范);
-
-
3.2 获取对应类型的事务管理器
// 2、获取对应类型的事务管理器
// 可以是 Jms,Jta,DataSource,Hibernate ...
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
下面方法大致流程就是:
先通过缓存map中取获取当前事务配置的事务管理器,如果没有,就从 beanFactory
中找,这就是我们第二节说到的,注册的事务管理器,现在用到了。
然后,在我们编写spring事务的时候,不轮是xml方式,还是注解方式都是需要配置事务管理器,这里我自己的demo中,用的是 DataSourceTransactionManager
@Nullable
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
// Do not attempt to lookup tx manager if no tx attributes are set
// 如果未设置tx属性,请勿尝试查找tx manager
if (txAttr == null || this.beanFactory == null) {
return getTransactionManager();
}
String qualifier = txAttr.getQualifier(); // qualifier:与当前事务相关联的限定符值
if (StringUtils.hasText(qualifier)) { // 通过与当前事务相关联的限定符值 获取对应的事务管理器
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
}
else {
// 这里获取的是 DataSourceTransactionManager
PlatformTransactionManager defaultTransactionManager = getTransactionManager();
if (defaultTransactionManager == null) { // 如果是null,往里走
// 默认的是null ,继续往里走
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
if (defaultTransactionManager == null) {
// 从当期的beanFactory中获取
defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
this.transactionManagerCache.putIfAbsent(
DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); // put,缓存到 transactionManagerCache
}
}
return defaultTransactionManager;
}
}
讲到事务管理器,我们大致介绍一下:
-
PlatformTransactionManager是super接口:里面有三个方法:
- getTransaction():就是 事务的begin,开启事务 - commit():提交事务 - rollback():回滚事务
-
AbstractPlatformTransactionManager
-
事务管理器的抽象基类,定义了下面一些事务处理工作:
- 1、确定是否存在现有事务;
- 2、应用适当的传播行为;
- 3、必要时暂停并恢复事务;
- 4、在提交时检查仅回滚标志;
- 5、对回滚应用适当的修改(仅限实际回滚或设置回滚);
- 6、触发已注册的同步回调(如果事务同步处于活动状态)。
-
子类:
-
像 JDBC、Hibernate、JPA 这些具体的事务管理器则需要提供 模板方法的具体实现:begin,supend,commit,rollback等;
因为具体的事务管理器类型封装了 Connection 。
-
-
-
3.3 创建事务
进入方法之前,需要记住一点:这个方式是 事务的 begin
阶段
// 3、主要是创建事务, 根据事务管理器和事务属性创建事务。
// 就是 begin阶段
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
进入:createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr,
final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name. 如果未指定名称,请将方法标识应用为事务名称。
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) { // 事务属性不为空
if (tm != null) { // 事务管理器不为空
// ★★★★★ 主要就是开启事务, begin ★★★★★
status = tm.getTransaction(txAttr);
}
else { // 如果走 else 说明当前方法,既没有事务属性设置,也没有配置事务管理器,那就不用创建事务
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 这里构建的TransactionInfo 表示事务状态处于准备好了。
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
核心流程:创建事务,进入:tm.getTransaction(txAttr);
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
// 返回相应的数据源事务对象 ,
// ege: DataSourceTransactionObject;为什么这里我只是举这个例子呢,因为我用的是mysql数据库,如果是别的场景,也可能是:jms,jpa这些。
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks. debug操作下,正常肯定不会走这里。
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given. 如果没有给出事务定义,则使用默认值。
definition = new DefaultTransactionDefinition();
}
// 1. 在该方法之前已经有事务存在。
// 如果有,那么,根据事务的传播行为,是将该事务挂起;还是加入到当前事务;还是以嵌套事务方式,由事务的传播行为决定。
if (isExistingTransaction(transaction)) {
// 找到现有事务->检查传播行为以了解行为方式。
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
// 2. 未找到现有事务->检查传播行为以了解如何继续。
// PROPAGATION_MANDATORY
// 1、A 没有事务,而B 事务传播特性定义为:MANDATORY
// 那么会抛出No existing transaction found for transaction marked with propagation ‘mandatory’;
// 而且需要注意的是:A Method()有数据库操作是正常执行的,不会回滚(因为本身就没有事务),B Method()也不会进行回滚;
// 其实应该是B都没有执行,因为在调用B的时候,spring事务会检查到B的注解上面的的参数是 MANDATORY就会抛出相应异常)。
// 2、A 开启事务,B 事务传播行为:MANDATORY
// 由于A开启事务,B会加入到A开始的事务中运行
// 3、A 开启事务并抛出异常,B 事务传播行为:MANDATORY
// A method 和 B method()执行的操作被回滚。
// 4、A 开启事务,B 事务传播行为:MANDATORY,抛出异常,A不捕获B的异常
// A method 和 B method()执行的操作被回滚。
// 5、A 开启事务,B 事务传播行为:MANDATORY,抛出异常,A捕获B的异常
// A method 和 B method()执行的操作被回滚,并且抛出异常 Transaction rolled back because it has been marked as rollback-only
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED :
// 表示当前方法必须在一个具有事务的上下文中运行,如果有加入当前事务,如果没有开启一个新的事务。
// 1、A method() 不开启事务, B method() 事务传播特性:REQUIRED
// 那么 B method 开启事务。
// 2、A method() 开启事务, B method() 事务传播行为:REQUIRED
// 那么B method()会加入到 A的事务中!
// 3、A method() 开启事务, B method() 事务传播行为:REQUIRED
// 3.1 如果 B method() 抛出异常,B method()和A method()都会回滚。
// 3.2 如果 A method 捕获了 B method()抛出的异常,则会出现异常Transaction rolled back because it has been marked as rollback-only。
// 抛出的一次也说明了 rollback-only 只能回滚,所以 A,B都回滚。
//
// PROPAGATION_REQUIRES_NEW:
// 表示当前方法需要运行在一个新的事务中,并且 上一个事务会被挂起。
//
// PROPAGATION_NESTED
// 如果存在当前事务,则在嵌套事务中执行,如果不存在当前事务,那么就新开启一个事务。
// 1、 A method 不存在事务,B事务传播行为NESTED
// 结果:则B method()运行在一个新的事务中。
// 2、 A method()存在事务,B事务传播行为NESTED
// 结果:B method()运行在嵌套事务中,(这就是上面补充说道的,在同一个事务,只是有保存点。)
// 3、 A 作为外层事务,B 作为内层事务;A 发生异常
// 结果:A,B都回滚;
// 4、A 作为外层事务,B 作为内层事务;B 发生异常,A不捕获
// 结果:A,B都回滚;
// 5、A 作为外层事务,B 作为内层事务;B 发生异常,A捕获
// 结果:B method发生异常,B回滚不影响A事务的提交。(也就是B的回滚,只是回滚到了上一个保存点)
// PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 来到这里是要开启新事务的!!!
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 这里置为null,就是无需挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 利用上面的这些参数构建 DefaultTransactionStatus对象 并返回
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
newSynchronization, debugEnabled, suspendedResources);
// 开启事务;
// 利用:对应的事务管理器 transaction,事务属性 definition 开启事务。
doBegin(transaction, definition);
// 就是将当期事务的状态和属性信息和线程绑定,做到同步,保证线程安全
prepareSynchronization(status, definition);
// 构建 DefaultTransactionStatus对象 并返回
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
// 创建“空”事务:没有实际事务,但可能是同步。
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 同步
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
}
-
大致流程:
-
1、
Object transaction = doGetTransaction();
获取事务源对象 -
2、根据事务源判断是否已经存在事务,以及当前方法所设置的事务属性(隔离级别,传播行为)做出相应的处理:
-
3.2.1、在该方法之前已经有事务存在
// 1. 在该方法之前已经有事务存在。 // 如果有,那么,根据事务的传播行为,是将该事务挂起;还是加入到当前事务;还是以嵌套事务方式,由事务的传播行为决定。 if (isExistingTransaction(transaction)) { // 找到现有事务->检查传播行为以了解行为方式。 return handleExistingTransaction(definition, transaction, debugEnabled); }
-
3.2.2、在该方法之前不存在事务:
-
如果传播行为是:
PROPAGATION_MANDATORY
必须要事务中运行,并且是加入已经存在的事务;如果不存在,就抛出异常。
-
如果传播行为是:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED
则需要开启新事物。
// 开启事务; // 利用:对应的事务管理器 transaction,事务属性 definition 开启事务。 doBegin(transaction, definition); // 就是将当期事务的状态和属性信息和线程绑定,做到同步,保证线程安全 prepareSynchronization(status, definition);
-
其他的传播行为:
就不用开启事务了。
-
-
-
重点说下 2.1 handleExistingTransaction(definition, transaction, debugEnabled);
和 2.2 的doBegin(transaction, definition);
3.2.1 handleExistingTransaction(definition, transaction, debugEnabled);
- 大致流程:(首先进入该方法,代表着当前已经存在事务,然后根据传播行为,做出相应的处理)
- 如果当前事务方法传播行为为 PROPAGATION_NEVER:不在事务中运行,如果存在事务,直接抛出异常;
- 如果当前事务方法传播行为为 PROPAGATION_NOT_SUPPORTED:也不在事务中运行,如果当前有事务,则当前方法挂起,先执行事务,后面再执行它;
- 如果当前事务方法传播行为为 PROPAGATION_REQUIRES_NEW:表示当前方法需要运行在一个新的事务中,并且 上一个事务会被挂起;
- 如果当前事务方法传播行为为 PROPAGATION_NESTED: 如果存在当前事务,则在嵌套事务中执行,如果不存在当前事务,那么就新开启一个事务;
- 如果当前事务方法传播行为为 PROPAGATION_SUPPORTS,PROPAGATION_REQUIRED :这种都会将当前方法加入到上一个方法所在的事务中去。
/**
* 为现有事务创建 TransactionStatus
* 注意:进入到该方法,说明 在当前方法之前已经存在事务了。
*/
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled)
throws TransactionException {
// PROPAGATION_NEVER :
// 表示当方法务不应该在一个事务中运行,如果存在一个事务,则抛出异常;
// 1、A 不开启事务,B 传播行为为:NEVER
// 结果:A,B方法都没有在事务中运行
// 2、A 开启事务,B 传播行为为:NEVER
// 结果:抛出异常 IllegalTransactionStateException: Existing transaction found for transaction marked with propagation ‘never’
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
// 进入到if ,就是 2这种情况
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED:
// 表示该方法不应该在一个事务中运行。如果有一个事务正在运行,他将在运行期被挂起,直到这个事务提交或者回滚才恢复执行。
// 1、 A 不开启事务,B 事务传播行为:NOT_SUPPORTED
// 结果:B 没有开启事务
// 2、 A method开启事务,B,事务传播行为:NOT_SUPPORTED
// 结果:A method 是在事务中运行的,并且事务是活动状态。
// B method 则是等 A事务提交结束,以非事务的状态执行。
// 3、 A 开启事务并且A 抛出异常;B 事务传播行为:NOT_SUPPORTED
// 结果:这种属于,A method是在事务中,A 异常会回滚,B由于不在事务中执行,所以会被执行。
// 验证:表a没有插入数据库,表b中对应的数据被删除。
// 4、A 开启事务;B 事务传播行为:NOT_SUPPORTED;B 抛出异常,A不捕获
// 结果:A 的insert操作回滚,B 的delete的操作正常执行
// 5、A 开启事务;B 事务传播行为:NOT_SUPPORTED;B 抛出异常,不捕获
// A的insert操作正常执行,B的delete的操作也正常执行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 进入到if,就可以: 2,3,4,5 这几种情况。
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 把当前事务挂起
Object suspendedResources = suspend(transaction);
// 当前事务是否支持同步
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 构建并返回当前事务的状态对象:TransactionStatus
return prepareTransactionStatus(definition, null, false,
newSynchronization, debugEnabled, suspendedResources);
}
// 表示当前方法需要运行在一个新的事务中,并且 上一个事务会被挂起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 1、挂起上一个事务
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 返回此事务管理器是否应激活线程绑定事务同步支持。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建 TransactionStatus
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization,
debugEnabled, suspendedResources);
// 2、创建新的事务,让当前方法运行在新创建的事务中
doBegin(transaction, definition);
prepareSynchronization(status, definition);
// 构建并返回当前事务的状态对象:TransactionStatus
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED
// 如果存在当前事务,则在嵌套事务中执行,如果不存在当前事务,那么就新开启一个事务。
// 1、A method 不存在事务,B事务传播行为NESTED
// 结果:则B method()运行在一个新的事务中。
// 2、A method()存在事务,B事务传播行为NESTED
// 结果:B method()运行在嵌套事务中,(这就是上面补充说道的,在同一个事务,只是有保存点。)
// 3、A 作为外层事务,B 作为内层事务;A 发生异常
// 结果:A,B都回滚;
// 4、A 作为外层事务,B 作为内层事务;B 发生异常,A不捕获
// 结果:A,B都回滚;
// 5、A 作为外层事务,B 作为内层事务;B 发生异常,A捕获
// 结果:B method发生异常,B回滚不影响A事务的提交。(也就是B的回滚,只是回滚到了上一个保存点)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) { // 事务管理器不支持保存点的创建:就是可能有些数据库引擎不支持保存点(嵌套事务)
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) { // 对于嵌套事务需要使用保存点
// 在现有Spring托管事务中创建保存点,通过TransactionStatus实现的SavepointManager API。
// 通常使用JDBC 3.0 保存点。从不激活Spring同步。
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false,
debugEnabled, null);
// 创建保存点
status.createAndHoldSavepoint();
// 构建并返回当前事务的状态对象:TransactionStatus
return status;
}
else { // 来到这里应该是,事务管理器允许创建保存点,但是没有创建成功;那也就是说,当前没有事务;就创建一个新的事务
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
// 通过嵌套的 begin和commit/rollback 调用嵌套事务。
// 通常仅适用于JTA:此处可能会激活Spring同步,对于预先存在的JTA 事务。
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization,
debugEnabled, null);
// 创建新事务
doBegin(transaction, definition);
// 同步
prepareSynchronization(status, definition);
// 构建并返回当前事务的状态对象:TransactionStatus
return status;
}
}
// 事务的传播行为可能是这两种: PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
// SUPPORTS:
// 1、A 不开启事务,B 事务传播行为:SUPPORTS
// 不会以事务方式运行
// 2、A 开启事务,B 事务传播行为:SUPPORTS
// B 会在A开启的事务中运行
// 3、A 开启事务;A抛出异常,B 事务传播行为:SUPPORTS
// A method,B method 都会回滚
// 4、A 开启事务;B 事务传播行为SUPPORTS,抛出异常,A捕获B的异常
// A method()和B method()都会被回滚,并抛出 UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
// 5、A 开启事务;B 事务传播行为SUPPORTS,抛出异常,A不捕获B的异常
// A method()和B method()都会被回滚
// 4.1 A 不开启事务,B 事务传播
// REQUIRED:
// 表示当前方法必须在一个具有事务的上下文中运行,如果有加入当前事务,如果没有开启一个新的事务。
// 1、A method() 不开启事务, B method() 事务传播特性:REQUIRED
// 那么 B method 开启事务。
// 2、A method() 开启事务, B method() 事务传播行为:REQUIRED
// 那么B method()会加入到 A的事务中!
// 3、A method() 开启事务, B method() 事务传播行为:REQUIRED
// 3.1 如果 B method() 抛出异常,B method()和A method()都会回滚。
// 3.2 如果 A method 捕获了 B method()抛出的异常,则会出现异常Transaction rolled back because it has been marked as rollback-only。
// 抛出的一次也说明了 rollback-only 只能回滚,所以 A,B都回滚。
// 因为 在来到本方法,本身就说了,在该方法之前,就已经存在事务;
// 所以,PROPAGATION_SUPPORTS,PROPAGATION_REQUIRED 这种都会将当前方法加入到上一个方法所在的事务中去
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 构建并返回当前事务的状态对象:TransactionStatus
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
3.2.2 doBegin(transaction, definition);
这里就是数据库的 begin
阶段方法!!!为事务设置:超时时间,隔离级别这些。
在这里对事务管理器(DataSourceTransactionManager)说明一下:事务的 begin 、commit、rollback、dataSource,connection,这些都在这里对 JDBC进行了封装。
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
protected abstract void doBegin(Object transaction, TransactionDefinition definition)
throws TransactionException;
}
// 下面有很多事务开始的实现类,都在对应的事务管理器中:
// DataSourceTransactionManager,JmsTransactionManager,JpaTransactionManager,JtaTransactionManager等
// 我们这里用的是 DataSourceTransactionManager
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
implements ResourceTransactionManager, InitializingBean {
/**
* 开启事务。
* @param transaction 对应的事务管理器 transaction
* @param definition 事务属性: 传播行为,隔离解绑,是否只读标志,超时时间,事务名称
*/
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
// 数据源事务对象
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 这个就是看当前事务是否绑定了线程,然后根据相应的数据源创建连接。
if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 将资源标记为与事务同步。
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
// 获取连接
con = txObject.getConnectionHolder().getConnection();
// 获取 事务的隔离级别,并且将事务隔离基本设置到 con 数据源连接属性中
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 设置事务的隔离级别
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
// 如有必要,切换到手动提交。这在一些JDBC驱动程序中非常昂贵,
// 所以我们不想不必要地这样做(例如,如果我们明确已将连接池配置为已设置)。
if (con.getAutoCommit()) { // 事务如果是自动提交
// 将自动提交标志设置为 true
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
con.setAutoCommit(false);
}
// 如果设置了 事务的只读属性,那么这里设置只读属性
prepareTransactionalConnection(con, definition);
// 将当前事务状态设置为 true (即:活跃状态)
txObject.getConnectionHolder().setTransactionActive(true);
// 设置事务的超时时间
int timeout = determineTimeout(definition);
// 如果 自定义了超时时间(就是事务属性设置了超时时间),那么这里给资源连接持有者设置对应的超时时间
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread. 将连接持有者绑定到线程上。
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
}
3.4 执行目标方法
try {
// 下面是一个通知:调用链中的下一个拦截器。
// 这通常会导致调用目标对象。
// 4、执行事务增强的方法??? 是的!! 目标方法
retVal = invocation.proceedWithInvocation();
}
@FunctionalInterface
public interface InvocationCallback {
@Nullable
Object proceedWithInvocation() throws Throwable;
}
这里会回到:
org.springframework.aop.framework.ReflectiveMethodInvocation#proceed
方法,来到 invokeJoinpoint()
;
往下追:org.springframework.aop.framework.ReflectiveMethodInvocation#invokeJoinpoint
接着追: AopUtils.invokeJoinpointUsingReflection(this.target, this.method, this.arguments);
该方法里面就会用利用反射执行目标方法。
即:执行目标方法 (注意:还没有提交事务)
3.5 commit(提交事务)
// 5、★在方法后置返回后,提交事务
// commit阶段
commitTransactionAfterReturning(txInfo);
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
if (logger.isTraceEnabled()) {
logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
}
// 事务管理器 commit事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
下面的 commit
过程大致几种情况:
- 1、如果当前事务已经完成,就抛出异常,不再执行;
- 2、如果当前事务的属性是出现异常只能回滚,这里回滚;
- 3、如果全局事务标记为仅回滚,但事务代码请求commit;回滚;
- 4、除上述情况外,就
commit
。
这里面我们主要看 processCommit
方法 和 processRollback
方法。
public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
/**
* 此 事务执行或事务回滚的 commit阶段?? 处理参与现有事务和编程回滚请求的情况。仅限isRollback、doCommit和rollback。
* @see org.springframework.transaction.TransactionStatus#isRollbackOnly()
* @see #doCommit
* @see #rollback
*/
@Override
public final void commit(TransactionStatus status) throws TransactionException {
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
//1、事务已完成-不要在每个事务中多次调用提交或回滚
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
//2、如果事务的属性是,出现异常只能回滚,这里回滚。
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 这里回滚过程; 出现异常时的回滚
processRollback(defStatus, false);
return;
}
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
// 全局事务标记为仅回滚,但事务代码请求commit
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 这里回滚过程; 为出现异常时的回滚
processRollback(defStatus, true);
return;
}
// 提交事务过程
processCommit(defStatus);
}
}
3.5.1 processCommit
/**
* Process an actual commit.
* Rollback-only flags have already been checked and applied.
* 处理实际的提交。已检查并应用仅回滚标志。
*
* 该方法就是 提交事务的 最底层实现方法!!!!!
*
* @param status 表示事务的对象
* @throws TransactionException in case of commit failure
*/
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status); // 空方法,相当于一个钩子方法,由子类实现 commit前的处理
triggerBeforeCommit(status); // 触发 beforeCommit回调
triggerBeforeCompletion(status);// 触发 beforeCompletion回调
beforeCompletionInvoked = true; // 上面步骤完成,前置完成标志 置为 true
// 当前事务,如果有保存点 (走这个if,事务的传播行为可能是:NESTED,因为只有它,事务才有所谓的保存点)
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
// 为什么要释放该保存点呢,因为都到这里,说明当前已经存在事务;
// 然后将当前方法加入到事务中,这里事务上一个保存点,下面commit的时候 执行当前方法的事务,会创建新的保存点。
status.releaseHeldSavepoint();
}
// 当前方法是在一个新的事务中执行的,走这个if ,然后执行 doCommit() 提交事务
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// 将异常回滚标志置为 当前事务所配置的属性
unexpectedRollback = status.isGlobalRollbackOnly();
// commit !!!!!
// 调用 doCommit(status) 的方法只有这一个位置!!!
doCommit(status);
}
// isFailEarlyOnGlobalRollbackOnly() : 如果事务被全局标记为仅回滚,则返回是否提前失败。
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
// 如果我们有一个仅全局回滚标记,但仍然没有从提交中获得相应的异常,则抛出UnexpectedRollbackException。
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
// 事务自动回滚,因为它已被标记为仅回滚
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
// 当前方法可以处理的异常
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit 只能由 doCommit() 引起
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
// 被 spring 事务定义的异常,来这里处理
catch (TransactionException ex) {
// can only be caused by doCommit 只能由 doCommit() 引起
if (isRollbackOnCommitFailure()) { // 这里是,如果commit已经被标记为失败了回滚时
// 回滚由 commit 引发的异常
doRollbackOnCommitException(status, ex);
}
else { // 这里是 没有别标记为确定的回滚方式,即:未知的异常 的处理方式
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
// 最终的兜底: 运行时异常和错误,回滚。
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 回滚由 commit 引发的异常
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
该方法里面的 doCommit(status);
就是调用org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit
执行 commit
。
没什么好说的~
先介绍两个知识点:
1、该方法我们主要看四个钩子方法:也是事务 前后阶段可以做的事情:
- beforeCommit:在事务提交之前,例如:可以将事务性O/R映射会话刷新到数据库。(事务提交前的节点,准备工作)
- afterCommit:在事务提交后调用。可以在主事务成功提交后立即执行进一步的操作。例如:可以在成功提交主事务之后进行的进一步操作,如:确认消息或电子邮件。(在这个阶段,是事务提交之后执行的阶段,事务已经成功提交到数据库)
- beforeCompletion:在事务提交/回滚之前调用。可以在事务完成之前执行资源清理(如:dataSource,Connection和当前线程解绑)。(在这个阶段,事务已经执行完所有的操作,但还没有提交到数据库)
- afterCompletion:在事务提交/回滚后调用。可以在事务完成后执行资源清理。(真正事务完成之后的尾声节点,对资源处理)
2、事务监听器:
代码太多,可以看我写的 注解demo包中的
com.transactional.eventlistener.TransactionalEventListenerConfig
类这里写一些大致的伪代码:
事务监听器可以作用的四个阶段:(可以看
org.springframework.transaction.event.TransactionPhase
枚举类的类型)
TransactionPhase.BEFORE_COMMIT
TransactionPhase.AFTER_COMMIT
TransactionPhase.AFTER_ROLLBACK
TransactionPhase.AFTER_COMPLETION
事务监听器的执行类:(各个阶段的执行,可以看该类中对应的方法)
org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter
所以,在
commit
阶段,事务监听可以作用的阶段:
TransactionPhase.BEFORE_COMMIT
→beforeCommit
TransactionPhase.AFTER_COMMIT
→afterCommit
TransactionPhase.AFTER_COMPLETION
→afterCompletion
@Component public class TransactionalEventListenerConfig { /** * 普通事件监听器 */ @EventListener public void listen(String str) { System.out.println("-----普通事件 : "+ "thread-name : " + Thread.currentThread().getName()+" , event : " +str); } /** * 事务回滚后 监听器 * @param str 发布的事件,要求是String及其兼容类型 */ @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK) public void afterRollback(String str) { System.out.println("-----AFTER_ROLLBACK : "+ "thread-name : " + Thread.currentThread().getName()+" , event : " +str); } /** * 事务提交前 监听器 */ @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT) public void beforeCommit(String str) { System.out.println("-----BEFORE_COMMIT : "+ "thread-name : " + Thread.currentThread().getName()+" , event : " +str); } /** * 事务提交后 监听器 * @param str 发布的事件,要求是String及其兼容类型 */ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void afterCommit(String str) { System.out.println("-----AFTER_COMMIT : "+ "thread-name : " + Thread.currentThread().getName()+" , event : " +str); } /** * 事务完成后 监听器 */ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION) public void afterCompletion(String str) { System.out.println("-----AFTER_COMPLETION : "+ "thread-name : " + Thread.currentThread().getName()+" , event : " +str); } }
同时,还要在 service实现类的方法中 : 加上监听器
@Service public class AnnotationTransactionalServiceImpl implements AnnotationTransactionalService{ @Autowired private ApplicationEventPublisher applicationEventPublisher; @Transactional @Override public int annotationTransactionalMethod_update(Money money) { // 在事务中 发布事件 applicationEventPublisher.publishEvent("注解方式 事务,插入数据流水"); int flag = moneyMapper.insert(money); return flag; } }
这四个方法分别对阶段:
-
1、
triggerBeforeCommit(status);
→beforeCommit
设置了该阶段的监听器方法,就是在这里执行。
-
2、
triggerBeforeCompletion(status);
→beforeCompletion
首先,监听器中没有这个阶段的处理,我们进入方法中:
protected final void triggerBeforeCompletion(DefaultTransactionStatus status) { if (status.isNewSynchronization()) { if (status.isDebug()) { logger.trace("Triggering beforeCompletion synchronization"); } // 事务完成前的 同步回调 TransactionSynchronizationUtils.triggerBeforeCompletion(); } }
进入:
TransactionSynchronizationUtils.triggerBeforeCompletion();
public static void triggerBeforeCompletion() { for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) { try { synchronization.beforeCompletion(); } catch (Throwable tsex) { logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex); } } }
TransactionSynchronizationManager.getSynchronizations()
里面的列表有4个监听器方法,但是没有beforeCompletion
的处理,只有:-
org.mybatis.spring.SqlSessionUtils.SqlSessionSynchronization#beforeCompletion
-
org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#beforeCompletion
这两个类的方法里面有, 对应列表中索引 4,5下标。
我们可以在这两个方法里面都打上断点,发现都会进入;其作用就是上面介绍的:可以在事务完成之前执行资源清理(如:dataSource,Connection和当前线程解绑)。
-
-
3、
triggerAfterCommit(status);
→afterCommit
上面截图的
TransactionSynchronizationManager.getSynchronizations()
列表类型中,没有一个实现了afterCommit
方法,所以,这里暂不做处理。 -
4、
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
和cleanupAfterCompletion(status);
→afterCompletion
-
两个都是
afterCompletion
阶段;-
4.1
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
该方法会完成:AFTER_COMMIT,AFTER_COMPLETION两个阶段的处理;
/** * completionStatus 有三种状态: * 0 : STATUS_COMMITTED 正确提交时的完成状态 * 1 : STATUS_ROLLED_BACK 正确回滚时的完成状态 * 2 : STATUS_UNKNOWN 启发式混合完成或系统错误情况下的完成状态 */ private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) { if (status.isNewSynchronization()) { //这说明,只有一个事务方法时,才有这里的 后置处理 // 获取 当前线程中 事务同步列表 List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations(); TransactionSynchronizationManager.clearSynchronization(); if (!status.hasTransaction() || status.isNewTransaction()) { // 这里说明是一个新开启的事务 if (status.isDebug()) { logger.trace("Triggering afterCompletion synchronization"); } // 当前范围没有事务或新事务 -> 立即调用afterCompletion回调 // 根据 completionStatus的类型,进行后置的处理 invokeAfterCompletion(synchronizations, completionStatus); } else if (!synchronizations.isEmpty()) { // 事务同步管理器列表list中不为null // 我们参与的、在这个Spring事务管理器范围之外控制的现有事务 -> 尝试向现有(JTA)事务注册完成后回调。 registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations); } } } // -> 进入到该方法 protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, int completionStatus) { TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus); }
public abstract class TransactionSynchronizationUtils { public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations, int completionStatus) { if (synchronizations != null) { for (TransactionSynchronization synchronization : synchronizations) { try { synchronization.afterCompletion(completionStatus); } catch (Throwable tsex) { logger.error("TransactionSynchronization.afterCompletion threw exception", tsex); } } } } }
synchronizations
还是上面的 断点调试时候的6种类型。org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter.TransactionSynchronizationEventAdapter#afterCompletion
监听器类型的4种情况,上面的for
循环遍历,都会进入到下面的 afterCompletion中,有两种情况被被执行:-
1、AFTER_COMMIT;
-
2、AFTER_COMPLETION
@Override public void afterCompletion(int status) { if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) { processEvent(); // AFTER_COMMIT } else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) { processEvent(); // AFTER_ROLLBACK } else if (this.phase == TransactionPhase.AFTER_COMPLETION) { processEvent(); // AFTER_COMPLETION } }
-
-
4.2
cleanupAfterCompletion(status);
- 作用:
- 1、清除当前事务事务的属性信息:事务名,事务隔离级别,同步信息,活跃状态等;
- 2、清除当前线程持有的Connection
- 3、如果当前的事务方法有将其他的事务挂起,这里进行恢复。
- 作用:
-
-
OK,processCommit方法完成!!!
3.5.2 processRollback
不在细讲,看看注释就行,大致和 commit
处理流程差不多。
/**
* 处理实际回滚。已检查完成标志。
* 该方法就是 回滚的 最底层实现方法!!!!!
*
* @param status object representing the transaction
* @throws TransactionException in case of rollback failure
*/
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
// 回滚前的 回调接口
triggerBeforeCompletion(status);
// 是否是保存点,即,当前方法引起的异常,是否存在保存点,即:嵌套事务回滚的这种场景
// 如果有,就走这个 if
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
// 对保存点进行回滚
status.rollbackToHeldSavepoint();
}
// 如果是一个新事务,那就是直接对整个事务进行回滚。
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 回滚
doRollback(status);
}
else { // 分布式事务场景下的 本地分支事务??? 处理?
// Participating in larger transaction
if (status.hasTransaction()) { // 本地分支存在事务的场景下,回滚
// 如果本地事务状态是 只能回滚和 全局事务已经标志失败
// 那么,就对该分支进行回滚
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 回滚
doSetRollbackOnly(status);
}
// 当前方法 不是事务,那就让 调用该方法的事务方法决定,这里只打印状态信息
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting
transaction originator decide on rollback");
}
}
}
else { // 非事务
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
// 只有当我们被要求提前失败时,意外的回滚才会起作用
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) { // “|” 是按位的意思,属于运位算符。
// 回滚过程中,出现异常
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回滚完成之后的 同步处理
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 完成之后的 清除同步
cleanupAfterCompletion(status);
}
}
四、总结
至此,事务的源码流程就走完了,还是比较简单的。 复杂的是,整个事务处理过程,对于异常类型的处理,回滚,各种类型事务管理器的适配。
从里面我们可以学到:事务管理器的各种实现策略;监听事务事件的异步处理;还有最最大的AOP应用就是事务的实现!
转载自:https://juejin.cn/post/7389913087481413632