likes
comments
collection
share

5. xxl-job源码分析-调度触发原理(一)

作者站长头像
站长
· 阅读数 14

上篇分析了XxlJobScheduler的主要流程和各个流程具体的逻辑内容,但是并没有深入到核心细节原理。本篇接着JobTriggerPoolHelper类的后半部分接着探索。看一看为什么作者需要分快触发和慢触发。具体触发的逻辑和原理又是什么。

JobTriggerPoolHelper.trigger

public void addTrigger(final int jobId,
                       final TriggerTypeEnum triggerType,
                       final int failRetryCount,
                       final String executorShardingParam,
                       final String executorParam,
                       final String addressList) {
    // 选择一个触发的线程池,默认是快触发
    ThreadPoolExecutor triggerPool_ = fastTriggerPool;
    AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
    if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      
        // 1分钟内超时了10次就放入到慢触发线程池
        triggerPool_ = slowTriggerPool;
    }

    // 线程池触发调用
    triggerPool_.execute(new Runnable() {
        @Override
        public void run() {
            long start = System.currentTimeMillis();
            try {
                // 触发
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                // 1分钟清理一次超时次数的map
                long minTim_now = System.currentTimeMillis()/60000;
                if (minTim != minTim_now) {
                    minTim = minTim_now;
                    jobTimeoutCountMap.clear();
                }
                // 计算花费时间
                long cost = System.currentTimeMillis()-start;
                if (cost > 500) {       
                    // 超过500ms了就算是超时一次
                    AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                    if (timeoutCount != null) {
                        timeoutCount.incrementAndGet();
                    }
                }

            }

        }
    });
}

上述代码可以看到默认使用的线程池是快速触发线程池,而当触发的超时次数超过了1分钟10次的话,就放入另外一个慢触发线程池。这样做的目的应该是怕慢触发的任务影响了快速触发的任务。因为线程池的线程创建的个数是有限的,超过之后,就会放入任务队列等待执行。如果任务量大,执行的速度又慢,实际会影响其他任务的处理。分成两个可以更好的区分任务的执行快慢,分开触发,影响小。

这里判断是否超时的条件是任务的执行时间是否大于了500ms,超过500ms就进行累加次数。超时的次数会存入一个单独的map,其key为jobId,并且每分钟做一次清理操作。

继续看下XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList)方法

public static void trigger(int jobId,
                           TriggerTypeEnum triggerType,
                           int failRetryCount,
                           String executorShardingParam,
                           String executorParam,
                           String addressList) {
    // 获取job信息
    XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
    if (jobInfo == null) {
        logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
        return;
    }
    if (executorParam != null) {
        // 设置参数
        jobInfo.setExecutorParam(executorParam);
    }
    int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
    XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

    // 设置地址
    if (addressList!=null && addressList.trim().length()>0) {
        group.setAddressType(1);
        group.setAddressList(addressList.trim());
    }

    // 设置分片参数
    int[] shardingParam = null;
    if (executorShardingParam!=null){
        String[] shardingArr = executorShardingParam.split("/");
        if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
            shardingParam = new int[2];
            shardingParam[0] = Integer.valueOf(shardingArr[0]);
            shardingParam[1] = Integer.valueOf(shardingArr[1]);
        }
    }
    if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
        && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
        && shardingParam==null) {
        // 分片广播的处理,也就是根据注册的执行器,每隔执行器执行自己分片的那一部分,传递了总分片和当前自己处理的分片
        for (int i = 0; i < group.getRegistryList().size(); i++) {
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
        }
    } else {
        // 非广播分片直接调用处理
        if (shardingParam == null) {
            shardingParam = new int[]{0, 1};
        }
        processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
    }
}

在触发的时候,首先是对参数信息进行了处理,然后判断了注册了执行器的数量和配置的策略,如果支持广播分片,还需要进行分片的处理,每个执行器执行自己负责的那个分片,否则直接执行,分片也就是默认的单片。这个方法处理完分片的信息后,就调用了processTrigger

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
     // 获取参数
     ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
     ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
     String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

     // 1、保存日志id
     XxlJobLog jobLog = new XxlJobLog();
     jobLog.setJobGroup(jobInfo.getJobGroup());
     jobLog.setJobId(jobInfo.getId());
     jobLog.setTriggerTime(new Date());
     XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
     logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

     // 2、初始化触发参数
     TriggerParam triggerParam = new TriggerParam();
     triggerParam.setJobId(jobInfo.getId());
     triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
     triggerParam.setExecutorParams(jobInfo.getExecutorParam());
     triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
     triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
     triggerParam.setLogId(jobLog.getId());
     triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
     triggerParam.setGlueType(jobInfo.getGlueType());
     triggerParam.setGlueSource(jobInfo.getGlueSource());
     triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
     triggerParam.setBroadcastIndex(index);
     triggerParam.setBroadcastTotal(total);

     // 3、初始化调用执行的地址
     String address = null;
     ReturnT<String> routeAddressResult = null;
     if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
         if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
             if (index < group.getRegistryList().size()) {
                 address = group.getRegistryList().get(index);
             } else {
                 address = group.getRegistryList().get(0);
             }
         } else {
             routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
             if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                 address = routeAddressResult.getContent();
             }
         }
     } else {
         routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
     }

     // 4、调用远程执行器进行触发
     ReturnT<String> triggerResult = null;
     if (address != null) {
         triggerResult = runExecutor(triggerParam, address);
     } else {
         triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
     }

     // 5、收集触发后的结果信息
     StringBuffer triggerMsgSb = new StringBuffer();
     triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
         .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
     if (shardingParam != null) {
         triggerMsgSb.append("("+shardingParam+")");
     }
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
     triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

     triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
         .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

     // 6、保存日志信息
     jobLog.setExecutorAddress(address);
     jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
     jobLog.setExecutorParam(jobInfo.getExecutorParam());
     jobLog.setExecutorShardingParam(shardingParam);
     jobLog.setExecutorFailRetryCount(finalFailRetryCount);
     //jobLog.setTriggerTime();
     jobLog.setTriggerCode(triggerResult.getCode());
     jobLog.setTriggerMsg(triggerMsgSb.toString());
     XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
     logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}

processTrigger方法中,第一步是初始化了日志的信息,并记录了一条初始化的日志。接着是初始化触发的参数。这些参数信息都是需要传递给执行器进行调用的必要信息,包括最重要的执行Handler。然后是需要根据分片的信息拿到调用执行器的地址。分片的概念实际是将任务分解,然后进行分布式的任务处理。因为每个执行器执行属于自己的一部分任务,互不干扰,这样就可以实行多台机器并行的执行,加快任务的处理。后续就是触发执行和触发后的日志信息的保存。毕竟不能保证每个任务都能成功。当任务触发失败后,需要详细的记录下日志信息,这样才方便后续问题的排查。

但是上述代码任然没触及到核心具体是怎么触发的。继续深入triggerResult = runExecutor(triggerParam, address);

public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
    ReturnT<String> runResult = null;
    try {
        // 获取到触发执行器
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // 触发执行
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }

    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("<br>address:").append(address);
    runResultSB.append("<br>code:").append(runResult.getCode());
    runResultSB.append("<br>msg:").append(runResult.getMsg());

    runResult.setMsg(runResultSB.toString());
    return runResult;
}

这里获取到执行器后就可以触发执行了。但是这个执行器有两个实现,具体是哪一个呢?

public static ExecutorBiz getExecutorBiz(String address) throws Exception {
    // valid
    if (address==null || address.trim().length()==0) {
        return null;
    }

    // load-cache
    address = address.trim();
    ExecutorBiz executorBiz = executorBizRepository.get(address);
    if (executorBiz != null) {
        return executorBiz;
    }

    // set-cache
    executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

    executorBizRepository.put(address, executorBiz);
    return executorBiz;
}

如果根据远程地址获取不到执行器,就需要创建一个,也就是ExecutorBizClient这个执行器。最后查看下ExecutorBizClientrun方法。

public ReturnT<String> run(TriggerParam triggerParam) {
    // 发送post请求到各个执行器调度处理
    return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

分析到这里就很清晰了,run方法调用注册到调度中心的执行器的一个post请求,让任务执行器去执行。任务执行器接受到请求后,根据参数就可以找到对应的Handler进行逻辑处理了。

总结

本篇分析了作者创建快触发和慢触发线程池的理由。也重点分析trigger方法。这里面并没有很难很深的东西。其实很多源码也并不是太难。跟着作者的思路走,都能理解。关键是在看源码的过程中,要去思考作者的设计。如此设计的好处在哪里。在上面的代码中,作者就考虑了触发任务的执行快慢,对触发任务记录了日志,方便后续的问题排查。还设计了分片的思想,方便我们在多个处理器上进行分布式的执行任务。如果是我们自己设计,自己写这个任务调度是不是能考虑的如此全面呢?说到这里,实际上你就明白差距在哪里了,应该学习什么内容了。

不过本篇没有介绍调度触发的时机,也就是为什么它能够精准的调度,这个我们下一篇继续探讨。

转载自:https://juejin.cn/post/7257048132064346168
评论
请登录