likes
comments
collection
share

3. xxl-job源码分析-executor-server详解

作者站长头像
站长
· 阅读数 35

上一篇2. xxl-job源码分析-核心XxlJobExecutor我们介绍了XxlJobExecutor。里面其实有个很重要的点当时没有深入分析,就是在executor-server (rpc provider)部分,当时只是介绍了embedServer = new EmbedServer()就没继续深入了。但是个人认为这里面的内容还是很多的,所以本篇详细讲解下这个executor-server的内容。

我们首先从这个embedServer.start开始。

EmbedServer的start方法

EmbedServer到底是如何启动的呢?也可以说这个start方法到底做了啥?我们带着问题往下看。

public void start(final String address, final int port, final String appname, final String accessToken) {
    // 实现业务操作功能
    executorBiz = new ExecutorBizImpl();
    // 创建一个线程
    thread = new Thread(new Runnable() {
        @Override
        public void run() {
            // 采用netty进行网络服务
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                0,
                200,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, EmbedServer bizThreadPool-" + r.hashCode());
                    }
                },
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                    }
                });
            try {
                // 开启网络服务
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                // 空闲检测
                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                // 支持http协议
                                .addLast(new HttpServerCodec())
                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                // 业务逻辑处理
                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                        }
                    })
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

                // bind
                ChannelFuture future = bootstrap.bind(port).sync();

                logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);

                // 启动注册
                startRegistry(appname, address);

                // wait util stop
                future.channel().closeFuture().sync();

            } catch (InterruptedException e) {
                logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
            } finally {
                // stop
                try {
                    workerGroup.shutdownGracefully();
                    bossGroup.shutdownGracefully();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
    // 设置为后台线程
    thread.setDaemon(true);    // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
    // 启动线程
    thread.start();
}

看到了启动的源码,你是否会恍然大悟,原来就是创建了一个Netty服务端,监听端口。然后由Netty来帮忙进行Http协议的编码和解码。而我们只需要关注业务,也就是EmbedHttpServerHandler的处理逻辑。

public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);

    private ExecutorBiz executorBiz;
    private String accessToken;
    private ThreadPoolExecutor bizThreadPool;

    public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
        this.executorBiz = executorBiz;
        this.accessToken = accessToken;
        this.bizThreadPool = bizThreadPool;
    }

    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
        // 获取调用中心发送过来的请求
        String requestData = msg.content().toString(CharsetUtil.UTF_8);
        String uri = msg.uri();
        HttpMethod httpMethod = msg.method();
        boolean keepAlive = HttpUtil.isKeepAlive(msg);
        String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);

        // 由线程池进行异步处理,防止阻塞IO
        bizThreadPool.execute(new Runnable() {
            @Override
            public void run() {
                // 处理请求
                Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
                // 得到的结果转成JSON
                String responseJson = GsonTool.toJson(responseObj);
                // 返回给调度中心
                writeResponse(ctx, keepAlive, responseJson);
            }
        });
    }

    // 具体的处理逻辑
    private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
        // 校验POST请求
        if (HttpMethod.POST != httpMethod) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
        }
        if (uri == null || uri.trim().length() == 0) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
        }
        if (accessToken != null
            && accessToken.trim().length() > 0
            && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }

        // 根据uri进行不同的处理,不过这些处理逻辑全部委托给了executorBiz
        try {
            switch (uri) {
                case "/beat":
                    return executorBiz.beat();
                case "/idleBeat":
                    IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                    return executorBiz.idleBeat(idleBeatParam);
                case "/run":
                    TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                    return executorBiz.run(triggerParam);
                case "/kill":
                    KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                    return executorBiz.kill(killParam);
                case "/log":
                    LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                    return executorBiz.log(logParam);
                default:
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
        }
    }

    /**
     * 写入返回的http的响应报文
     */
    private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, String responseJson) {
        // write response
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer(responseJson, CharsetUtil.UTF_8));   //  Unpooled.wrappedBuffer(responseJson)
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");       // HttpHeaderValues.TEXT_PLAIN.toString()
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error(">>>>>>>>>>> xxl-job provider netty_http server caught exception", cause);
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ctx.channel().close();      // beat 3N, close if idle
            logger.debug(">>>>>>>>>>> xxl-job provider netty_http server close an idle channel.");
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

EmbedHttpServerHandler的逻辑也不复杂,主要是对整理流程和功能进行了定义,其实主要的报文处理逻辑还是委托executorBiz进行处理。

作为一个客户端,最重要的就是执行调度中心发过来的命令,所以这里重点分析一下executorBiz.run(triggerParam)

ExecutorBizImplrun方法

executorBizExecutorBizImpl类的一个实例。直接查看它的run方法的实现逻辑

public ReturnT<String> run(TriggerParam triggerParam) {
    // 根据jobId获取JobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // 获取执行处理的类型
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {
        // 如果是spring bean的类型
        // 根据handler的名字得到JobHandler处理类
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // 校验jobThread中的Handler是否和参数中的传的一致
        if (jobThread!=null && jobHandler != newJobHandler) {
            // 如果不一致,说明handler改变了,需要kill之前的
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            // 重置
            jobThread = null;
            jobHandler = null;
        }
        // 重新赋值handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
        // 忽略groovy的校验赋值代码,类似于bean的
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
        // 忽略脚本类的校验赋值代码,类似于bean的
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }

    // 处理阻塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }

    // 重新注册,设置jobThread
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 将需要执行的参数放入阻塞队列等待线程执行,异步处理
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

上面代码前部分都是校验内容,最后执行了一个jobThread.pushTriggerQueue(triggerParam)方法,将参数放入到阻塞队列中,等待'JobThread的执行。

JobThread

JobThread继承了Thread类,内部维护着一个无界的阻塞队列LinkedBlockingQueue<TriggerParam> triggerQueue。其重点就是线程的run方法。

@Override
public void run() {
    // 调用init方法,也就是@XxlJob注解中配置的init方法
    try {
        handler.init();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    // 判断线程是否停止,因为内部是个阻塞队列,会一直等待执行,当强制kill的时候,需要进行停止
    while(!toStop){
        // 默认还没有跑任务
        running = false;
        // 空闲次数累加
        idleTimes++;

        TriggerParam triggerParam = null;
        try {
            // 要检查 toStop 信号,我们需要循环,所以不能使用 queue.take(),instand of poll(timeout)
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                // 不为空的情况,说明需要执行任务了
                running = true;
                idleTimes = 0;
                triggerLogIdSet.remove(triggerParam.getLogId());

                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                    triggerParam.getJobId(),
                    triggerParam.getExecutorParams(),
                    logFileName,
                    triggerParam.getBroadcastIndex(),
                    triggerParam.getBroadcastTotal());

                // 初始化上下文,相当于在执行的时候放入参数
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // 记录日志
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());

                if (triggerParam.getExecutorTimeout() > 0) {
                    // 如果有超时时间,需要在超时后中断
                    Thread futureThread = null;
                    try {
                        // 包装成FutureTask,启动另一个线程,进行线程的超时中断处理
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {
                                // init job context
                                XxlJobContext.setXxlJobContext(xxlJobContext);

                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();
                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {

                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);

                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // 无超时时间的,直接执行
                    handler.execute();
                }

                // 校验执行结果
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                        ?tempHandleMsg.substring(0, 50000).concat("...")
                        :tempHandleMsg;
                    // 在context中设置结果信息
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                // 打印日志
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                                 + XxlJobContext.getXxlJobContext().getHandleCode()
                                 + ", handleMsg = "
                                 + XxlJobContext.getXxlJobContext().getHandleMsg()
                                );

            } else {
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            // handle result
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();

            XxlJobHelper.handleFail(errorMsg);

            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // callback handler info
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.getXxlJobContext().getHandleCode(),
                        XxlJobContext.getXxlJobContext().getHandleMsg() )
                     );
                } else {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                        triggerParam.getLogId(),
                        triggerParam.getLogDateTime(),
                        XxlJobContext.HANDLE_CODE_FAIL,
                        stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }

    // 如果强制停止,队列还有未处理的数据的时候
    while(triggerQueue !=null && triggerQueue.size()>0){
        // 队列中的回调触发器请求
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                triggerParam.getLogId(),
                triggerParam.getLogDateTime(),
                XxlJobContext.HANDLE_CODE_FAIL,
                stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }

    // 调用destroy方法,也就是@XxlJob注解中配置的destroy方法
    try {
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }
    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}

经过上面的代码分析,我觉得有几个写的很好的点可以参考学习的:

  1. 它的toStop变量。这个toStop变量是个volatile boolean类型的,保证可见性。这个并没有什么特别的,只要在while中进行判断,都需要使用volatile类型。但是它在判断toStop为false的时候,也就是可能被停止了,这个方法还做了处理,进行了一个回调。而不是直接返回,做清空队列的操作,我认为这点很好,毕竟遇到了意外的情况,还是有地方可以查到的。
  2. triggerQueue中获取队列中的值的时候,使用了poll(3L, TimeUnit.SECONDS),并没有直接使用take()。如果使用take()的话,没有元素会一直阻塞,这样即使是toStop已经变了,也无法检测到,因为阻塞了,无法继续往后执行了,这里留了一个口子,3s后如果没有poll到会立马返回null,保证3s后会进行一个toStop检测。
  3. 线程中还使用了一个idleTimes空闲次数的检测。如果超过了30次,就移除销毁这个线程。防止一些无用的线程一直在等待。因为在之前分析ExecutorBizImplrun方法中也可以看到有个重置后重新注册的一个过程,这个registJobThread会创建新的线程,如果改变了handler而不清理之前的线程,会导致线程越来越多。

总结

EmbedServer的start方法到最后执行的业务逻辑的JobThreadIJobHandlerexecute方法。我们逐层深入,从网络模型到线程模型,包括线程队列的阻塞,超时,停止。这里面的很多内容都值得我们进行深入的思考和学习的。假如我们也写一个网络程序,应该如何处理网络请求,应该如何采用异步处理来增大吞吐量,提高程序运行的效率。还要如何做好线程和异常的善后工作。相信读者在阅读完源码后,一定会有着自己的思考。

转载自:https://juejin.cn/post/7251895374771224631
评论
请登录