likes
comments
collection
share

并发编程-ReentrantLook底层设计

作者站长头像
站长
· 阅读数 16

线程的基础和使用 Synchronized原理分析 并发编程-探索可见性背后的本质以及vloatile原理 并发编程-死锁/ThreadLocal 并发编程-Condition底层设计 并发编程-常见并发工具BlockingQueue的使用及原理解析

重入锁

顾名思义:就是可以重入的互斥锁,但是这个重入是有条件的,允许同一个线程多次获得同一个锁,避免了死锁的发生。

重入锁在实现上比 synchronized 关键字更加灵活,提供了一些额外的特性,比如可定时的锁等待(tryLock)、可中断的锁等待(lockInterruptibly)、公平性等。另外,使用重入锁需要手动加锁和解锁,使得线程控制更加精细。

使用重入锁可以避免死锁的发生,提高程序的可靠性。但是,过多地使用重入锁可能会影响程序的性能,因为重入锁需要进行额外的状态管理和线程切换操作。因此,在使用重入锁时需要谨慎考虑锁的范围和粒度,以及锁的公平性和性能等方面的问题。

这里可能有人对tryLocklockInterruptibly有疑问 具体的使用方式如下:

  • tryLock ReentrantLock lock = new ReentrantLock(); if (lock.tryLock()) { try { // 获得锁后的操作 } finally { lock.unlock(); } } else { // 获取锁失败的操作 } 它的作用可以让线程在获取锁的时候最多等待一定的时间,如果在等待时间内没有获取到锁,则返回false,表示获取锁失败
  • lockInterruptibly Thread t = new Thread(() -> { ReentrantLock lock = new ReentrantLock(); try { lock.lockInterruptibly(); try { // 获得锁后的操作 while (!Thread.currentThread().isInterrupted()) { System.out.println("1"); } } finally { lock.unlock(); } } catch (InterruptedException e) { // 当前线程被中断的操作 Thread.currentThread().interrupt(); } }); t.start(); Thread.sleep(1000); t.interrupt(); 作用就是可以抛出InterruptedException异常,所以线程可以使用interrupt的方式被中断

Lock

Lock(锁)是一种线程同步机制,用于实现互斥和协作访问共享资源。与 synchronized 关键字相比,Lock 接口提供了更加灵活的锁实现,可以实现更加精细的线程控制和高级功能。

其实Lock就是一个接口,定义了锁的一些方法: 并发编程-ReentrantLook底层设计

  1. lock(): 获取锁,如果锁被其他线程持有,则线程会阻塞等待锁的释放。
  2. unlock(): 释放锁,如果当前线程持有锁,则释放锁,否则会抛出 IllegalMonitorStateException 异常。
  3. tryLock(): 尝试获取锁,如果锁当前没有被其他线程持有,则获取锁并立即返回 true,否则立即返回 false,不会阻塞线程。
  4. tryLock(long time, TimeUnit unit): 在一定时间内尝试获取锁,如果锁在给定的时间内未被其他线程持有,则获取锁并立即返回 true,否则等待指定时间,如果在指定时间内还未获取到锁,则返回 false。
  5. newCondition(): 获取与该锁关联的 Condition 对象,用于实现等待/通知模式。

ReentrantLock

ReentrantLock就是Lock的一种实现: 并发编程-ReentrantLook底层设计ReentrantLock是基于AbstractQueuedSynchronizer实现的可重入锁

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer基于 FIFO 等待队列的同步器框架,它为自定义同步器的实现提供了一种底层框架和算法,可以方便地实现各种类型的同步器,用于存储等待获取锁的线程 我们先看一下AQS的代码结构: 并发编程-ReentrantLook底层设计

lock源码实现

并发编程-ReentrantLook底层设计 发现是调用了sync.lock();而sync就是一个内部类并且集成AQS: 并发编程-ReentrantLook底层设计 并且我们可以发现sync是个抽象类一共有两个实现: 并发编程-ReentrantLook底层设计 也就是公平锁和非公平锁(下文会说到详细的实现)至于用哪个是在实例化的时候赋值的如下: 并发编程-ReentrantLook底层设计 接着看sync.lock()源码实现如下: 并发编程-ReentrantLook底层设计 这里我们就拿非公平锁举例:源码实现是这样的

initialTryLock
final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}

compareAndSetState方法使用了unsafe类的一个方法具体实现在jvm里面 并发编程-ReentrantLook底层设计 并发编程-ReentrantLook底层设计 所以这个方法呢就是对比state状态如果是0 则说明当前没有线程占用则会赋值1然后调用setExclusiveOwnerThread,而setExclusiveOwnerThread方法就是给exclusiveOwnerThread赋值记录一下当前这个锁是被这个线程占用的:

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

如果不是state不是0则说明当前的锁被占用了,所以此时需要判断被占用的线程是否是自己, 并发编程-ReentrantLook底层设计 这里也比较简单就是state加一加好了,然后返回true,当前线程获取了所以可以执行对应的代码块否则就会返回false

acquire

上文我们知道如果锁被占用了initialTryLock会返回false则此时会执行acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg))
        acquire(null, arg, false, false, false, 0L);
}

首先是tryAcquire

protected final boolean tryAcquire(int acquires) {
    if (getState() == 0 && compareAndSetState(0, acquires)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

这里又尝试着获取锁,因为可能到这一步刚好这个锁就释放掉了所以就可以直接获取到锁 如果还是没有获取到锁才会走到acquire方法

final int acquire(Node node, int arg, boolean shared,
                  boolean interruptible, boolean timed, long time) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
    boolean interrupted = false, first = false;
    Node pred = null;                // predecessor of node when enqueued

    for (;;) {
        if (!first && (pred = (node == null) ? null : node.prev) != null &&
            !(first = (head == pred))) {
            if (pred.status < 0) {
                cleanQueue();           // predecessor cancelled
                continue;
            } else if (pred.prev == null) {
                Thread.onSpinWait();    // ensure serialization
                continue;
            }
        }
        if (first || pred == null) {
            boolean acquired;
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            if (acquired) {
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        signalNextIfShared(node);
                    if (interrupted)
                        current.interrupt();
                }
                return 1;
            }
        }
        if (node == null) {                 // allocate; retry before enqueue
            if (shared)
                node = new SharedNode();
            else
                node = new ExclusiveNode();
        } else if (pred == null) {          // try to enqueue
            node.waiter = current;
            Node t = tail;
            node.setPrevRelaxed(t);         // avoid unnecessary fence
            if (t == null)
                tryInitializeHead();
            else if (!casTail(t, node))
                node.setPrevRelaxed(null);  // back out
            else
                t.next = node;
        } else if (first && spins != 0) {
            --spins;                        // reduce unfairness on rewaits
            Thread.onSpinWait();
        } else if (node.status == 0) {
            node.status = WAITING;          // enable signal and recheck
        } else {
            long nanos;
            spins = postSpins = (byte)((postSpins << 1) | 1);
            if (!timed)
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                LockSupport.parkNanos(this, nanos);
            else
                break;
            node.clearStatus();
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    return cancelAcquire(node, interrupted, interruptible);
}

梳理一下逻辑:

初始变量:
spins = 0,   
postSpins = 0;   
interrupted = false,   
first = false;  
pred = null;  
node = null  
share = false        

第一次循环:
第一个if中 !first -> true  (pred = null) != null -> false 所以第一个if不执行

第二个if (first || pred == null)  pred == null -> true
所以此时执行了,执行的结果其实就是再一次尝试获取锁,获取成功就返回了否自继续往下执行

第三个if(node == null)第一个条件命中 node = new ExclusiveNode();

第一次循环结束

第二次循环
此时条件有一些变化
spins = 0, 
postSpins = 0; 
interrupted = false, 
first = false;
pred = null;
node = new ExclusiveNode()
share = false 

第一个if:(pred = (node == null) ? null : node.prev) != null -> false 所以没有执行

第二个if: pred == null -> true 一样的继续尝试获取锁

第三个if:走到了else if (pred == null) 这里
这里将waiter赋值给当前线程
然后将node->prev指向tail,但是此时tail = null
继续走到tryInitializeHead方法
private void tryInitializeHead() {
    Node h = new ExclusiveNode();
    if (U.compareAndSetReference(this, HEAD, null, h))
        tail = h;
}
就是新创建一个node,然后head和tail都指向新节点h

第三次循环
第一个if和第二个if不说了还是一样
到第三个else if (pred == null) {    
此时就会将node结点的prev指针指向tail,然后tail的next指针指向node
至此一个链表的结构也就行程了


第四次循环
直接看最后一个if直接走到了最后一个else
LockSupport.park(this);
开始阻塞线程,等待被唤醒了

unlock源码实现

看完上文的逻辑这里就很简单了

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    //这里其实就是将state-1
    int c = getState() - releases;
    if (getExclusiveOwnerThread() != Thread.currentThread())
       //如果当前线程和记录的所占用的线程不符合则抛出异常
        throw new IllegalMonitorStateException();
    
    //判断是否释放成功了
    //释放成功的条件是state == 0
    boolean free = (c == 0);
    if (free)
        //如果释放成功则修改当前占用线程为null
        setExclusiveOwnerThread(null);
    setState(c);
    return free;
}

如果没有释放成功则继续不修改锁的状态,释放成功了则会走到signalNext方法

private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        s.getAndUnsetStatus(WAITING);
        LockSupport.unpark(s.waiter);
    }
}

可以发现是将当前等待的线程唤醒并且设置status为0(这里也可以看出是FIFO结构,去的是头结点的next结点)这又回到了上文中的循环会走到第二个if

if (first || pred == null) {
    boolean acquired;
    try {
        if (shared)
            acquired = (tryAcquireShared(arg) >= 0);
        else
            acquired = tryAcquire(arg);
    } catch (Throwable ex) {
        cancelAcquire(node, interrupted, false);
        throw ex;
    }
    if (acquired) {
        if (first) {
            node.prev = null;
            head = node;
            pred.next = null;
            node.waiter = null;
            if (shared)
                signalNextIfShared(node);
            if (interrupted)
                current.interrupt();
        }
        return 1;
    }
}

尝试获取锁,如果获取不到

 else if (node.status == 0) {
    node.status = WAITING;          // enable signal and recheck
}

又继续将status置为1

公平性体现

我们可以看一下initialTryLock代码的实现 FairSync:

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    } else if (getExclusiveOwnerThread() == current) {
        if (++c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    }
    return false;
}

在这里会先去判断一下当前的锁是不是被占用了,被占用的是谁,而且这里还多了一个判断是线程队列中是否有等待的线程,如果有直接返回false了 而后续的唤醒线程都是从队列的第一个开始唤醒一定是顺序的,所以这个是公平的 NonfairSync:

final boolean initialTryLock() {
    Thread current = Thread.currentThread();
    if (compareAndSetState(0, 1)) { // first attempt is unguarded
        setExclusiveOwnerThread(current);
        return true;
    } else if (getExclusiveOwnerThread() == current) {
        int c = getState() + 1;
        if (c < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(c);
        return true;
    } else
        return false;
}

当然后续的唤醒线程都是从队列的第一个开始唤醒一定是顺序的,这个是公平的, 但是区别于上文中的公平锁的实现是他没有判断当前是否有等待的队列,如果此时一个新的线程进来需要获取锁,恰巧一个老的线程正在释放锁已经将state置为0了所以此时新线程不就插队成功了,所以非公平就体现在这里

底层实现逻辑流程图

并发编程-ReentrantLook底层设计