java多线程面试——新版api
在Android开发的面试中,Java多线程的问题是绕不开的。这个系列主要介绍面试过程中涉及到的多线程的知识点,以及相关的面试题。这是本系列的第三篇,介绍Java中多线程的新版api,及其对应的常见的面试题。
ReteenLock
ReteenLock 是 java 1.5 以后提出的加锁API。使用它加锁解锁非常简单,只需要调用 lock
、 unlock
方法
private Lock lock = new ReentrantLock();
lock.lock();//加锁
//执行一些操作
...
lock.unlock();//解锁
除此之外,ReteenLock 还提供了 lockInterruptibly
和 tryLock
的不同的加锁接口,对加锁操作进行更细致的控制。
- lockInterruptibly
lockInterruptibly 可以中断等待锁的线程。当线程调用 lockInterruptibly ,没有获取锁时,线程处于等待锁的状态,这时就可以通过 interrupt 方法来中断线程的等待状态。注意使用 lock 方法、synchronized 关键字是不能中断等待或者阻塞状态的。
- tryLock
//直接尝试获取锁,没有获取则直接返回
boolean tryLock();
//在一定时间尝试获取锁,否则等待获取锁,超时会返回 false
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
tryLock 来尝试获取锁,如果没有获取到锁会返回 false。我们也可以设置规定时间内等待获取锁。
Lock lock = new ReentrantLock();
if(lock.tryLock()) {
//执行一些操作
...
lock.unlock();
} else {
//未获取锁的操作
}
await 、signal 和 signalAll
Condition condition = lock.newCondition();
try {
condition.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
condition.signalAll();
在新版java多线程的api中,我们可以通过 Condition 的 await 和 signal 方法来等待和唤醒线程。await 和 wait 对应,signal 和 notify 对应。调用 await 方法会让线程进入等待状态(WAITING),同时释放掉锁。如果需要唤醒线程,需要调用 signal 或者 signalAll 方法。signal 方法是随机唤醒一个线程,而 signalAll 方法是会唤醒所有等待线程。
面试题:Synchronized的原理以及与ReentrantLock的区别。
- ReenTrantLock 可以指定是公平锁还是非公平锁。而synchronized只能是非公平锁。
- ReenTrantLock 提供了一个Condition(条件)类,用来实现分组唤醒需要唤醒的线程们,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。
- ReenTrantLock 通过 lock.lockInterruptibly() 提供了一种能够中断等待锁的线程的机制
ReentrantReadWriteLock 和 StampedLock
ReentrantReadWriteLock 是读写锁。解决的场景是:读多写少。当没有写操作时,多线程读取数据,此时加锁会影响并发的性能。这时就需要读写锁,它有两个特点:
- 当线程写操作时,其他线程不能读取
- 当线程读操作时,允许其他线程读操作,但是不能写操作
代码示例如下:
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock(); //获取读锁
Lock writeLock = readWriteLock.writeLock();//获取写锁
StampedLock 和 ReentrantReadWriteLock 类似。它特殊的地方是,它可以乐观读,即当线程读取共享变量时,其他线程可以写共享变量。而这在 ReentrantReadWriteLock 中是不被允许的。
代码示例如下
/*
*使用StampedLock读操作模板
*/
final StampedLock sl = new StampedLock();
// 乐观读 ,获取 stamp 版本号
long stamp = sl.tryOptimisticRead();
// 读取共享变量,并用局部变量保存
......
// 校验 stamp 版本号
if (!sl.validate(stamp)){
// 如果失败,则升级为悲观读锁
stamp = sl.readLock();
try {
// 再次读取共享变量,并用局部变量保存
.....
} finally {
// 释放悲观读锁
sl.unlockRead(stamp);
}
}
// 执行业务操作
......
/*
*使用StampedLock写操作模板
*/
long stamp = sl.writeLock();
try {
// 写共享变量
......
} finally {
sl.unlockWrite(stamp);
}
StampedLock 虽然通过共享读提升了读多写少场景的性能,但是也提高了它的复杂度。需要注意的是:StampedLock 是不可重入的
Semaphore
Semaphore 是一个限制器,可以限制访问资源的线程数量。代码如下:
//限制只有四个线程可以访问
//第5个线程会被阻塞,直到其他线程release
Semaphore semaphore = new Semaphore(4);
try {
semaphore.acquire();
//执行操作
...
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
semaphore.release();
}
CountDownLatch 和 CyclicBarrier
CountDownLatch 和 CyclicBarrier 都可以解决多线程并发依赖的问题。它们的主要区别是:CountDownLatch 主要用来 解决一个线程等待多个线程的场景;而CyclicBarrier 解决的是一组线程之间互相等待的场景。
如上图所示,当我们需要判断当前两个用户是否是好友时,线程3需要依赖线程1和线程2的结果,才可以执行。这时就可以使用 CountDownLatch,代码示例如下:
final CountDownLatch countDownLatch = new CountDownLatch(2);
Runnable runnable1 = new Runnable() {
@Override
public void run() {
//获取用户的id
...
//计数减一
countDownLatch.countDown();
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
//获取其他用户的id
...
//计数减一
countDownLatch.countDown();
}
};
Runnable runnable3 = new Runnable() {
@Override
public void run() {
try {
//等待其他线程执行完成
countDownLatch.await();
//判断两个人是否是好友
...
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
new Thread(runnable1).start();
new Thread(runnable2).start();
new Thread(runnable3).start();
如上图所示,CyclicBarrier 解决的是一组线程之间互相等待的场景,代码如下:
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2, new Runnable() {
@Override
public void run() {
//计数归0时的回调
...
}
});
Runnable runnable1 = new Runnable() {
@Override
public void run() {
while (是否满足退出条件) {
try {
//执行操作
...
//阻塞线程,计数减少1
cyclicBarrier.await();
//执行操作
...
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
};
Runnable runnable2 = new Runnable() {
@Override
public void run() {
while (是否满足退出条件) {
try {
//执行操作
...
//阻塞线程,计数减少1
cyclicBarrier.await();
//执行操作
...
} catch (InterruptedException | BrokenBarrierException e) {
throw new RuntimeException(e);
}
}
}
};
new Thread(runnable1).start();
new Thread(runnable2).start();
CyclicBarrier 类是通过 await 方法来让计数减一,同时会阻塞当前线程。通过这种方式,让不同线程步调保持一致,以此来实现一组线程之间的互相等待。
需要注意,CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。但是 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。
原子类
如上图,是在java 1.5 以后新增了原子类,这些类可以分成五种类型:基本类型、数组类型、对象引用类型、对象属性类型、累加器类型。
基本类型
基本类型的原子类有三个,分别是:AtomicBoolean、AtomicInteger、AtomicLong。它们的方法都是类似的,这里以 AtomicInteger 为例。AtomicInteger 对基本数据类型的 自增(加1操作),自减(减1操作)、以及加法操作(加一个数),减法操作(减一个数)进行了封装,保证这些操作是原子性操作。
AtomicInteger的方法 | 作用 |
---|---|
int get() | 获取当前值 |
void set(int newValue) | 设置 value 值 |
int getAndIncrement() | 先取得值,然后加1 |
int getAndDecrement() | 先取得值,然后减1 |
int incrementAndGet() | 加1,然后返回新值 |
int decrementAndGet() | 减1,然后返回新值 |
int getAndAdd(int delta) | 先取得值,然后增加指定值 |
int addAndGet(int delta) | 增加指定值,然后返回新值 |
boolean compareAndSet(int expect, int update) | 将旧值设置成新值(先要获取当前值) |
数组类型
数组类型的原子类有三个,分别是:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray。这里以 AtomicIntegerArray 为例,它的方法与上面的方法类似,只是多了index 的参数。
AtomicIntegerArray的方法 | 作用 |
---|---|
int get(int index) | 获取数组index位置的值 |
void set(int index, int newValue) | 设置数组index位置的value 值 |
int getAndIncrement(int index) | 获取数组index位置的值,然后加1 |
int getAndDecrement(int index) | 数组index位置的值,然后减1 |
int incrementAndGet(int index) | 让数组index位置的值加1,然后返回新值 |
int decrementAndGet(int index) | 让数组index位置的值减1,然后返回新值 |
int getAndAdd(int index, int delta) | 先获取数组index位置的值,然后增加指定值 |
int addAndGet(int index, int delta) | 让数组index位置的值增加指定值,然后返回新值 |
boolean compareAndSet(int index, int expect, int update) | 让数组index位置的值设置成新值(先要获取当前值) |
代码示例如下:
int[] v = new int[]{1, 2, 3};
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(v);
int value = atomicIntegerArray.get(0);
System.out.println(atomicIntegerArray.compareAndSet(0, value, 100));
对象引用类型
对象引用类型的原子类有三个,分别是 AtomicReference、AtomicStampedReference、AtomicMarkableReference。AtomicReference 的方法相对于上面就少了很多,但是大致的功能是一样的。
AtomicReference的方法 | 作用 |
---|---|
int get() | 获取当前对象值 |
void set(T newValue) | 设置对象值 |
int getAndSet(T newValue) | 先取得对象值,然后设置新的对象值 |
boolean compareAndSet(T expect, T update) | 将旧对象值设置成新对象值(先要获取当前值) |
代码示例如下:
Test test = new Test();
AtomicReference<Test> atomicReference = new AtomicReference<>(test);
atomicReference.compareAndSet(atomicReference.get(), new Test());
AtomicStampedReference 和 AtomicMarkableReference 相对于 AtomicReference 的不同是,它们分别通过 Stamp(整数标记) 和 Mark(布尔标记) 解决了ABA问题。
对象属性类型
对象属性类型的原子类有三个,分别是 AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。下面代码以 AtomicIntegerFieldUpdater 为例,其方法与 AtomicInteger 类似,区别是新增了对象入参。代码示例如下:
Test test = new Test();
test.id = 0;
AtomicIntegerFieldUpdater<Test> updater = AtomicIntegerFieldUpdater.newUpdater(Test.class, "id");
//获取当前对象的id值,并加1
updater.getAndIncrement(test);
注意:对象属性类型的原子类只支持被 volatile 关键字修饰的可见成员属性
累加器类型
累加器类型的原子类有四个,分别是LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator。它们是java 1.8加入的,专门用来执行数值类型的数据累加操作,相对于 AtomicLong 性能更好。代码如下:
LongAdder longAdder = new LongAdder();
longAdder.increment();
LongAdder 和 LongAccumulator 的区别:LongAdder的功能增强版,它支持自定义的函数操作。DoubleAdder 和 DoubleAccumulator 的区别也一样。
并发容器
如上图,java新版的并发容器可以分成 List、Set、Map、Queue 四种类型。
List
对于List,新版Api只提供了 CopyOnWriteArrayList 这个并发容器。 Copy On Write(写时复制),意思就是在对其进行修改操作的时候,复制一个新的ArrayList,在新的ArrayList上进行修改操作,从而不影响旧的ArrayList的读操作。
Map
对于Map类型,有两个实现类,分别是 ConcurrentHashMap、ConcurrentSkipListMap。从应用的角度来看,主要区别在于ConcurrentHashMap 的 key 是无序的,而 ConcurrentSkipListMap 的 key 是有序的。所以如果需要保证 key 的顺序,就只能使用 ConcurrentSkipListMap。
使用 ConcurrentHashMap 和 ConcurrentSkipListMap 需要注意的地方是,它们的 key和 value 都不能为空,否则会抛出NullPointerException这个运行时异常
Set
对于Set类型,有两个实现类,分别是 CopyOnWriteArraySet、ConcurrentSkipListSet。
- CopyOnWriteArraySet:基于数组实现的并发 Set,内部是使用 CopyOnWriteArrayList 来实现的
- ConcurrentSkipListSet:基于跳表实现的并发 Set,其内部是通过 ConcurrentSkipListMap 来实现的
Queue
对于Queue类型,有九个实现类,分别是 ArrayBlockingQueue、LinkedBlockingQueue、 SynchronousQueue、LinkedTransferQueue、PriorityBlockingQueue、 DelayQueue、LinkedBlockingDeque、ConcurrentLinkedQueue、ConcurrentLinkedDeque。
如上图所示,根据其数据结构方式和是否可以阻塞可以分成四种。Queue表示单端队列,遵循的先进先出的原则;Deque表示双端队列,该队列两端的元素既能入队,也能出队。阻塞指的是当队列已满时,入队操作阻塞,直到队列有空位才能插入;当队列已空时,出队操作阻塞,直到队列不为空才返回。非阻塞则是指入队出队操作不会阻塞,如果队列已满或者为空,(根据调用的方法)直接返回null或者报错。
队列 | 作用 |
---|---|
ArrayBlockingQueue | 基于数组的阻塞队列,使用数组存储数据,并需要指定其长度,所以是一个有界队列 |
LinkedBlockingQueue | 基于链表的阻塞队列,使用链表存储数据,默认是一个无界队列;也可以通过构造方法中的capacity 设置最大元素数量,所以也可以作为有界队列 |
SynchronousQueue | 一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并且立刻消费 |
PriorityBlockingQueue | 优先级别的阻塞队列,底层基于数组实现,是一个无界队列 |
DelayQueue | 延迟队列,其中的元素只有到了其指定的延迟时间,才能够从队列中出队 |
LinkedTransferQueue | 基于链表的数据交换队列,基于链表实现,是一个无界队列 |
线程池
java中自带的线程池
线程池 | 特点 | 获取方式 |
---|---|---|
FixedThreadPool | 线程数固定的线程池 | Executors.newFixedThreadPool |
CachedThreadPool | 线程数根据任务动态调整的线程池 | Executors.newCachedThreadPool |
SingleThreadExecutor | 只有一个线程的线程池 | Executors.newSingleThreadExecutor() |
ScheduledThreadPool | 定时或周期性执行任务的线程池 | Executors.newScheduledThreadPool() |
SingleThreadScheduledExecutor | 定时或周期性执行任务的线程池,但是它的线程数只有一个 | Executors.newSingleThreadScheduledExecutor() |
一般在业务中,我们不会使用java中自带的线程池,而是根据自己的需要自定义线程池。
ThreadPoolExecutor
自定义的线程池需要通过 ThreadPoolExecutor 来创建。ThreadPoolExecutor 的构造函数非常复杂,如下面代码所示:
ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数意义如下:
- corePoolSize:核心线程数,由于频繁创建线程会对性能产生影响,因此就需要线程被创建后一直存在,这就是核心线程。Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,包括核心线程
- maximumPoolSize:线程池创建的最大线程数。当核心线程都在执行任务时,还有任务需要处理,就会创建新的线程来处理,但是系统的资源不是无限的,因此需要限制最多创建的线程。
- keepAliveTime :非核心线程的存在时间。当一个线程如果在一段时间内,都没有执行任务,就回收该非核心线程
- unit :上面 keepAliveTime 的时间参数,有秒、分钟等
- workQueue:阻塞队列(BlockingQueue),具体实现类上面已经介绍过了。当线程数达到最大时,这时还有任务来,就把任务放到这个任务队列中,等待处理。
- threadFactory:自定义如何创建线程,例如可以给线程指定一个有意义的名字。
- handler:自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。这时你可以通过 handler 这个参数来指定拒绝的策略
面试题:shutdown 、shutdownNow 的区别
- shutdown() : 执行后停止接受新任务,会把队列的任务执行完毕。
- shutdownNow() : 执行后停止接受新任务,但会中断所有的任务(不管是否正在执行中),将线程池状态变为 STOP状态。
面试题:当任务超过阻塞队列数量时,有哪些拒绝策略
ThreadPoolExecutor 已经提供了以下 4 种策略:
- CallerRunsPolicy:提交任务的线程自己去执行该任务。
- AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
- DiscardPolicy:直接丢弃任务,没有任何异常抛出。
- DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列
面试题:使用线程池要注意些什么
- 不要使用无界的 LinkedBlockingQueue,在高负载情境下,无界队列很容易导致 OOM。很多Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,使用前需要特别注意。
- 默认拒绝策略要慎重使用。当任务过多时,会有拒绝策略,最好针对业务的情况来自定义拒绝策略
面试题:自定义线程池的参数如何配置
根据任务的不同,推荐配置如下:
- CPU密集型: cpu数量 + 1
- IO密集型: cpu 数量 * 2
也可以使用动态线程池,可以看动态线程池
面试题:线程池都有哪些状态?
- RUNNING:这是最正常的状态,接受新的任务,处理等待队列中的任务。
- SHUTDOWN:不接受新的任务提交,但是会继续处理等待队列中的任务。
- STOP:不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程。
- TIDYING:所有的任务都销毁了,workCount 为 0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()。
- TERMINATED:terminated()方法结束后,线程池的状态就会变成这个。
面试题: 线程池中 submit() 和 execute() 方法有什么区别?
- execute():只能执行 Runnable 类型的任务。
- submit():可以执行 Runnable 和 Callable 类型的任务。
Callable 类型的任务可以获取执行的返回值,而 Runnable 执行无返回值。
ForkJoinPool
ForkJoinPool 是java7引入的线程池,它可以把一个大任务拆成多个小任务并行执行。代码示例如下:
class SumTask extends RecursiveTask<Long> {
protected Long compute() {
if(判断是否需要拆分任务) {
//创建两个子任务
SumTask task1 = new SumTask(...);
SumTask task2 = new SumTask(...);
// invokeAll会并行运行两个子任务:
invokeAll(task1, task2);
// 等待获得子任务的结果:
Long result1 = task1.join();
Long result2 = task2.join();
return result1 + result2;
} else {
//执行sum操作
...
return result;
}
}
}
ForkJoinTask<Long> task = new SumTask(...);
Long result = ForkJoinPool.commonPool().invoke(task);
Fork/Join线程池在Java标准库中就有应用。Java标准库提供的
java.util.Arrays.parallelSort(array)
可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
CompletableFuture
CompletableFuture 是 java 1.8 以后提供的类。它可以处理任务之间的时序关系,如串行关系、并行关系、汇聚关系等用来简化异步编程。它内部默认是通过ForkJoinPool线程池来执行任务,当然我们也可以设置自己的线程池。CompletableFuture 是官方提供的异步编程类,可以满足简单的异步编程需求,在Android中复杂的异步编程使用最多的是RxJava,或者现在的kotlin 协程,这个了解即可。
代码如下
//串行任务,任务1、2、3串行执行
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> "hello") //任务1
.thenApply(s -> s + " world"); //任务2
.thenApply(String::toUpperCase); //任务3
System.out.println(result.join());
//汇聚关系
CompletableFuture<String> result =
CompletableFuture.supplyAsync(() -> "a")
.thenCombineAsync(CompletableFuture.supplyAsync(() -> "b"),
(a, b) -> a + b );
System.out.println(result.join());
CompletionService
CompletionService 是一种能处理批量异步任务并在完成时获取结果的并发工具类。你可以把它看成 线程池 + 队列,当一个任务完成时,就可以通过 completionService.take().get() 获取返回值(任务执行完的值存储在队列)。如果所有任务都在执行,调用 take 方法时会阻塞。
ExecutorService executorService = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);
completionService.submit(() -> { //任务1
Thread.sleep(200);
return 1;
});
completionService.submit(() -> { //任务2
Thread.sleep(100);
return 2;
});
completionService.submit(() -> { //任务3
Thread.sleep(150);
return 3;
});
int sum = 0;
for(int i = 0; i < 3; i++) {
try {
//这里获取顺序是 2 3 1
sum += completionService.take().get();
System.out.println(sum);
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
参考
转载自:https://juejin.cn/post/7361726155718492197