likes
comments
collection
share

【坑】: 谁教你的自定义拒绝策略不抛异常?

作者站长头像
站长
· 阅读数 57

概述

最近写一个批量业务接口,为了降低响应时间,遂使用了自定义线程池 + CompletableFuture异步执行任务,阻塞获取结果,

但在测试过程中,发现一个很奇怪的问题

下面造一个测试案例来复现问题,也方便带大家了解~


问题案例

@SpringBootTest
public class OtherTest {
​
   @Resource
   private ThreadPoolExecutor threadPoolExecutor;
​
   @Test
   public void test() {
      List<Integer> list = new ArrayList<>(100);
      for (int i = 0; i < 100; i++) {
         list.add(i);
      }
​
      List<CompletableFuture<Integer>> futureList = list.stream().map(num -> {
         return CompletableFuture.supplyAsync(() -> {
            // 模拟业务
            return num + 1;
         }, threadPoolExecutor);
      }).collect(Collectors.toList());
​
      CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
​
      for (CompletableFuture<Integer> completableFuture : futureList) {
         try {
            System.out.println(completableFuture.get());
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
   }
​
}

执行案例后,我们会发现主线程一直被阻塞,下面的打印根本没有执行!这 就是我遇到的问题

【坑】: 谁教你的自定义拒绝策略不抛异常?


为啥被阻塞了?

我们直接Dump Threads,可以看到主线程处于WAITING状态,通过堆栈跟踪,我们可以看到是被LockSupport.park

【坑】: 谁教你的自定义拒绝策略不抛异常?

因为我们最终会CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();阻塞获取结果

根据join方法一路跟踪,最终到达java.util.concurrent.CompletableFuture.Signaller#block,也符合上图的堆栈信息

【坑】: 谁教你的自定义拒绝策略不抛异常?

ok,问题算是找到了,但是为啥会一直阻塞呢?

根据我们之前的运行结果,可以发现,打印出了任务被拒绝的error日志,难不成是因为被拒绝的原因???

【坑】: 谁教你的自定义拒绝策略不抛异常?

想到这里,我直接调大了线程和队列容量

这里是之前的线程池配置参数

@Slf4j
@Configuration
public class ThreadPoolConfig {
​
   @Bean(name = "threadPoolExecutor")
   public ThreadPoolExecutor prepareThreadPoolExecutor() {
      return new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(5), new RejectedExecutionHandler() {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.error("ThreadPoolExecutor 任务被拒绝,请及时检查!!!");
         }
      });
   }
​
}

下面是调整之后的线程池参数,增加了队列容量和最大线程数

@Slf4j
@Configuration
public class ThreadPoolConfig {
​
   @Bean(name = "threadPoolExecutor")
   public ThreadPoolExecutor prepareThreadPoolExecutor() {
      return new ThreadPoolExecutor(2, 10, 1, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500), new RejectedExecutionHandler() {
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.error("ThreadPoolExecutor 任务被拒绝,请及时检查!!!");
         }
      });
   }
​
}

调整之后再次执行任务,可以看到执行成功

【坑】: 谁教你的自定义拒绝策略不抛异常?


看源码,找问题~

通过上面我们已经了解到,是因为任务被拒绝,导致主线程LockSupport.park一直处于WAITING状态,无法继续往下执行。

既然被park了,那肯定要unpark才行

那,什么时候会unpark呢?

当然是任务都执行完之后啦,才回去unpark主线程

【坑】: 谁教你的自定义拒绝策略不抛异常?

但之前我们通过日志看到,有些任务被拒绝了,任务被拒绝了就不会被执行呀,那阻塞获取该任务的线程就不会unpark,导致之后的任务也不执行,所以一直不能去unpark主线程,所以主线程一直被阻塞住。

下面再看看线程池的源码:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
​
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

线程池的八股文相信大家都知道,看上面源码我们知道,当线程数已经不小于核心线程数且队列容量也满的情况下,会再去尝试addWorker

// 此时core为false
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;
​
        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                // 当前线程数超过核心线程数,直接返回false去触发拒绝策略
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // ...省略
    return workerStarted;
}

addWorker中当前线程数超过核心线程数,直接返回false去触发拒绝策略,且不会把任务添加到队列中

【坑】: 谁教你的自定义拒绝策略不抛异常?


CompletableFuture不行,那Future呢?

可能有小伙伴会说既然CompletableFuture不行,那我用Future。

其实原理上都是一样的,Future也不行

@SpringBootTest
public class OtherTest {
​
   @Resource
   private ThreadPoolExecutor threadPoolExecutor;
​
   @Test
   public void test() {
      List<Integer> list = new ArrayList<>(100);
      for (int i = 0; i < 100; i++) {
         list.add(i);
      }
​
      List<Future<Integer>> futureList = list.stream().map(num -> {
         return threadPoolExecutor.submit(() -> {
            return num + 1;
         });
      }).collect(Collectors.toList());
​
      for (Future<Integer> future : futureList) {
         try {
            System.out.println(future.get());
         } catch (Exception e) {
            e.printStackTrace();
         }
      }
​
   }
​
}

使用Future同样会导致主线程park,处于WAITING状态

【坑】: 谁教你的自定义拒绝策略不抛异常?

下面是FutureTask的get源码,会通过LockSupport.park(this);进行阻塞获取

具体解释见 -> 异步超时中断,知其然,也要知其所以然~

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}
​
private int awaitDone(boolean timed, long nanos)
  throws InterruptedException {
  long startTime = 0L;
  WaitNode q = null;
  boolean queued = false;
  for (;;) {
    int s = state;
    if (s > COMPLETING) {
      if (q != null)
        q.thread = null;
      return s;
    }
    else if (s == COMPLETING)
      Thread.yield();
    else if (Thread.interrupted()) {
      removeWaiter(q);
      throw new InterruptedException();
    }
    else if (q == null) {
      if (timed && nanos <= 0L)
        return s;
      q = new WaitNode();
    }
    else if (!queued)
      queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
    else if (timed) {
      final long parkNanos;
      if (startTime == 0L) {
        startTime = System.nanoTime();
        if (startTime == 0L)
          startTime = 1L;
        parkNanos = nanos;
      } else {
        long elapsed = System.nanoTime() - startTime;
        if (elapsed >= nanos) {
          removeWaiter(q);
          return state;
        }
        parkNanos = nanos - elapsed;
      }
      if (state < COMPLETING)
        LockSupport.parkNanos(this, parkNanos);
    }
    else
      LockSupport.park(this);
  }
}

而,unpark时机就是在任务正常执行完后的finishCompletion,该方法会去唤醒所有阻塞获取该任务的线程

但就是任务被拒绝了,所以无法去唤醒阻塞获取该任务的线程,造就了惨案~

CompletableFuture其实也是这个原因

本质上都是因为任务被拒绝无法执行,导致无法去唤醒线程

public void run() {
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
      // ...省略
    }
}
​
​
protected void set(V v) {
  if (STATE.compareAndSet(this, NEW, COMPLETING)) {
    outcome = v;
    STATE.setRelease(this, NORMAL); // final state
    // 唤醒阻塞获取该任务的线程
    finishCompletion();
  }
}

解决方案

既然了解到坑的原因所在,那么找出对应的解决方案也不难

  1. 拒绝策略记得抛出异常,避免主线程一直等待,比如DiscardPolicyDiscardOldestPolicy都会存在一定的问题,没有对外抛出异常。
  2. 设置超时时间,及时释放资源,比如CompletableFuture在jdk9后,可以通过orTimeout设置超时时间,Future的话,他的get方法也支持设置等待时间。

相关文章

异步超时中断,知其然,也要知其所以然~

揭秘Future cancel迷惑性boolean入参~

ScheduledThreadPoolExecutor有坑嗷~

转载自:https://juejin.cn/post/7214124426047750203
评论
请登录