并发-AQS之ThreadPoolExecutor源码解读(一)
ThreadPoolExecutor是Java中一个线程池的实现类,可以用于管理和调度多个线程执行任务。线程池中维护了一个线程池容量,当有任务需要执行时,线程池中的线程会被分配执行任务,执行完毕后又会返回线程池等待下一次执行任务。 整体UML类图如下
基础概念
核心参数
构造线程池需要7个参数,下面是ThreadPoolExecutor提供的构造方法
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)
corePoolSize
线程池核心线程数
private volatile int corePoolSize;
maximumPoolSize
线程池最大线程数
private volatile int maximumPoolSize;
keepAliveTime
空闲线程等待工作的超时时间(以纳秒为单位)。当存在超过corePoolSize数量的线程或者允许核心线程超时allowCoreThreadTimeOut时,线程使用此超时时间。否则,它们将永远等待新的工作任务。 它的单位是是由unit确定 线程池成员变量keepAliveTime的单位是纳秒nanoseconds
private volatile long keepAliveTime;
unit
主要讲构造参数keepAliveTime转换成单位为
this.keepAliveTime = unit.toNanos(keepAliveTime);
workQueue
阻塞队列,用来存放任务线程 其提供方法如下
public interface BlockingQueue<E> extends Queue<E> {
// 如果队列已满,无法插入新元素,则会抛出IllegalStateException异常。否则,将元素插入到队列的尾部,并返回true
boolean add(E e);
// 如果队列已满,无法插入新元素,则会返回false。否则,将元素插入到队列的尾部,并返回true
boolean offer(E e);
// 用于将指定元素插入到队列尾部。如果队列已满,则会阻塞当前线程,直到有空间可用为止。
void put(E e) throws InterruptedException;
// 用于将指定元素插入到队列尾部。如果队列已满,则会等待指定的时间,在超时时间内如果仍无法插入元素则返回false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 用于从队列头部取出并移除一个元素。如果队列为空,则会阻塞当前线程,直到有元素可用为止。
E take() throws InterruptedException;
// 用于从队列头部取出并移除一个元素。如果队列为空,则会等待指定的时间,在超时时间内如果仍无法取出元素则返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
// 用于获取当前队列可用空间的大小
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
// 将队列中所有元素移除,并将它们添加到指定的集合中。移除的元素数量会作为方法的返回值返回。
int drainTo(Collection<? super E> c);
// 将队列中最多 maxElements 个元素移除,并将它们添加到指定的集合中。移除的元素数量会作为方法的返回值返回
int drainTo(Collection<? super E> c, int maxElements);
}
其在JUC包中的实现类如下
- ArrayBlockingQueue:一个基于数组的阻塞队列实现,有固定的容量,当队列满时,会阻塞插入操作,直到队列中有空间可用
- LinkedBlockingQueue:一个基于链表的阻塞队列实现,可以指定容量,如果未指定容量,则默认容量为Integer.MAX_VALUE。插入操作在队列已满时会阻塞,取出操作在队列为空时会阻塞
- PriorityBlockingQueue:一个具有优先级的阻塞队列实现,元素按照它们的优先级顺序被插入到队列中。如果两个元素具有相同的优先级,则它们将按照它们被插入到队列中的顺序进行比较
- SynchronousQueue:一个不存储元素的阻塞队列实现,它的每个插入操作必须等待一个对应的删除操作,否则插入操作会一直阻塞
- DelayQueue:一个具有延迟时间的阻塞队列实现,插入的元素必须实现Delayed接口,队列会按照延迟时间的大小来进行排序,延迟时间最小的元素将被首先取出
threadFactory
线程工厂,主要用于创建线程
public interface ThreadFactory {
Thread newThread(Runnable r);
}
handler
RejectedExecutionHandler 饱和策略,默认情况下使用AbortPolicy
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
其有四个实现策略
- AbortPolicy:默认的拒绝策略,当 Executor 无法接受新任务时,会抛出 RejectedExecutionException 异常
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
- CallerRunsPolicy:当 Executor 无法接受新任务时,会使用调用线程来执行该任务。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
- DiscardPolicy:当 Executor 无法接受新任务时,会直接丢弃该任务,不会抛出任何异常。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
- DiscardOldestPolicy:当 Executor 无法接受新任务时,会丢弃队列中最早的任务,并尝试重新提交新的任务。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
其他成员变量
ctl
一个原子整型变量,用于记录线程池的状态和线程数目。 ctl 的高 3 位表示线程池的状态,低 29 位表示线程数目
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
线程池状态
- RUNNING:线程池正在运行中,可以接受新的任务。
- SHUTDOWN:线程池正在关闭中,不再接受新的任务,但会继续处理队列中的任务。
- STOP:线程池已经停止,不再接受新的任务,也不再处理队列中的任务,并会中断正在执行的任务。
- TIDYING:线程池正在整理线程,即将所有线程都终止,转换为 TERMINATED 状态。
- TERMINATED:线程池已经终止,不再接受新的任务,也不再处理队列中的任务。
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
RUNNING 状态:当线程池被创建时,初始状态为 RUNNING。 SHUTDOWN 状态:当调用 shutdown() 方法时,线程池会进入 SHUTDOWN 状态。此时线程池不再接受新的任务,但会继续执行队列中的任务。 STOP 状态:当调用 shutdownNow() 方法时,线程池会进入 STOP 状态。此时线程池不再接受新的任务,并中断正在执行的任务。 TIDYING 状态:当线程池完成所有任务后,会进入 TIDYING 状态。在此状态下,线程池会将所有线程终止,并执行一些清理操作,如将工作队列置为 null。 TERMINATED 状态:当线程池完成清理操作后,会进入 TERMINATED 状态。此时线程池已经终止,不再接受新的任务。
allowCoreThreadTimeOut
一个布尔型参数,用于设置是否允许核心线程超时。如果设置为 true,则在任务执行完毕后,空闲的核心线程会在 keepAliveTime 时间内超时退出。
默认情况下,ThreadPoolExecutor 中的核心线程不会超时。即使在没有新任务可执行时,核心线程也会一直保持活动状态。这是为了保证线程池的响应速度,因为创建和销毁线程都需要一定的时间 通过下列方法设置
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
largestPoolSize
用于记录线程池中曾经出现过的最大线程数。具体来说,largestPoolSize 记录了线程池中在任何时刻的最大线程数(包括核心线程数和非核心线程数) largestPoolSize 可以用来记录线程池中曾经创建的最大线程数,以便于监控线程池的状态和性能
completedTaskCount
用于记录线程池中已经完成执行的任务数量。具体来说,completedTaskCount 记录了线程池中已经成功执行完成的任务数量,包括已经从工作队列中移除的任务和已经执行完毕的任务。 获取总的执行完成任务需要加上每个work的执行数量
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
workers
线程池中的工作线程。 Worker 继承自 AQS类,实现了 Runnable 接口。每个 Worker 对象都对应着一个工作线程。 Worker 类实现了 AQS 类的 acquire() 和 release() 方法,用于实现工作线程的锁定和解锁。这些方法的主要作用是保证同一时间只有一个工作线程在执行任务,从而保证线程池的安全性。
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
关键方法
execute(Runnable command)
将任务异步提交到线程池处理
public void execute(Runnable command) {
//如果传入的任务是空,抛异常
if (command == null)
throw new NullPointerException();
//获取当前线程池状态
int c = ctl.get();
//1. 如果活动线程数<核心线程数
if (workerCountOf(c) < corePoolSize) {
//创建核心线程来处理传入的任务
if (addWorker(command, true))
return;
//如果创建失败,说明有其他线程介入
//再次获取最新线程池状态
c = ctl.get();
}
//2. 如果当前线程池是运行态,并且成功将任务放入工作队列
if (isRunning(c) && workQueue.offer(command)) {
//添加任务队列成功
//再次检查最新线程状态
int recheck = ctl.get();
//如果当我们将任务放入队列后出现了:
//1. 线程池不再是运行态 --> 将任务从队列中剔除,并执行拒绝策略(非运行状态,拒绝接受任务)
//2. 活动线程数为0 --> 创建非核心线程,处理的任务由非核心线程自主去工作队列中获取
if (! isRunning(recheck) && remove(command))
//线程池状态不正确,执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
//情况1:RUNNING状态,但设置核心线程数为0,此时任务放入了工作队列,但没人处理
//情况2:SHUTDOWN状态,但remove任务失败。在此之前工作线程处理完所有任务后全部停止工作,为了保证队列为空,需要再创建一个空任务非核心线程,处理这个队列中的剩余任务
addWorker(null, false);
}
//3. 如果线程池不是运行态,或者是运行态,但任务队列已满,尝试添加非核心线程处理当前任务
else if (!addWorker(command, false))
//添加非核心线程失败,执行拒绝策略
reject(command);
}
整体流图基本如下
第一步:如果活动线程数<核心线程数,创建核心线程来处理传入的任务
计算线程数量
private static int workerCountOf(int c) {
return c & CAPACITY;
}
添加核心线程数addWorker(command, true)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
//如果出现以下情况,将不添加工作线程
//1. 线程池非运行态
//2. 在1的条件下,线程池为STOP状态(既不是运行态也不是SHUTDOWN态),且传入了任务
//3. 在2的条件下,工作队列不为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//如果线程池状态为运行态,进入该循环,自旋cas尝试增加工作线程数
for (;;) {
int wc = workerCountOf(c);
//创建失败条件:
//1. 如果当前活动线程数已经到达了最大线程数
//2. 如果请求创建核心线程,但核心线程数已经满了
//3. 如果请求非核心线程数,但总线线程数已经满了
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//如果还可以创建,先自旋cas尝试增加工作线程数
if (compareAndIncrementWorkerCount(c))
//中断循环退出
break retry;
//如果还可以创建,先自旋cas尝试增加工作线程数
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
//如果线程池运行状态变化(变成非运行态)重新进入retry循环查看条件
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//新的Worker,Worker封装了线程、任务
//初始Worker的state是-1
w = new Worker(firstTask);
final Thread t = w.thread;
//健壮性判断
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//1. 如果是运行态
//2. SHUTDOWN态且传入任务为空,这个情况创建线程的目的是处理工作队列中剩余任务。
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker主要是添加工作线程,然后再让线程跑起来,我们看下Worker的run方法
public void run() {
runWorker(this);
}
最终调用的runWorker方法
final void runWorker(Worker w) {
//获取当前线程环境
Thread wt = Thread.currentThread();
//取出Worker封装的初始任务
Runnable task = w.firstTask;
//将Worker封装的初始任务清空
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//循环获取可执行的任务,进入循环的条件:
//1. 如果worker中有初始任务
//2. 获取到工作队列中的任务
while (task != null || (task = getTask()) != null) {
//线程是临界区资源,处理任务的过程需要将线程上锁
w.lock();
//1.如果线程为STOP状态,或者TIDYING、TERMINATED状态,那么就将线程中断(将中断标记设为true)
//2.如果线程中断标记为真,且状态此时变为上述状态,那么就将线程中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//如果线程没被中断,为正常运行态
//执行任务前的钩子函数,注意,钩子函数抛出的异常是没人管的,会直接进入到finally代码块
beforeExecute(wt, task);
Throwable thrown = null;
try {
//在当前线程环境执行任务的run()方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后的钩子函数
afterExecute(task, thrown);
}
} finally {
//处理完任务,将task置空,等待GC
task = null;
//累计完成任务数+1
w.completedTasks++;
//处理任务完成,将线程资源释放
w.unlock();
}
}
//如果执行到这里之前有人抛出异常,且无人捕获,则不执行这句,直接进入finally
//(工作线程 task.run() 即使抛了异常,也被捕获了。只有可能在 getTask,w.lock(), 钩子函数 等无人捕获异常的地方抛出异常,才有可能跳过这句话的执行,直接进入finally)
//如果执行到这里都没人抛出异常,或者抛出的异常全都被捕获过,则执行这一句
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
上述方法有几个重要的方法,我们先看getTask()
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
//获取线程池最新状态
int c = ctl.get();
int rs = runStateOf(c);
//如果线程非运行态
//如果
//1. 是STOP、TIDYING、TERMINATED状态
//2. 是SHUTDOWN状态,且工作队列为空
//则自旋cas减小工作线程数,并return null,让所在线程获取任务失败,退出获取任务的循环,从而结束线程。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//如果线程为运行态
//或者是SHUTDOWN状态,但工作队列不为空
//就需要将任务取出处理
int wc = workerCountOf(c);
//allowCoreThreadTimeOut 一般都是 false
//判断当前线程是否有限时,是需要判断当前线程是否为核心线程的,这里用了很聪明的方法:判断工作线程数是否大于核心线程数
//如果工作线程数小于核心线程数,说明当前线程一定是核心线程,那么不限时
//反之,就直接认定进入到当前判断的是非核心线程,让它限时。
//注意,这里没有上锁
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果工作线程超过了最大值,或者当前线程超时了,那么当前线程应当被废弃
//额外条件:且此时 有别的工作线程(有别人处理任务) 或者 工作队列为空(没有任务需要处理)
//那么当前线程就确定可以废弃,return null,结束当前线程。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//CAS减小工作线程数,不取出任务
if (compareAndDecrementWorkerCount(c))
return null;
//自旋
continue;
}
//如果当前线程可用
try {
//如果限时,就通过限时方式poll()获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//如果不限时,直接阻塞获取任务
workQueue.take();
if (r != null)
//如果拿到了任务,直接返回执行
return r;
//进到这里说明超时了还没拿到任务
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
如果线程获取工作队列任务失败,或者被判定为当前线程应当结束,将会退出 runWorker() 的 while 循环,并进入 processWorkerExit() 处理线程结束的工作
//移除当前工作线程
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果执行了 completedAbruptly=false 语句,说明当前线程异常退出了。需要将线程池工作线程数更新-1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
//有线程退出的时候,将要操作 workers 这个非线程安全集合,它是临界区资源
//对临界区资源的 检查+修改,需要原子性操作,需要上锁。对 mainLock 上锁。
// mainLock 逻辑上用于 workers 资源的上锁, 这里顺便同时也对 completedTaskCount 这个互斥资源进行逻辑上的上锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//工作记录,统计线程池总完成任务数
completedTaskCount += w.completedTasks;
//将该线程从 workers 集合中取出
workers.remove(w);
} finally {
//释放对 workers 资源的 mainLock 锁
mainLock.unlock();
}
//释放线程之后,尝试终止线程池
//终止条件:SHUTDOWN状态且工作线程数为0且工作队列为空 || STOP状态且工作线程数为0
tryTerminate();
//获取当前线程池状态
int c = ctl.get();
//如果线程池状态为 RUNNING 或者 SHUTDOWN
if (runStateLessThan(c, STOP)) {
//如果是正常状态溢出当前线程,进入if方法块
if (!completedAbruptly) {
// 核心线程数最小值允许多少?
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//工作队列不为空,设置工作线程的最小值为1
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果工作线程数大于等于工作线程最小值,说明还有工作线程在线程池中,那就return,什么也不干。
if (workerCountOf(c) >= min)
return; // replacement not needed
}
//1. 如果是不正常的方式移除了当前工作线程,再补一个工作线程!
//2. 线程池工作队列不为空,并且没有工作线程,再添加一个工作线程。
addWorker(null, false);
}
}
上述在线程退出后尝试终止线程池tryTerminate 如果当前状态为 SHUTDOWN 或者 STOP ,而且满足变为 TIDYING 状态的条件,就CAS一次尝试将状态变为 TIDYING 。如果变为 TIDYING 成功,将继续变为 TERMINATED。在真正变为 TERMINATED 之前,会给一个钩子函数 terminated() 由外界/子类 实现,处理变为 TERMINATED 状态之前的最后操作。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//如果是运行态,就不终止线程
if (isRunning(c) ||
//如果已经是TIDYING状态了,就不作处理
runStateAtLeast(c, TIDYING) ||
//如果是SHUTDOWN,但是工作队列还有任务,不作处理( SHUTDOWN需要到工作队列为空的时候,才可以变成TIDYING)
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//如果是SHUTDOWN或者STOP状态,但是工作线程还没清空
if (workerCountOf(c) != 0) { // Eligible to terminate
//中断一个当前运行的线程
interruptIdleWorkers(ONLY_ONE);
return;
}
//如果可以变为TIDYING状态,上锁,因为terminated中的操作可能会是临界区资源
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//自定义处理线程池终止前的最后操作
terminated();
} finally {
//最终设置为TERMINATED线程池状态
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
最后我们看下addWorker中添加工作线程失败的addWorkerFailed方法
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//移除工作线程
workers.remove(w);
//工作线程数量减一
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
第二步:活动线程数>核心线程数且是运行态,添加队列中
关键方法前面已经贴了,再贴一下
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
其他方法前面已经解读过,我们看下reject(command)
final void reject(Runnable command) {
//调用丢弃策略处理
handler.rejectedExecution(command, this);
}
第三步:如果线程池不是运行态,或者是运行态,但任务队列已满,尝试添加非核心线程处理当前任务
else if (!addWorker(command, false))
reject(command);
submit(Runnable task, T result)
用于提交一个 Runnable 任务,并返回一个 Future 对象,该对象可以用于获取任务执行的结果。 主要包含下面3个方法
//AbstractExecutorService
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
其核心方法是newTaskFor,最主要实现是FutureTask
//AbstractExecutorService
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
FutureTask 是 Java 中的一个类,实现了 Future 和 Runnable 接口,可以用于异步执行一个任务,并在任务完成后获取其结果。FutureTask 可以用于手动创建一个 Future 对象,或者作为 ThreadPoolExecutor 的任务提交到线程池中执行。 后续再阅读FutureTask
本文篇幅较长,后续内容请在下面网页查看 并发-AQS之ThreadPoolExecutor源码解读(二)
JDK中实现AQS简介
同步工具 | 与AQS关联 | 详细介绍 |
---|---|---|
AQS原理讲解 | AQS原理介绍 | 并发-AQS原理讲解 |
ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理。 | AQS之Reentrantlonk源码解读 |
Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数。 | Semaphore 源码分析以及AQS共享加解锁 |
CountDownLatch | 在多线程并发执行任务时,有时需要让某些线程等待某些条件达成后再开始执行,这时就可以使用CountDownLatch来实现 | CountDownLatch 源码分析 |
ThreadPoolExecutor | 创建线程池中的工作线程worker继承AQS,实现独占资源 | 参考 并发-AQS之ThreadPoolExecutor源码解读(一) |
CyclicBarrier | 多个线程等待彼此达到一个共同的屏障点,然后同时继续执行。 | 并发-AQS之CyclicBarrier源码解读 |
ReentrantReadWriteLock | 可重入读写锁,它允许多个线程同时读取一个共享资源,但只允许一个线程写入该共享资源。 | 参考 并发-AQS之ReentrantReadWriteLock源码解读(一) |
转载自:https://juejin.cn/post/7244466016000000055