Spring本地事务回调
背景
在项目中,会经常遇到一种场景:
而在代码里,通常用的是声明式事务
@Transaction
public void test(){
dosometing();
//通知其他业务系统
sendMessage();
}
那实际上,很有可能出现发送完消息后,事务回滚了,导致了数据不一致性。
原理
把关注点TransactionAspectSupport#commitTransactionAfterReturning
方法中
进入到AbstractPlatformTransactionManager#commit
方法,进入到processCommit
方法中。
这个几个地方就是对Spring 事务的回调:
- beforeCommit 事务提交前回调
- beforeCompletion 事务完成前回调
- afterCommit 事务提交后回调
- afterCompletion 事务完成后回调
点进其中一个看一下
调用的是TransactionSynchronization
(事务同步器)接口的beforecommit
方法(这个方法会抛出异常,导致事务回滚)。
在这个方法中方法如上,其他方法可以看文末参考资料。
示例
/**
* 事务工具类
*/
@Configuration
@Slf4j
public class TransactionUtils implements BeanPostProcessor {
/**
* spring线程池缓存
*/
private static Map<String, Executor> EXECUTOR_MAP = new ConcurrentHashMap<String, Executor>(16);
@FunctionalInterface
public interface TaskDo{
/**
* 执行方法 没有返回值
* @throws Exception
*/
void taskDo() throws Exception;
}
/**
* 注册一个默认事务线程池
* @return
*/
@Bean(DEFAULT_EXECUTOR)
public Executor transactionDefault(){
//返回一个线程池
}
/**
* 事务成功之后执行
* @param taskDo
*/
public static void doAfterTransaction(TaskDo taskDo){
doAfterTransaction(DEFAULT_EXECUTOR, taskDo);
}
/**
* 事务成功之后执行
* @param executorName spring中线程池bean名称
* @param taskDo
*/
public static void doAfterTransaction(String executorName, TaskDo taskDo){
//如果存在事务,则等待事务执行完成执行
if (TransactionSynchronizationManager.isActualTransactionActive()){
Executor executor = getExecutorByMap(executorName);
TransactionSynchronizationManager.registerSynchronization(new DoTransactionCompletion(executor, taskDo));
} else {
log.info("不存在事务,异步执行");
getExecutorByMap(executorName).execute(() -> {
try {
taskDo.taskDo();
} catch (Exception e) {
log.error("异步回调异常", e);
}
});
}
}
/**
* 根据线程池bean名称获得线程池
* @param executorName
* @return
*/
private static Executor getExecutorByMap(String executorName){
if (EXECUTOR_MAP.isEmpty()){
throw new Exception("线程缓存为空");
}
//如果线程名称为空获取默认线程池
if (StringUtils.isEmpty(executorName)){
executorName = DEFAULT_EXECUTOR;
}
if (!EXECUTOR_MAP.containsKey(executorName)){
throw new Exception(executorName + "线程不存在");
}
return EXECUTOR_MAP.get(executorName);
}
/**
* bean初始化后置接口 把类型是线程类型的放入缓存
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof Executor){
EXECUTOR_MAP.put(beanName, (Executor)bean);
}
return bean;
}
}
/**
* 事务回调接口
*/
@Slf4j
class DoTransactionCompletion implements TransactionSynchronization {
private TransactionUtils.TaskDo taskDo;
private Executor executor;
public DoTransactionCompletion(Executor executor ,TransactionUtils.TaskDo taskDo) {
this.taskDo = taskDo;
this.executor = executor;
}
@Override
public void afterCompletion(int status){
//如果事务执行成功了
if (TransactionSynchronization.STATUS_COMMITTED == status){
//提交异步方法
executor.execute(() -> {
try {
taskDo.taskDo();
} catch (Exception e) {
log.error("事务完成异步回调异常", e);
}
});
} else {
log.info("事务执行状态:" + status + ",不执行异步回调方法");
}
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void flush() {}
}
这里封装了一个工具类TransactionUtils
, 这里的思路如下:
-
实现
BeanPostProcessor
接口,重写postProcessAfterInitialization
bean初始化后方法,获取在Spring中声明的线程池bean,放入缓存中。 -
声明一个类,实现
TransactionSynchronization
, 重写afterCompletion
事务回调完成方法,当状态时事务提交成功,则启用的一个线程去执行调用者的方法。
这里有一点,就是线程池的异常处理问题,因为我把spring的线程池单独拿出来用了,无法受spring asny注解的统一管理(用Spring AOP实现的)
即使写了异常处理器也应用不了。 解决思路有三个
1.try cath 捕获 2. 新建一个ThreadFactory
- 重写线程次afterExecute方法,但是Spring的
org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
和java的线程池类不是一个,也没有继承或者实现关系(这块是我自己看的,详细看看文末参考)。
这里选择最简单try cath。
其他方式
也可以使用@TransactionalEventListener
,具体怎么样没有研究。待续。
参考
参考文章,写的非常好
Spring 源码分析衍生篇十三 :事务扩展机制 TransactionSynchronization_猫吻鱼的博客-CSDN博客_transactionsynchronization
转载自:https://juejin.cn/post/7166157691679670286