likes
comments
collection
share

Spring Event使用和实现原理

作者站长头像
站长
· 阅读数 1

Spring Event使用和实现原理

铿然架构  |  作者  /  铿然一叶 这是  铿然架构  的第 115  篇原创文章

1. 介绍

事件驱动是一种很常用且重要的设计模式,它通过发布/订阅方式实现了生产者和消费者之间的解耦。

对于事件驱动模式的关注点有:

● 事件队列

是否有事件队列来排队,队列是有界还是无界?

● 任务队列

是否有任务队列?支持多大并发?

● 同步/异步

事件处理是同步还是异步,会不会阻塞?

● 持久化

排队中的事件是否支持持久化,进程挂掉后是否会丢失?

本章我们将带着这些问题去分析Spring Event支持的能力,以及核心实现逻辑。

2. 核心类结构

Spring Event使用和实现原理

描述
ApplicationEvent应用事件类,要发布的事件需要继承此类。
ApplicationListener事件监听接口,实现此接口才能接收到发布的事件,此为监听事件的方式1。
ApplicationEventPublisher事件发布者接口,要通过它的子类发布事件。
ApplicationEventPublisherAware事件发布者aware,实现此接口可以获得事件发布者实例,通过事件发布者实例发布事件。
SimpleApplicationEventMulticaster系统默认的事件广播者,所有事件都通过它广播出去,这样事件监听者才能收到事件。
ErrorHandler事件接收异常处理类,处理“事件监听者收到事件并处理时”抛出的异常。
Executor事件监听者异步调用接口,通过它的实例异步调用事件监听者要执行的动作,避免阻塞,此为异步调用实现方式1。
AbstractApplicationContext抽象应用上下文,实现了事件发布者接口,并调用SimpleApplicationEventMulticaster广播事件。
@EventListener事件监听注解,作用于类的方法上,实现事件监听,此为事件监听方式2
@Async异步调用注解,作用在@EventListener注解的事件监听方法上,使得可以异步调用,此为异步调用实现方式2。
@EnableAsync使能异步调用注解,spring中@Async注解修饰的方法,均需要在启动类上使用@EnableAsync注解才有效。
AsyncAnnotationAdvisor支持Async注解的Advisor。
AnnotationAsyncExecutionInterceptor@Async注解拦截器,获取该注解上的Executor bean名称,决定一步调用时使用哪个Executor bean实例。
AsyncExecutionInterceptor异步处理拦截器,完成异步调用的核心逻辑。
ThreadPoolTaskExecutor系统默认的异步调用执行者,如果@Async注解没有指定,则使用它。

3. 验证场景

3.1 基本场景

通过编码方式实现事件通知。

3.1.1 类结构

Spring Event使用和实现原理

描述
PayWagesEvent发薪事件
PayWagesNotifier发薪事件监听者
PayWagesService发薪事件发布者

3.1.2 代码

3.1.2.1 PayWagesEvent

发薪事件类,需要继承ApplicationEvent类:

public class PayWagesEvent extends ApplicationEvent {

    private PayWages payWages;

    public PayWagesEvent(PayWages source) {
        super(source);
        this.payWages = source;
    }

    public PayWages getPayWages() {
        return payWages;
    }
}

3.1.2.2 PayWagesNotifier

发薪事件监听者,需要实现ApplicationListener接口:

@Component
public class PayWagesNotifier implements ApplicationListener<PayWagesEvent> {
    @Override
    public void onApplicationEvent(PayWagesEvent event) {
        System.out.println("received PayWagesEvent, payWages: " + event.getPayWages().toString());
    }
}

3.1.2.3 PayWagesService

发薪事件发布者,发布发薪事件,需要实现ApplicationEventPublisherAware接口,以获取ApplicationEventPublisher实例来发布事件:

@Service
public class PayWagesService implements ApplicationEventPublisherAware {

    private ApplicationEventPublisher publisher;

    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }

    public void exec(int amount) {
        publisher.publishEvent(new PayWagesEvent(new PayWages(amount)));
    }
}

3.1.2.4 PayWagesController

触发服务调用:

@RestController
@RequestMapping(path = "/payWagesController", method = {RequestMethod.GET})
public class PayWagesController {

    @Autowired
    private PayWagesService service;

    @RequestMapping("/exec")
    public void exec() {
        int amount = 100;
        service.exec(amount);
    }
}

3.1.3 验证

发起请求,打印日志如下:

received PayWagesEvent, payWages: PayWages(amount=100)

事件发布和监听处理成功。

3.2 同步逻辑验证

验证默认事件处理是同步还是异步。

3.2.1 代码

3.2.1.1 PayWagesNotifierBlock

模拟收到事件后的耗时处理,休眠3秒:

@Component
public class PayWagesNotifierBlock implements ApplicationListener<PayWagesEventBlock> {
    @Override
    public void onApplicationEvent(PayWagesEventBlock event) {
        System.out.println("received PayWagesEventBlock, payWages: " + event.getPayWages().toString());
        pause(3);
    }

    private void pause(long seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

3.2.1.2 PayWagesService

循环发布多次事件,并记录耗时时间,验证是否会阻塞:

@Service
public class PayWagesService implements ApplicationEventPublisherAware {

    public void blockExec(int amount) {
        long start = System.nanoTime();
        for (int i = 0; i < 3; i++) {
            System.out.println("publish Event PayWagesEventBlock: " + i + ", amount: " + amount);
            publisher.publishEvent(new PayWagesEventBlock(new PayWages(amount)));

            long costTime = (System.nanoTime() - start) / 1000 / 1000 / 1000;
            System.out.println("cost time: " + costTime);
        }
    }
}

3.2.2 验证

发起请求,打印日志如下:

publish Event PayWagesEventBlock: 0, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 3
publish Event PayWagesEventBlock: 1, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 6
publish Event PayWagesEventBlock: 2, amount: 500
received PayWagesEventBlock, payWages: PayWages(amount=500)
cost time: 9

可以看到每次事件发布间隔3秒,即下一次事件发布要等上一次已发布事件处理完,说明默认多个事件之间会同步等待,而在实际业务中同步阻塞处理方式可能不是我们想要的,需要改成异步处理方式。

3.2.3 同步处理原因分析

SimpleApplicationEventMulticaster类中广播事件处理时,判断了是否存在Executor实例,存在就异步处理,否则同步处理:

	@Override
	public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			// 异步处理
			if (executor != null && listener.supportsAsyncExecution()) {
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
				// 同步处理
				invokeListener(listener, event);
			}
		}
	}

而在AbstractApplicationContext中初始化事件广播处理者时会先判断是否有定制的事件广播处理者,如果没有则会创建SimpleApplicationEventMulticaster实例,但此时不会传入Executor实例,所以默认是同步模式:

	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isTraceEnabled()) {
				logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			// 创建实例,没有传入Executor和ErrorHandler
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isTraceEnabled()) {
				logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
						"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
			}
		}
	}

3.3 定制事件广播类实现异步处理

上个场景验证了同步处理逻辑,并解释了其原理,这回就按照该原理修改为异步处理方式。

3.3.1 代码

3.3.1.1 MulticasterConfigure

定制事件广播类,bean的name必须设置为spring默认的事件广播bean名称,及“applicationEventMulticaster”,同时注入一个异步任务执行者,以实现异步处理:

@Configuration
public class MulticasterConfigure {

    @Bean(name = "applicationEventMulticaster") // bean名称必须是这个,这是默认名称
    public SimpleApplicationEventMulticaster simpleApplicationEventMulticaster() {
        SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
        // 添加异步任务执行者,即可实现异步调用
        SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor();
        eventMulticaster.setTaskExecutor(taskExecutor);
        return eventMulticaster;
    }
}

除此之外,不用修改其他任何类。

3.3.2 验证

参考前面3.2同步阻塞的例子,再次发起请求,输出日志如下:

publish Event PayWagesEventBlock: 0, amount: 500
cost time: 0
publish Event PayWagesEventBlock: 1, amount: 500
cost time: 0
publish Event PayWagesEventBlock: 2, amount: 500
cost time: 0
received PayWagesEventBlock, payWages: PayWages(amount=500)
received PayWagesEventBlock, payWages: PayWages(amount=500)
received PayWagesEventBlock, payWages: PayWages(amount=500)

可以看到,多次发布事件之间耗时为0,没有阻塞,验证通过。

3.4 注解+异步

前面都是通过编码方式实现事件监听和异步处理,本场景验证通过注解方式来实现。

3.4.1 代码

3.4.1.1 PayWagesNotifierAsync

在方法上加了Async和EventListener注解,前者表示异步执行,后者表示监听事件,监听的事件为方法输入参数:

@Component
public class PayWagesNotifierAsync {
    @EventListener
    @Async
    public void onEvent(PayWagesEventAsync event) {
        System.out.println("received async PayWagesEventAsync, payWages: " + event.getPayWages().toString());
    }
}

3.4.1.2 启动类

启动类上必须加上EnableAsync注解,前面的Async注解才会起作用:

@EnableAsync
@SpringBootApplication
public class StartupApplication {
    public static void main(String[] args) {
        SpringApplication.run(StartupApplication.class, args);
    }
}

3.4.2 验证

发起请求,打印日志如下:

publish Event PayWagesEventAsync: 0, amount: 300
cost time: 0
publish Event PayWagesEventAsync: 1, amount: 300
cost time: 0
publish Event PayWagesEventAsync: 2, amount: 300
cost time: 0
received PayWagesEventAsync, payWages: PayWages(amount=300)
received PayWagesEventAsync, payWages: PayWages(amount=300)
received PayWagesEventAsync, payWages: PayWages(amount=300)

可以看到多次发布事件之间耗时为0,没有阻塞,验证通过。

3.4.3 原理

添加Async注解后会被AsyncExecutionInterceptor拦截: Spring Event使用和实现原理 另外可以看到此时Executor是ThreadPoolTaskExecutor。

接着在AsyncExecutionAspectSupport类中异步提交事件处理任务: Spring Event使用和实现原理 通过上述操作实现了异步处理。

所以,编码方式和注解方式的异步处理是两个不同的处理流程。

4. 核心逻辑

需要关注的核心逻辑包括如下几点:

● 事件广播者的创建和处理逻辑

● 事件监听者注册逻辑

● 事件发布逻辑

● Async注解的方法如何被异步调用

4.1 事件广播者

4.1.1 实例化

AbstractApplicationContext.java

先通过beanFactory获取“applicationEventMulticaster”,获取不到则new一个默认的SimpleApplicationEventMulticaster实例,new的时候没有传入Executor和ErrorHandler。

	protected void initApplicationEventMulticaster() {
		ConfigurableListableBeanFactory beanFactory = getBeanFactory();
		// 先通过beanFactory获取
		if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
			this.applicationEventMulticaster =
					beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
			if (logger.isTraceEnabled()) {
				logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
			}
		}
		else {
			// 创建实例,没有传入Executor和ErrorHandler
			this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
			beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
			if (logger.isTraceEnabled()) {
				logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
						"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
			}
		}
	}

因此,正如前面的示例,可以定制实现事件广播者并传入定制的Executor和ErrorHandler实现类。

4.1.2 事件广播处理

SimpleApplicationEventMulticaster.java

前面例子也提到了,处理时如果存在Executor且listener支持异步处理则异步调用,否则是同步调用:

	public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
		ResolvableType type = (eventType != null ? eventType : ResolvableType.forInstance(event));
		Executor executor = getTaskExecutor();
		for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
			// 异步处理
			if (executor != null && listener.supportsAsyncExecution()) {
				executor.execute(() -> invokeListener(listener, event));
			}
			else {
				// 同步处理
				invokeListener(listener, event);
			}
		}
	}

ApplicationListener接口中默认支持异步处理,因此只要实现类不重载接口方法都支持异步处理:

	default boolean supportsAsyncExecution() {
		return true;
	}

4.2 事件监听者注册逻辑

4.2.1 相关类

核心类结构如下:

Spring Event使用和实现原理

描述
ApplicationListenerDetector识别出实现了ApplicationListener的listener bean,通过AbstractApplicationContext添加到SimpleApplicationEventMulticaster中。
EventListenerMethodProcessor识别EventListener注解的方法,通过DefaultEventListenerFactory创建ApplicationListenerMethodAdapter,添加到SimpleApplicationEventMulticaster中。
DefaultEventListenerFactoryApplicationListener创建工厂,将EventListener注解的方法生成ApplicationListenerMethodAdapter类。
ApplicationListenerMethodAdapter根据EventListener注解的方法生成的类,记录了EventListener注解的类和方法等信息,实现了ApplicationListener接口。
SimpleApplicationEventMulticaster事件广播者,将事件广播出去,让事件监听者处理。

4.2.2 代码

4.2.2.1 ApplicationListenerDetector

实现了BeanPostProcessor接口,在bean初始化完成的后处理中,识别出ApplicationListener实例的bean,添加到applicationContext中:

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		if (bean instanceof ApplicationListener<?> applicationListener) {
			// potentially not detected as a listener by getBeanNamesForType retrieval
			Boolean flag = this.singletonNames.get(beanName);
			if (Boolean.TRUE.equals(flag)) {
				// singleton bean (top-level or inner): register on the fly
				this.applicationContext.addApplicationListener(applicationListener);
			}
			else if (Boolean.FALSE.equals(flag)) {
				if (logger.isWarnEnabled() && !this.applicationContext.containsBean(beanName)) {
					// inner bean with other scope - can't reliably process events
					logger.warn("Inner bean '" + beanName + "' implements ApplicationListener interface " +
							"but is not reachable for event multicasting by its containing ApplicationContext " +
							"because it does not have singleton scope. Only top-level listener beans are allowed " +
							"to be of non-singleton scope.");
				}
				this.singletonNames.remove(beanName);
			}
		}
		return bean;
	}

4.2.2.2 AbstractApplicationContext

添加listener到SimpleApplicationEventMulticaster:

	public void addApplicationListener(ApplicationListener<?> listener) {
		Assert.notNull(listener, "ApplicationListener must not be null");
		if (this.applicationEventMulticaster != null) {
			this.applicationEventMulticaster.addApplicationListener(listener);
		}
		this.applicationListeners.add(listener);
	}

4.2.2.3 AbstractApplicationEventMulticaster

将listener添加到集合中:

	public void addApplicationListener(ApplicationListener<?> listener) {
		synchronized (this.defaultRetriever) {
			// Explicitly remove target for a proxy, if registered already,
			// in order to avoid double invocations of the same listener.
			Object singletonTarget = AopProxyUtils.getSingletonTarget(listener);
			if (singletonTarget instanceof ApplicationListener) {
				this.defaultRetriever.applicationListeners.remove(singletonTarget);
			}
			this.defaultRetriever.applicationListeners.add(listener);
			this.retrieverCache.clear();
		}
	}

4.2.2.4 EventListenerMethodProcessor

实现BeanPostProcessor接口,在初始化后处理时获取ApplicationListener bean工厂:

	@Override
	public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) {
		this.beanFactory = beanFactory;

		Map<String, EventListenerFactory> beans = beanFactory.getBeansOfType(EventListenerFactory.class, false, false);
		List<EventListenerFactory> factories = new ArrayList<>(beans.values());
		AnnotationAwareOrderComparator.sort(factories);
		this.eventListenerFactories = factories;
	}

满足条件的实现类只有一个,即DefaultEventListenerFactory。

另外还实现了SmartInitializingSingleton接口,在afterSingletonsInstantiated方法中通过前面获取到的DefaultEventListenerFactory创建ApplicationListenerMethodAdapter:

	private void processBean(final String beanName, final Class<?> targetType) {
		if (!this.nonAnnotatedClasses.contains(targetType) &&
				AnnotationUtils.isCandidateClass(targetType, EventListener.class) &&
				!isSpringContainerClass(targetType)) {

			Map<Method, EventListener> annotatedMethods = null;
			try {
				annotatedMethods = MethodIntrospector.selectMethods(targetType,
						(MethodIntrospector.MetadataLookup<EventListener>) method ->
								AnnotatedElementUtils.findMergedAnnotation(method, EventListener.class));
			}
			catch (Throwable ex) {
				// An unresolvable type in a method signature, probably from a lazy bean - let's ignore it.
				if (logger.isDebugEnabled()) {
					logger.debug("Could not resolve methods for bean with name '" + beanName + "'", ex);
				}
			}

			if (CollectionUtils.isEmpty(annotatedMethods)) {
				this.nonAnnotatedClasses.add(targetType);
				if (logger.isTraceEnabled()) {
					logger.trace("No @EventListener annotations found on bean class: " + targetType.getName());
				}
			}
			else {
				// Non-empty set of methods
				ConfigurableApplicationContext context = this.applicationContext;
				Assert.state(context != null, "No ApplicationContext set");
				List<EventListenerFactory> factories = this.eventListenerFactories;
				Assert.state(factories != null, "EventListenerFactory List not initialized");
				// 有EventListener注解的方法,
				for (Method method : annotatedMethods.keySet()) {
					for (EventListenerFactory factory : factories) {
						if (factory.supportsMethod(method)) {
							Method methodToUse = AopUtils.selectInvocableMethod(method, context.getType(beanName));
							ApplicationListener<?> applicationListener =
									factory.createApplicationListener(beanName, targetType, methodToUse);
							if (applicationListener instanceof ApplicationListenerMethodAdapter alma) {
								alma.init(context, this.evaluator);
							}
							context.addApplicationListener(applicationListener);
							break;
						}
					}
				}
				if (logger.isDebugEnabled()) {
					logger.debug(annotatedMethods.size() + " @EventListener methods processed on bean '" +
							beanName + "': " + annotatedMethods);
				}
			}
		}
	}

4.2.2.5 DefaultEventListenerFactory

创建ApplicationListenerMethodAdapter,逻辑比较简单:

	@Override
	public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
		return new ApplicationListenerMethodAdapter(beanName, type, method);
	}

4.2.2.6 ApplicationListenerMethodAdapter

有了beanName,method信息,通过反射来调用EventListener注解的事件监听方法:

	public void onApplicationEvent(ApplicationEvent event) {
		processEvent(event);
	}
    
	// 先获取事件参数
	public void processEvent(ApplicationEvent event) {
		Object[] args = resolveArguments(event);
		if (shouldHandle(event, args)) {
			Object result = doInvoke(args);
			if (result != null) {
				handleResult(result);
			}
			else {
				logger.trace("No result object given - no result to handle");
			}
		}
	}

    // 接着通过java反射调用
	protected Object doInvoke(Object... args) {
		Object bean = getTargetBean();
		// Detect package-protected NullBean instance through equals(null) check
		if (bean.equals(null)) {
			return null;
		}

		ReflectionUtils.makeAccessible(this.method);
		try {
			if (KotlinDetector.isSuspendingFunction(this.method)) {
				return CoroutinesUtils.invokeSuspendingFunction(this.method, bean, args);
			}
			return this.method.invoke(bean, args);
		}
	// ......
	}

4.3 事件发布

事件发布在最开始的类结构中已经提到,回顾一下:

Spring Event使用和实现原理

事件发布者实现类实现ApplicationEventPublisherAware接口,通过该接口注入AbstractApplicationContext的实例。

然后通过这个实例对象发布事件给SimpleApplicationEventMulticaster,由SimpleApplicationEventMulticaster调用事件监听者实现类,并将发布的事件传递给监听者处理。

这段逻辑比较简单,代码不再赘述。

4.4 Async注解异步调用逻辑

Async注解不仅仅用于spring event,系统内的其他异步处理逻辑也使用此注解。

4.4.1 核心类结构

Async注解的异步处理核心类结构如下:

Spring Event使用和实现原理

描述
AsyncConfigurer异步配置接口,用于返回提交异步处理任务的Executor和异步处理异常类,可以定制实现类返回需要的处理类。
AbstractAsyncConfiguration异步配置抽象类,注入AsyncConfigurer,获取它的成员,另外会校验EnableAsync注解,前面也提到了Async注解必须在EnableAsync注解启用前提下才会生效。
ProxyAsyncConfiguration注册AsyncAnnotationBeanPostProcessor。
AsyncAnnotationBeanPostProcessor将AsyncAnnotationAdvisor添加到代理工厂,这样Async注解的类和方法都能被代理处理。
AsyncAnnotationAdvisor支持Async注解的Advisor。
AnnotationAsyncExecutionInterceptorAsync注解拦截器,会获取Async注解的参数,以得到限定的Executor实现类。
AsyncExecutionInterceptor异步处理拦截器。
AsyncExecutionAspectSupport异步处理支持类,拦截器的父类,提供公共方法。

4.4.2 关键代码

4.4.2.1 AbstractAsyncConfiguration

自动注入AsyncConfigurer:

	@Autowired
	void setConfigurers(ObjectProvider<AsyncConfigurer> configurers) {
		Supplier<AsyncConfigurer> configurer = SingletonSupplier.of(() -> {
			List<AsyncConfigurer> candidates = configurers.stream().toList();
			if (CollectionUtils.isEmpty(candidates)) {
				return null;
			}
			if (candidates.size() > 1) {
				throw new IllegalStateException("Only one AsyncConfigurer may exist");
			}
			return candidates.get(0);
		});
		this.executor = adapt(configurer, AsyncConfigurer::getAsyncExecutor);
		this.exceptionHandler = adapt(configurer, AsyncConfigurer::getAsyncUncaughtExceptionHandler);
	}

4.4.2.2 AsyncAnnotationAdvisor

初始化,创建advice和pointcut,支持多个异步处理注解:

public AsyncAnnotationAdvisor(
			@Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);

		ClassLoader classLoader = AsyncAnnotationAdvisor.class.getClassLoader();
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.ejb.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If EJB API not present, simply ignore.
		}
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("jakarta.enterprise.concurrent.Asynchronous", classLoader));
		}
		catch (ClassNotFoundException ex) {
			// If Jakarta Concurrent API not present, simply ignore.
		}

		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

4.4.2.3 AsyncExecutionInterceptor

拦截处理,任务被异步调用:

	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}

		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future<?> future) {
					return future.get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};

		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}

4.4.2.4 AsyncExecutionAspectSupport

获取Executor:

	protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
		AsyncTaskExecutor executor = this.executors.get(method);
		if (executor == null) {
			Executor targetExecutor;
			String qualifier = getExecutorQualifier(method);
			if (this.embeddedValueResolver != null && StringUtils.hasLength(qualifier)) {
				qualifier = this.embeddedValueResolver.resolveStringValue(qualifier);
			}
			if (StringUtils.hasLength(qualifier)) {
				targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
			}
			else {
				targetExecutor = this.defaultExecutor.get();
			}
			if (targetExecutor == null) {
				return null;
			}
			executor = (targetExecutor instanceof AsyncTaskExecutor asyncTaskExecutor ?
					asyncTaskExecutor : new TaskExecutorAdapter(targetExecutor));
			this.executors.put(method, executor);
		}
		return executor;
	}

提交异步任务:

	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return executor.submitCompletable(task);
		}
		else if (org.springframework.util.concurrent.ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((org.springframework.core.task.AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else if (void.class == returnType) {
			executor.submit(task);
			return null;
		}
		else {
			throw new IllegalArgumentException(
					"Invalid return type for async method (only Future and void supported): " + returnType);
		}
	}

5. 总结

5.1 问题回顾

开篇提到的几个问题,通过前面的验证和分析,得到的结论如下:

问题描述
事件队列不支持,事件直接提交执行。
任务队列支持,异步模式下取决于使用的线程池对象。
同步/异步支持同步和异步处理。
持久化不支持

5.2 本文小结

本文探讨了Spring Event的具体用法,包括编码和注解两种实现方式。同时深入了解了事件的广播、监听者注册、发布机制以及异步处理的关键原理。这些知识不仅加深了我们对Spring生命周期钩子和拦截器等底层框架的理解,还为定制化业务实现提供了实用参考。

综合来看,Spring Event适用于事件驱动的场景,但在使用时需注意以下几点:

● 同步模式可能导致阻塞问题,必要时应考虑切换至异步模式。

● 在自定义SimpleApplicationEventMulticaster进行异步处理时,应确保线程池大小和任务队列能够满足业务需求。

● 由于事件本身不支持持久化,系统宕机可能对业务造成的影响要考虑到。

6. 思考

● Spring Event异步处理下的性能考量

当Spring Event配置为异步模式时,如果同时发布大量事件,可能会对系统性能产生什么样的影响?需要考虑的因素包括线程池的大小、任务队列的容量以及系统资源的限制。

● Spring Event与开源消息队列的对比

虽然Spring Event和开源消息队列(如RabbitMQ, Kafka等)都采用了发布/订阅模式,但它们在实现机制、使用场景和性能方面有何不同?特别是在分布式环境和高可靠性要求下的表现差异。

● 简化进程内事件处理框架的设计

在实现一个仅限于进程内的事件处理框架时,Spring Event涉及的处理类众多。如果是你,怎样简化这个设计,以便更高效地实现事件的发布和处理?


其他阅读: