likes
comments
collection
share

使用Spring事务监听机制解决异步事务问题

作者站长头像
站长
· 阅读数 1

业务场景

业务系统先生成申请单application, 当申请单提交后,会生成办件order, 然后需要根据申请单application数据和办件order信息制作一份文件PDF。 由于文件PDF较复杂,且该业务上存在多份不同的业务的PDF,故项目决定将该功能做成通用的,通过配置SQL实现模板数据的自动填充。由于制作文件的流程一般耗时较长(freemaker模板),一般选择异步完成。

使用Spring事务监听机制解决异步事务问题 为下文方便描述,简单分以下步骤

步骤1. 开启主线程事务(spring @transactional事务管理)
步骤2. 主线程处理申请单数据和办件数据
步骤3. 发起制作文件(异步处理)
步骤4. 提交事务

这个就存在一个问题,当业务系统在调起动态脚本想要制作文件时,主线程的事务是还没有提交的,那么异步开启线程去查询当前申请单和办件信息时时查询不到的。

解决思路

  1. 延迟等待主线程提交后,再去执行步骤3的操作
  2. 使用MQ延迟队列执行步骤3,上述步骤3只需要往MQ发送消息即可 延迟一段时间后执行(主线程事务一般都已经提交了)
  3. 使用rocketMQ的事务消息机制,绑定主线程事务一起提交,当主线程事务提交后,MQ中的消息才能被消费者消费
  4. 使用spring的事务监听机制 ....

分析

上述方案东拼西凑的大概也能解决一般的业务需要,因为业务系统本身使用用户不多,且对数据一致性要求也没那么高。各个方案各有优劣,看具体项目使用,没有最好的技术,只有最合适的应用场景嘛。我们挑选方案4去实现。

缺陷

  1. 没有补偿机制,主线程提交后,一旦子线程制作文件失败,不能自动重试
  2. 占用线程资源,需要做好线程池管控

实现步骤

  1. 主线程发布消息
    @Autowired private ApplicationEventPublisher applicationEventPublisher;
    
    public void publishMakeFileEvent(Long applicaitonId, Long orderId){
        applicationEventPublisher.publishEvent(new MyAfterTransactionEvent(this, "我是和事务相关的事件,请事务提交后执行我~~~", applicaitonId, orderId));
    }

  1. 开启线程监听
@Slf4j 
@Component 
private class MyTransactionListener { 
    
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) 
    private void onHelloEvent(MyAfterTransactionEvent event) { 
        // 执行制作文件相关事务操作
        // 这里只有当主线程事务提交后才会开始执行哦 !!!
        log.info("event info:{}"", event); 
    } 
}

  1. 定义监听类
// 定一个事件,继承自ApplicationEvent 
private class MyAfterTransactionEvent extends ApplicationEvent { 
    private String desc; 
    private Long applicationId;
    private Long orderId; 
    public MyAfterTransactionEvent(Object source, String desc, Long applicationId, Long orderId) { 
        super(source);
        this.desc = desc;
        this.applicationId = applicationId;
        this.orderId = orderId;
    } 
    
}

多学习一点点

本质上Spring事务监听机制采用了监听器模式。

什么是监听器模式?

Spring事件机制原理分析

Spring事件机制涉及四个重要类:

  1. ApplicationEvent:事件对象,继承自JDK的类EventObject,可以携带事件的时间戳。 使用Spring事务监听机制解决异步事务问题 ApplicationEvent有個子類ApplicationContextEvent,他又有四個子類:
  • ContextStartedEvent:当Spring容器context启动后触发
  • ContextRefreshEvent: 当Spring容器context刷新时触发
  • ContextClosedEvent: 当Spring容器停止时触发
  • ContextStopedEvent: 当容器context停止时触发 当每个事件触发时,相关的监听器就会监听到对应的事件,然后触发onApplicationEvent方法执行。
  1. ApplicationListener: 事件监听器,继承自JDK的EventListener接口, 该接口被所有的事件监听器实现。 查看spring实现的一个广播事件逻辑:
public class SimpleApplicationEventMulticaster extends AbstractApplicationEventMulticaster {  
  
@Nullable  
private Executor taskExecutor;  
  
@Nullable  
private ErrorHandler errorHandler;  
  
  
/**  
* Create a new SimpleApplicationEventMulticaster.  
*/  
public SimpleApplicationEventMulticaster() {  
}  
  
/**  
* Create a new SimpleApplicationEventMulticaster for the given BeanFactory.  
*/  
public SimpleApplicationEventMulticaster(BeanFactory beanFactory) {  
setBeanFactory(beanFactory);  
}  
  
  
/**  
* Set a custom executor (typically a {@link org.springframework.core.task.TaskExecutor})  
* to invoke each listener with.  
* <p>Default is equivalent to {@link org.springframework.core.task.SyncTaskExecutor},  
* executing all listeners synchronously in the calling thread.  
* <p>Consider specifying an asynchronous task executor here to not block the  
* caller until all listeners have been executed. However, note that asynchronous  
* execution will not participate in the caller's thread context (class loader,  
* transaction association) unless the TaskExecutor explicitly supports this.  
* @see org.springframework.core.task.SyncTaskExecutor  
* @see org.springframework.core.task.SimpleAsyncTaskExecutor  
*/  
public void setTaskExecutor(@Nullable Executor taskExecutor) {  
this.taskExecutor = taskExecutor;  
}  
  
/**  
* Return the current task executor for this multicaster.  
*/  
@Nullable  
protected Executor getTaskExecutor() {  
return this.taskExecutor;  
}  
  
/**  
* Set the {@link ErrorHandler} to invoke in case an exception is thrown  
* from a listener.  
* <p>Default is none, with a listener exception stopping the current  
* multicast and getting propagated to the publisher of the current event.  
* If a {@linkplain #setTaskExecutor task executor} is specified, each  
* individual listener exception will get propagated to the executor but  
* won't necessarily stop execution of other listeners.  
* <p>Consider setting an {@link ErrorHandler} implementation that catches  
* and logs exceptions (a la  
* {@link org.springframework.scheduling.support.TaskUtils#LOG_AND_SUPPRESS_ERROR_HANDLER})  
* or an implementation that logs exceptions while nevertheless propagating them  
* (e.g. {@link org.springframework.scheduling.support.TaskUtils#LOG_AND_PROPAGATE_ERROR_HANDLER}).  
* @since 4.1  
*/  
public void setErrorHandler(@Nullable ErrorHandler errorHandler) {  
this.errorHandler = errorHandler;  
}  
  
/**  
* Return the current error handler for this multicaster.  
* @since 4.1  
*/  
@Nullable  
protected ErrorHandler getErrorHandler() {  
return this.errorHandler;  
}  
  
  
@Override  
public void multicastEvent(ApplicationEvent event) {  
multicastEvent(event, resolveDefaultEventType(event));  
}  
  
@Override  
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {  
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));  
Executor executor = getTaskExecutor();  
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {  
if (executor != null) {  
executor.execute(() -> invokeListener(listener, event));  
}  
else {  
invokeListener(listener, event);  
}  
}  
}  
  
private ResolvableType resolveDefaultEventType(ApplicationEvent event) {  
return ResolvableType.forInstance(event);  
}  
  
/**  
* Invoke the given listener with the given event.  
* @param listener the ApplicationListener to invoke  
* @param event the current event to propagate  
* @since 4.1  
*/  
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {  
ErrorHandler errorHandler = getErrorHandler();  
if (errorHandler != null) {  
try {  
doInvokeListener(listener, event);  
}  
catch (Throwable err) {  
errorHandler.handleError(err);  
}  
}  
else {  
doInvokeListener(listener, event);  
}  
}  
  
@SuppressWarnings({"rawtypes", "unchecked"})  
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {  
try {  
listener.onApplicationEvent(event);  
}  
catch (ClassCastException ex) {  
String msg = ex.getMessage();  
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {  
// Possibly a lambda-defined listener which we could not resolve the generic event type for  
// -> let's suppress the exception and just log a debug message.  
Log logger = LogFactory.getLog(getClass());  
if (logger.isTraceEnabled()) {  
logger.trace("Non-matching event type for listener: " + listener, ex);  
}  
}  
else {  
throw ex;  
}  
}  
}  
  
private boolean matchesClassCastMessage(String classCastMessage, Class<?> eventClass) {  
// On Java 8, the message starts with the class name: "java.lang.String cannot be cast..."  
if (classCastMessage.startsWith(eventClass.getName())) {  
return true;  
}  
// On Java 11, the message starts with "class ..." a.k.a. Class.toString()  
if (classCastMessage.startsWith(eventClass.toString())) {  
return true;  
}  
// On Java 9, the message used to contain the module name: "java.base/java.lang.String cannot be cast..."  
int moduleSeparatorIndex = classCastMessage.indexOf('/');  
if (moduleSeparatorIndex != -1 && classCastMessage.startsWith(eventClass.getName(), moduleSeparatorIndex + 1)) {  
return true;  
}  
// Assuming an unrelated class cast failure...  
return false;  
}  
  
}
  • 首先根据事件类型,获取事件监听器列表
  • 遍历监听器列表
  • 判断是否有线程池,如果存在则在线程池执行
  1. ApplicationEventMulticaster: 事件管理者,管理监听器和发布事件,ApplicationContext通过委托ApplicationEventMulticaster来发布事件。
  2. ApplicationEventPublisher: 事件发布者

Spring事务监听机制同样有以下几种模式:

@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@EventListener 
public @interface TransactionalEventListener {
    // 这个取值有:BEFORE_COMMIT、AFTER_COMMIT、AFTER_ROLLBACK、AFTER_COMPLETION
    TransactionPhase phase() default TransactionPhase.AFTER_COMMIT;

    // 若没有事务的时候,对应的event默认值为false, 事务就不执行
    boolean fallbackExecution() default false;

    @AliasFor(annotation = EventListener.class, attribute = "classes")
    Class<?>[] value() default {};
    
    @AliasFor(annotation = EventListener.class, attribute = "classes")
    Class<?>[] classes() default {};

    String condition() default "";
}

总结

  1. 事务监听机制就是事件监听机制就是监听器模式
  2. JDK本身就提供了事件监听机制
  3. Spring的事件机制也是基于JDK来扩展的
转载自:https://juejin.cn/post/7342702838461972519
评论
请登录