likes
comments
collection
share

reentrantlock源码分析

作者站长头像
站长
· 阅读数 41

首先,reentrantlock是基于aqs实现的,我们都知道,reentrantlock有公平锁和非公平锁。我们明确下什么是公平锁,何为公平,one by one,有一定的顺序,有队列。

在分析源码之前,我们首先要记住,state为0是未持有锁,其他为持有锁,下面我们进入源码分析。

aqs这几个变量我们关注下

# 持有锁的状态
private volatile int state;
# node节点
private transient volatile Node tail;
static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;
    volatile Thread thread;

因为aqs是双向链表,node节点为排队的线程封装的对象

我们先分析lock方法

非公平锁

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

公平锁

final void lock() {
    acquire(1);
}

我们以公平锁来分析

  1. 进来先尝试抢锁,如果成功,设置占有锁的线程为当前线程
  2. 失败,进入acquire方法,我们看下acquire方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里主要有三步

  1. 尝试获取锁
  2. 获取失败,尝试获取队列
  3. 自动中断

我们一步步分析,先看tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
  1. 获取锁的状态,如果没有线程占有锁,尝试占有锁,如果成功,设置占有锁的线程为当前线程。
  2. 判断是否当前线程占有锁,如果是当前线程(说明是重入锁)设置获取锁的次数,如果数字小于0,说明有问题,直接报错。设置状态数字,返回true,否则直接返回false

再分析下acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法,首先分析下addWaiter方法

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

这个方法其实很简单,就是将当前线程加入队列尾部。

  1. 初始化当前线程为node
  2. 获取队列尾部线程,不为空,设置当前线程的node的前一个线程为tail,尝试将尾部线程替换为当前线程,将之前tail线程的尾指针指向当前线程的node,返回当前线程的node。
  3. 如果队列尾部线程为空,初始化线程,初始化线程也很简单,获取当前队列的尾部node,如果为空,设置一个空node为头和尾,否则将当前线程加入尾部。

我们再分析下acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
  1. 首先,失败初始化,然后循环获取当前线程node的前一个节点,如果前一个节点为头节点并且获取锁成功,设置当前线程的node节点为头节点。
  2. 获取当前线程node的前一个node的waitStauts,如果是阻塞的,返回true,还没有获取锁,如果大于0,证明前一个线程的node取消了,把当前线程的node指向上上个node,跳过取消的node,否则设置前一个node的状态为挂起。
  3. 如果前一个node被挂起,设置当前线程为挂起。阻塞,打断当前线程。
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

这个方法清除中断位置。 公平锁和非公平锁lock方法就一个区别(公平锁会乖乖排队,非公平锁会尝试获取,获取不到,才会排队)

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

hasQueuedPredecessors 就是排队方法

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

公平锁什么时候可以走到第二步呢,队列头不等于队列尾,并且(头节点的下一个node等于空或者头节点的线程不是当前线程) 换一种说法就是头节点的线程为当前线程才可以尝试设置

lock方法就这样,我们再看下unLock方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

先看下,tryRelease 方法,获取状态,减去1,如果占有线程不是当前线程,报错 如果状态为0,设置node节点的占有线程为空,设置状态值。

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

如果头节点不为空,并且头节点的waitstatus不为0,设置头节点的waitStauts的值为0。唤醒下一个不为0的节点。

转载自:https://juejin.cn/post/7244109491897417765
评论
请登录