likes
comments
collection
share

优秀开源项目解读(八) - 基于配置中心的轻量级动态可监控线程池Dynamic-tp

作者站长头像
站长
· 阅读数 13

🌈简介

🔥🔥🔥 Dynamic-tp是基于配置中心的轻量级动态可监控线程池,基于dtp可以实现项目运行时动态调整线程池参数,包括黑心线程数、最大线程数、空闲线程超时时间、任务队列打的大小等,dtp目前支持调参通知、活性、队列容量、拒绝策略、超时共六类通知报警维度,在项目运行时实时+定时检测,触发阈值进行推送,定时采集线程池运行指标数据,提供jsonlogmicrometerendpoint三种指标数据采集方式,可灵活选择,集成了三方中间件线程池管理,目前已接入dubborocketmqhystrixgrpctomcatundertowjetty等组件线程池管理。

🎉项目背景

以往我们使用线程池ThreadPoolExecutor过程中你是否有以下痛点呢?

  1. 代码中创建了一个 ThreadPoolExecutor,但是不知道那几个核心参数设置多少比较合适
  2. 凭经验设置参数值,上线后发现需要调整,改代码重新发布服务,非常麻烦
  3. 线程池相对开发人员来说是个黑盒,运行情况不能及时感知到,直到出现问题

现在大多数的互联网项目其实都会微服务化部署,有一套自己的服务治理体系,微服务组件中的分布式配置中心扮演的就是动态修改配置, 实时生效的角色。那么我们是否可以结合配置中心来做运行时线程池参数的动态调整呢?答案是肯定的,而且配置中心相对都是高可用的, 使用它也不用过于担心配置推送出现问题这类事儿,而且也能减少研发动态线程池组件本身的难度和工作量。

综上,可以总结出以下的背景

  • 广泛性:在 Java 开发中,想要提高系统性能,线程池已经是一个 90%以上的人都会选择使用的基础工具
  • 不确定性:项目中可能会创建很多线程池,既有 IO 密集型的,也有 CPU 密集型的,但线程池的参数并不好确定;需要有套机制在运行过程中动态去调整参数
  • 无感知性:线程池运行过程中的各项指标一般感知不到;需要有套监控报警机制在事前、事中就能让开发人员感知到线程池的运行状况,及时处理
  • 高可用性:配置变更需要及时推送到客户端,需要有高可用的配置管理推送服务,配置中心是现在大多数互联网系统都会使用的组件,与之结合可以极大提高系统可用性

功能特性✔

  • 代码零侵入:我们改变了线程池以往的使用姿势,所有配置均放在配置中心,服务启动时会从配置中心拉取配置生成线程池对象放到 Spring 容器中,使用时直接从 Spring 容器中获取,对业务代码零侵入
  • 通知告警:提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝触发报警、任务执行或等待超时报警),已支持企业微信、钉钉、飞书、邮件报警,同时提供 SPI 接口可自定义扩展实现
  • 运行监控:定时采集线程池指标数据,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现
  • 任务增强:提供任务包装功能,实现 TaskWrapper 接口即可,如 MdcTaskWrapper、TtlTaskWrapper、SwTraceTaskWrapper,可以支持线程池上下文信息传递
  • 多配置中心支持:基于主流配置中心实现线程池参数动态调整,实时生效,已支持 Nacos、Apollo、Zookeeper、Consul、Etcd,同时也提供 SPI 接口可自定义扩展实现
  • 中间件线程池管理:集成管理常用第三方组件的线程池,已集成 Tomcat、Jetty、Undertow、Dubbo、RocketMq、Hystrix、Grpc、Motan、Okhttp3、Brpc、Tars 等组件的线程池管理(调参、监控报警)
  • 轻量简单:基于 SpringBoot 实现,引入 starter,接入只需简单 4 步就可完成,顺利 3 分钟搞定
  • 多模式:参考 Tomcat 线程池提供了 IO 密集型场景使用的 EagerDtpExecutor 线程池
  • 兼容性:JUC 普通线程池和 Spring 中的 ThreadPoolTaskExecutor 也可以被框架监控,@Bean 定义时加 @DynamicTp 注解即可
  • 可靠性:框架提供的线程池实现 Spring 生命周期方法,可以在 Spring 容器关闭前尽可能多的处理队列中的任务
  • 高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等)
  • 线上大规模应用:参考美团线程池实践open in new window,美团内部已经有该理论成熟的应用经验

💡技术架构

优秀开源项目解读(八) - 基于配置中心的轻量级动态可监控线程池Dynamic-tp

项目模块设计

优秀开源项目解读(八) - 基于配置中心的轻量级动态可监控线程池Dynamic-tp

配置变更监听模块

  1. 监听特定配置中心的指定配置文件(已实现 Nacos、Apollo、Zookeeper、Consul、Etcd),可通过内部提供的SPI接口扩展其他实现
  2. 解析配置文件内容,内置实现 yml、properties、json 配置文件的解析,可通过内部提供的 SPI 接口扩展其他实现
  3. 通知线程池管理模块实现参数的刷新

服务内部线程池管理模块

  1. 服务启动时从配置中心拉取配置,生成线程池实例注册到内部线程池注册中心以及 Spring 容器中
  2. 接受配置监听模块的刷新事件,实现线程池参数的刷新
  3. 代码中通过依赖注入(推荐)或者 DtpRegistry.getDtpExecutor() 方法根据线程池名称来获取线程池实例

三方组件线程池管理

  1. 服务启动获取第三方中间件的线程池,被框架管理起来
  2. 接受参数刷新、指标收集、通知报警事件,进行相应的处理

监控模块

实现监控指标采集以及输出,默认提供以下三种方式,也可通过内部提供的 SPI 接口扩展其他实现:

  1. 默认实现 JsonLog 输出到磁盘,可以自己采集解析日志,存储展示
  2. MicroMeter采集,引入 MicroMeter 相关依赖,暴露相关端点,采集指标数据,结合 Grafana 做监控大盘
  3. 暴雷自定义 Endpoint 端点(dynamic-tp),可通过 http 方式实时访问

通知告警模块

对接办公平台,实现通知告警功能,已支持钉钉、企微、飞书、邮件,可通过内部提供的 SPI 接口扩展其他实现,通知告警类型如下:

  1. 线程池主要参数变更通知
  2. 阻塞队列容量达到设置的告警阈值
  3. 线程池活性达到设置的告警阈值
  4. 触发拒绝策略告警,格式:A/B,A:该报警项前后两次报警区间累加数量,B:该报警项累计总数
  5. 任务执行超时告警,格式:A/B,A:该报警项前后两次报警区间累加数量,B:该报警项累计总数
  6. 任务等待超时告警,格式:A/B,A:该报警项前后两次报警区间累加数量,B:该报警项累计总数

优秀开源项目解读(八) - 基于配置中心的轻量级动态可监控线程池Dynamic-tp

代码模块

Dynamic-Tp目前代码主要分为七大块:

  • adapter模块: 主要是适配一些第三方组件的线程池管理,目前已经实现的有 SpringBoot 内置的三大 web 容器(Tomcat、Jetty、Undertow)、Dubbo、RocketMq、Hystrix、Grpc 的线程池管理, 后续会接入其他常用组件的线程池管理。
  • common模块: 主要是一些各个模板都会用到的类,解耦依赖,复用代码,大家日常开发中可能也经常会这样做。
  • core模块: 该框架的核心代码都在这个模块里,包括动态调整参数,监控报警,以及串联整个项目流程都在此。
  • example模块: 提供一个简单使用示例,方便使用者参照
  • extension模块: 放一些扩展功能实现,比如基于 redis 的流控扩展、邮件发送扩展、skywalking 上下文传递扩展等
  • logging模块: 用于配置框架内部日志的输出,目前主要用于输出线程池监控指标数据到指定文件
  • starter模块: 提供独立功能模块的依赖封装、自动配置等相关。
├─ dynamic-tp
├─adapter # 适配器
│  │  
│  ├─adapter-brpc #baidu rpc
│  │                                      
│  ├─adapter-common #common
│  │                                  
│  ├─adapter-dubbo #dubbon
│  │                                      
│  ├─adapter-grpc #grpc
│  │                                  
│  ├─adapter-hystrix #hystric熔断限流
│  │                                  
│  ├─adapter-motan #
│  │                                  
│  ├─adapter-okhttp3 #okhttp远程调用工具
│  │                                  
│  ├─adapter-rocketmq #消息队列RocketMq
│  │                                  
│  ├─adapter-tars #tars
│  │                                  
│  └─adapter-webserver #webservers

├─common #common公共模块

├─core #核心业务陌模块
│  │                 
├─example #示例
│  │  
│  ├─example-adapter
│  │  │     
│  ├─example-apollo
│  │  │                  
│  ├─example-consul-cloud
│  │  │             
│  ├─example-etcd
│  │  │                       
│  ├─example-nacos
│  │  │                
│  ├─example-nacos-cloud
│  │  │  
│  │  │  
│  ├─example-zookeeper
│  │  │               
│  └─example-zookeeper-cloud

├─extension #扩展
│  │  
│  ├─extension-limiter-redis
│  │  │                      
│  ├─extension-notify-email
│  │  │                  
│  └─extension-skywalking

├─logging #日志

├─starter #项目启动starter
│  │  pom.xml
│  │  
│  ├─starter-adapter
│  │  │  
│  │  ├─starter-adapter-brpc
│  │  │  
│  │  ├─starter-adapter-common
│  │  │                 
│  │  ├─starter-adapter-dubbo
│  │  │    
│  │  ├─starter-adapter-grpc
│  │  │ 
│  │  ├─starter-adapter-hystrix
│  │  │ 
│  │  ├─starter-adapter-motan
│  │  │         
│  │  ├─starter-adapter-okhttp3
│  │  │            
│  │  ├─starter-adapter-rocketmq
│  │  │          
│  │  ├─starter-adapter-tars
│  │  │  
│  │  └─starter-adapter-webserver
│  │       
│  ├─starter-common
│  │  
│  ├─starter-configcenter
│  │  │  
│  │  ├─cloud-starter-consul
│  │  │                      
│  │  ├─cloud-starter-nacos
│  │  │                      
│  │  ├─cloud-starter-zookeeper  
│  │  │                      
│  │  ├─starter-apollo
│  │  │                      
│  │  ├─starter-etcd
│  │  │                                   
│  │  ├─starter-nacos
│  │  │                      
│  │  └─starter-zookeeper
│  │                          
│  └─starter-extension
│      │  
│      ├─starter-extension-limiter-redis
│      │                      
│      └─starter-extension-notify-email

└─test

Maven依赖

  1. apollo 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-boot-starter-apollo</artifactId>
    <version>1.0.9</version>
</dependency>
  1. spring-cloud 场景下的 nacos 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-cloud-starter-nacos</artifactId>
    <version>1.0.9</version>
</dependency>
  1. 非 spring-cloud 场景下的 nacos 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-boot-starter-nacos</artifactId>
    <version>1.0.9</version>
</dependency>

注意版本:nacos-config-spring-boot-starter 0.2.10 及以下版本对应 springboot 2.3.12.RELEASE及以下版本, 0.2.11-beta及以上版本对应springboot 版本2.4.0及以上版本,具体看官方说明。

  1. zookeeper 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-boot-starter-zookeeper</artifactId>
    <version>1.0.9</version>
</dependency>

application.yml 需配置 zk 地址节点信息

spring:
  application:
    name: dynamic-tp-zookeeper-demo
  dynamic:
    tp:
      config-type: properties         # zookeeper支持 properties & json 配置
      zookeeper:
        config-version: 1.0.0
        zk-connect-str: 127.0.0.1:2181
        root-node: /configserver/dev
        node: dynamic-tp-zookeeper-demo

注意:配置中心配置文件参考example-zookeeper/resource下的config.txt / config.json,该文件可以通过ZKUI工具导入到Zookeeper

  1. spring-cloud 场景下 zookeeper 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-cloud-starter-zookeeper</artifactId>
    <version>1.0.9</version>
</dependency>

注意:配置中心配置文件参考example-zookeeper-cloud/resource下的config.txt,该文件可以通过ZKUI工具导入到Zookeeper

  1. spring-cloud 场景 consul 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-cloud-starter-consul</artifactId>
    <version>1.0.9</version>
</dependency>
  1. etcd 应用接入用此依赖
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-boot-starter-etcd</artifactId>
    <version>1.0.9</version>
</dependency>
  1. 无配置中心应用接入用此依赖,无动态调整能力,有监控告警能力
<dependency>
    <groupId>cn.dynamictp</groupId>
    <artifactId>dynamic-tp-spring-boot-starter-common</artifactId>
    <version>1.0.9</version>
</dependency>

注意:一定要根据应用类型引入正确的依赖,不然会集成失败,有版本兼容性问题可以提 Issues 或加群反馈。

如何快速创建线程池

core模块ThreadPoolCreator类提供快速创建内存安全线程池的静态方法,可以用来替换 Executors类, 内部是基于ThreadPoolBuilder来创建的,也可以通过ThreadPoolBuilder来创建线程池对象。

public static ThreadPoolExecutor createCommonFast(String threadPrefix) {
        return ThreadPoolBuilder.newBuilder()
                .threadFactory(threadPrefix)
                .buildCommon();
    }

    public static ExecutorService createCommonWithTtl(String threadPrefix) {
        return ThreadPoolBuilder.newBuilder()
                .dynamic(false)
                .threadFactory(threadPrefix)
                .buildWithTtl();
    }

    public static DtpExecutor createDynamicFast(String poolName) {
        return createDynamicFast(poolName, poolName);
    }

    public static DtpExecutor createDynamicFast(String poolName, String threadPrefix) {
        return ThreadPoolBuilder.newBuilder()
                .threadPoolName(poolName)
                .threadFactory(threadPrefix)
                .buildDynamic();
    }

    public static ExecutorService createDynamicWithTtl(String poolName) {
        return createDynamicWithTtl(poolName, poolName);
    }

    public static ExecutorService createDynamicWithTtl(String poolName, String threadPrefix) {
        return ThreadPoolBuilder.newBuilder()
                .threadPoolName(poolName)
                .threadFactory(threadPrefix)
                .buildWithTtl();
    }

    public static ThreadPoolExecutor newSingleThreadPool(String threadPrefix, int queueCapacity) {
        return newFixedThreadPool(threadPrefix, 1, queueCapacity);
    }

    public static ThreadPoolExecutor newFixedThreadPool(String threadPrefix, int poolSize, int queueCapacity) {
        return ThreadPoolBuilder.newBuilder()
                .corePoolSize(poolSize)
                .maximumPoolSize(poolSize)
                .workQueue(QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), queueCapacity, null)
                .threadFactory(threadPrefix)
                .buildDynamic();
    }

    public static ExecutorService newCachedThreadPool(String threadPrefix, int maximumPoolSize) {
        return ThreadPoolBuilder.newBuilder()
                .corePoolSize(0)
                .maximumPoolSize(maximumPoolSize)
                .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, null)
                .threadFactory(threadPrefix)
                .buildDynamic();
    }

    public static ThreadPoolExecutor newThreadPool(String threadPrefix, int corePoolSize,
                                                   int maximumPoolSize, int queueCapacity) {
        return ThreadPoolBuilder.newBuilder()
                .corePoolSize(corePoolSize)
                .maximumPoolSize(maximumPoolSize)
                .workQueue(QueueTypeEnum.MEMORY_SAFE_LINKED_BLOCKING_QUEUE.getName(), queueCapacity, null)
                .threadFactory(threadPrefix)
                .buildDynamic();
    }

通知报警

调整参数通知

  • 配置变更会推送通知消息,且会高亮变更的字段。

优秀开源项目解读(八) - 基于配置中心的轻量级动态可监控线程池Dynamic-tp

告警

DynamicTp框架目前提供以下告警功能,每一个告警项都可以独立配置是否开启、告警阈值、告警间隔时间、平台等,具体代码请看core模块notify包, 告警信息同时会高亮与该项相关的字段。

  1. 核心参数变更通知
  2. 线程池活跃度告警
  3. 队列容量告警
  4. 拒绝策略告警
  5. 任务执行超时告警
  6. 任务排队超时告警

优秀开源项目解读(八) - 基于配置中心的轻量级动态可监控线程池Dynamic-tp

Dynamic-tp作为dromara组织的其中一员,有很多值得借鉴和学习的地方,本期内容分享就到这里了,我是👨‍🎓austin流川枫,如果有帮助的,欢迎点赞+关注+收藏,谢谢大家~