从应用到源码分析——聊一聊synchronized与ReentrantLock
上一篇文章对volatile做了一些简单的介绍,知道了并发三大特性中volatile无法满足原子性。那么现在就来聊聊并发中另外两个重要的应用synchronized和ReentrantLock。synchronized是JVM提供的一种隐式锁,而ReentrantLock是通过java代码编写的显式锁。下面就来简单了解这两个锁的应用以及实现原理。
显式锁:需要自己通过手动加锁或解锁
隐式锁:不需要我们手动加解锁,由JVM自动帮我们实现
synchronized
synchronized是一种互斥锁,可以说是并发编程中元老级别的存在,在JDK1.6以前,很多人称之为重量级锁,性能不高。但是在JDK1.6之后,对synchronized进行了一些优化,引入了偏向锁,轻量级锁以及重量级锁。这个时候,synchronized会根据线程竞争的程度进行锁升级或降级。很多的文章或者书籍中都表示synchronized锁升级后就不会降级,其实synchronized存在降级机制,只是这个条件比苛刻,后面会讲到在什么情况下会进行锁升级和锁降级。
synchronized锁的是谁
synchronized在日常开发中有三种用法,分别为作用在普通方法,作用在静态方法,作用在同步代码块。下面就来简单测试一下这三种用法。
作用在普通方法上
public class SyncCommonMethod {
public static void main(String[] args) throws InterruptedException {
CommonMethod commonMethod = new CommonMethod();
new Thread(() -> {
try {
commonMethod.method1();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
commonMethod.method2();
}, "线程B").start();
}
}
class CommonMethod {
public synchronized void method1() throws InterruptedException {
TimeUnit.SECONDS.sleep(2);
System.out.println("===>method1<===" + Thread.currentThread().getName());
}
public synchronized void method2() {
System.out.println("===>method2<===" + Thread.currentThread().getName());
}
}
上段代码中使用两个线程去调用同一个对象的两个方法,两个方法都是用synchronized进行了修饰,无论怎么运行,结果都如下图。
这是因为synchronized锁住的是对象的实例,也就是说创建出的commonMethod实例只有一把锁,两个线程谁先获取这把锁,谁就先执行,另一个线程则进行阻塞。那么再次基础上,再次新建一个CommonMethod,继续执行并查看运行效果。
public class SyncCommonMethod {
public static void main(String[] args) throws InterruptedException {
CommonMethod commonMethod = new CommonMethod();
CommonMethod commonMethod2 = new CommonMethod();
new Thread(() -> {
try {
commonMethod.method1();
} catch (Exception e) {
e.printStackTrace();
}
}, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
commonMethod2.method2();
}, "线程B").start();
new Thread(() -> {
commonMethod2.method3();
}, "线程C").start();
}
}
class CommonMethod {
public synchronized void method1() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
}
System.out.println("===>method1<===" + Thread.currentThread().getName());
}
public synchronized void method2() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (Exception e) {
}
System.out.println("===>method2<===" + Thread.currentThread().getName());
}
// 不使用synchronized
public void method3() {
System.out.println("===>method3<===" + Thread.currentThread().getName());
}
}
运行结果发生了变化,因为两个对象实例分别持有自己的锁,两个线程不存在竞争关系。那对于线程B和线程C来讲,两个线程中调用的是同一个对象实例中的不同方法,那为什么就没有出现竞争呢?这是因为线程C中调用的method3方法并没有使用synchronized,所以不会受到锁的影响。
===>method3<===线程C
===>method2<===线程B
===>method1<===线程A
作用在静态方法上
public class SyncCommonMethod {
public static void main(String[] args) throws InterruptedException {
StaticMethod staticMethod = new StaticMethod();
StaticMethod staticMethod2 = new StaticMethod();
new Thread(() -> {
try {
staticMethod.method1();
} catch (Exception e) {
e.printStackTrace();
}
}, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
staticMethod2.method2();
}, "线程B").start();
}
}
class StaticMethod {
public synchronized static void method1() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
}
System.out.println("===>method1<===" + Thread.currentThread().getName());
}
public synchronized static void method2() {
System.out.println("===>method2<===" + Thread.currentThread().getName());
}
}
因为方法被static修饰,所以在类加载的时候就已经有了锁,并且锁的是Class模板。一个类的Class模板是全局唯一的,所以无论创建了多少个对象,使用的都是同一把锁,所以上述代码的执行任然是谁先获取到锁,谁就先执行。与所属实例对象无关,与实例对象的class有关。所以输出结果为
===>method1<===线程A
===>method2<===线程B
作用在同步代码块
synchroized(this)
public static void main(String[] args) throws InterruptedException {
SyncCommonMethod syncCommonMethod = new SyncCommonMethod();
new Thread(()->{
syncCommonMethod.method();
},"线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(()->{
syncCommonMethod.method2();
},"线程B").start();
}
public void method(){
synchronized (this){
System.out.println("=====>"+Thread.currentThread().getName()+"<======");
}
}
public void method2(){
synchronized (this){
System.out.println("=====>"+Thread.currentThread().getName()+"<======");
}
}
synchroized(this)代码块中的this表示当前对象实例,所以该作用与作用在普通方法上的用处一样。每个对象实例都会持有自己的一把锁。
synchroized(Object.class)
public static void main(String[] args) throws InterruptedException {
SynClassMethod method = new SynClassMethod();
new Thread(() -> {
method.method1();
}, "线程A").start();
TimeUnit.SECONDS.sleep(1);
new Thread(() -> {
method.method2();
}, "线程B").start();
}
public void method1() {
synchronized (SynClassMethod.class) {
System.out.println("======>method1<=======" + Thread.currentThread().getName());
}
}
public void method2() {
synchronized (SynClassMethod.class) {
System.out.println("======>method1<=======" + Thread.currentThread().getName());
}
}
synchroized(Object.class)中Object.class表示对Object的Class模板进行加锁,作用与静态同步方法一致,只是语法不同。经过上面几个例子可以总结出synchroized锁住的要么是实例对象,要么是Class类。
用法 | 锁住目标 |
---|---|
普通方法 | 锁住的是实例对象 |
静态方法 | 锁住的是Class类 |
synchroized(this) / synchroized(Object) | 锁住的是实例对象 |
synchroized(Object.class) | 锁住的是Class类 |
synchronized如何加锁
在虚拟机中,创建的对象在内存中可以三个部分:对象头,实例数据以及对其填充。对象头中可以分成三个部分:Mark Word,MetaData元数据指针以及数组长度(数组对象才会有数组长度)。synchronized加锁标识就存储在Mark Word中,除了锁的标志之外Mark Word还包含了偏向锁的标识,偏向锁的线程id,以及分代年龄和Epoch。
对象头信息是对对象本身自定义数据无关的额外存储,考虑到虚拟机的空间效率,Mark Word被设计成一个非固定的数据结构,从而达到以很小的内存空间存储更多的数据,Mark Word会随着程序的执行而发生变化。下面的表格就是对象中锁不同状态对应不同的标识。
锁的状态 | 是否偏向锁(1bit) | 锁的标志位(2bit) |
---|---|---|
无锁 | 0 | 01 |
偏向锁 | 1 | 01 |
轻量级锁 | 00 | |
重量级锁 | 10 |
如果想要查看这些内存信息,可以在pom中引入下面这个依赖包,这个依赖包为我们提供了ClassLayout类,通过这个类可以打印出对象的内存信息。
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>
public static void main(String[] args) {
Object o = new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 01 00 00 00 (00000001 00000000 00000000 00000000) (1)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
下面是上述代码的输出结果,其中00000001 00000000 00000000 00000000 标识对象的加锁信息。但是发现后两位的结果为00啊,与表格内的标志不符啊。这是因为JVM使用的是小端模式。小端模式,是指数据的高字节保存在内存的高地址中,而数据的低字节保存在内存的低地址中,这种存储模式将地址 * 的高低和数据位权有效地结合起来,高地址部分权值高,低地址部分权值低。 所以实际应该是 00000000 00000000 00000000 00000001。那么这个时候就可以进行判定了后两个比特位为01,标识有可能是无锁状态或者是偏向锁状态。再看倒数第三位标识为0,那么就表示为非偏向锁。那就可以断定,此时对象是一个无锁状态。那么现在对Object对象进行加锁,再来查看内存数据,是否发生变化。
public static void main(String[] args) throws InterruptedException {
TimeUnit.SECONDS.sleep(5);
Object o = new Object();
System.out.println(ClassLayout.parseInstance(o).toPrintable());
synchronized (o){
System.out.println(ClassLayout.parseInstance(o).toPrintable());
}
}
0 4 (object header) 05 00 00 00 (00000101 00000000 00000000 00000000) (5)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
0 4 (object header) 05 d0 e3 ea (00000101 11010000 11100011 11101010) (-354168827)
4 4 (object header) 38 01 00 00 (00111000 00000001 00000000 00000000) (312)
8 4 (object header) e5 01 00 f8 (11100101 00000001 00000000 11111000) (-134217243)
可以看出当对Object加锁后,同步代码块内的输出00000101,参照表格可以知道这是一个偏向锁的标识。但是你会发现未加锁之前的Object输出结果尽然也是00000101,这是因为Object知道会发生偏向,但是此时并没有发生,这种叫做匿名偏向或者叫做可偏向状态。如果输出结果为11101000,有可能是关闭了JVM偏向锁,也有可能JVM对偏向锁进行了延迟处理,这种情况下可以使用-XX:BiasedLockingStartupDelay=0
调整延迟处理时间,使用-XX:±UseBiasedLocking
开启或关闭偏向锁。
为什么要引入锁升级
在JDK1.6以前,synchronized加锁是依赖与对象的,当对象创建后会维护一个Monitor管程,而Monitor又依赖于底层操作系统,synchronized的互斥是由操作系统中Mutex互斥量来实现的。当JVM需要使用到Mutex时,CPU需要由用户态切换到内核态,这个操作是比较消耗性能的。这种方式适用于线程经常非常激烈的场景,但是如果只有一两个线程进行竞争 ,没必要使用这种模式。考虑到这个问题,所以在JDK1.6便进行了优化。
锁升级的过程
在解释synchronized加锁时,已经知道了锁的状态会随着程序的运行发生升级,由起初的无锁状态升级到重量级锁。在对象刚被创建时,此时处于一个无锁状态,如果有且仅有一个线程对对象进行了加锁,那么此时将会升级到偏向锁状态,记录当前线程id并通过CAS操作修改Mark Word中的标志位。当有第二个线程来竞争锁时,并且两个线程执行速度非常快并且竞争比较小,那么此时为抢到锁的线程将会一直处于自旋状态并等待对象所被释放。当有多个线程来竞争锁并且竞争非常强烈时,那么此时会升级到重量级锁。在很多文章中都有提到说一旦锁升级,则无法降级,其实锁降级是存在的,只是条件比较苛刻。具体的触发时机:在全局安全点(safepoint)中,执行清理任务的时候会触发尝试降级锁。
锁的粗化以及锁消除
锁粗化和锁消除是JVM对synchronized的一个优化。锁粗化是指将多个连续的同步代码块合并到同一个同步代码块中,从而减少加锁,解锁的次数。锁消除是指,JVM通过检测发现同步代码块内并不存在线程竞争,那么JVM会将该锁进行一个消除。锁粗化与锁消除都需要依赖于逃逸分析,有关逃逸分析可以参考这篇文章www.cnblogs.com/tiancai/p/1…
Reentrantlock
ReentrantLock是java并发包下面的Lock接口的一个实现类,其丰富的功能与synchronized对比更加灵活。ReentrantLock的底层实现依赖于AQS,支持手动加解锁,支持手动设置公平与非公平锁,也支持线程中断。在ReentrantLock类中定义了一个抽象内部类Sync,而Sync继承了AQS。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {...}
Sync有两个子类,分别为NonfairSync非公平锁实现类和FairSync公平锁实现类。如果在创建ReentrantLock对象并且没有指定是否公平时,此时默认为非公平锁。
// 非公平锁
static final class NonfairSync extends Sync{....}
// 公平锁
static final class FairSync extends Sync{....}
// 默认为非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 指定公平锁或非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁
公平锁指线程必须依靠先后顺序先获取锁,也就是常说的FIFO队列。公平锁的实现逻辑先判断当前AQS的CLH队列中是否正在排队的线程,如果队列为空,那么通过CAS加锁,如果加锁成功则继续向下执行。如果AQS有正在持有锁的线程,那么会判断持有锁的线程是否为当前线程,如果与当前线程是同一个那么就可以实现锁重入。如果获取锁失败,那么将会插入到CLH队列。
// 尝试加锁
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 当前线程
final Thread current = Thread.currentThread();
// AQS中判断当前是否有线程获取锁的状态值,如果为0表示没有加锁。
int c = getState();
if (c == 0) {
//hasQueuedPredecessors()判断当前队列里是否存在线程。
//如果队列里没有线程,则compareAndSetState(0, acquires)从过CAS修改status值并获取锁。
//如果获取锁成功,setExclusiveOwnerThread(current)将持有锁的线程设置为当前线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 持有锁的线程为当前线程,则重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平锁
非公平锁指线程比较暴力,上来就进行锁资源的争夺。如果获得锁,则继续向下执行,否则依然会插入到FIFO队列,等待被唤醒。一般情况下非公平锁的效率要优于公平锁。具体实现就是线程直接通过CAS抢锁,如果抢到锁,那么将持有锁的线程设置为自己。如果抢锁失败,那么就再次尝试获取锁,如果还是失败那只能老老实实的加入队列。
final void lock() {
//先通过CAS抢锁
if (compareAndSetState(0, 1))
//抢锁成果,设置持有锁的线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//抢锁失败,再次尝试获取锁,如果获取锁失败,则插入队列
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 再次抢锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
经过上面的源码,我们知道公平锁与非公平锁的区别在于:公平锁按照先到先获取锁的方式进行执行,这样的有点是每个线程都能够获取锁,但是几乎每个线程都要先被阻塞,在被唤醒,这样性能开销较大。非公平锁是按照暴力抢锁的方式,这种方式的优点在于减少线程的阻塞,提高吞吐量,但缺点在于可能会导致有些线程一直获取不到锁,造成线程饥饿。
CLH同步队列
上面提到的CLH队列,是 AQS内部实现的一个双向链表,用于存放抢锁失败的线程。当链表中前置节点执行完成并释放锁后,那么会唤醒下一个节点来尝试获取锁。在AQS中创建了一个Node静态内部类,在这个内部类中定义了前置节点,后置节点,抢锁失败的线程等成员属性。
static final class Node {
// 共享模式的节点
static final Node SHARED = new Node();
// 独占模式的节点
static final Node EXCLUSIVE = null;
// 节点状态
volatile int waitStatus;
// 前置节点
volatile Node prev;
// 后置节点
volatile Node next;
// 被阻塞的线程
volatile Thread thread;
}
基于Node类,在AQS类中有维护了一个head头节点和一个tail尾节点,用与构建链表的头部和尾部。
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
公平锁与非公平锁公用获取锁失败时,插入队列的方法在下面这块代码中
public final void acquire(int arg) {
// tryAcquire() 尝试获取锁
// 尝试获取失败,则调用addWaiter()方法像CLH队列里插入一个独占类型的Node节点
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
CLH插入Node节点
在抢锁失败时,会调用addWaiter方法,将失败线程封装到Node节点中。然后才会将Node节点插入到CLH队列中。
/**
* 将线程插入到队列
*/
private Node addWaiter(Node mode) {
// 构建Node节点
Node node = new Node(Thread.currentThread(), mode);
// 首次插入 tail一定为null , 所以会走enq方法
Node pred = tail;
if (pred != null) { //如果尾节点不是空
node.prev = pred; // 当前节点的前置节点指向尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node; wei'jie'dian
return node;
}
}
enq(node); // 自旋插入节点,直到节点插入成功
return node;
}
/**
* 初始换空节点并插入新节点
*/
private Node enq(final Node node) {
//自旋,直到节点插入成功为止
for (;;) {
Node t = tail;
if (t == null) { // 首次插入前,必须要进行队列初始化一个空节点
if (compareAndSetHead(new Node()))
tail = head; // head,tail指向同一个空节点。
} else {
node.prev = t; // 插入节点的前置节点指向初始化的空节点
if (compareAndSetTail(t, node)) {
t.next = node; // 空节点的next后置节点指向给当前node
return t;
}
}
}
}
从源码中可以知道在首次插入节点时,AQS会初始化一个空节点插入到CLH队列中。当初始化一个空节点后,才是真正的将主节点插入到空节点后。
那为什么要新建一个空节点呢,这是因为每个节点都需要设置前置节点的waitstatus状态,如果值为1,则表示节点被释放,如果节点为-1表示可被唤醒节点,所以每个节点必须要有前置节点,而空节点存在了意义就是为了第一个入队的节点。当线程入队后,状态并没有被阻塞,那么什么是被阻塞呢。在节点入队完成后,会继续执行acquireQueued()方法。在该方法内线程可能会再次抢锁,如果抢锁成功,那么就将head节点由空节点重新指向自己,否则才会阻塞线程。
/**
* 尝试阻塞节点线程,如果节点为头节点,那么会再一次尝试获取锁
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 获取当前节点的前置节点
//如果当前节点为队列的第一个节点,会再次进行抢锁
if (p == head && tryAcquire(arg)) {
// 将head指针指向当前节点,并将当前节点前置节点和线程属性置空
setHead(node);
p.next = null; // 原头指针指向的节点被置空
failed = false;
return interrupted;
}
// 阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 判断前置节点waitStatus状态,决定当前节点是否要阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //前置节点的waitStatus状态
if (ws == Node.SIGNAL) // 如果为-1 表示可以被唤醒,那么当前节点可以被阻塞
return true;
if (ws > 0) { //如果大于0,说明节点出现了问题,将节点丢弃。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果waitStatus为0,那么会将waitStatus状态更改为-1,这样的话下一个节点可以被唤醒。下一轮循环时ws=-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
好吧,坦白了!!!到这里我就扛不住了,这鬼东西太绕了,代码也不太好理解,我也懵逼了。
CLH唤醒下一节点
在调用unlock方法时,当前线程除了会释放锁,在内部方法中也会唤起下一个线程。在获取下一个唤醒的节点时,会校验下个节点的waitStatus是不是-1,如果不是,那么就会从后向前遍历找到离头节点最近的一个节点,并准备唤醒。
//释放锁的方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//当前线程释放锁
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 当前线程释放锁
*
*/
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
*唤起下一个节点
*/
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将 head 节点的 ws 改成 0,清除信号。表示,他已经释放过了。不能重复释放。
compareAndSetWaitStatus(node, ws, 0);
// 找到下一个节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//如果下个节点为空,或者状态不是-1(被取消),那么从后往前寻找,直到找到最前面未被取消的节点。准备唤醒
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒下一个节点
if (s != null)
LockSupport.unpark(s.thread);
}
总结
上述对并发中两个重要的应用synchronized和ReentrantLock做了简单的介绍以及源码分析。对于synchronized,简单介绍了锁的升级,如何加锁以及加锁的原理等。对于ReentrantLock,简单的介绍了公平锁与非公平锁的工作流程,节点的入队以及节点的唤醒等。当然,其中某些点来说可能存在一些疏漏,后续如有发现先会及时补充或修正。
转载自:https://juejin.cn/post/7130991628046581767