Java并发编程(下)
volatile关键字
volatile关键字的两个作用
- 变量线程间可见
- 不同线程之间往往是隔离的,一个线程无法得知另一个线程中的数据。所以这就有可能导致多处理的计算机中线程运行在不同的处理器上,而不同处理器中的寄存器或内存缓存中的值可能不一样。所以需要在声明实例变量时使用
volatile
关键字保证当一个线程修改了一个值,另一个线程能够看到相同变量的最新的值(其实是不通过缓存而是重新去内存中拿值)。
- 不同线程之间往往是隔离的,一个线程无法得知另一个线程中的数据。所以这就有可能导致多处理的计算机中线程运行在不同的处理器上,而不同处理器中的寄存器或内存缓存中的值可能不一样。所以需要在声明实例变量时使用
- 禁止指令重排序
- 如果一个操作不是原子性的,也就是可以被拆分的,那么它的执行顺序就有可能被编译器或更底层的cpu修改。虽然编译器不会修改有依赖关系的代码顺序,但是这只对单线程有作用。因为在多线程环境,可能因为其他线程的执行导致依赖关系被破坏。
volatile关键字无法保证原子性
什么是死锁
private static ReentrantLock lock = new ReentrantLock();
private static Condition c = lock.newCondition();
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(()->{
lock.lock();
if(1==1){
try {
System.out.println(Thread.currentThread().getName()+":死锁");
c.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("解除");
c.signalAll();
lock.unlock();
}).start();
}
}
}
总结一下就是当所有的线程都被阻塞,就会形成死锁。必须仔细设计程序,必须保证不会出现死锁!
线程局部变量
当多个线程在使用同一个共享变量的时候往往容易造成混乱。但是如果使用局部变量则能够实现线程间变量互不影响
使用局部变量前
private static Integer count = 0;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
count++;
}).start();
}
Thread.sleep(1000);
System.out.println("count : "+count);
}
输出:3
可以看到所有线程的操作都直接影响到了共享变量count
。
使用局部变量后
//为每个线程构造一个实例
private static ThreadLocal<Integer> t = ThreadLocal.withInitial(() -> Integer.valueOf(10));
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 3; i++) {
new Thread(() -> {
//获取当前线程的实例
int count = t.get();
count++;
}).start();
}
Thread.sleep(1000);
System.out.println("count : "+t.get());
}
输出:10
t.get()
方法可以得到属于当前线程的Integer
对象。可以看到无论线程中如何操作这个变量,它操作的其实都是属于当前线程的那个实例(局部变量)。所以说使用局部变量可以提高线程的安全性。
线程池
什么时候需要用到线程池?当程序中有大量生命周期很短的线程时则应该使用线程池。
Future
Future
是一个带参数类型的接口,它有一个实现类FutureTask
,实现了Future
和Runnable接口。这个类的构造方法中可以传递一个Callable
对象。Callable
是一个封装了带有返回值方法的泛型类,返回值类型就是为泛型指定的类型。而Future
就保存了Callable
返回的这个结果。当完成上面的工作之后,就可以将FutureTask
对象传到new Thread()
构造器,并调用start()
运行。
这种方式并不常用,一般不会将
Callable
直接传给Future
,现在只需要记得Future
可以保存异步计算结果即可
创建线程池
执行器Executors
类有很多静态方法用来获取线程池
方法 | 描述 |
---|---|
newCachedThreadPool | 必要的时候会创建新的线程;池中的空闲线程会保留60秒 |
newFixedThreadPool | 包含固定线程数的线程池;线程会一直保留 |
newSingleThreadExecutor | 只包含一个线程,一次只能运行一个任务,排队执行 |
使用线程池
//1、创建一个固定大小的线程池,因为我的cpu是4核4线程所以我指定16个线程
ExecutorService es = Executors.newFixedThreadPool(16);
//2、将Callable提交给ExecutorService,执行完的结果保存在Future中,这里只返回1024
Future<Integer> f = es.submit(() -> 1024);
//3、取出执行结果
Integer i = f.get();
System.out.println(i);
//4、如果不需要线程池工作了,记得关闭
es.shutdown();
以上就是线程池使用的步骤,总结以下就是
- 通过
Executors
的静态工厂创建一个指定类型的线程池 - 将
Callable
或者Runnable
接口实例提交到ExecutorService
- 通过专门保存执行结果的
Future
实例用它的get()
方法获取结果 - 如果没有要执行的任务时,就关闭线程池服务
Callable
和Runnable
很类似,但是Callable可以有返回值,并且它是一个参数化的函数式接口。
线程池执行任务组
上面介绍了线程池的基本用法,并且演示了如何将一个Callable
任务对象提交给执行器执行。那么能不能传递多个任务让执行器执行呢?实际上ExecutorService
执行器接口提供了invokeAny(Collection<Callable<T>> tasks)
以及invokeAll(Collection<Callable<T>> tasks)
方法;他们的用法基本相同,只不过前者返回一个任务结果,后者返回所有的任务结果。
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Callable<Integer>> listCall = new ArrayList<>();
Callable<Integer> c = () -> 1024;
listCall.add(c);
listCall.add(c);
ExecutorService es = Executors.newFixedThreadPool(16);
Integer i = es.invokeAny(listCall);
System.out.println("invokeAny:");
System.out.println(i);
List<Future<Integer>> list = es.invokeAll(listCall);
System.out.println("invokeAll:");
list.forEach((f) -> {
try {
System.out.println(f.get());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
});
es.shutdown();
}
运行结果: invokeAny: 1024 invokeAll: 1024 1024
现在还有一个问题,invokeAll
的返回的多个结果并不是按计算结果顺序存储的。如果想要顺序存储可以使用ExecutorCompletionService
处理器来完成,这个处理器中包含执行器Executor
变量。
示例代码
public static void main(String[] args) throws InterruptedException, ExecutionException {
List<Callable<Integer>> listCall = new ArrayList<>();
Callable<Integer> c = () -> 1024;
listCall.add(c);
listCall.add(c);
ExecutorService es = Executors.newFixedThreadPool(16);
var service = new ExecutorCompletionService<Integer>(es);
for (Callable<Integer> call : listCall) {
//将任务提交给这个处理器
service.submit(call);
}
for (int i = 0; i < listCall.size(); i++) {
//移出一个已完成的任务并返回结果
service.take().get();
}
es.shutdown();
}
想要顺序获取任务的计算结果,只需要构建这个处理器,传入线程池执行器对象,并调用处理器所提供的响应的方法就好了。
fork-join线程池
从字面意思来看,fork-join
是分-和的意思,实际上它的功能也确实如此。它的主要用途是将一个大任务拆分成多个小任务并分别计算结果,最后将所有小任务的结果加在一起形成最终结果。
public class ceshi {
public static void main(String[] args) throws InterruptedException, ExecutionException {
//创建一个实现了RecursiveTask<T>接口的类型,该类中只有一个方法用来执行具体的任务
var fjt = new ForkJoinTest("hello hello");
//创建fork-join线程池
var pool = new ForkJoinPool();
//运行任务
pool.invoke(fjt);
//返回指定类型的结果,这里是String。
System.out.println(fjt.join());
}
}
class ForkJoinTest extends RecursiveTask<String>{
private String msg;
public ForkJoinTest(String msg){
this.msg = msg;
}
@Override
protected String compute() {
if(!msg.contains(" ")){
return msg+="happy";
}else{
String[] msg = this.msg.split(" ");
var first = new ForkJoinTest(msg[0]);
var second = new ForkJoinTest(msg[1]);
//这里会阻塞,知道所有任务全部完成
invokeAll(first,second);
return first.join()+first.join();
}
}
}
运行结果:hellohappyhellohappy
除了向fork-join线程池中传递RecursiveTask<T>
实例外,还可以传递RecursiveAction
实例,只不过后者没有返回值;他们都是Future
接口的扩展。同时还需要注意一点:在compute()
方法中,除了用join()
方法获取返回值外,还可以使用get()
方法,但是不建议这么做,因为有可能抛出检查型
异常,但是在compute()
方法中不能抛出这种异常。
CompletableFuture
CompletableFuture
实现了Future
接口,它解决了一个问题,那就是当我们使用Future
对象来获取任务结果的时候,如果还有任务没有执行完,那么这个方法就会干等着。
而CompletableFuture
不仅实现了Future
接口,还实现了CompletionStage
接口。前者是用来获取计算结果,后者则是用来组合异步任务,通过对任务合理的组合就可以完成无需等待的异步任务。那么如何组合任务呢?
先看看它内部提供的部分方法:
- supplyAsync(Supplier,Executor) 这是一个静态方法,用来开启一个任务。它的第一个参数是用来返回一个指定类型值的函数式接口、第二个参数可以传递一个线程池类型。
- thenCompose(T) 这是一个实例方法,他可以传任意类型的参数,他用来处理一个数据并返回
CompletableFuture
类型 - thenCombine(CompletableFuture,BiFunction) 这是一个实例方法,它用来组合两个任务并组合两个任务的结果。它有两个参数,第一个参数类型是
CompletableFuture
,这是它要组合的CompletableFuture
;第二个参数是一个函数式接口BiFunction
,用来返回组合的结果。
示例代码1:
CompletableFuture<String> c1 = CompletableFuture.supplyAsync(()->"秦军正在攻打赵国上党",
executor).thenCompose(t -> CompletableFuture.supplyAsync(()->t + "\n燕军正在自不量力搞背后偷袭",executor));
System.out.println(c1.join());
运行结果: 秦军正在攻打赵国上党 燕军正在自不量力搞背后偷袭
示例代码1用supplyAsync()
方法开启了一个任务,之后用thenCompose()
方法连接了一个任务,他们是按顺序执行的。
示例代码2:
CompletableFuture<String> c2 = CompletableFuture.supplyAsync(()->"秦军正在攻打赵国上党",
executor).thenCombine(CompletableFuture.supplyAsync(()->"\n燕军正在自不量力搞背后偷袭",executor),
(a,b)-> a+b+"\n燕军被廉颇按在地上摩擦");
System.out.println(c2.join());
运行结果: 秦军正在攻打赵国上党 燕军正在自不量力搞背后偷袭 燕军被廉颇按在地上摩擦
示例代码2用supplyAsync()
方法开启了一个任务,之后用thenCombine()
方法将这个任务与方法中第一个参数传递的任务合并,并将这两个任务的结果合并。
CompletableFuture类中还有很多其他好用的方法,他们的用法大同小异。具体可以查看JDK8以上版本的文档。
转载自:https://juejin.cn/post/7233961132540969021