diff --git a/week_03/05/12-AbstractQueuedSynchronizer.md b/week_03/05/12-AbstractQueuedSynchronizer.md new file mode 100644 index 0000000000000000000000000000000000000000..9e0b51e1367d38d35d51c35481a2a4b3a548cadf --- /dev/null +++ b/week_03/05/12-AbstractQueuedSynchronizer.md @@ -0,0 +1,405 @@ +# AbstractQueuedSynchronizer 源码分析 + +## TOP 带着问题看源码 + +1. AQS 的数据结构 +2. AQS 的设计模式 +3. AQS 的核心思想 + +## 1. 基本介绍 + +前面我们已经介绍和分析了管程,以及 JVM 层面的管程而 AQS 则是 Java 并发包中管程的一种实现。 + +下面是 AQS 的类实现关系图 + +![](http://qiniu.itliusir.com/aqs_dependent.png) + +## 2. 成员变量分析 + +```java +// 头结点 +private transient volatile Node head; +// 尾节点 +private transient volatile Node tail; +// 同步状态 +private volatile int state; +// AbstractOwnableSynchronizer.class +// 当前持有独占锁的线程,类似对象头的Thread ID,可以用来判断是否为重入 +private transient Thread exclusiveOwnerThread; +``` + +### 2.1 Node 节点分析 + +```java +// 节点状态,有下面几种取值 +volatile int waitStatus; +// 共享模式 +static final Node SHARED = new Node(); +// 独占模式 +static final Node EXCLUSIVE = null; +// 当前线程取消了锁的竞争 +static final int CANCELLED = 1; +// 后继节点需要被唤醒 +static final int SIGNAL = -1; +// 当前节点线程正在等待条件 +static final int CONDITION = -2; +static final int PROPAGATE = -3; +// 双向链表 +// 前驱节点 +volatile Node prev; +// 后继节点 +volatile Node next; +// 节点封装的线程 +volatile Thread thread; +// 条件队列的单向链表 +Node nextWaiter; +``` + +回到问题 **TOP 1** ,可以分析得到 AQS 的数据结构是一个双向链表,并维护了一个全局状态 + +## 3. 核心方法分析 + +### 3.1 尝试获取锁 + +`tryAcquire` 这个方法 AQS 是定义了一个空方法,交由子类自行实现,这里也是采用了 **模板设计模式** + +我们先暂时理解 `tryAcquire` 是尝试获取一下锁,后面会结合具体实现类来分析 + +可以看到如果 `tryAcquire(arg)` 返回 true,方法就结束了,如果返回 false 则往下走。 + +```java +public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} +``` + +接下来我们来分析 *addWaiter(Node.EXCLUSIVE)* + +如果你看过之前分析 `Synchronized` 那一篇的重量锁阶段相信你看到这会发现很熟悉,没错!就是在竞争失败后把当前线程封装到 node 节点(独占模式),与 `Synchronized` 不同的是 node 不是放在队列头部而是塞到队列的队尾处。 + +核心逻辑就是通过 CAS 自旋进行入队。 + +```java +private Node addWaiter(Node mode) { + Node node = new Node(Thread.currentThread(), mode); + // Try the fast path of enq; backup to full enq on failure + Node pred = tail; + // 队列不为空 直接 CAS 入队,成功直接返回 + if (pred != null) { + node.prev = pred; + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + // 队列为空 或 CAS竞争失败 调用 enq 自旋入队 + enq(node); + return node; +} +``` + +我们来看下 *enq(node)* 的逻辑,很简单就是: + +1. 队列为空就初始化 head 节点(初始化之前是null,这里new Node中waitStatus是0) ,队列不为空后就继续循环走下面入队的逻辑 +2. 不为空说明是 CAS 竞争失败,尝试自旋入队 + +```java +private Node enq(final Node node) { + for (;;) { + Node t = tail; + if (t == null) { // Must initialize + if (compareAndSetHead(new Node())) + tail = head; + } else { + node.prev = t; + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } +} +``` + +*acquire(arg)* 中的 *addWaiter(Node.EXCLUSIVE)* 已经分析完了,接下来我们分析外层的 *acquireQueued()* 方法。 + +上面逻辑已经把节点放入队列了,接下来的逻辑就是会把放入队列的节点不断获取锁,直到成功或者中断 + +```java +final boolean acquireQueued(final Node node, int arg) { + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + // 如果前驱节点是队头就尝试获取锁,因为这个节点有可能是刚初始化的 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + // 注意,唯一 return 跳出方法的地方 + return interrupted; + } + // 说明要么上面分支没获取到锁,要么不是头节点 + // 接下来我们分析 shouldParkAfterFailedAcquire + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + // 根据 return 之前的赋值可以知道,failed 为 true 只有 tryAcquire 异常时候会出现 + if (failed) + // 将 node 节点设置为 CANCELLED 状态 + cancelAcquire(node); + } +} +``` + +接下来我们分析 *shouldParkAfterFailedAcquire(p, node)* 方法,注意传过来的第一个节点是 **前驱节点** ,第二个是当前节点 + +该方法核心思想就是判断当前线程是否应该被挂起 + +```java +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus; + // 前驱节点是唤醒状态 + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + return true; + // ws>0 代表前驱节点取消了排队 + if (ws > 0) { + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry. + */ + do { + // 因为依赖前驱节点的唤醒,所以前驱不能是取消状态,再往前找,一直找到前驱不是取消状态的才停止 + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + // 排除 ws = -1 和 ws > 0,加上前面初始化节点并没有看见设置 waitStatus + // 所以进入这个分支的也就是 waitStatus 为0 + // 使用 CAS 把前驱节点状态设置为 唤醒状态,再次循环时候就会从第一个分支 return true + } else { + /* + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking. + */ + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} +``` + +如果上个方法返回为 true ,就会接着调用 park 挂起当前线程。 + +正常来说 第一次都会为 false,因为 第一次只是设置状态,第二次才会校验状态 + +#### 3.1.1 tryAcquire(arg) 的实现 + +我们主要拿 `ReentrantLock` 和 `Semaphore` 的实现来举例子 + +`ReentrantLock` 有两个版本的实现,一个是公平锁,一个是非公平锁 + +我们先来看非公平锁,并没有什么特殊处理,就是先尝试获取一下,成功就返回,失败就入 AQS 的等待队列 + +*ReentrantLock.Sync#nonfairTryAcquire* + +```java +final boolean nonfairTryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + // c 为 0,说明还没有线程持有锁 + if (c == 0) { + // 尝试 CAS 一下,成功就直接返回 + if (compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + // 如果有线程且是当前线程,说明是重入锁,state+1 + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} +``` + +接下来我们来看公平锁,可以看到在非公平的基础上多判断一次 `hasQueuedPredecessors` , `hasQueuedPredecessors` 的逻辑很简单,就是判断队列为不为空,如果不为空说明还有等待的,就不往下走了 + +*ReentrantLock.FairSync#tryAcquire* + +```java +protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + // 队列为空,就 CAS 尝试获取一下锁 + if (!hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + // 重入 + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} + +// 队列不为空返回 true +public final boolean hasQueuedPredecessors() { + Node t = tail; // Read fields in reverse initialization order + Node h = head; + Node s; + return h != t && + ((s = h.next) == null || s.thread != Thread.currentThread()); +} +``` + +`Semaphore` 是 Java 层面实现的一个信号量,也是分为公平和非公平版本,`Semaphore` 也是基于 AQS 来实现的,它是通过一个许可介质,获取许可就把许可减少,如果许可数小于0,就入队列阻塞等待许可的归还;归还许可的时候就把许可数增加。信号量这块可以参考前面的文章,有专门讲解这个机制。 + +我们先来看非公平的实现,其实就是对许可的减少 + +```java +final int nonfairTryAcquireShared(int acquires) { + for (;;) { + // 获取许可数量 + int available = getState(); + // 减去当前要获取的许可的剩余的许可数量 + int remaining = available - acquires; + // CAS 修改许可的数量,如果小于0,则返回负数,在上一层调用的时候如果为负数会加入 AQS 的队列 + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } +} +``` + +再来看看公平锁的实现,相信看到 `hasQueuedPredecessors` 你又懂了 + +```java +protected int tryAcquireShared(int acquires) { + for (;;) { + if (hasQueuedPredecessors()) + return -1; + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } +} +``` + +### 3.2 尝试释放锁 + +同 `tryAcquire` 一样, `tryRelease` 也是 AQS 中的一个模板方法,我们后面会分析 `tryRelease` 的具体实现,我们先来分析 `release` + +同获取锁 `acquire` 不一样,这里是子类实现返回 true才往下走(后面我们会知道,这个是指是否完全释放),后面会调用 `unparkSuccessor` 方法来唤醒后继节点,需要注意的是传入的节点是 **head 节点** + +```java +public final boolean release(int arg) { + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; +} +``` + +*unparkSuccessor* 方法核心逻辑就是唤醒 head 的后继节点,如果后继节点的状态不是需要被唤醒的状态,就从后往前找到 waitStatus 是唤醒状态的最前面的节点 + +```java +private void unparkSuccessor(Node node) { + int ws = node.waitStatus; + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + Node s = node.next; + // 有可能后继节点取消了等待 + if (s == null || s.waitStatus > 0) { + s = null; + // 从后往前遍历 + for (Node t = tail; t != null && t != node; t = t.prev) + // 找到最前面的一个waitStatus <= 0的节点,赋值给 s 等待被唤醒 + if (t.waitStatus <= 0) + s = t; + } + // 后继节点正常且不为空就唤醒 + if (s != null) + LockSupport.unpark(s.thread); +} +``` + +#### 3.2.1 tryRelease(arg) 的实现 + +同样的,我们主要拿 `ReentrantLock` 和 `Semaphore` 的实现来举例子 + +先来看 `ReentrantLock` 的实现,如果可重入的次数已经减少完了就返回true,走 AQS 的模板方法调用唤醒操作 + +*ReentrantLock.Sync#tryRelease* + +```java +protected final boolean tryRelease(int releases) { + // 减少可重入次数 + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + // 这里考虑到了是否为重入锁,也就是是否完全释放, + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); + } + // 更新状态数 + setState(c); + return free; +} +``` + +然后来看下 `Semaphore` 的实现,更新许可,成功则返回 true,走 AQS 的模板进行唤醒操作 + +```java +protected final boolean tryReleaseShared(int releases) { + for (;;) { + // 当前许可数量,因为有可能有竞争,所以每次自旋后重新获取许可数量进行归还 + int current = getState(); + // 新增许可数量 + int next = current + releases; + if (next < current) // overflow + throw new Error("Maximum permit count exceeded"); + // CAS 更新许可数量,失败就重试 + if (compareAndSetState(current, next)) + return true; + } +} +``` + +通过对核心方法的分析,我们可以知道 AQS 定义了很多模板方法,扩展逻辑交由子类实现。 + +回到问题 **TOP 2** ,可以知道采用的设计模式是模板设计模式 + +## 4. 总结 + +由上面对成员变量和核心方法的分析,我们可以看到 AQS 这个管程的实现其实和概念上是相同的,就是对队列和状态值的一个维护,也可以明白 Java 为什么使用管程为核心实现同步,其优势是面向对象,把复杂逻辑封装起来,对于使用更友好。 + +同样我们对 `Semaphore` 这个 Java 层面的信号量实现的分析,也明白了管程那篇文章中写的 **管程和信号量等价** ,因为我们可以使用管程来实现信号量,也可以使用信号量来实现管程,只是管程对我们更加友好! + +以上的总结也是对问题 **TOP 3** 的一个回答。 \ No newline at end of file