likes
comments
collection
share

如何利用spring实现一个简单任务调度 (二)

作者站长头像
站长
· 阅读数 22

schedule注解加载机制

schedule注解是通过beanPostProcessor来进行的bean实例化后置处理。核心逻辑在postProcessAfterInitialization方法中。

判断类或者方法上是否有schedule注解

如何利用spring实现一个简单任务调度 (二)

  • 先判断类上面是否有Scheduled,Schedules注解
  • 在判断方法上是否有Scheduled注解

通过上述步骤后,得到Scheduled的Set集合,走processScheduled方法来判断是fixed,rated,cron哪一种定时任务,进行定时任务的创建。 代码比较多,核心我们看到主要是通过注册机加载三种类型的任务

this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone)))
this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay))
this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay))

统一放到scheduledTasks中存储起来。

如何利用spring实现一个简单任务调度 (二)

手动停止schedule注册

通过listener停止所有已经启动的task。

package com.wuhanpe.task.processor;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor;
import org.springframework.scheduling.config.ScheduledTask;
import org.springframework.scheduling.config.TaskManagementConfigUtils;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.Set;

/**
 * @author:zooooooooy
 * @date: 2022/12/27 - 11:51
 */
@Component
@Slf4j
public class ClientScheduledListener implements ApplicationListener<ApplicationStartedEvent> {

    @Override
    public void onApplicationEvent(ApplicationStartedEvent event) {
        // 获取task,进行cancel
        ConfigurableApplicationContext applicationContext = event.getApplicationContext();
        ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor = (ScheduledAnnotationBeanPostProcessor) applicationContext.getBean(TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME);
        Set<ScheduledTask> scheduledTasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();

        // cancel所有任务
        scheduledTasks.forEach(st -> st.cancel());

    }

}

获取到对应processor,然后获取到所有的task,进行cancel处理。

获取到所有任务并存储到服务端

// 通过url请求注册到task server端
List<ServiceInstance> instanceList = discoveryClient.getInstances("task-service");
// 取第一个
ServiceInstance instance = instanceList.get(0);
scheduledTasks.forEach(st -> {
    Task task = st.getTask();
    ScheduledMethodRunnable smr = (ScheduledMethodRunnable)task.getRunnable();
    TaskEntity.TaskEntityBuilder taskEntityBuilder = TaskEntity.builder().serviceName(instance.getServiceId())
            .serviceClass(smr.getTarget().getClass().getName())
            .serviceMethod(smr.getMethod().getName());
    if (task instanceof FixedDelayTask) {
        FixedDelayTask fixedDelayTask = (FixedDelayTask) task;
        taskEntityBuilder.fixedDelay(fixedDelayTask.getInterval());
        taskEntityBuilder.initialDelay(fixedDelayTask.getInitialDelay());
        taskEntityBuilder.taskType("fixed");
    } else if(task instanceof FixedRateTask) {
        FixedRateTask fixedRateTask = (FixedRateTask) task;
        taskEntityBuilder.fixedRate(fixedRateTask.getInterval());
        taskEntityBuilder.initialDelay(fixedRateTask.getInitialDelay());
        taskEntityBuilder.taskType("rate");
    } else if (task instanceof CronTask) {
        CronTask cronTask = (CronTask) task;
        taskEntityBuilder.cron(cronTask.getExpression());
        taskEntityBuilder.taskType("cron");
    }
    HttpUtil.post("http://" + instance.getHost() + ":" + instance.getPort() + "/v1/task/remoteAdd", JSONUtil.toJsonStr(taskEntityBuilder.build()));
});
  • 获取任务执行的target类和方法
  • 判断是哪一种类型的任务并进行类型标记
  • 通过discovery获取到服务端的ip,发起post请求。

服务端接收

public void remoteTaskAdd(TaskBase taskBase) {
    // 判断是否有重复任务
    TaskBase localTaskBase = taskRepository.selectOne(new QueryWrapper<TaskBase>().lambda()
            .eq(TaskBase::getServiceName, taskBase.getServiceName())
            .eq(TaskBase::getServiceClass, taskBase.getServiceClass())
            .eq(TaskBase::getServiceMethod, taskBase.getServiceMethod())
    );

    if (localTaskBase == null) {
        taskRepository.insert(taskBase);
        startTask(taskBase);
    } else {
        taskRepository.update(null, new UpdateWrapper<TaskBase>().lambda()
                .eq(TaskBase::getId, localTaskBase.getId())
                .set(TaskBase::getFixedDelay, taskBase.getFixedDelay())
                .set(TaskBase::getInitialDelay, taskBase.getInitialDelay())
                .set(TaskBase::getFixedRate, taskBase.getFixedRate())
                .set(TaskBase::getCron, taskBase.getCron())
        );
        stopTask(localTaskBase);
        startTask(localTaskBase);
    }
}

判断本地是否已经存储有该任务,如果有则存储,没有则更新。同步更新任务。

自定义注解

上传了spring自带的scheduled注解后,我们也可以自定义自己的注解,并通过BeanPostProcessor来进行加载并推送到task服务端。

package com.wuhanpe.task.annotation;

import com.wuhanpe.task.enumration.TaskTypeEnum;

import java.lang.annotation.*;

/**
 * @author:zooooooooy
 * @date: 2022/12/27 - 15:41
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TaskScheduled {

    TaskTypeEnum taskType() default TaskTypeEnum.CRON;

    String cron() default "";

    long fixedRate() default -1;

    long initialDelay() default -1;

    long fixedDelay() default -1;

}

自定义BeanPostProcessor

public class ClientScheduledProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {

        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
                if(AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(TaskScheduled.class))){
            Map<Method, TaskScheduled> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<TaskScheduled>) method -> AnnotatedElementUtils.getMergedAnnotation(
                            method, TaskScheduled.class));

                    annotatedMethods.forEach((m, ts) -> log.info("method is {} ", m));
        }
        return bean;
    }

}

可以获取到对应method集合,参照上面的步骤,直接将数据推送到task服务端即可。至此,一个简单的自动化任务推送就完成了。

转载自:https://juejin.cn/post/7181735632643817532
评论
请登录