likes
comments
collection
share

Java中扩展线程池的拒绝策略

作者站长头像
站长
· 阅读数 13

今天学点优雅的,兄弟们👌👌👌

我们想在任务被拒绝时,扩展一些,比如在任务被拒绝时,发出警报、统计次数等

别急🙌🙌🙌,先看一下源码中拒绝策略是怎么实现的?

 /**
  * Invokes the rejected execution handler for the given command.
  * Package-protected for use by ScheduledThreadPoolExecutor.
  */
 final void reject(Runnable command) {
     handler.rejectedExecution(command, this);
 }

很好😢,线程池底层设计中,将抛出拒绝策略方法的访问级别设置为 默认访问权限,并添加了 final 关键字,所以我们没法继承和直接调用reject方法

那怎么办,😭😭😭

用代理!!!开始跟着我学习🙌🙌🙌

代理模式

代理模式(Proxy Design Pattern),在不改变原始类代码的情况下,引入代理类对原始类的功能作出增强。

Java中扩展线程池的拒绝策略

很有干劲,直接来实践:

首先我们来扩展线程池,添加一个拒绝策略次数统计参数,并添加原子自增和查询方法。

 public class SupportThreadPoolExecutor extends ThreadPoolExecutor {
 ​
     /**
      * 拒绝策略次数统计
      */
     private final AtomicInteger rejectCount = new AtomicInteger();
 ​
     public SupportThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
     }
 ​
     /**
      * 设置 {@link SupportThreadPoolExecutor#rejectCount} 自增
      */
     public void incrementRejectCount() {
         rejectCount.incrementAndGet();
     }
 ​
     /**
      * 获取拒绝次数
      *
      * @return
      */
     public int getRejectCount() {
         return rejectCount.get();
     }
 }

好了,然后呢,我有点懵,别急,慢慢捋一下,接下来是不是要用到上面的扩展的东西了?

就是该使用自己扩展的线程池了!

使用之前,是不是得自定义一个拒绝策略?是的,来直接扩展拒绝策略处理器🙌🙌🙌

 public interface SupportRejectedExecutionHandler extends RejectedExecutionHandler {
 ​
     /**
      * 拒绝策略记录时, 执行某些操作
      *
      * @param executor
      */
     default void beforeReject(ThreadPoolExecutor executor) {
         if (executor instanceof SupportThreadPoolExecutor) {
             SupportThreadPoolExecutor supportExecutor = (SupportThreadPoolExecutor) executor;
             // 发起自增
             supportExecutor.incrementRejectCount();
             // 触发报警...
             System.out.println("线程池触发了任务拒绝...");
         }
     }
 }

很简答,在这个接口中我们新写了一个接口的方法的默认实现(jdk8新特性),在这个里面实现了上面我们的两个要求:次数自增、触发警报

然后,现在我们开始自定义一个拒绝策略:

 public class SupportAbortPolicyRejected extends ThreadPoolExecutor.AbortPolicy implements SupportRejectedExecutionHandler {
 ​
     @Override
     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
         beforeReject(e);
         super.rejectedExecution(r, e);
     }
 }

打工告成😭😭😭

开始测试:

 @SneakyThrows
 public static void main(String[] args) {
     SupportThreadPoolExecutor executor = new SupportThreadPoolExecutor(
             1,
             1,
             1024,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue(1),
             // 使用自定义拒绝策略
             new SupportAbortPolicyRejected()
     );
 ​
     // 测试流程
     for (int i = 0; i < 3; i++) {
         try {
             // 无限睡眠, 以此触发拒绝策略.(此处有异常, 为了减少无用代码, 省略...)
             executor.execute(() -> Thread.sleep(Integer.MAX_VALUE));
         } catch (Exception ignored) {
         }
     }
 ​
     Thread.sleep(50);
     System.out.println(String.format("线程池拒绝策略次数 :: %d", executor.getRejectCount()));
 }
 ​
 /**
  * 日志打印:
  *
  * 线程池触发了任务拒绝...
  * 线程池拒绝策略次数 :: 1
  */

根据日至打印得知,我们的扩展需求完整的实现了。当线程池执行任务拒绝行为时,首先会调用 SupportRejectedExecutionHandler#beforeReject,然后才是执行真正的拒绝策略行为💕

静态代理

很明显,上面的代理思路很简单,就是用扩展类替换了原来要使用的类,在扩展类中扩展了我们想要的东西

Java中扩展线程池的拒绝策略

但是这种方式优雅吗?🤷‍♂️🤷‍♂️🤷‍♂️

很不优雅,静态代理会造成系统设计中类的数量增加、增加了系统复杂度

怎么解决,动态代理缓缓走来🐕🐕🐕

动态代理

顾名思义,就是在程序运行时来动态增强类的行为,它取消了对被代理类的扩展限制,遵循开闭原则(对开放扩展,对修改关闭)

如何将动态代理代入到线程池拒绝策略呢?文章采用 JDK 动态代理的例子和大家说明:

回顾一下JDK动态代理怎么用:

先创建一个实现 InvocationHandler 接口的类,实现里面的invoke类

创建一个接口和一个接口实现的类(这个类是用来被扩展的)

然后使用:在代码中,创建这个接口实现的类的实例和实现 InvocationHandler 接口的类的实例

然后使用 Proxy.newProxyInstance 创建动态代理对象,将这两个类放进去,然后直接调用代理对象的实例的方法就完成了代理

好🙌🙌🙌,跟着节奏来

创建核心类 InvocationHandler,这个类主要负责代理拒绝策略执行的,重写里面的invoke方法,看代码:

 @AllArgsConstructor
 public class RejectedExecutionProxyInvocationHandler implements InvocationHandler {
 ​
     private RejectedExecutionHandler target;
 ​
     private SupportThreadPoolExecutor executor;
 ​
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
         // 执行拒绝策略前自增拒绝次数 & 发起报警
         executor.incrementRejectCount();
         System.out.println("线程池触发了任务拒绝...");
         return method.invoke(target, args);
     }
 }

创建一个接口和一个接口实现的类(这个类是用来被扩展的),这个用的是线程池的拒绝策略实例abortPolicy

  ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();

然后创建这两个类的实例,并在代码中测试:

 @SneakyThrows
 public static void main(String[] args) {
     // 删除 SupportThreadPoolExecutor 构造方法中的拒绝策略
     SupportThreadPoolExecutor executor = new SupportThreadPoolExecutor(
             1,
             1,
             1024,
             TimeUnit.SECONDS,
             new LinkedBlockingQueue(1)
     );
 ​
     ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
     // 创建拒绝策略代理类
     RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) Proxy.newProxyInstance(
             abortPolicy.getClass().getClassLoader(),
             abortPolicy.getClass().getInterfaces(),
             new RejectedExecutionProxyInvocationHandler(abortPolicy, executor)
     );
     // 线程池 set 拒绝策略代理类
     executor.setRejectedExecutionHandler(rejectedExecutionHandler);
 ​
     // 测试流程
     for (int i = 0; i < 3; i++) {
         try {
             // 无限睡眠, 以此触发拒绝策略.(此处有异常, 为了减少无用代码, 省略...)
             executor.execute(() -> Thread.sleep(Integer.MAX_VALUE));
         } catch (Exception ex) {
             // ignore
         }
     }
 ​
     Thread.sleep(50);
     System.out.println(String.format("线程池拒绝策略次数 :: %d", executor.getRejectCount()));
 }
 ​
 /**
  * 日志打印:
  *
  * 线程池触发了任务拒绝...
  * 线程池拒绝策略次数 :: 1
  */

好的,完成,至此,已经成功了一半多了

虽然创建动态代理可以解决 类的数量增加;但是,代理类的创建依然需要开发人员操作,这样上面说的静态代理的第二个缺点依然无法解决

我们换一种思路去创建代理拒绝策略类,从外部的创建变更到内部就可以了;这一版选择在线程池的构造方法内部实现代理类:

 public class SupportThreadPoolExecutor extends ThreadPoolExecutor {
 ​
     // 省略代码...
 ​
     public SupportThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
         super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
 ​
         RejectedExecutionHandler rejectedExecutionHandler = (RejectedExecutionHandler) Proxy.newProxyInstance(
                 handler.getClass().getClassLoader(),
                 handler.getClass().getInterfaces(),
                 new RejectedExecutionProxyInvocationHandler(handler, this)
         );
 ​
         setRejectedExecutionHandler(rejectedExecutionHandler);
     }
 ​
     // 省略代码...
 }

完结撒花😜😜😜,总的来说不难,捋清楚了很简单,你搞清楚了吗?

转载自:https://juejin.cn/post/7316202800663691274
评论
请登录