3. xxl-job源码分析-executor-server详解
上一篇2. xxl-job源码分析-核心XxlJobExecutor我们介绍了XxlJobExecutor
。里面其实有个很重要的点当时没有深入分析,就是在executor-server (rpc provider)
部分,当时只是介绍了embedServer = new EmbedServer()
就没继续深入了。但是个人认为这里面的内容还是很多的,所以本篇详细讲解下这个executor-server
的内容。
我们首先从这个embedServer.start
开始。
EmbedServer
的start方法
EmbedServer
到底是如何启动的呢?也可以说这个start
方法到底做了啥?我们带着问题往下看。
public void start(final String address, final int port, final String appname, final String accessToken) {
// 实现业务操作功能
executorBiz = new ExecutorBizImpl();
// 创建一个线程
thread = new Thread(new Runnable() {
@Override
public void run() {
// 采用netty进行网络服务
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// 开启网络服务
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
// 空闲检测
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
// 支持http协议
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
// 业务逻辑处理
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// 启动注册
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
// 设置为后台线程
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
// 启动线程
thread.start();
}
看到了启动的源码,你是否会恍然大悟,原来就是创建了一个Netty
服务端,监听端口。然后由Netty
来帮忙进行Http
协议的编码和解码。而我们只需要关注业务,也就是EmbedHttpServerHandler
的处理逻辑。
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
this.bizThreadPool = bizThreadPool;
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// 获取调用中心发送过来的请求
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// 由线程池进行异步处理,防止阻塞IO
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// 处理请求
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// 得到的结果转成JSON
String responseJson = GsonTool.toJson(responseObj);
// 返回给调度中心
writeResponse(ctx, keepAlive, responseJson);
}
});
}
// 具体的处理逻辑
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// 校验POST请求
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// 根据uri进行不同的处理,不过这些处理逻辑全部委托给了executorBiz
try {
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
/**
* 写入返回的http的响应报文
*/
private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
// write response
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8)); // Unpooled.wrappedBuffer(responseJson)
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8"); // HttpHeaderValues.TEXT_PLAIN.toString()
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
if (keepAlive) {
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
ctx.writeAndFlush(response);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close(); // beat 3N, close if idle
logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
} else {
super.userEventTriggered(ctx, evt);
}
}
}
EmbedHttpServerHandler
的逻辑也不复杂,主要是对整理流程和功能进行了定义,其实主要的报文处理逻辑还是委托executorBiz
进行处理。
作为一个客户端,最重要的就是执行调度中心发过来的命令,所以这里重点分析一下executorBiz.run(triggerParam)
。
ExecutorBizImpl
的run
方法
executorBiz
即ExecutorBizImpl
类的一个实例。直接查看它的run
方法的实现逻辑
public ReturnT<String> run(TriggerParam triggerParam) {
// 根据jobId获取JobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// 获取执行处理的类型
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 如果是spring bean的类型
// 根据handler的名字得到JobHandler处理类
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 校验jobThread中的Handler是否和参数中的传的一致
if (jobThread!=null && jobHandler != newJobHandler) {
// 如果不一致,说明handler改变了,需要kill之前的
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
// 重置
jobThread = null;
jobHandler = null;
}
// 重新赋值handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// 忽略groovy的校验赋值代码,类似于bean的
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// 忽略脚本类的校验赋值代码,类似于bean的
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// 处理阻塞策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// 重新注册,设置jobThread
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 将需要执行的参数放入阻塞队列等待线程执行,异步处理
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
上面代码前部分都是校验内容,最后执行了一个jobThread.pushTriggerQueue(triggerParam)
方法,将参数放入到阻塞队列中,等待'JobThread
的执行。
JobThread
JobThread
继承了Thread
类,内部维护着一个无界的阻塞队列LinkedBlockingQueue<TriggerParam> triggerQueue
。其重点就是线程的run方法。
@Override
public void run() {
// 调用init方法,也就是@XxlJob注解中配置的init方法
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// 判断线程是否停止,因为内部是个阻塞队列,会一直等待执行,当强制kill的时候,需要进行停止
while(!toStop){
// 默认还没有跑任务
running = false;
// 空闲次数累加
idleTimes++;
TriggerParam triggerParam = null;
try {
// 要检查 toStop 信号,我们需要循环,所以不能使用 queue.take(),instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
// 不为空的情况,说明需要执行任务了
running = true;
idleTimes = 0;
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// 初始化上下文,相当于在执行的时候放入参数
XxlJobContext.setXxlJobContext(xxlJobContext);
// 记录日志
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
if (triggerParam.getExecutorTimeout() > 0) {
// 如果有超时时间,需要在超时后中断
Thread futureThread = null;
try {
// 包装成FutureTask,启动另一个线程,进行线程的超时中断处理
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute();
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
XxlJobHelper.log(e);
// handle result
XxlJobHelper.handleTimeout("job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// 无超时时间的,直接执行
handler.execute();
}
// 校验执行结果
if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
XxlJobHelper.handleFail("job handle result lost.");
} else {
String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
?tempHandleMsg.substring(0, 50000).concat("...")
:tempHandleMsg;
// 在context中设置结果信息
XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
}
// 打印日志
XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
+ XxlJobContext.getXxlJobContext().getHandleCode()
+ ", handleMsg = "
+ XxlJobContext.getXxlJobContext().getHandleMsg()
);
} else {
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
// handle result
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
XxlJobHelper.handleFail(errorMsg);
XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
// 如果强制停止,队列还有未处理的数据的时候
while(triggerQueue !=null && triggerQueue.size()>0){
// 队列中的回调触发器请求
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_CODE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}
}
// 调用destroy方法,也就是@XxlJob注解中配置的destroy方法
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
经过上面的代码分析,我觉得有几个写的很好的点可以参考学习的:
- 它的
toStop
变量。这个toStop
变量是个volatile boolean
类型的,保证可见性。这个并没有什么特别的,只要在while中进行判断,都需要使用volatile
类型。但是它在判断toStop
为false的时候,也就是可能被停止了,这个方法还做了处理,进行了一个回调。而不是直接返回,做清空队列的操作,我认为这点很好,毕竟遇到了意外的情况,还是有地方可以查到的。 - 在
triggerQueue
中获取队列中的值的时候,使用了poll(3L, TimeUnit.SECONDS)
,并没有直接使用take()
。如果使用take()
的话,没有元素会一直阻塞,这样即使是toStop
已经变了,也无法检测到,因为阻塞了,无法继续往后执行了,这里留了一个口子,3s后如果没有poll
到会立马返回null,保证3s后会进行一个toStop
检测。 - 线程中还使用了一个
idleTimes
空闲次数的检测。如果超过了30次,就移除销毁这个线程。防止一些无用的线程一直在等待。因为在之前分析ExecutorBizImpl
的run
方法中也可以看到有个重置后重新注册的一个过程,这个registJobThread
会创建新的线程,如果改变了handler而不清理之前的线程,会导致线程越来越多。
总结
从EmbedServer
的start方法到最后执行的业务逻辑的JobThread
中IJobHandler
的execute
方法。我们逐层深入,从网络模型到线程模型,包括线程队列的阻塞,超时,停止。这里面的很多内容都值得我们进行深入的思考和学习的。假如我们也写一个网络程序,应该如何处理网络请求,应该如何采用异步处理来增大吞吐量,提高程序运行的效率。还要如何做好线程和异常的善后工作。相信读者在阅读完源码后,一定会有着自己的思考。
转载自:https://juejin.cn/post/7251895374771224631