likes
comments
collection
share

Spring 事务源码执行流程

作者站长头像
站长
· 阅读数 26

引用:developer.aliyun.com/article/389…

Spring 事务源码中文注释:在gitee仓库的:gitee.com/old_yogurt/…spring-tx 工程中做了。

spring-tx工程中,每个包中的 package-info都对一个包按类的类型、功能做的划分总结。

注解方式demo:在 a-transactional-learning 工程中的 com.transactional.annotation包中。

配置文件方式demo:在 a-transactional-learning 工程中的 com.transactional.xml包中。

Spring 事务源码执行流程

一、Spring管理事务的由来

传统的Jdbc事务我们需要获取Jdbc连接,然后将事务自动提交关闭,然后创建事务,手动commit事务。

下面是伪代码:

try{
    con.setAutoCommit(false);
    PreparedStatement ps = con.prepareStatement("insert into money(account,money) value('1001002',100)"); 
    ps.execute();
    con.commit();  // 提交事务
}catch(SQLException e){
    con.rollback(); // 回滚事务
}

这样就使得,我们需要自己来将Jdbc连接自动提交关闭,然后手动commit,并且需要我们对异常处理,对事务进行回滚。

这里就有两个问题:

  • 1、业务代码都要嵌套在try catch模板代码中 ;
  • 2、接触了底层Connection的事务功能,当使用其他一些框架时,我们更多的不会直接与Connection打交道,如:使用Mybatis时,就不容易获取Connection。

​ 其实就和 Mybatis 这样的ORM框架一样,封装了对数据库底层的操作,使得程序员直接编写业务SQL映射即可;至于数据库连接和JDBC源和数据库打交道的部分交由Mybatis处理。

​ 所以,针对事务的处理,Spring 编写了 spring-tx工程,针对事务的处理做的封装,使得我们程序员在开发的过程中,只用关注业务逻辑的编写,而无需关心Spring对于事务的处理。

spring-tx工程中:

  • 针对问题一:

    • 1、将事务处理部分的公共代码封装成模板以便复用。如同 JdbcTemplate对jdbc 操作的模板代码的封装,这便引出了下文 TransactionTemplate 的模板代码。
      • 说实话,如果我们使用普通代码并没有走 TransactionTemplate类,而是走:事务拦截器 TransactionInterceptor 的org.springframework.transaction.interceptor.TransactionInterceptor#invoke方法里面的 invokeWithinTransaction()这一步;下面我们也是走该 事务拦截器,因为 事务拦截器实现的 MethodInterceptor会被 spring-aop所拦截。
    • 2、然后,真正将业务业务代码和事务代码完全分离的,还是利用的 spring aop技术,通过AOP代理将事务逻辑植入业务逻辑中,这样就可以做到分离。
  • 针对问题二:

    • 对于和JDBC的Connection打交道,那就需要一个统一的接口来完成事务的提交和回滚等功能,即接口PlatformTransactionManager

      然后该接口下面有子类,对应了不同的管理器类型,如:

      • 如果是使用 jdbc,则使用 DataSourceTransactionManager 子类来完成事务的提交和回滚;(后面的源码流程,我们就用的这个管理器进行分析)
      • 若果是使用hibernate ,则使用 HibernateTransactionManager来完成事务的提交和回滚。

二、注册事务解析器

实例代码:

本人的 gitee 仓库的:gitee.com/old_yogurt/… 中:

注解方式demo:在 a-transactional-learning 工程中的 com.transactional.annotation包中。

配置文件方式demo:在 a-transactional-learning 工程中的 com.transactional.annotation包中。

2.1 注解方式解析器注册

首先,我们知道,注解方式使用的注解:@EnableTransactionManagement

跟踪注解我们可以找到的:

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
	/**
	 * 其实这里意思是:
	 * 	  在我们配置 @EnableTransactionManagement注解的时候,
	 * 	  配置属性 mode,有两个枚举:PROXY、ASPECTJ
	 * 	     我们选择事务拦截器的处理方式是使用 JDK代理方式(PROXY),还是使用ASPECTJ方式(ASPECTJ);
	 * 	     下面这个方法,就是根据不同的枚举注册相应的配置类。
	 */
	@Override
	protected String[] selectImports(AdviceMode adviceMode) {
		switch (adviceMode) {
			// JDK代理,注册 AutoProxyRegistrar,ProxyTransactionManagementConfiguration
			case PROXY:
				return new String[] {AutoProxyRegistrar.class.getName(),
						ProxyTransactionManagementConfiguration.class.getName()};
			//
			case ASPECTJ:
				return new String[] {
						TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}
}

首先,该类继承了 AdviceModeImportSelector类,这个类的 <>中对应的泛型,必须是注解类型,EnableTransactionManagement符合;

我们从该类中的唯一方法,往上找调用该方法的类

TransactionManagementConfigurationSelector#selectImports
  ->被 AdviceModeImportSelector#selectImports(org.springframework.core.type.AnnotationMetadata) 调用
    -> 被 org.springframework.context.annotation.ConfigurationClassParser#processImports 调用
    -> 其实来到ConfigurationClassParser类,我们就应该知道,该类是注解配置类(@Configuration)的解析类

PROXY方式为例,我们可以看到,这两个里面的 case 返回的数据是 对应类的名字:

注册的:AutoProxyRegistrarProxyTransactionManagementConfiguration

  • AutoProxyRegistrar

    • 如果看过AOP源码的比较熟悉,该类中的唯一一个方法 org.springframework.context.annotation.AutoProxyRegistrar#registerBeanDefinitions 又一步 AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry); 最终注册还是 AnnotationAwareAspectJAutoProxyCreator类,就是AOP的APC类型的类,并且将对应的代理类型属性设置进去。
  • ProxyTransactionManagementConfiguration

    • 代理方式的事务管理器配置类;
    • 进入该方法,我们可以看到,该类直接以 注解的方式,往Spring IOC中注册事务基础类,如:
      • BeanFactoryTransactionAttributeSourceAdvisor:事务通知
      • AnnotationTransactionAttributeSource:注册 注解方式的 事务源信息
      • TransactionInterceptor:事务拦截器;就是我们前面说的问题一;
      • 该类还是 AbstractTransactionManagementConfiguration 的子类,子类注册,父类也会注册:
        • 在该父类中会注册:
          • 我们最终要的 事务管理器 PlatformTransactionManager,就是我们前面说的问题二;
          • 事务监听器:TransactionalEventListenerFactory

就这么简单,这里我们只需要关注,我们在这里注册了那些类就行。

2.2 配置文件方式解析器注册

我们可以从配置文件中的 transaction-manager 节点出发找注册类:

 /*
 * NamespaceHandler允许使用XML或注解配置声明性事务管理。
 * 这个名称空间处理程序是Spring事务管理工具的核心功能,它提供了两种声明式管理事务的方法。
 * 一种方法使用XML中使用<tx:advice>元素定义的事务语义,另一种方法将注解与<tx:annotation-driven>元素结合使用。
 * 这两种方法在Spring参考手册中有很大程度的详细说明。
 */
public class TxNamespaceHandler extends NamespaceHandlerSupport {
	/**
	 * 这里只是注册下面三种节点的解析器
	 */
	@Override
	public void init() {
		// <tx:advice>标签解析流程
		registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
		// 这个也是注解方式
		// 只是说,我们在业务代码编写事务流程,然后再在xml中配置:
		// <tx:annotation-driven transaction-manager="transactionManager"/> 开启事务;
		// 当然,也可以基于注解方式开始事务 @EnableTransactionManagement,那就是注解注册流程了,最终肯定有公共的部分,和我们开启aop一毛一样。
		registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
		// JTA (Java Transaction Manager): 是基于Java规范,XA分布式事务方案在Java上的实现.
		registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
	}
}

​ 我们可以看到,注册节点名,对应的就是配置文件中的 事务的根节点,对应的解析器会遍历解析,将里面的节点信息以及节点上的属性信息给解析出来,构建实体。

​ 其实 我们可以 TxNamespaceHandler 所继承了的类是:NamespaceHandlerSupport,其实继承该类,都是为了解析XML方式的命名空间节点;我们可以看该类的子类:(有我们比较熟悉的:AopNamespaceHandler

Spring 事务源码执行流程

  • TxAdviceBeanDefinitionParser:

    • 该类里面,我们可以很多事务的属性信息:

      // 下面这些都是 <tx:advice/>标签的属性、子标签、子标签中的属性 信息。
      
      // 方法节点
      private static final String METHOD_ELEMENT = "method";
      
      // 方法名节点 属性 name
      private static final String METHOD_NAME_ATTRIBUTE = "name";
      
      // 属性节点
      private static final String ATTRIBUTES_ELEMENT = "attributes";
      
      // 方法名节点 属性 超时时间
      private static final String TIMEOUT_ATTRIBUTE = "timeout";
      
      // 方法名节点 属性 设置事务是否只读
      private static final String READ_ONLY_ATTRIBUTE = "read-only";
      
      // 方法名节点 属性 事务的传播行为
      private static final String PROPAGATION_ATTRIBUTE = "propagation";
      
      // 方法名节点 属性 事务的隔离级别
      private static final String ISOLATION_ATTRIBUTE = "isolation";
      
      // 方法名节点 属性 事务回滚
      private static final String ROLLBACK_FOR_ATTRIBUTE = "rollback-for";
      
      // 方法名节点 属性 事务不回滚
      private static final String NO_ROLLBACK_FOR_ATTRIBUTE = "no-rollback-for";
      
  • AnnotationDrivenBeanDefinitionParser

    • 配置文件方式 的注解事务解析器
      • 这里里面的 parse方法,又回到了两种 代理方式处理事务 一种:JDK代理方式,一种:ASPECTJ方式。
      • 可以看到事务管理器,事务拦截器,事务属性等信息的注册。
  • JtaTransactionManagerBeanDefinitionParser

    • JTA事务;想了解可以找找我在 spring-tx 工程中,对于这一块的源码注释,主流程中我们不关注。

2.3 总结

​ 不论是注解方式解析器注册还是配置文件方式解析器注册;我们可以看到二者最终都有注册相同类型的的bean;如:事务管理器,事务拦截器,事务属性源,事务增强器等。

三、事务源码执行流程

​ 这里,强烈建议先看 spring-aop源码执行流程(文章开头标注了本人另外一个关于spring-aop源码的文章),因为 spring-tx事务对于 aop来说,仅仅是一个方法拦截器,对一个加上了 @transactional注解的普通方法拦截增强。

​ 写这篇文章,本人已经把 spring-aop的源码都摸了个遍,包括源码执行流程。

​ 所以这里直接来到事务增强器 TransactionInterceptor;唯一一个方法:invoke(MethodInvocation invocation),该方法在 AOP 执行链中会被调用。( TransactionInterceptorinvoke(MethodInvocation invocation) 方法为什么会被 执行链调用呢?我们第二章讲过,两种注册方式都会将TransactionInterceptor注册到Spring 容器中,然后AOP会在 容器中找到实现了MethodInterceptor接口的类,然后加入到执行器链中,最终根据选择的代理方式 ASPECTJPROXY ,执行TransactionInterceptorinvoke(MethodInvocation invocation)方法**,然后这里就是对普通方法利用事务增强**。 )

/*
 * AOP Alliance MethodInterceptor,用于使用公共Spring事务基础架构(PlatformTransactionManager)进行声明性事务管理。
 * 派生自TransactionSpectSupport类,该类包含与Spring的底层事务API的集成。
 * TransactionInterceptor只需按照正确的顺序调用相关的超类方法,例如invokeWithinTransaction。
 * TransactionInterceptor是线程安全的。
 */    
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
    @Override
	@Nullable
	public Object invoke(MethodInvocation invocation) throws Throwable {
		// TransactionAttributeSource应该被传递给目标类和方法,后者可能来自接口。
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

		// 适配 TransactionAspectSupport的 invokeWithinTransaction方法
		return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
	}
}

​ 这里贴一张AOP执行事务拦截器的图;看到这个方法,看过AOP源码的同学应该很熟悉这里;

​ 我们可以看到interceptorOrInterceptionAdvice类型就是 TransactionInterceptor;这样下一步的执行就被 TransactionInterceptor拦截,执行代理方法: TransactionInterceptor#invoke(MethodInvocation invocation)

Spring 事务源码执行流程

这样,我们找到了事务源码执行流程的入口;执行流程大致分为下面几步:

  • 1、获取事务方法上事务属性;
  • 2、获取对应类型的事务管理器;
  • 3、主要是创建事务,根据事务管理器和事务属性创建事务。就是 begin阶段;
  • 4、执行事务增强的方法, 是的!! 目标方法;
  • 5、提交事务,commit阶段。

进入:org.springframework.transaction.interceptor.TransactionAspectSupport#invokeWithinTransaction方法中

public abstract class TransactionAspectSupport implements BeanFactoryAware, InitializingBean {
    /**
	 * @param method 被调用的方法
	 * @param targetClass 被调用的方法所在的类
	 * @param invocation 用于处理目标方法调用的回调
	 * @return 方法的返回值(如果有)
	 * @throws Throwable 从目标调用传播
	 *
	 * 用于基于 around-advice-based子类的常规委托,委托给该类上的其他几个模板方法。
	 * 能够处理CallbackPreferringPlatformTransactionManager以及常规PlatformTransactionManager实现。
	 */
    @Nullable
    protected Object invokeWithinTransaction(Method method,
                                             @Nullable Class<?> targetClass,
                                             final InvocationCallback invocation) throws Throwable {

        // 如果事务属性为null,则该方法是非事务性的,那就以非事务属性运行。
        TransactionAttributeSource tas = getTransactionAttributeSource();
        // 1、获取事务方法上事务属性
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        // 2、获取对应类型的事务管理器
        // 可以是 Jms,Jta,DataSource,Hibernate ...
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        // 获取 事务方法名(id) ,也即:连接点。
        // ege:方法名 com.transactional.xml.service.XmlTransactionalServiceImpl.xmlTransactionalMethod_update
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        // 事务属性为 null 或者 事务管理器不是 CallbackPreferringPlatformTransactionManager
        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // 使用getTransaction和提交/回滚调用进行标准事务划分。
            // 3、主要是创建事务,  根据事务管理器和事务属性创建事务。
            //    就是 begin阶段
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

            Object retVal;
            try {
                // 下面是一个通知:调用链中的下一个拦截器。
                // 这通常会导致调用目标对象。
                // 4、执行事务增强的方法??? 是的!! 目标方法
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                // 目标方法调用发生异常,对异常进行处理
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                // 清理事务信息 / 重置当前线程(ThreadLocal)的TransactionInfo。
                cleanupTransactionInfo(txInfo);
            }
            // 5、★在方法后置返回后,提交事务
            //   commit阶段
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        // 事务属性为null && 事务管理器是 CallbackPreferringPlatformTransactionManager
        // 可以看到 下面的 lambda表达式,只是执行了 被事务拦截器所拦截的方法,并没有开启事务。
        else {
           // 这里省略一万字... 
        }
    }
}

3.1 获取事务方法上事务属性

// 如果事务属性为null,则该方法是非事务性的,那就以非事务属性运行。
TransactionAttributeSource tas = getTransactionAttributeSource();
// 1、获取事务方法上事务属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

这里我们可以看到两个类:

Spring 事务源码执行流程

一个:TransactionAttributeSource:这里的类型是 子类:AnnotationTransactionAttributeSource

一个:TransactionAttribute:这里的类型是 子类:RuleBasedTransactionAttribute

UML图:

Spring 事务源码执行流程

Spring 事务源码执行流程

  • 第一个图:目的就是获取事务的属性源

    • TransactionAttributeSource:该类只有一个方法:

      // 利用给定的方法和类,获取 TransactionAttribute 事务属性
      TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass);
      
    • 再介绍一个子类 AnnotationTransactionAttributeSource ,该类是注解方式事务方法,解析注解方式事务属性之后得到的一个事务源信息;我们在getTransactionAttributeSource();这一步获取了该事务源对象。

  • 第二个图:事务的属性信息UML图;下面梳理一下该关系图

    • TransactionDefinition:定义事务属性常量,如:隔离级别,传播行为,超时的常量定义。

      • DefaultTransactionDefinition:子类默认实现,这个才是作为一个 对象,记录当前事务的属性信息的一个实体类(事务源);

      • TransactionAttribute:继承了TransactionDefinition,该接口新增 定义 rollback回滚规范;

        • DefaultTransactionAttribute:继承了 DefaultTransactionDefinition (也就说继承了DefaultTransactionDefinition 作为事务属性的实体);并且实现了 TransactionAttribute 接口(也就要去定义回滚的规范);
          • RuleBasedTransactionAttribute:继承了DefaultTransactionAttribute,也就是(DefaultTransactionDefinition 和 TransactionAttribute 对于事务的属性定义、回滚规范都有了,但该类具体还是对 回滚规范粒度降低:通过应用大量回滚规则(正负规则),计算给定异常是否应导致事务回滚。)

3.2 获取对应类型的事务管理器

// 2、获取对应类型的事务管理器
// 可以是 Jms,Jta,DataSource,Hibernate ...
final PlatformTransactionManager tm = determineTransactionManager(txAttr);

下面方法大致流程就是:

​ 先通过缓存map中取获取当前事务配置的事务管理器,如果没有,就从 beanFactory 中找,这就是我们第二节说到的,注册的事务管理器,现在用到了。

​ 然后,在我们编写spring事务的时候,不轮是xml方式,还是注解方式都是需要配置事务管理器,这里我自己的demo中,用的是 DataSourceTransactionManager

@Nullable
protected PlatformTransactionManager determineTransactionManager(@Nullable TransactionAttribute txAttr) {
    // Do not attempt to lookup tx manager if no tx attributes are set
    // 如果未设置tx属性,请勿尝试查找tx manager
    if (txAttr == null || this.beanFactory == null) {
        return getTransactionManager();
    }

    String qualifier = txAttr.getQualifier();  // qualifier:与当前事务相关联的限定符值
    if (StringUtils.hasText(qualifier)) { // 通过与当前事务相关联的限定符值 获取对应的事务管理器
        return determineQualifiedTransactionManager(this.beanFactory, qualifier);
    }
    else if (StringUtils.hasText(this.transactionManagerBeanName)) {
        return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
    }
    else {
        // 这里获取的是 DataSourceTransactionManager
        PlatformTransactionManager defaultTransactionManager = getTransactionManager(); 
        if (defaultTransactionManager == null) { // 如果是null,往里走
            // 默认的是null ,继续往里走
            defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); 
            if (defaultTransactionManager == null) {
                // 从当期的beanFactory中获取
                defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); 
                this.transactionManagerCache.putIfAbsent(
                    DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); // put,缓存到 transactionManagerCache
            }
        }
        return defaultTransactionManager;
    }
}

讲到事务管理器,我们大致介绍一下:

Spring 事务源码执行流程

  • PlatformTransactionManager是super接口:里面有三个方法:

    - getTransaction():就是 事务的begin,开启事务
    - commit():提交事务
    - rollback():回滚事务
    
    • AbstractPlatformTransactionManager

      • 事务管理器的抽象基类,定义了下面一些事务处理工作:

        • 1、确定是否存在现有事务;
        • 2、应用适当的传播行为;
        • 3、必要时暂停并恢复事务;
        • 4、在提交时检查仅回滚标志;
        • 5、对回滚应用适当的修改(仅限实际回滚或设置回滚);
        • 6、触发已注册的同步回调(如果事务同步处于活动状态)。
      • 子类:

        • 像 JDBC、Hibernate、JPA 这些具体的事务管理器则需要提供 模板方法的具体实现:begin,supend,commit,rollback等;

          因为具体的事务管理器类型封装了 Connection 。

3.3 创建事务

进入方法之前,需要记住一点:这个方式是 事务的 begin阶段

// 3、主要是创建事务,  根据事务管理器和事务属性创建事务。
//    就是 begin阶段
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

进入:createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                       @Nullable TransactionAttribute txAttr,
                                                       final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name. 如果未指定名称,请将方法标识应用为事务名称。
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) { // 事务属性不为空
        if (tm != null) { // 事务管理器不为空
            // ★★★★★ 主要就是开启事务, begin ★★★★★
            status = tm.getTransaction(txAttr);
        }
        else {  // 如果走 else 说明当前方法,既没有事务属性设置,也没有配置事务管理器,那就不用创建事务
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                             "] because no transaction manager has been configured");
            }
        }
    }
    // 这里构建的TransactionInfo 表示事务状态处于准备好了。
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

核心流程:创建事务,进入:tm.getTransaction(txAttr);

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {

    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
        // 返回相应的数据源事务对象 ,
        // ege: DataSourceTransactionObject;为什么这里我只是举这个例子呢,因为我用的是mysql数据库,如果是别的场景,也可能是:jms,jpa这些。
        Object transaction = doGetTransaction(); 

        // Cache debug flag to avoid repeated checks. debug操作下,正常肯定不会走这里。
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given. 如果没有给出事务定义,则使用默认值。
            definition = new DefaultTransactionDefinition();
        }

        // 1. 在该方法之前已经有事务存在。
        // 如果有,那么,根据事务的传播行为,是将该事务挂起;还是加入到当前事务;还是以嵌套事务方式,由事务的传播行为决定。
        if (isExistingTransaction(transaction)) {
            // 找到现有事务->检查传播行为以了解行为方式。
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        // 2. 未找到现有事务->检查传播行为以了解如何继续。

        // PROPAGATION_MANDATORY
        // 	1、A 没有事务,而B 事务传播特性定义为:MANDATORY
        // 			那么会抛出No existing transaction found for transaction marked with propagation ‘mandatory’;
        // 			而且需要注意的是:A Method()有数据库操作是正常执行的,不会回滚(因为本身就没有事务),B Method()也不会进行回滚;
        // 			其实应该是B都没有执行,因为在调用B的时候,spring事务会检查到B的注解上面的的参数是 MANDATORY就会抛出相应异常)。
        // 	2、A 开启事务,B 事务传播行为:MANDATORY
        // 			由于A开启事务,B会加入到A开始的事务中运行
        // 	3、A 开启事务并抛出异常,B 事务传播行为:MANDATORY
        // 			A method 和 B method()执行的操作被回滚。
        // 	4、A 开启事务,B 事务传播行为:MANDATORY,抛出异常,A不捕获B的异常
        // 			A method 和 B method()执行的操作被回滚。
        // 	5、A 开启事务,B 事务传播行为:MANDATORY,抛出异常,A捕获B的异常
        // 			A method 和 B method()执行的操作被回滚,并且抛出异常 Transaction rolled back because it has been marked as rollback-only
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        // PROPAGATION_REQUIRED :
        //	表示当前方法必须在一个具有事务的上下文中运行,如果有加入当前事务,如果没有开启一个新的事务。
        //	1、A method() 不开启事务, B method() 事务传播特性:REQUIRED
        //		那么 B method 开启事务。
        //	2、A method() 开启事务, B method() 事务传播行为:REQUIRED
        //		那么B method()会加入到 A的事务中!
        //	3、A method() 开启事务, B method() 事务传播行为:REQUIRED
        //		3.1 如果 B method() 抛出异常,B method()和A method()都会回滚。
        //		3.2 如果 A method 捕获了 B method()抛出的异常,则会出现异常Transaction rolled back because it has been marked as rollback-only。
        //		抛出的一次也说明了 rollback-only 只能回滚,所以 A,B都回滚。
        //
        // PROPAGATION_REQUIRES_NEW:
        // 	表示当前方法需要运行在一个新的事务中,并且 上一个事务会被挂起。
        //
        // PROPAGATION_NESTED
        // 如果存在当前事务,则在嵌套事务中执行,如果不存在当前事务,那么就新开启一个事务。
        //	1、 A method 不存在事务,B事务传播行为NESTED
        //		结果:则B method()运行在一个新的事务中。
        //	2、 A method()存在事务,B事务传播行为NESTED
        //		结果:B method()运行在嵌套事务中,(这就是上面补充说道的,在同一个事务,只是有保存点。)
        //	3、 A 作为外层事务,B 作为内层事务;A 发生异常
        //		结果:A,B都回滚;
        //	4、A 作为外层事务,B 作为内层事务;B 发生异常,A不捕获
        //		结果:A,B都回滚;
        //	5、A 作为外层事务,B 作为内层事务;B 发生异常,A捕获
        //		结果:B method发生异常,B回滚不影响A事务的提交。(也就是B的回滚,只是回滚到了上一个保存点)

        // PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 来到这里是要开启新事务的!!!
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                 definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            // 这里置为null,就是无需挂起
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                // 利用上面的这些参数构建 DefaultTransactionStatus对象 并返回
                DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true,
                                                                       newSynchronization, debugEnabled, suspendedResources);
                // 开启事务;
                // 利用:对应的事务管理器 transaction,事务属性 definition 开启事务。
                doBegin(transaction, definition);
                // 就是将当期事务的状态和属性信息和线程绑定,做到同步,保证线程安全
                prepareSynchronization(status, definition);
                // 构建 DefaultTransactionStatus对象 并返回
                return status;
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            // 创建“空”事务:没有实际事务,但可能是同步。
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                            "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            // 同步
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }
}
  • 大致流程:

    • 1、Object transaction = doGetTransaction(); 获取事务源对象

    • 2、根据事务源判断是否已经存在事务,以及当前方法所设置的事务属性(隔离级别,传播行为)做出相应的处理:

      • 3.2.1、在该方法之前已经有事务存在

        // 1. 在该方法之前已经有事务存在。
        // 如果有,那么,根据事务的传播行为,是将该事务挂起;还是加入到当前事务;还是以嵌套事务方式,由事务的传播行为决定。
        if (isExistingTransaction(transaction)) {
            // 找到现有事务->检查传播行为以了解行为方式。
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }
        
      • 3.2.2、在该方法之前不存在事务:

        • 如果传播行为是:PROPAGATION_MANDATORY

          必须要事务中运行,并且是加入已经存在的事务;如果不存在,就抛出异常。

        • 如果传播行为是:PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED

          则需要开启新事物。

          // 开启事务;
          // 利用:对应的事务管理器 transaction,事务属性 definition 开启事务。
          doBegin(transaction, definition);
          // 就是将当期事务的状态和属性信息和线程绑定,做到同步,保证线程安全
          prepareSynchronization(status, definition);
          
        • 其他的传播行为:

          就不用开启事务了。

重点说下 2.1 handleExistingTransaction(definition, transaction, debugEnabled); 和 2.2 的doBegin(transaction, definition);

3.2.1 handleExistingTransaction(definition, transaction, debugEnabled);

  • 大致流程:(首先进入该方法,代表着当前已经存在事务,然后根据传播行为,做出相应的处理)
    • 如果当前事务方法传播行为为 PROPAGATION_NEVER:不在事务中运行,如果存在事务,直接抛出异常;
    • 如果当前事务方法传播行为为 PROPAGATION_NOT_SUPPORTED:也不在事务中运行,如果当前有事务,则当前方法挂起,先执行事务,后面再执行它;
    • 如果当前事务方法传播行为为 PROPAGATION_REQUIRES_NEW:表示当前方法需要运行在一个新的事务中,并且 上一个事务会被挂起;
    • 如果当前事务方法传播行为为 PROPAGATION_NESTED: 如果存在当前事务,则在嵌套事务中执行,如果不存在当前事务,那么就新开启一个事务;
    • 如果当前事务方法传播行为为 PROPAGATION_SUPPORTS,PROPAGATION_REQUIRED :这种都会将当前方法加入到上一个方法所在的事务中去。
/**
 * 为现有事务创建 TransactionStatus
 * 注意:进入到该方法,说明 在当前方法之前已经存在事务了。
 */
private TransactionStatus handleExistingTransaction(TransactionDefinition definition, Object transaction,
                                                    boolean debugEnabled)
    throws TransactionException {

    // PROPAGATION_NEVER :
    //    表示当方法务不应该在一个事务中运行,如果存在一个事务,则抛出异常;
    //    	 1、A 不开启事务,B 传播行为为:NEVER
    //    		结果:A,B方法都没有在事务中运行
    //    	 2、A 开启事务,B 传播行为为:NEVER
    //    	    结果:抛出异常 IllegalTransactionStateException: Existing transaction found for transaction marked with propagation ‘never’
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        // 进入到if ,就是 2这种情况
        throw new IllegalTransactionStateException(
            "Existing transaction found for transaction marked with propagation 'never'");
    }

    // PROPAGATION_NOT_SUPPORTED:
    //    表示该方法不应该在一个事务中运行。如果有一个事务正在运行,他将在运行期被挂起,直到这个事务提交或者回滚才恢复执行。
    //        1、 A 不开启事务,B 事务传播行为:NOT_SUPPORTED
    //        		结果:B 没有开启事务
    //        2、 A method开启事务,B,事务传播行为:NOT_SUPPORTED
    //        		结果:A method 是在事务中运行的,并且事务是活动状态。
    //        			 B method 则是等 A事务提交结束,以非事务的状态执行。
    //        3、 A 开启事务并且A 抛出异常;B 事务传播行为:NOT_SUPPORTED
    //        		结果:这种属于,A method是在事务中,A 异常会回滚,B由于不在事务中执行,所以会被执行。
    //        		验证:表a没有插入数据库,表b中对应的数据被删除。
    //        4、A 开启事务;B 事务传播行为:NOT_SUPPORTED;B 抛出异常,A不捕获
    //        		结果:A 的insert操作回滚,B 的delete的操作正常执行
    //        5、A 开启事务;B 事务传播行为:NOT_SUPPORTED;B 抛出异常,不捕获
    //        		A的insert操作正常执行,B的delete的操作也正常执行
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        // 进入到if,就可以: 2,3,4,5 这几种情况。
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }

        // 把当前事务挂起
        Object suspendedResources = suspend(transaction);
        // 当前事务是否支持同步
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        // 构建并返回当前事务的状态对象:TransactionStatus
        return prepareTransactionStatus(definition, null, false,
                                        newSynchronization, debugEnabled, suspendedResources);
    }

    // 表示当前方法需要运行在一个新的事务中,并且 上一个事务会被挂起。
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                         definition.getName() + "]");
        }
        // 1、挂起上一个事务
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 返回此事务管理器是否应激活线程绑定事务同步支持。
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 构建 TransactionStatus
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization,
                                                                   debugEnabled, suspendedResources);
            // 2、创建新的事务,让当前方法运行在新创建的事务中
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            //  构建并返回当前事务的状态对象:TransactionStatus
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // PROPAGATION_NESTED
    // 如果存在当前事务,则在嵌套事务中执行,如果不存在当前事务,那么就新开启一个事务。
    //	1、A method 不存在事务,B事务传播行为NESTED
    //		结果:则B method()运行在一个新的事务中。
    //	2、A method()存在事务,B事务传播行为NESTED
    //		结果:B method()运行在嵌套事务中,(这就是上面补充说道的,在同一个事务,只是有保存点。)
    //	3、A 作为外层事务,B 作为内层事务;A 发生异常
    //		结果:A,B都回滚;
    //	4、A 作为外层事务,B 作为内层事务;B 发生异常,A不捕获
    //		结果:A,B都回滚;
    //	5、A 作为外层事务,B 作为内层事务;B 发生异常,A捕获
    //		结果:B method发生异常,B回滚不影响A事务的提交。(也就是B的回滚,只是回滚到了上一个保存点)
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {  // 事务管理器不支持保存点的创建:就是可能有些数据库引擎不支持保存点(嵌套事务)
            throw new NestedTransactionNotSupportedException(
                "Transaction manager does not allow nested transactions by default - " +
                "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {  // 对于嵌套事务需要使用保存点
            // 在现有Spring托管事务中创建保存点,通过TransactionStatus实现的SavepointManager API。
            // 通常使用JDBC 3.0 保存点。从不激活Spring同步。
            DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false,
                                                                       debugEnabled, null);
            // 创建保存点
            status.createAndHoldSavepoint();
            //  构建并返回当前事务的状态对象:TransactionStatus
            return status;
        }
        else { // 来到这里应该是,事务管理器允许创建保存点,但是没有创建成功;那也就是说,当前没有事务;就创建一个新的事务
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            // 通过嵌套的 begin和commit/rollback 调用嵌套事务。
            // 通常仅适用于JTA:此处可能会激活Spring同步,对于预先存在的JTA 事务。
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization,
                                                                   debugEnabled, null);
            // 创建新事务
            doBegin(transaction, definition);
            // 同步
            prepareSynchronization(status, definition);
            //  构建并返回当前事务的状态对象:TransactionStatus
            return status;
        }
    }

    // 事务的传播行为可能是这两种: PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    // SUPPORTS:
    //   1、A 不开启事务,B 事务传播行为:SUPPORTS
    //   	 不会以事务方式运行
    //   2、A 开启事务,B 事务传播行为:SUPPORTS
    //   	 B 会在A开启的事务中运行
    //   3、A 开启事务;A抛出异常,B 事务传播行为:SUPPORTS
    //   	 A method,B method 都会回滚
    //   4、A 开启事务;B 事务传播行为SUPPORTS,抛出异常,A捕获B的异常
    //  	 A method()和B method()都会被回滚,并抛出 UnexpectedRollbackException: Transaction silently rolled back because it has been marked as rollback-only
    //   5、A 开启事务;B 事务传播行为SUPPORTS,抛出异常,A不捕获B的异常
    //   	 A method()和B method()都会被回滚
    //   4.1 A 不开启事务,B 事务传播

    // REQUIRED:
    // 	表示当前方法必须在一个具有事务的上下文中运行,如果有加入当前事务,如果没有开启一个新的事务。
    // 1、A method() 不开启事务, B method() 事务传播特性:REQUIRED
    //	   那么 B method 开启事务。
    // 2、A method() 开启事务, B method() 事务传播行为:REQUIRED
    // 	   那么B method()会加入到 A的事务中!
    // 3、A method() 开启事务, B method() 事务传播行为:REQUIRED
    // 		3.1 如果 B method() 抛出异常,B method()和A method()都会回滚。
    // 		3.2 如果 A method 捕获了 B method()抛出的异常,则会出现异常Transaction rolled back because it has been marked as rollback-only。
    // 抛出的一次也说明了 rollback-only 只能回滚,所以 A,B都回滚。

    // 因为 在来到本方法,本身就说了,在该方法之前,就已经存在事务;
    // 所以,PROPAGATION_SUPPORTS,PROPAGATION_REQUIRED 这种都会将当前方法加入到上一个方法所在的事务中去
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    if (isValidateExistingTransaction()) {
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                           definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                                           (currentIsolationLevel != null ?
                                                            isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                                            "(unknown)"));
            }
        }
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                                           definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 构建并返回当前事务的状态对象:TransactionStatus
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

3.2.2 doBegin(transaction, definition);

这里就是数据库的 begin 阶段方法!!!为事务设置:超时时间,隔离级别这些。

在这里对事务管理器(DataSourceTransactionManager)说明一下:事务的 begin 、commit、rollback、dataSource,connection,这些都在这里对 JDBC进行了封装。

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
	protected abstract void doBegin(Object transaction, TransactionDefinition definition)
			throws TransactionException;
}

// 下面有很多事务开始的实现类,都在对应的事务管理器中:
//   DataSourceTransactionManager,JmsTransactionManager,JpaTransactionManager,JtaTransactionManager等

// 我们这里用的是 DataSourceTransactionManager
public class DataSourceTransactionManager extends AbstractPlatformTransactionManager
		implements ResourceTransactionManager, InitializingBean {
   /**
	 * 开启事务。
	 * @param transaction 对应的事务管理器 transaction
	 * @param definition 事务属性: 传播行为,隔离解绑,是否只读标志,超时时间,事务名称
	 */
	@Override
	protected void doBegin(Object transaction, TransactionDefinition definition) {
		// 数据源事务对象
		DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
		Connection con = null;

		try {
			// 这个就是看当前事务是否绑定了线程,然后根据相应的数据源创建连接。
			if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
				Connection newCon = obtainDataSource().getConnection();
				if (logger.isDebugEnabled()) {
					logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
				}
				txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
			}

			// 将资源标记为与事务同步。
			txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
			// 获取连接
			con = txObject.getConnectionHolder().getConnection();

			// 获取 事务的隔离级别,并且将事务隔离基本设置到 con 数据源连接属性中
			Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
			// 设置事务的隔离级别
			txObject.setPreviousIsolationLevel(previousIsolationLevel);

			// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
			// so we don't want to do it unnecessarily (for example if we've explicitly
			// configured the connection pool to set it already).
			// 如有必要,切换到手动提交。这在一些JDBC驱动程序中非常昂贵,
			// 所以我们不想不必要地这样做(例如,如果我们明确已将连接池配置为已设置)。
			if (con.getAutoCommit()) { // 事务如果是自动提交
				// 将自动提交标志设置为 true
				txObject.setMustRestoreAutoCommit(true);
				if (logger.isDebugEnabled()) {
					logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
				}
				con.setAutoCommit(false);
			}

			// 如果设置了 事务的只读属性,那么这里设置只读属性
			prepareTransactionalConnection(con, definition);
			// 将当前事务状态设置为 true (即:活跃状态)
			txObject.getConnectionHolder().setTransactionActive(true);

			// 设置事务的超时时间
			int timeout = determineTimeout(definition);
			// 如果 自定义了超时时间(就是事务属性设置了超时时间),那么这里给资源连接持有者设置对应的超时时间
			if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
				txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
			}

			// Bind the connection holder to the thread. 将连接持有者绑定到线程上。
			if (txObject.isNewConnectionHolder()) {
				TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
			}
		}

		catch (Throwable ex) {
			if (txObject.isNewConnectionHolder()) {
				DataSourceUtils.releaseConnection(con, obtainDataSource());
				txObject.setConnectionHolder(null, false);
			}
			throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
		}
	}
}

3.4 执行目标方法

try {
    // 下面是一个通知:调用链中的下一个拦截器。
    // 这通常会导致调用目标对象。
    // 4、执行事务增强的方法??? 是的!! 目标方法
    retVal = invocation.proceedWithInvocation();
}

@FunctionalInterface
public interface InvocationCallback {

    @Nullable
    Object proceedWithInvocation() throws Throwable;
}

这里会回到:

org.springframework.aop.framework.ReflectiveMethodInvocation#proceed方法,来到 invokeJoinpoint()

Spring 事务源码执行流程

往下追:org.springframework.aop.framework.ReflectiveMethodInvocation#invokeJoinpoint

接着追: AopUtils.invokeJoinpointUsingReflection(this.target, this.method, this.arguments);

Spring 事务源码执行流程

该方法里面就会用利用反射执行目标方法。

即:执行目标方法 (注意:还没有提交事务)

Spring 事务源码执行流程

3.5 commit(提交事务)

// 5、★在方法后置返回后,提交事务
//   commit阶段
commitTransactionAfterReturning(txInfo);
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        // 事务管理器 commit事务
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

下面的 commit过程大致几种情况:

  • 1、如果当前事务已经完成,就抛出异常,不再执行;
  • 2、如果当前事务的属性是出现异常只能回滚,这里回滚;
  • 3、如果全局事务标记为仅回滚,但事务代码请求commit;回滚;
  • 4、除上述情况外,就 commit

这里面我们主要看 processCommit 方法 和 processRollback 方法。

public abstract class AbstractPlatformTransactionManager implements PlatformTransactionManager, Serializable {
    /**
	 * 此 事务执行或事务回滚的 commit阶段??  处理参与现有事务和编程回滚请求的情况。仅限isRollback、doCommit和rollback。
	 * @see org.springframework.transaction.TransactionStatus#isRollbackOnly()
	 * @see #doCommit
	 * @see #rollback
	 */
    @Override
    public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                //1、事务已完成-不要在每个事务中多次调用提交或回滚
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        //2、如果事务的属性是,出现异常只能回滚,这里回滚。
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            // 这里回滚过程; 出现异常时的回滚
            processRollback(defStatus, false);
            return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                // 全局事务标记为仅回滚,但事务代码请求commit
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            // 这里回滚过程; 为出现异常时的回滚
            processRollback(defStatus, true);
            return;
        }

        // 提交事务过程
        processCommit(defStatus);
    }
}

3.5.1 processCommit

/**
 * Process an actual commit.
 * Rollback-only flags have already been checked and applied.
 * 处理实际的提交。已检查并应用仅回滚标志。
 *
 * 该方法就是 提交事务的 最底层实现方法!!!!!
 *
 * @param status 表示事务的对象
 * @throws TransactionException in case of commit failure
 */
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            prepareForCommit(status);  		// 空方法,相当于一个钩子方法,由子类实现 commit前的处理
            triggerBeforeCommit(status);	// 触发 beforeCommit回调
            triggerBeforeCompletion(status);// 触发 beforeCompletion回调
            beforeCompletionInvoked = true; // 上面步骤完成,前置完成标志 置为 true

            // 当前事务,如果有保存点 (走这个if,事务的传播行为可能是:NESTED,因为只有它,事务才有所谓的保存点)
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                // 为什么要释放该保存点呢,因为都到这里,说明当前已经存在事务;
                // 然后将当前方法加入到事务中,这里事务上一个保存点,下面commit的时候 执行当前方法的事务,会创建新的保存点。
                status.releaseHeldSavepoint();
            }
            // 当前方法是在一个新的事务中执行的,走这个if ,然后执行 doCommit() 提交事务
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                // 将异常回滚标志置为 当前事务所配置的属性
                unexpectedRollback = status.isGlobalRollbackOnly();
                // commit !!!!!
                // 调用 doCommit(status) 的方法只有这一个位置!!!
                doCommit(status);
            }
            // isFailEarlyOnGlobalRollbackOnly() : 如果事务被全局标记为仅回滚,则返回是否提前失败。
            else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            // 如果我们有一个仅全局回滚标记,但仍然没有从提交中获得相应的异常,则抛出UnexpectedRollbackException。
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                    // 事务自动回滚,因为它已被标记为仅回滚
                    "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        // 当前方法可以处理的异常
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit 只能由 doCommit() 引起
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        // 被 spring 事务定义的异常,来这里处理
        catch (TransactionException ex) {
            // can only be caused by doCommit 只能由 doCommit() 引起
            if (isRollbackOnCommitFailure()) {  // 这里是,如果commit已经被标记为失败了回滚时
                // 回滚由 commit 引发的异常
                doRollbackOnCommitException(status, ex);
            }
            else { // 这里是 没有别标记为确定的回滚方式,即:未知的异常 的处理方式
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        // 最终的兜底: 运行时异常和错误,回滚。
        catch (RuntimeException | Error ex) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            // 回滚由 commit 引发的异常
            doRollbackOnCommitException(status, ex);
            throw ex;
        }

        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        try {
            triggerAfterCommit(status);
        }
        finally {
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }

    }
    finally {
        cleanupAfterCompletion(status);
    }
}

该方法里面的 doCommit(status); 就是调用org.springframework.jdbc.datasource.DataSourceTransactionManager#doCommit 执行 commit

没什么好说的~

先介绍两个知识点:

  • 1、该方法我们主要看四个钩子方法:也是事务 前后阶段可以做的事情:

    • beforeCommit:在事务提交之前,例如:可以将事务性O/R映射会话刷新到数据库。(事务提交前的节点,准备工作)
    • afterCommit:在事务提交后调用。可以在主事务成功提交后立即执行进一步的操作。例如:可以在成功提交主事务之后进行的进一步操作,如:确认消息或电子邮件。(在这个阶段,是事务提交之后执行的阶段,事务已经成功提交到数据库
    • beforeCompletion:在事务提交/回滚之前调用。可以在事务完成之前执行资源清理(如:dataSource,Connection和当前线程解绑)。(在这个阶段,事务已经执行完所有的操作,但还没有提交到数据库
    • afterCompletion:在事务提交/回滚后调用。可以在事务完成后执行资源清理。(真正事务完成之后的尾声节点,对资源处理)
  • 2、事务监听器:

    代码太多,可以看我写的 注解demo包中的 com.transactional.eventlistener.TransactionalEventListenerConfig

    这里写一些大致的伪代码:

    • 事务监听器可以作用的四个阶段:(可以看 org.springframework.transaction.event.TransactionPhase 枚举类的类型)

      • TransactionPhase.BEFORE_COMMIT

      • TransactionPhase.AFTER_COMMIT

      • TransactionPhase.AFTER_ROLLBACK

      • TransactionPhase.AFTER_COMPLETION

    • 事务监听器的执行类:(各个阶段的执行,可以看该类中对应的方法)

      • org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter

    所以,在 commit 阶段,事务监听可以作用的阶段:

    • TransactionPhase.BEFORE_COMMITbeforeCommit
    • TransactionPhase.AFTER_COMMITafterCommit
    • TransactionPhase.AFTER_COMPLETIONafterCompletion
    @Component
    public class TransactionalEventListenerConfig {
      
        /**
        * 普通事件监听器
        */
        @EventListener
        public void listen(String str) {
            System.out.println("-----普通事件 : "+ "thread-name : " + Thread.currentThread().getName()+" , event : " +str);
        }
      
        /**
        * 事务回滚后 监听器
        * @param str 发布的事件,要求是String及其兼容类型
        */
        @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
        public void afterRollback(String str) {
            System.out.println("-----AFTER_ROLLBACK : "+ "thread-name : " + 
                               Thread.currentThread().getName()+" , event : " +str);
        }
      
        /**
        * 事务提交前 监听器
        */
        @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
        public void beforeCommit(String str) {
            System.out.println("-----BEFORE_COMMIT : "+ "thread-name : " + 
                               Thread.currentThread().getName()+" , event : " +str);
        }
      
        /**
        * 事务提交后 监听器
        * @param str 发布的事件,要求是String及其兼容类型
        */
        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
        public void afterCommit(String str) {
            System.out.println("-----AFTER_COMMIT : "+ "thread-name : " + 
                               Thread.currentThread().getName()+" , event : " +str);
        }
      
        /**
        * 事务完成后 监听器
        */
        @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
        public void afterCompletion(String str) {
            System.out.println("-----AFTER_COMPLETION : "+ "thread-name : " + 
                               Thread.currentThread().getName()+" , event : " +str);
        }
    }
    

    同时,还要在 service实现类的方法中 : 加上监听器

    @Service
    public class AnnotationTransactionalServiceImpl implements AnnotationTransactionalService{
        @Autowired
        private ApplicationEventPublisher applicationEventPublisher;
      
        @Transactional
        @Override
        public int annotationTransactionalMethod_update(Money money) {
      
            // 在事务中 发布事件
            applicationEventPublisher.publishEvent("注解方式 事务,插入数据流水");
      
            int flag = moneyMapper.insert(money);
            return flag;
        }
    }
    

这四个方法分别对阶段:

  • 1、triggerBeforeCommit(status);beforeCommit

    Spring 事务源码执行流程

    设置了该阶段的监听器方法,就是在这里执行。

  • 2、triggerBeforeCompletion(status);beforeCompletion

    首先,监听器中没有这个阶段的处理,我们进入方法中:

    protected final void triggerBeforeCompletion(DefaultTransactionStatus status) {
        if (status.isNewSynchronization()) {
            if (status.isDebug()) {
                logger.trace("Triggering beforeCompletion synchronization");
            }
            // 事务完成前的 同步回调
            TransactionSynchronizationUtils.triggerBeforeCompletion();
        }
    }
    

    进入:TransactionSynchronizationUtils.triggerBeforeCompletion();

    public static void triggerBeforeCompletion() {
        for (TransactionSynchronization synchronization : TransactionSynchronizationManager.getSynchronizations()) {
            try {
                synchronization.beforeCompletion();
            }
            catch (Throwable tsex) {
                logger.error("TransactionSynchronization.beforeCompletion threw exception", tsex);
            }
        }
    }
    

    TransactionSynchronizationManager.getSynchronizations()里面的列表有4个监听器方法,但是没有beforeCompletion 的处理,只有:

    • org.mybatis.spring.SqlSessionUtils.SqlSessionSynchronization#beforeCompletion

    • org.springframework.jdbc.datasource.DataSourceUtils.ConnectionSynchronization#beforeCompletion

      这两个类的方法里面有, 对应列表中索引 4,5下标。

      我们可以在这两个方法里面都打上断点,发现都会进入;其作用就是上面介绍的:可以在事务完成之前执行资源清理(如:dataSource,Connection和当前线程解绑)。

    Spring 事务源码执行流程

  • 3、triggerAfterCommit(status);afterCommit

    上面截图的 TransactionSynchronizationManager.getSynchronizations()列表类型中,没有一个实现了 afterCommit方法,所以,这里暂不做处理。

  • 4、triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);cleanupAfterCompletion(status);afterCompletion

    • 两个都是 afterCompletion 阶段;

      • 4.1 triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);

        该方法会完成:AFTER_COMMIT,AFTER_COMPLETION两个阶段的处理;

        /**
         * completionStatus 有三种状态:
         *    0 : STATUS_COMMITTED    正确提交时的完成状态
         *    1 : STATUS_ROLLED_BACK  正确回滚时的完成状态
         *    2 : STATUS_UNKNOWN      启发式混合完成或系统错误情况下的完成状态
         */
        private void triggerAfterCompletion(DefaultTransactionStatus status, int completionStatus) {
            if (status.isNewSynchronization()) {  //这说明,只有一个事务方法时,才有这里的 后置处理
                // 获取 当前线程中 事务同步列表
                List<TransactionSynchronization> synchronizations = TransactionSynchronizationManager.getSynchronizations();
                TransactionSynchronizationManager.clearSynchronization();
                if (!status.hasTransaction() || status.isNewTransaction()) {  // 这里说明是一个新开启的事务
                    if (status.isDebug()) {
                        logger.trace("Triggering afterCompletion synchronization");
                    }
                    // 当前范围没有事务或新事务 -> 立即调用afterCompletion回调
                    // 根据 completionStatus的类型,进行后置的处理
                    invokeAfterCompletion(synchronizations, completionStatus);
                }
                else if (!synchronizations.isEmpty()) {  // 事务同步管理器列表list中不为null
                    // 我们参与的、在这个Spring事务管理器范围之外控制的现有事务 -> 尝试向现有(JTA)事务注册完成后回调。
                    registerAfterCompletionWithExistingTransaction(status.getTransaction(), synchronizations);
                }
            }
        }
        
        //  -> 进入到该方法
        protected final void invokeAfterCompletion(List<TransactionSynchronization> synchronizations, 
                                                   int completionStatus) {
            TransactionSynchronizationUtils.invokeAfterCompletion(synchronizations, completionStatus);
        }
        
        public abstract class TransactionSynchronizationUtils {
            public static void invokeAfterCompletion(@Nullable List<TransactionSynchronization> synchronizations,
                                                     int completionStatus) {
                if (synchronizations != null) {
                    for (TransactionSynchronization synchronization : synchronizations) {
                        try {
                            synchronization.afterCompletion(completionStatus);
                        }
                        catch (Throwable tsex) {
                            logger.error("TransactionSynchronization.afterCompletion threw exception", tsex);
                        }
                    }
                }
            }
        }
        

        synchronizations 还是上面的 断点调试时候的6种类型。org.springframework.transaction.event.ApplicationListenerMethodTransactionalAdapter.TransactionSynchronizationEventAdapter#afterCompletion 监听器类型的4种情况,上面的 for 循环遍历,都会进入到下面的 afterCompletion中,有两种情况被被执行:

        • 1、AFTER_COMMIT; Spring 事务源码执行流程

        • 2、AFTER_COMPLETION Spring 事务源码执行流程

        @Override
        public void afterCompletion(int status) {
            if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
                processEvent(); // AFTER_COMMIT
            }
            else if (this.phase == TransactionPhase.AFTER_ROLLBACK && status == STATUS_ROLLED_BACK) {
                processEvent(); // AFTER_ROLLBACK
            }
            else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
                processEvent(); // AFTER_COMPLETION
            }
        }
        
      • 4.2 cleanupAfterCompletion(status);

        • 作用:
          • 1、清除当前事务事务的属性信息:事务名,事务隔离级别,同步信息,活跃状态等;
          • 2、清除当前线程持有的Connection
          • 3、如果当前的事务方法有将其他的事务挂起,这里进行恢复。

OK,processCommit方法完成!!!

3.5.2 processRollback

不在细讲,看看注释就行,大致和 commit 处理流程差不多。

/**
 * 处理实际回滚。已检查完成标志。
 * 该方法就是 回滚的 最底层实现方法!!!!!
 *
 * @param status object representing the transaction
 * @throws TransactionException in case of rollback failure
 */
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;

        try {
            // 回滚前的 回调接口
            triggerBeforeCompletion(status);

            // 是否是保存点,即,当前方法引起的异常,是否存在保存点,即:嵌套事务回滚的这种场景
            // 如果有,就走这个 if
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                // 对保存点进行回滚
                status.rollbackToHeldSavepoint();
            }
            // 如果是一个新事务,那就是直接对整个事务进行回滚。
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                // 回滚
                doRollback(status);
            }
            else {  // 分布式事务场景下的 本地分支事务??? 处理?
                // Participating in larger transaction
                if (status.hasTransaction()) {  // 本地分支存在事务的场景下,回滚
                    // 如果本地事务状态是 只能回滚和 全局事务已经标志失败
                    // 那么,就对该分支进行回滚
                    if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                        }
                        // 回滚
                        doSetRollbackOnly(status);
                    }
                    // 当前方法 不是事务,那就让 调用该方法的事务方法决定,这里只打印状态信息
                    else {
                        if (status.isDebug()) {
                            logger.debug("Participating transaction failed - letting 
                                         transaction originator decide on rollback");
                        }
                    }
                }
                else { // 非事务
                    logger.debug("Should roll back transaction but cannot - no transaction available");
                }
                // Unexpected rollback only matters here if we're asked to fail early
                // 只有当我们被要求提前失败时,意外的回滚才会起作用
                if (!isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = false;
                }
            }
        }
        catch (RuntimeException | Error ex) {  // “|” 是按位的意思,属于运位算符。
            // 回滚过程中,出现异常
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            throw ex;
        }

        // 回滚完成之后的 同步处理
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

        // Raise UnexpectedRollbackException if we had a global rollback-only marker
        if (unexpectedRollback) {
            throw new UnexpectedRollbackException(
                "Transaction rolled back because it has been marked as rollback-only");
        }
    }
    finally {
        // 完成之后的 清除同步
        cleanupAfterCompletion(status);
    }
}

四、总结

​ 至此,事务的源码流程就走完了,还是比较简单的。 ​ 复杂的是,整个事务处理过程,对于异常类型的处理,回滚,各种类型事务管理器的适配。

​ 从里面我们可以学到:事务管理器的各种实现策略;监听事务事件的异步处理;还有最最大的AOP应用就是事务的实现!

转载自:https://juejin.cn/post/7389913087481413632
评论
请登录