Spring Scheduling 本地任务调度设计与实现解析
一、Spring Boot 集成 Scheduling
对于不涉及分布式计算又关于时间的任务处理,也就是本地任务调度(Task Schduling)既是 Spring Framework 的集成功能,也是 Spring Boot 的重要特性。在 Spring Boot 中,任务调度的使用得到了极大简化。
1、简单任务调度
从官方文档介绍中,在任务调度类上声明 EnableScheduling
和 @Configuration
,并在调度方法添加 @Scheduled
就可以完成任务调度。
@Configuration
@EnableScheduling
public class SayHelloTask {
@Scheduled(cron = "${hello.schedule.cron:*/10 * * * * *}")
public void sayHello() {
System.out.print("Hello,Schedule");
}
}
Spring Boot 做了增强,可以添加默认值,优先从配置文件中读取 hello.schedule.cron
,如果为空则使用后面默认的值,也就是每十秒钟执行一次 sayHello 方法。
线程池默认使用一个线程,可使用 spring.task.scheduling
命名空间进行如下微调:
spring.task.scheduling.thread-name-prefix=scheduling-
spring.task.scheduling.pool.size=2
2、自定义任务调度
如果需要扩展任务调度,可以实现 SchedulingConfigurer
来完成。
@Component
public class SayHiSchedulingConfigurer implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(4);
threadPoolTaskScheduler.setThreadNamePrefix("helloschedules");
threadPoolTaskScheduler.initialize();
taskRegistrar.setScheduler(threadPoolTaskScheduler);
}
}
这样,调整了任务调度的线程池大小,也修改了线程日志名称,便于日志分析定位。
二、Spring Scheduling 设计说明
Spring Boot 对 TaskExecution and Scheduling 做了简要使用说明,深入了解本地任务调度可以参考 Spring Framework 对 TaskExecution and Scheduling 的介绍。
下面就以 Spring Boot 3.1.2 以及 Spring Framework 6.0.11 版本为例,重点分析 Spring 对本地任务调度的实现方法。
首先,打开 @EnableScheduling 注解:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
这个注解写了大段的注释用于解释任务调度的用法,从注解中我们可以了解到本地调度的设计说明,这是后面分析源码、扩展实现、最佳实践的基础:
- 简单的任务调度可以通过在方法上标记
@Scheduled
实现,Spring 提供了三种调度模式cron
、fixedRate
和fixedDelay
,也就是 cron 表达式执行、固定频率执行与固定延迟执行。 - Spring 容器会先扫描
org.springframework.scheduling.TaskScheduler
和java.util.concurrent.ScheduledExecutorService
两个 bean,如果都没有注入的话会创建一个默认的单线程任务调度器。 - 如果 @Scheduled 无法满足任务调度配置的话,或者需要运行时配置 fixRate 与 fixDelay,可以通过标记 @Configuration 来实现
SchedulingConfigurer
,这样就可以访问底层的ScheduledTaskRegistrar
实例,实现细粒度的任务控制。注意使用类似@Bean(destroyMethod="shutdown")
来保证自定义任务执行器在 Spring 应用程序上下文关闭时也能够正确关闭。 - 最重要的,
EnableScheduling
仅适用于本地应用程序上下文,也就是我们说的本地任务调度。分布式任务调度会有其他解决方案。
三、Spring Scheduling 关键类初探
1、SchedulingConfiguration
使用 @EnableScheduling 就会自动引入这个注解,它只做了一件事就是注入了一个 bean ScheduledAnnotationBeanPostProcessor
。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
2、ScheduledAnnotationBeanPostProcessor
ScheduledAnnotationBeanPostProcessor 是任务调度的核心类,由 @EnableScheduling 注解自动注册,作为 bean 的后置处理器实现了大量接口。核心功能是提供 cron、fixedRate 和 fixedDelay 三种调度模式,识别 @Scheduled 标记的方法,并转换成 TaskScheduler 可调度的任务。以及,识别所有 SchedulingConfigurer 的实例,允许自定义使用调度任务或对任务进行细粒度的控制。
(1)postProcessAfterInitialization
忽略掉细枝末节,方法的核心功能是扫描带有 @Scheduled 注解的方法,并处理成标准化任务。
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
// ...
AnnotationUtils.isCandidateClass(targetClass, List.of(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});
// ...
annotatedMethods.forEach((method, scheduledAnnotations) ->
scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
// ...
return bean;
}
(2)processScheduled
将扫描到的所有调度配置借助 ScheduledTaskRegistrar 转换成标准化调度任务,直接被异步执行,返回结果交给 ScheduledTask.future。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// ...
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay.isNegative(), "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// ...
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
// ...
}
这里分析一下 fixedDelay 与 fixedRate 的区别:
fixedDelay 属性可确保在执行任务的结束时间与下一次执行任务的开始时间之间有 n 毫秒的延迟。当我们需要确保只有一个任务实例一直在运行时,该属性特别有用。
而 fixedRate 属性是每 n 毫秒运行一次计划任务。它不会检查任务之前的执行情况。如果任务的所有执行都是独立的,这一点就很有用。如果我们不希望超出内存和线程池的大小,那么 fixedRate 就会非常方便。不过,如果进入的任务不能快速完成,就有可能出现内存不足异常。
(3)finishRegistration
这个方法完成任务调度器注册。提供自定义调度任务扩展点的 SchedulingConfigurer 接口,并被全部扫描进来。
最后,查找 TaskScheduler,添加到 ScheduledTaskRegistrar 中。
private void finishRegistration() {
if (this.scheduler != null) {
this.registrar.setScheduler(this.scheduler);
}
if (this.beanFactory instanceof ListableBeanFactory lbf) {
Map<String, SchedulingConfigurer> beans = lbf.getBeansOfType(SchedulingConfigurer.class);
List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (SchedulingConfigurer configurer : configurers) {
configurer.configureTasks(this.registrar);
}
}
// ...
this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true /false));
// ...
this.registrar.afterPropertiesSet();
}
3、ScheduledTaskRegistrar
ScheduledAnnotationBeanPostProcessor 被 SchedulingConfiguration 创建的时候二话没说,初始化就创建了一个 ScheduledTaskRegistrar
。
ScheduledTaskRegistrar 用于辅助任务注册到 TaskScheduler
中,尤其是结合 @EnableAsync
注解和 SchedulingConfigurer
的回调方法时。这个 bean 在创建时会先判断是否存在 TaskScheduler,没有就会创建一个单线程任务调度池,然后逐一把调度任务添加到任务队列中。
(1)scheduleTasks
在被 ScheduledAnnotationBeanPostProcessor#finishRegistration 调用时,就是完成任务标准化,afterPropertiesSet 只完成了一个方法完成,也就是 scheduleTasks。
protected void scheduleTasks() {
if (this.taskScheduler == null) {
this.localExecutor = Executors.newSingleThreadScheduledExecutor();
this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
}
// ...
if (this.cronTasks != null) {
for (CronTask task : this.cronTasks) {
addScheduledTask(scheduleCronTask(task));
}
}
// ...
}
(2)scheduleCronTask
使用适配器模式将各个 Task 转换成 ScheduledTask,成为标准化任务执行。
其他几个方法 scheduleTriggerTask,scheduleFixedRateTask,scheduleFixedDelayTask 都是类似的。
public ScheduledTask scheduleCronTask(CronTask task) {
ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
boolean newTask = false;
if (scheduledTask == null) {
scheduledTask = new ScheduledTask(task);
newTask = true;
}
if (this.taskScheduler != null) {
scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
}
else {
addCronTask(task);
this.unresolvedTasks.put(task, scheduledTask);
}
return (newTask ? scheduledTask : null);
}
四、Spring Scheduling 调用时序
- @EnableScheduling 的引入使得 Spring 容器开启了对本地调度任务扫描,并自动装配了 SchedulingConfiguration,实际上是引入了 ScheduledAnnotationBeanPostProcessor。
- ScheduledAnnotationBeanPostProcessor 实现了 postProcessAfterInitialization 首先被执行,在 bean 初始化完成后对 @Scheduled 注解进行扫描并转换成标准化本地调度任务 ScheduledTask。
- 任务转换完成后就会异步执行任务,但是需要等待主线程对线程池的初始化。ScheduledAnnotationBeanPostProcessor 实现了 onApplicationEvent 完成对任务注册的初始化,包括自定义调度任务配置 SchedulingConfigurer 的所有实现的扫描,以及对调度任务执行者 TaskScheduler 的初始化,如果都没有注入的话会创建一个默认的单线程任务调度器。
- 最后,准备工作完成后,由 ScheduledTaskRegistrar 的各类型调度任务分别调用 ScheduledFuture 的 schedule 方法完成任务。
转载自:https://juejin.cn/post/7268673775033024571