likes
comments
collection
share

xxl-job路由策略浅析

作者站长头像
站长
· 阅读数 8

一、概况

xxl-job调度平台在进行任务调度时,除分片任务外,需要从执行器列表中选择一个来调度。如何选择就需要路由策略。 xxl-job路由策略浅析 xxl-job路由策略浅析 策略实现在xxl-job-admin项目的core.route包中。抽象类com.xxl.job.admin.core.route.ExecutorRouter仅有一个抽象接口。枚举类ExecutorRouteStrategyEnum声明了支持的策略。

// ExecutorRouter接口中的唯一接口,返回执行器地址即ip:port
public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList); 

xxl-job路由策略浅析

二、路由策略实现

2.1 RouteFirst、RouteLast和RouteRandom

  • RouteFirst返回addressList.get(0)
  • RouteLast返回addressList.get(addressList.size()-1)。
  • RouteRandom返回addressList.get(new Random().nextInt(addressList.size())); xxl-job路由策略浅析

2.2 RouteRound

轮训算法的常规实现:定义一个计数器即int counter=0,每路由一次counter加1,当counter>Integer.MAX_VALUE时让ounter归零;路由时addressList.get(counter%addressList.size())。 xxl-job中对轮训做了增强:

  1. counter初始值随机获取100以内的数;
  2. counter>1000000,或初始赋值后24小时后,主动再次初始化。 xxl-job路由策略浅析

2.3 RouteConsistentHash

即一致性hash路由,简单实现即用外部key(如Id等)的hash值对列表size取余。但是这样分配往往是不均匀的。xxl-job中采用带虚拟节点的hash环,最终效果为:

  • 每个JOB固定调度执行器列表中一台机器,但是执行器数量发生变化时,调度的节点可能会发生变化;
  • 假设有10个job,它们的执行器列表相同;那么这些JOB将均匀散列在不同机器上。

实现中有下面两点需注意:

  • a、virtual node:解决不均衡问题。每个执行器虚拟为100个hash环节点。 xxl-job路由策略浅析
  • b、计算hash值时,没有使用String的hashCode(可能重复),使用md5散列计算hash值,将取值范围扩大到2^32。 xxl-job路由策略浅析 路由时,查找大于jobId hash值的第一个entry,entry.value就是目标address。 这儿hash环的实现方式,值得借鉴。

2.4 RouteLFU

LFU(Least Frequently Used),使用频率最低的优先被选举,频率统计使用计数器,不考虑在时间维度的分布情况(即20个小时前计数增加100,与刚才2秒钟内增加100,两者等效)。

  • 对某个JOB的每个执行器,每调用一次,计数器加1;
  • 初始化计数器时,取值是new Random().nextInt(addressList.size());当计数大于1000000时,重新初始化。
  • 每次调用,需将新增执行器的计数器初始化,将已下线执行器的计数器删除。
  • 选择时,将内层Map.Entry按照计数升序排列,获取第一个(即使用次数最少的节点)。
// 使用次数的缓存,24小时强制重置
// 外层Map的key即jobId,内存Map的key即执行器address,value即计数器。
private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();

xxl-job路由策略浅析

2.5 RouteLRU

LRU(Least Recently Used),(时间上)最近最久未使用。使用LinkedHashMap实现lru算法

// 缓存
// 外层Map的key即jobId,内存Map的key和value,都是address
private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
// lru算法使用LinkedHashMap实现
LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
if (lruItem == null) {
    /**
     * LinkedHashMap
     * a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
     * b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;
     可封装LinkedHashMap并重写该方法,比如定义最大容量,超出时返回true即可实现固定长度的LRU算法;
     */
    lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
    jobLRUMap.putIfAbsent(jobId, lruItem);
}

每次被访问的节点,被移到到队尾。选择时,获取链表的第一个节点即可 xxl-job路由策略浅析

2.6 RouteBusyover

忙碌转移。该策略在路由时,会遍历执行器列表,依次发起空闲检测,只要有节点返回true,就将向该节点发起调度。(节点处于何种状态时才算空闲?随后介绍)

// 简化后代码
for (String address : addressList) {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // 空闲检查
        idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
        // 返回第一个空闲节点
        if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
                idleBeatResult.setMsg(idleBeatResultSB.toString());
                idleBeatResult.setContent(address);
                return idleBeatResult;
        }
}

idleBeat由调度平台向执行器发起,是单向的。 xxl-job路由策略浅析

2.7 RouteFailover

故障转移。执行列表手动注册时,调度平台不会定期维护如添加新节点、移除下线节点。即使采取主动注册,在心跳检测的间隙,某个节点突然下线,此时正好调度到该节点,本次执行将会失败(如果配置了邮件告警,我们还能及时感知到调度异常)。 该策略在路由时,会遍历执行器列表,依次发起心跳检测,返回第一个响应成功的节点。可想而知,如果执行器列表中第一个节点一直正常,那么将一直路由到该节点。其他节点就是backup。

// 简化后代码
for (String address : addressList) {
        // 心跳检测
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        beatResult = executorBiz.beat();
        // 返回第一个响应成功的执行器。
        if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
                beatResult.setMsg(beatResultSB.toString());
                beatResult.setContent(address);
                return beatResult;
        }
}

2.8 分片广播

准确来说,分片广播不是路由策略,因为它将调度所有已注册的执行器,而不是从中选择一个。因此,它并不是ExecutorRouter接口的实现类。在com.xxl.job.admin.core.trigger.XxlJobTrigger#trigger中有如下代码:

	// 分片广播
	if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
			&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
			&& shardingParam==null) {
                 // 遍历调度每一个执行器
		for (int i = 0; i < group.getRegistryList().size(); i++) {
			processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
		}
	} else {
		// 其他路由策略,分片索引是0,分片总数为1
		if (shardingParam == null) {
			shardingParam = new int[]{0, 1};
		}
		processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
	}

processTrigger()方法最后两个参数,分别是sharding index、sharding total,其中获取address代码如下:

	// 3、init address
	String address = null;
	ReturnT<String> routeAddressResult = null;
	if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
		// 分片广播
		if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
			// 正常不会走else分支
			if (index < group.getRegistryList().size()) {
				address = group.getRegistryList().get(index);
			} else {
				address = group.getRegistryList().get(0);
			}
		} else {
		// 路由
			routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
			if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
				address = routeAddressResult.getContent();
			}
		}
	} else {
		routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
	}

三、选择路由策略

  • 通常选择随机或轮训即可。
  • 如果要保证每次调度成功,可选择故障转移
  • 如果要保证任务执行的及时性,可选择忙碌转移
  • 分片任务,自然得选分片广播。
  • 对于刷新程序内存缓存的定时任务,其实可以用spring的@Schedule来实现。用xxl-job实现的好处在于,支持主动执行来提前刷新缓存。此时,策略应该配置为分片广播,使所有执行器中缓存保持一致。而博主在工作中曾遇到,同事将刷新项目进程缓存的任务配置为故障转移,导致在执行器列表无变化时,始终只有一个节点的缓存被定时刷新,而其他节点没有被调度的机会,从而导致业务时好时坏的异常。

四、类比负载均衡算法

负载均衡,就是在有多个同等的服务组成的集群中,选择一个节点来处理请求。负载均衡过程与路由相似。Java编程中,博主知道使用负载均衡算法的组件如nginx、ribbon。下面仅对nginx的负载均衡做简单介绍。 nginx一个具有高性能的HTTP和反向代理的WEB服务器。它可以通过负载均衡策略将大量请求分发到应用集群,解决高并发压力,实现高可用、高扩展性。 xxl-job路由策略浅析 如下是nginx的一段配置:通过upstream定义了名为order-manage的应用集群,通过server将80端口的请求转发到order-manage集群,此时负载均衡策略默认是轮询。

upstream order-manage {
	server 192.168.200.6:9091;
	server 192.168.1.106:9092;
	server 192.168.25.80:9093;
}

server {
	listen 80;
	server_name localhost;
	location /{
		proxy_pass http://order-manage;
	}
}

Nginx的upstream支持如下六种方式的负载均衡算法

  • 轮询:默认方式
  • weight:加权轮训
  • ip_hash:依据发出请求的 客户端IP 的hash值来分配服务器,可以保证同IP发出的请求路由到同一服务器。与“一致性hash”路由相似
  • url_hash:根据请求的 URL 的hash值来分配服务器。与“一致性hash”路由相似
  • least_conn:最少连接,把请求转发给连接数较少的后端服务器。与“忙碌转移”路由相似
  • fair:由第三方模块提供,可以根据页面大小、加载时间长短智能的进行负载均衡。与“忙碌转移”路由相似 此外,可以用backup将某服务器标记为备用,当主服务器不可用时,将用它处理请求。与“故障转移”路由相似
upstream order-manage {
	server 192.168.200.6:9091;
	server 192.168.1.106:9092;
	server 192.168.25.80:9093 backup;
}

转载自:https://juejin.cn/post/7318214394612744228
评论
请登录