likes
comments
collection
share

Spring本地事务回调

作者站长头像
站长
· 阅读数 89

背景

在项目中,会经常遇到一种场景:

开启事务
执行业务方法
事务完成
通知其他业务系统

而在代码里,通常用的是声明式事务

@Transaction
public void test(){
    dosometing();
    //通知其他业务系统
    sendMessage();
}

那实际上,很有可能出现发送完消息后,事务回滚了,导致了数据不一致性。

原理

Spring本地事务回调

把关注点TransactionAspectSupport#commitTransactionAfterReturning方法中

Spring本地事务回调

进入到AbstractPlatformTransactionManager#commit方法,进入到processCommit方法中。

Spring本地事务回调

这个几个地方就是对Spring 事务的回调:

  1. beforeCommit 事务提交前回调
  2. beforeCompletion 事务完成前回调
  3. afterCommit 事务提交后回调
  4. afterCompletion 事务完成后回调

点进其中一个看一下

Spring本地事务回调

调用的是TransactionSynchronization(事务同步器)接口的beforecommit方法(这个方法会抛出异常,导致事务回滚)。

Spring本地事务回调

在这个方法中方法如上,其他方法可以看文末参考资料。

示例

/**
 * 事务工具类
 */
@Configuration
@Slf4j
public class TransactionUtils implements BeanPostProcessor {

    /**
     * spring线程池缓存
     */
    private static Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<String, Executor>(16);

    @FunctionalInterface
    public interface TaskDo{
        /**
         * 执行方法 没有返回值
         * @throws Exception
         */
        void taskDo() throws Exception;
    }


    /**
     * 注册一个默认事务线程池
     * @return
     */
    @Bean(DEFAULT_EXECUTOR)
    public Executor transactionDefault(){
        //返回一个线程池
    }

    /**
     * 事务成功之后执行
     * @param taskDo
     */
    public static void doAfterTransaction(TaskDo taskDo){
        doAfterTransaction(DEFAULT_EXECUTOR, taskDo);
    }

    /**
     * 事务成功之后执行
     * @param executorName spring中线程池bean名称
     * @param taskDo
     */
    public static void doAfterTransaction(String executorName, TaskDo taskDo){
        //如果存在事务,则等待事务执行完成执行
        if (TransactionSynchronizationManager.isActualTransactionActive()){
            Executor executor = getExecutorByMap(executorName);
            TransactionSynchronizationManager.registerSynchronization(new DoTransactionCompletion(executor, taskDo));
        } else {
            log.info("不存在事务,异步执行");
            getExecutorByMap(executorName).execute(() -> {
                try {
                    taskDo.taskDo();
                } catch (Exception e) {
                    log.error("异步回调异常", e);
                }
            });
        }
    }

    /**
     * 根据线程池bean名称获得线程池
     * @param executorName
     * @return
     */
    private static Executor getExecutorByMap(String executorName){
        if (EXECUTOR_MAP.isEmpty()){
            throw new Exception("线程缓存为空");
        }
        //如果线程名称为空获取默认线程池
        if (StringUtils.isEmpty(executorName)){
            executorName = DEFAULT_EXECUTOR;
        }
        if (!EXECUTOR_MAP.containsKey(executorName)){
            throw new Exception(executorName + "线程不存在");
        }
        return EXECUTOR_MAP.get(executorName);
    }

    /**
     * bean初始化后置接口 把类型是线程类型的放入缓存
     * @param bean
     * @param beanName
     * @return
     * @throws BeansException
     */
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof Executor){
            EXECUTOR_MAP.put(beanName, (Executor)bean);
        }
        return bean;
    }
}

/**
 * 事务回调接口
 */
@Slf4j
class DoTransactionCompletion implements TransactionSynchronization {

    private TransactionUtils.TaskDo taskDo;

    private Executor executor;

    public DoTransactionCompletion(Executor executor ,TransactionUtils.TaskDo taskDo) {
        this.taskDo = taskDo;
        this.executor = executor;
    }

    @Override
    public void afterCompletion(int status){
        //如果事务执行成功了
        if (TransactionSynchronization.STATUS_COMMITTED == status){
            //提交异步方法
            executor.execute(() -> {
                try {
                    taskDo.taskDo();
                } catch (Exception e) {
                    log.error("事务完成异步回调异常", e);
                }
            });
        } else {
            log.info("事务执行状态:" + status + ",不执行异步回调方法");
        }
    }

    @Override
    public int getOrder() {
        return Ordered.LOWEST_PRECEDENCE;
    }

    @Override
    public  void flush() {}
}

这里封装了一个工具类TransactionUtils, 这里的思路如下:

  1. 实现BeanPostProcessor接口,重写postProcessAfterInitializationbean初始化后方法,获取在Spring中声明的线程池bean,放入缓存中。

  2. 声明一个类,实现TransactionSynchronization, 重写afterCompletion事务回调完成方法,当状态时事务提交成功,则启用的一个线程去执行调用者的方法。

这里有一点,就是线程池的异常处理问题,因为我把spring的线程池单独拿出来用了,无法受spring asny注解的统一管理(用Spring AOP实现的)

Spring本地事务回调

即使写了异常处理器也应用不了。 解决思路有三个

1.try cath 捕获 2. 新建一个ThreadFactory

Spring本地事务回调

  1. 重写线程次afterExecute方法,但是Spring的org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor和java的线程池类不是一个,也没有继承或者实现关系(这块是我自己看的,详细看看文末参考)。

这里选择最简单try cath。

其他方式

也可以使用@TransactionalEventListener,具体怎么样没有研究。待续。

参考

参考文章,写的非常好

Spring 源码分析衍生篇十三 :事务扩展机制 TransactionSynchronization_猫吻鱼的博客-CSDN博客_transactionsynchronization

线程池中线程抛了异常如何处理?_知识分子_的博客-CSDN博客_线程池抛出异常

转载自:https://juejin.cn/post/7166157691679670286
评论
请登录