Java多线程最佳实践
前言: 这篇文章根据场景分类总结实践的最优做法以及注意事项, 不详细解释基本概念; 更灵活, 性能更好往往意味着代码复杂度的增加, 根据实际业务选择, 有时最简单的写法反而更合适
可见性
volatile: 对于一个多线程共享的变量, 每次访问变量时,总是获取主内存的最新值, 且当某个线程在其本地内存副本中修改了该变量的值, 立刻回写到主内存

线程同步
原子操作
赋值不需要同步
- 基本类型(
long和double除外)赋值,例如:int n = m,long和double是64位数据,JVM没有明确规定64位赋值操作是不是一个原子操作,不过在x64平台的JVM是把long和double的赋值作为原子操作实现的。 - 引用类型赋值,例如:
List<String> list = anotherList
Atomic 原子类
把单个变量引用或者值的变化封装为原子操作, 可分为4类
基本类型
使用原子的方式更新基本类型
AtomicInteger:整型原子类AtomicLong:长整型原子类AtomicBoolean:布尔型原子类
数组类型
使用原子的方式更新数组里的某个元素
AtomicIntegerArray:整型数组原子类AtomicLongArray:长整型数组原子类AtomicReferenceArray:引用类型数组原子类
引用类型
AtomicReference:引用类型原子类AtomicMarkableReference:原子更新带有标记的引用类型。该类将 boolean 标记与引用关联起来。AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于解决原子的更新数据和数据的版本号,可以解决使用CAS进行原子更新时可能出现的ABA问题。
ABA问题不是必须解决的, 如果业务关注变量的值而不在意值变化的过程, 那就不需要处理。
对象的属性修改类型
AtomicIntegerFieldUpdater:原子更新整型字段的更新器AtomicLongFieldUpdater:原子更新长整型字段的更新器AtomicReferenceFieldUpdater:原子更新引用类型里的字段
一般场景
使用synchronized或ReentrantLock
ReentrantLock性能更好, 提供了tryLock()方法限制了最大阻塞时间ReentrantLock更灵活, 临界区可跨多个代码块ReentrantLock更适用于顺序敏感的场景,synchronized和ReentrantLock默认都是非公平锁, 但ReentrantLock可以在构造时new ReentrantLock(true)设置为公平锁
读多写少
使用ReadWriteLock或StampedLock, 将读锁和写锁分离, 提高读并发性能
ReadWriteLock: 悲观读锁, 把读写操作分别用读锁和写锁来加锁, 允许多个线程同时读(当有一个线程持有读锁, 其他线程也可以获取读锁, 这样就大大提高了并发读的执行效率), 但它只允许一个线程写入(当有一个线程持有写锁, 其他线程读锁和写锁都获取不到)StampedLock: 乐观读锁, 它和ReadWriteLock相比,不同之处在于,读的过程中也允许获取写锁,这样一来,我们读的数据就可能不一致,但需要一点额外的代码来判断读的过程中是否有写入
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
死锁
各线程获取可重入锁的顺序一定要相同
public void add(int m) {
synchronized(lockA) { // 获得lockA的锁
this.value += m;
synchronized(lockB) { // 获得lockB的锁
this.another += m;
} // 释放lockB的锁
} // 释放lockA的锁
}
public void dec(int m) {
synchronized(lockB) { // 获得lockB的锁
this.another -= m;
synchronized(lockA) { // 获得lockA的锁
this.value -= m;
} // 释放lockA的锁
} // 释放lockB的锁
}
对于上述代码,线程1和线程2如果分别执行add()和dec()方法时, 执行到内层的synchronized时就会永远等待下去, 造成死锁
线程协同
等待/唤醒
使用synchronized+wait/notify, 或者ReentrantLock+Condition可以做到最细粒度的控制, 而且进入等待状态会释放锁, 不会阻塞其他线程, 但代码过于繁琐, 实战中尽量用其他并发容器替代, 没有必要自己实现
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
this.notifyAll();
}
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
}
CountDownLatch
CountDownLatch阻塞主线程, 等待指定数量的线程完成后, 执行指定的逻辑, 基本上可以被CompletableFuture的静态方法allOf取代, 实践中不考虑用它
CyclicBarrier
CyclicBarrier阻塞子线程, 作为一道屏障拦截在多个线程上, 屏障本身包含一段逻辑, 线程经过屏障时会等待, 所有线程都通过屏障时, 执行屏障逻辑, 各子线程也继续往下执行, 实践中还是很有用的
Semaphore
Semaphore用于限制同一时间并发访问的线程数量
CompletableFuture
CompletableFuture在实践中最为常用, 下面详细说明
1. 实例方法
实例方法较多, 建议使用Aync版本的方法, 防止阻塞主线程, 以及避免不确定性
当单个异步任务完成后
thenApply: 对其结果执行Function<T,U>, 返回一个新CompletionStagethenAccept: 对其结果执行ConsumerthenRun: 执行一个RunnablethenCompose: 类似thenApply, 他们的回参类型都是CompletionStage, 但thenCompose执行的是不是一个普通的Function<T,U>, 而是Function<T,CompletionStage<U>>, 当现有的方法返回已经是一个CompletionStage时, 相比thenApply,thenCompose不会嵌套, 因此thenApply适合用来编写新的异步逻辑, 而thenCompose更适合用来串接多个已有的CompletableFuture
// 回调是普通方法
CompletableFuture<Integer> futureApply = CompletableFuture
.supplyAsync(() -> 1)
.thenApply(x -> x+1);
CompletableFuture<Integer> futureCompose = CompletableFuture
.supplyAsync(() -> 1)
.thenCompose(x -> CompletableFuture.supplyAsync(() -> x+1));
// 回调是已有的异步方法, thenApply会嵌套一层而thenCompose不会
public CompletableFuture<UserInfo> getUserInfo(userId)
public CompletableFuture<UserRating> getUserRating(UserInfo)
CompletableFuture<CompletableFuture<UserRating>> f =
userInfo.thenApply(this::getUserRating);
CompletableFuture<UserRating> relevanceFuture =
userInfo.thenCompose(this::getUserRating);
当两个异步任务都完成后
thenCombine: 对它们的结果执行BiFunction, 返回一个新结果thenAcceptBoth: 对它们的结果执行BiConsumerrunAfterBoth: 执行一个Runnable
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);
int combinedResult = combinedFuture.join(); // 或者使用 get() 方法获取结果
System.out.println(combinedResult); // 输出:30,因为 future1 返回 10,future2 返回 20,合并结果为 10 + 20 = 30
当两个异步任务中的任意一个完成后
applyToEither: 对其结果后执行Function, 返回一个新结果, 不需要等待两个任务都完成acceptEither: 对其结果执行ConsumerrunAfterEither: 执行一个Runnable
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000); // 模拟任务1耗时2秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000); // 模拟任务2耗时1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
return 20;
});
CompletableFuture<Integer> resultFuture = future1.applyToEither(future2, result -> result * 2);
int result = resultFuture.join(); // 或者使用 get() 方法获取结果
System.out.println(result); // 输出:40,因为 future2 先完成,结果为 20,应用 fn 函数得到 20 * 2 = 40
异常处理相关
exceptionally: 入参是一个Function, 当exceptionally前的异步操作抛出异常时,可以对这个异常进行处理,并返回一个新的CompletionStagehandle: 和exceptionally类似, 但入参是一个BiFunction, 因此异常和正常的情况可以处理, ,并返回一个新的CompletionStage, 传递给后面whenComplete: 和handle类似, , 可以同时处理正常和异常的情况, 但入参是一个BiConsumer,Consumer是没有回参的, 所以whenComplete不产生新的异步结果
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
// Simulating an exception
throw new RuntimeException("Oops, something went wrong!");
}).exceptionally(ex -> {
// Handling the exception
System.out.println("Caught exception: " + ex.getMessage());
return 0; // Providing a default value
});
future.thenAccept(result -> {
System.out.println("Final result: " + result);
});
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10 / 2;
});
CompletableFuture<String> handledFuture = future.handle((result, ex) -> {
if (ex != null) {
return "Error: " + ex.getMessage();
} else {
return "Result: " + result;
}
});
handledFuture.thenAccept(result -> {
System.out.println(result);
});
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
return 10 / 2;
});
future.whenComplete((result, ex) -> {
if (ex != null) {
System.out.println("Exception occurred: " + ex.getMessage());
} else {
System.out.println("Result: " + result);
}
});
其它实例方法
Future接口下
get():调用方线程阻塞, 等待异步任务完成后获取结果get(long timeout, TimeUnit unit):同get(),但只等待指定的时间;cancel(boolean mayInterruptIfRunning):取消当前任务;isDone():判断任务是否已完成。join(): 调用方线程阻塞, 等待异步任务完成
CompletableFuture类下
getNow(T valueIfAbsent): 不会阻塞调用方线程, 如果任务已完成则返回结果, 否则返回给定的缺省值complete(T value)/completeExceptionally(Throwable ex): 直接手动完成异步任务, 返回给定的正常或异常结果, 如果在调用该方法之前已经有一个结果(包括正常结果或异常),则该方法不会生效, 这个方法可以用于模拟异步任务的完成,并将结果传递给等待该任务的其他部分。obtrudeValue(T value)/obtrudeException(Throwable ex): 类似complete(T value)/completeExceptionally(Throwable ex), 但会无视之前的结果, 强制替换为给定的值
备注
Future接口下的方法get()和CompletableFuture类下的join()方法的区别在于get()会抛出checked exception, 需要try...catch...手动处理异常, 而join()不需要, 发生异常时join()会抛出一个checked CompletionException,CompletionException中包裹着真正的异常信息- 对于异常的处理, 更好的方式还是在
get()或者join()之前就调用exceptionally
2. 静态方法
runAync/supplyAsync: 执行异步任务, 可指定提交的线程池, 默认使用ForkJoinPool.commonPool()anyOf/allOf:applyToEither/applyToBoth,acceptEither/acceptBoth,runAfterEither/runAfterBoth的强化版, 可以组合2个以上的CompletableFuturecompletedFuture: 创建一个已完成的任务, 并指定它的返回值, 可用于在异步调用链中返回常量
3. 属性
isCompletedExceptionally:如果异步任务异常结束时返回true, 未完成或已正常完成返回false
大任务分割
ForkJoinPool线程池可以把一个大任务递归地分拆成小任务并行执行,任务类必须继承自RecursiveTask或RecursiveAction,但代码较为繁琐, 普通场景推荐使用进一步封装的parallelStream()
parallelStream()的性能优于parallel()
常用并发容器
// TODO
线程池
// TODO
转载自:https://juejin.cn/post/7284221961621848119