diff --git a/second/week_03/81/AbstractQueuedSynchronizer.md b/second/week_03/81/AbstractQueuedSynchronizer.md new file mode 100644 index 0000000000000000000000000000000000000000..492c579e78dffcfdb48537718c57ec9fc745cdbd --- /dev/null +++ b/second/week_03/81/AbstractQueuedSynchronizer.md @@ -0,0 +1,1392 @@ +### 简述 + +AbstractQueuedSynchronizer 是 JUC 中通过 Sync Queue(并发安全的 CLH Queue), Condition Queue, volatile 变量 state 提供的控制线程获取统一资源(state)的 Synchronized 工具. + +主要特点: + ++ 内部含有两条 Queue(Sync Queue, Condition Queue) ++ AQS 内部定义获取锁(acquire), 释放锁(release)的主逻辑, 子类实现响应的模版方法即可 ++ 支持共享锁与独占锁两种模式,共享模式时只用 Sync Queue, 独占模式有时只用 Sync Queue, 但若涉及 Condition, 则还有 Condition Queue。 ++ 支持包括:不响应中断获取独占锁(acquire), 响应中断获取独占锁(acquireInterruptibly), 超时获取独占锁(tryAcquireNanos); 不响应中断获取共享锁(acquireShared), 响应中断获取共享锁(acquireSharedInterruptibly), 超时获取共享锁(tryAcquireSharedNanos); + +AQS 是一个并发框架,针对具体的场景,需要子类继承 AQS 并实现以下主要方法: + +实现独占:tryAcquire、tryRelease、isHeldExclusively +实现共享:tryAcquireShared、tryReleaseShared + +一般的lock获取释放流程如下: + +```java +# lock 获取 +Acquire: +while(!tryAcquire(arg)){ // tryAcquire交由子类来实现, 改变 AQS 的state的值 + 1. tryAcquire 获取lock没成功, 则入 Sync Queue + 2. 若当前节点是 head.next, 则再次尝试获取一下lock (tryAcquire) + 3. 获取 lock 失败, 则改变 前继节点的 waitStatus 的值(变成SIGNAL), 进行 blocked +} + +# lock 释放 +Release: +if(tryRelease(arg)){ / tryRelease交由子类来实现, 改变 AQS 的state的值 + 1. 判断 lock 是否释放彻底 + 2. 若自己被标记为SIGNAL, 则唤醒后继节点, 通知其去获取 AQS 中 state 的值 + 3. 将自己的 waitStatus 进行复位到 0 +} +``` + +整个 AQS 分为以下几部分: + ++ Node 节点, 用于存放获取线程的节点, 存在于 Sync Queue, Condition Queue, 这些节点主要的区分在于 waitStatus 的值 ++ Condition Queue, 这个队列是用于独占模式中, 只有用到 Condition.awaitXX 时才会将 node加到 tail 上(PS: 在使用 Condition的前提是已经获取 Lock) ++ Sync Queue, 独占 共享的模式中均会使用到的存放 Node 的 CLH queue(主要特点是, 队列中总有一个 dummy 节点, 后继节点获取锁的条件由前继节点决定, 前继节点在释放 lock 时会唤醒sleep中的后继节点) ++ ConditionObject, 用于独占的模式, 主要是线程释放lock, 加入 Condition Queue, 并进行相应的 signal 操作 ++ 独占的获取lock (acquire, release), 例如 ReentrantLock 就是使用这种 ++ 共享的获取lock (acquireShared, releaseShared), 例如 ReeantrantReadWriteLock, Semaphore, CountDownLatch + +### 源码分析 + +#### **AbstractQueuedSynchronizer 内部类 Node** + +Node 节点是代表获取lock的线程, 存在于 Condition Queue, Sync Queue 里面, 而其主要的分别就是 nextWaiter (标记共享还是独占) + +waitStatus 标记node的状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0。 + ++ CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 ++ SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 ++ CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 ++ PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 ++ 0:新结点入队时的默认状态。 + +**注意** 负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。 + +{% asset_img node.jpg %} + +```java +/** + * 代表 Thread 存在于 Sync Queue 与 Condition Queue 的节点 + */ +static final class Node { + /** marker to indicate a node is wating in shared mode */ + /** 标识节点是否是 共享的节点(这样的节点只存在于 Sync Queue 里面) */ + static final Node SHARED = new Node(); + /** marker to indicate a node is waiting in exclusive mode */ + /** 标识节点是 独占模式 */ + static final Node EXCLUSIVE = null; + + /** waitStatus value yto indicate thread has cancelled */ + /** + * CANCELLED 说明节点已经 取消获取 lock 了(一般是由于 interrupt 或 timeout 导致的) + * 很多时候是在 cancelAcquire 里面进行设置这个标识 + */ + static final int CANCELLED = 1; + + /** waitStatus value to indicate successor;s thread needs unparking */ + /** + * SIGNAL 标识当前节点的后继节点需要唤醒(PS: 这个通常是在 独占模式下使用, 在共享模式下有时用 PROPAGATE) + * + */ + static final int SIGNAL = -1; + + /** waitStatus value to indicate thread is waiting on condition */ + /** + * 当前节点在 Condition Queue 里面 + */ + static final int CONDITION = -2; + /** + * waitStatus value to indicate the next acquireShared should + * unconditionally propagate + */ + /** + * 当前节点获取到 lock 或进行 release lock 时, 共享模式的最终状态是 PROPAGATE(PS: 有可能共享模式的节点变成 PROPAGATE 之前就被其后继节点抢占 head 节点, 而从Sync Queue中被踢出掉) + */ + static final int PROPAGATE = -3; + + /** + * Status field, taking only the values: + * + * SIGNAL: The successor of this node is (or will soon be) + * blocked (via park), so the current node must + * unpark its successor when is releases or + * cancels. To avoid races, acquire methods must + * first indicate they need a signal, + * then retry the atomic acquire, and then, + * on failure, block + * CANCELLED: This node is cancelled due to timeout or interrupt + * Nodes never leave this state. In particular, + * a thread with cancelled node never again blocks + * CONDITION: This node is currently on a condition queue. + * It will not be used as a sync queue node + * until transferred, at which time the status + * will be set to 0. (Use of this value here has + * nothing to do with other uses of the + * field, but simplifies mechanics) + * PROPAGATE: A releaseShared should be propagated to other + * nodes. This is set (for head node only) in + * doReleaseShared to ensure propagation + * continues, even if other operations hava + * since intervened + * 0: None of the above(以上) + * + * The values are arranged numerically to simplify use. + * Non-negative values mean that a node doesn't need to + * signal. So, most code doesn't need to check for particular + * values, just for sign + * + * The field is initialized to 0 for narmal sync nodes, and + * CONDITION for condition nodes. It is modified using CAS + * (or when possible, unconditional volatile writes) + * + */ + volatile int waitStatus; + + /** + * 节点在 Sync Queue 里面时的前继节点(主要来进行 skip CANCELLED 的节点) + * 注意: 根据 addWaiter方法: + * 1. prev节点在队列里面, 则 prev != null 肯定成立 + * 2. prev != null 成立, 不一定 node 就在 Sync Queue 里面 + */ + volatile Node prev; + + /** + * Node 在 Sync Queue 里面的后继节点, 主要是在release lock 时进行后继节点的唤醒 + * 而后继节点在前继节点上打上 SIGNAL 标识, 来提醒他 release lock 时需要唤醒 + */ + volatile Node next; + + /** 获取 lock 的引用 */ + volatile Thread thread; + + /** + * 作用分成两种: + * 1. 在 Sync Queue 里面, nextWaiter用来判断节点是 共享模式, 还是独占模式 + * 2. 在 Condition queue 里面, 指向其后继节点 (Condition queue是一个单向的, 不支持并发的 list) + */ + Node nextWaiter; + + /** 当前节点是否是共享模式 */ + final boolean isShared() { + return nextWaiter == SHARED; + } + + /** + * 获取 node 的前继节点 + */ + final Node predecessor() throws NullPointerException{ + Node p = prev; + if(p == null){ + throw new NullPointerException(); + }else{ + return p; + } + } + + Node(){ + // Used to establish initial head or SHARED marker + } + + /** + * 初始化 Node 用于 Sync Queue 里面 + */ + Node(Thread thread, Node mode){ // Used by addWaiter + this.nextWaiter = mode; + this.thread = thread; + } + + /** + * 初始化 Node 用于 Condition Queue 里面 + */ + Node(Thread thread, int waitStatus){ // Used by Condition + this.waitStatus = waitStatus; + this.thread = thread; + } +} +``` + +waitStatus的状态变化: + +1. 线程刚入 Sync Queue 里面, 发现 独占锁被其他人获取, 则将其前继节点标记为 SIGNAL, 然后再尝试获取一下锁(调用 tryAcquire 方法) +2. 若 调用 tryAcquire 方法获取失败, 则判断一下是否前继节点被标记为 SIGNAL, 若是的话 直接 block(block前会确保前继节点被标记为SIGNAL, 因为前继节点在进行释放锁时根据是否标记为 SIGNAL 来决定唤醒后继节点与否 <- 这是独占的情况下) +3. 前继节点使用完lock, 进行释放, 因为自己被标记为 SIGNAL, 所以唤醒其后继节点 + +waitStatus 变化过程: + +1. 独占模式下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0) +2. 独占模式 + 使用 Condition情况下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0) + 其上可能涉及 中断与超时, 只是多了一个 CANCELLED, 当节点变成 CANCELLED, 后就等着被清除 +3. 共享模式下: 0(初始) -> PROPAGATE(获取 lock 或release lock 时) (获取 lock 时会调用 setHeadAndPropagate 来进行 传递式的唤醒后继节点, 直到碰到 独占模式的节点) +4. 共享模式 + 独占模式下: 0(初始) -> signal(被后继节点标记为release需要唤醒后继节点) -> 0 (等释放好lock, 会恢复到0) + +#### **AbstractQueuedSynchronizer 内部Queue :Condition Queue** + +Condition Queue 是一个并发不安全的, 只用于独占模式的队列(PS: 为什么是并发不安全的呢? 主要是在操作 Condition 时, 线程必需获取 独占的 lock, 所以不需要考虑并发的安全问题); +而当Node存在于 Condition Queue 里面, 则其只有 waitStatus, thread, nextWaiter 有值, 其他的都是null(其中的 waitStatus 只能是 CONDITION, 0(0 代表node进行转移到 Sync Queue里面, 或被中断/timeout)); 这里有个注意点, 就是 当线程被中断或获取 lock 超时, 则一瞬间 node 会存在于 Condition Queue, Sync Queue 两个队列中. + +{% asset_img condition_queue.jpg %} + +节点 Node4, Node5, Node6, Node7 都是调用 Condition.awaitXX 方法 加入 Condition Queue(PS: 加入后会将原来的 lock 释放) + +**Condition Queue 入队列方法 addConditionWaiter** + +```java +/** + * Adds a new waiter to wait queue + * 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面 + * 大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况 + * @return + */ +private Node addConditionWaiter(){ + Node t = lastWaiter; // 1. Condition queue 的尾节点 + // If lastWaiter is cancelled, clean out + // 2.尾节点已经Cancel, 直接进行清除, + // 这里有1个问题, 1 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时 + // 一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会) + if(t != null && t.waitStatus != Node.CONDITION){ + unlinkCancelledWaiters(); // 3. 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt)) + t = lastWaiter; // 4. 获取最新的 lastWaiter + } + Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 将线程封装成 node 准备放入 Condition Queue 里面 + if(t == null){ + firstWaiter = node; // 6 .Condition Queue 是空的 + }else{ + t.nextWaiter = node; // 7. 最加到 queue 尾部 + } + lastWaiter = node; // 8. 重新赋值 lastWaiter + return node; +} +``` + +**Condition Queue 删除Cancelled节点的方法 unlinkCancelledWaiters** + +当Node在Condition Queue 中, 若状态不是 CONDITION, 则一定是 被中断或超时 + +```java +/** + * 在 调用 addConditionWaiter 将线程放入 Condition Queue 里面时 或 awiat 方法获取 差不多结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点 + * 这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点 + */ +private void unlinkCancelledWaiters(){ + Node t = firstWaiter; + Node trail = null; + while(t != null){ + Node next = t.nextWaiter; // 1. 先初始化 next 节点 + if(t.waitStatus != Node.CONDITION){ // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的) + t.nextWaiter = null; // 3. Node.nextWaiter 置空 + if(trail == null){ // 4. 一次都没有遇到有效的节点 + firstWaiter = next; // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理) + }else{ + trail.nextWaiter = next; // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t + } + if(next == null){ // 7. next == null 说明 已经 traverse 完了 Condition Queue + lastWaiter = trail; + } + }else{ + trail = t; // 8. 将有效节点赋值给 trail + } + t = next; + } +} +``` + +**Condition Queue 转移节点的方法 transferForSignal** + +transferForSignal只有在节点被正常唤醒才调用的正常转移的方法 + +```java +/** + * 将 Node 从Condition Queue 转移到 Sync Queue 里面 + * 在调用transferForSignal之前, 会 first.nextWaiter = null; + * 而我们发现 若节点是因为 timeout / interrupt 进行转移, 则不会进行这步操作; 两种情况的转移都会把 wautStatus 置为 0 + */ +final boolean transferForSignal(Node node){ + /** + * If cannot change waitStatus, the node has been cancelled + */ + if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已经 cancelled 则失败 + return false; + } + + /** + * Splice onto queue and try to set waitStatus of predecessor to + * indicate that thread is (probably) waiting, If cancelled or + * attempt to set waitStatus fails, wake up to resync (in which + * case the waitStatus can be transiently and harmlessly wrong) + */ + Node p = enq(node); // 2. 加入 Sync Queue + int ws = p.waitStatus; + if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 这里的 ws > 0 指Sync Queue 中node 的前继节点cancelled 了, 所以, 唤醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明 前继节点已经变成 SIGNAL 或 cancelled, 所以也要 唤醒 + LockSupport.unpark(node.thread); + } + return true; +} +``` + +**Condition Queue 转移节点的方法 transferAfterCancelledWait** + +transferAfterCancelledWait 在节点获取lock时被中断或获取超时才调用的转移方法 + +```java +/** + * 将 Condition Queue 中因 timeout/interrupt 而唤醒的节点进行转移 + */ +final boolean transferAfterCancelledWait(Node node){ + if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 没有 node 没有 cancelled , 直接进行转移 (转移后, Sync Queue , Condition Queue 都会存在 node) + enq(node); + return true; + } + + /** + * If we lost out to a signal(), then we can't proceed + * until it finishes its enq(). Cancelling during an + * incomplete transfer is both race and transient, so just + * spin + */ + while(!isOnSyncQueue(node)){ // 2.这时是其他的线程发送signal,将本线程转移到 Sync Queue 里面的工程中(转移的过程中 waitStatus = 0了, 所以上面的 CAS 操作失败) + Thread.yield(); // 这里调用 isOnSyncQueue判断是否已经 入Sync Queue 了 + } + return false; +} +``` + +#### **AbstractQueuedSynchronizer 内部 Queue Sync Queue** + +Sync Queue 是一个类似于 CLH Queue 的并发安全, 双向, 用于独占和共享两种模式下的 queue. +而当 Node 存在于 Sync Queue 时, waitStatus,, prev, next, thread, nextWaiter 均可能有值; waitStatus 可能是 SIGNAL, 0, PROPAGATE, CANCELLED; 当节点不是 head 时一定prev != null(而 node.prev != null 不能说明节点一定存在于 Sync Queue); node.next != null 则 node一定存在于Sync Queue, 而 node存在于 Sync Queue 则 node.next 就不一定 != null; thread 则代表获取 lock 的线程; nextWaiter 用于标示共享还是独占的获取 lock + +{% asset_img sync_queue.jpg %} + +这个图代表有个线程获取lock, 而 Node1, Node2, Node3 则在Sync Queue 里面进行等待获取lock(PS: 注意到 dummy Node 的SINGNAL 这是叫获取 lock 的线程在释放lock时通知后继节点的标示) + +**Sync Queue 节点入Queue方法** + +这里有个地方需要注意, 就是初始化 head, tail 的节点, 不一定是 head.next, 因为期间可能被其他的线程进行抢占了 + +```java +/** + * Creates and enqueues node for current thread and given mode. + * + * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared + * @return the new node + */ +/** + * 将当前的线程封装成 Node 加入到 Sync Queue 里面 + */ +private Node addWaiter(Node mode){ + Node node = new Node(Thread.currentThread(), mode); // 1. 封装 Node + // Try the fast path of enq; backup to full enq on failure + Node pred = tail; + if(pred != null){ // 2. pred != null -> 队列中已经有节点, 直接 CAS 到尾节点 + node.prev = pred; // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时 node.prev 一定 != null(除 dummy node), 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 ) + if(compareAndSetTail(pred, node)){ // 4. CAS node 到 tail + pred.next = node; // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null) + return node; + } + } + enq(node); // 6. 队列为空, 调用 enq 入队列 + return node; +} + + +/** + * 这个插入会检测head tail 的初始化, 必要的话会初始化一个 dummy 节点, 这个和 ConcurrentLinkedQueue 一样的 + * Insert node into queue, initializing if necessary. See picture above. + * @param node the node to insert + * @return node's predecessor 返回的是前继节点 + */ +/** + * 将节点 node 加入队列 + * 这里有个注意点 + * 情况: + * 1. 首先 queue是空的 + * 2. 初始化一个 dummy 节点 + * 3. 这时再在tail后面添加节点(这一步可能失败, 可能发生竞争被其他的线程抢占) + * 这里为什么要加入一个 dummy 节点呢? + * 这里的 Sync Queue 是CLH lock的一个变种, 线程节点 node 能否获取lock的判断通过其前继节点 + * 而且这里在当前节点想获取lock时通常给前继节点 打上 signal 的标识(表示前继节点释放lock需要通知我来获取lock) + * 若这里不清楚的同学, 请先看看 CLH lock的资料 (这是理解 AQS 的基础) + */ +private Node enq(final Node node){ + for(;;){ + Node t = tail; + if(t == null){ // Must initialize // 1. 队列为空 初始化一个 dummy 节点 其实和 ConcurrentLinkedQueue 一样 + if(compareAndSetHead(new Node())){ // 2. 初始化 head 与 tail (这个CAS成功后, head 就有值了, 详情将 Unsafe 操作) + tail = head; + } + }else{ + node.prev = t; // 3. 先设置 Node.pre = pred (PS: 则当一个 node在Sync Queue里面时 node.prev 一定 != null, 但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 ) + if(compareAndSetTail(t, node)){ // 4. CAS node 到 tail + t.next = node; // 5. CAS 成功, 将 pred.next = node (PS: 说明 node.next != null -> 则 node 一定在 Sync Queue, 但若 node 在Sync Queue 里面不一定 node.next != null) + return t; + } + } + } +} +``` + +**Sync Queue 节点出Queue方法** + +这里的出Queue的方法其实有两个 + +1. 新节点获取lock, 调用setHead抢占head, 并且剔除原head +2. 节点因被中断或获取超时而进行 cancelled, 最后被剔除 + +```java +/** + * 设置 head 节点(在独占模式没有并发的可能, 当共享的模式有可能) + */ +private void setHead(Node node){ + head = node; + node.thread = null; // 清除线程引用 + node.prev = null; // 清除原来 head 的引用 <- 都是 help GC +} + + +/** + * Cancels an ongoing attempt to acquire. + * + * @param node the node + */ +/** + * 清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面) + */ +private void cancelAcquire(Node node) { + // Ignore if node doesn't exist + if (node == null) + return; + + node.thread = null; // 1. 线程引用清空 + + // Skip cancelled predecessors + Node pred = node.prev; + while (pred.waitStatus > 0) // 2. 若前继节点是 CANCELLED 的, 则也一并清除 + node.prev = pred = pred.prev; + + // predNext is the apparent node to unsplice. CASes below will + // fail if not, in which case, we lost race vs another cancel + // or signal, so no further action is necessary. + Node predNext = pred.next; // 3. 这里的 predNext也是需要清除的(只不过在清除时的 CAS 操作需要 它) + + // Can use unconditional write instead of CAS here. + // After this atomic step, other Nodes can skip past us. + // Before, we are free of interference from other threads. + node.waitStatus = Node.CANCELLED; // 4. 标识节点需要清除 + + // If we are the tail, remove ourselves. + if (node == tail && compareAndSetTail(node, pred)) { // 5. 若需要清除额节点是尾节点, 则直接 CAS pred为尾节点 + compareAndSetNext(pred, predNext, null); // 6. 删除节点predNext + } else { + // If successor needs signal, try to set pred's next-link + // so it will get one. Otherwise wake it up to propagate. + int ws; + if (pred != head && + ((ws = pred.waitStatus) == Node.SIGNAL || // 7. 后继节点需要唤醒(但这里的后继节点predNext已经 CANCELLED 了) + (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && // 8. 将 pred 标识为 SIGNAL + pred.thread != null) { + Node next = node.next; + if (next != null && next.waitStatus <= 0) // 8. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点 + compareAndSetNext(pred, predNext, next); + } else { + unparkSuccessor(node); // 若 pred 是头节点, 则此刻可能有节点刚刚进入 queue ,所以进行一下唤醒 + } + + node.next = node; // help GC + } +} +``` + +**AbstractQueuedSynchronizer 独占的获取lock** + +独占方式获取lock主要流程: + +1. 调用 tryAcquire 尝试性的获取锁(一般都是由子类实现), 成功的话直接返回 +2. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号 +3. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking) +4. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt), 若是响应中断的则直接抛出异常 + +独占方式获取lock主要分成下面3类: + +1. acquire 不响应中断的获取lock, 这里的不响应中断指的是线程被中断后会被唤醒, 并且继续获取lock,在方法返回时, 根据刚才的获取过程是否被中断来决定是否要自己中断一下(方法 selfInterrupt) +2. doAcquireInterruptibly 响应中断的获取 lock, 这里的响应中断, 指在线程获取 lock 过程中若被中断, 则直接抛出异常 +3. doAcquireNanos 响应中断及超时的获取 lock, 当线程被中断, 或获取超时, 则直接抛出异常, 获取失败 + +**AbstractQueuedSynchronizer 独占的获取lock 方法 acquire** + +```java +/** acquire 是用于获取锁的最常用的模式 + * 步骤 + * 1. 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回 + * 2. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号 + * 3. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking) + * 4. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt) + * + */ +public final void acquire(int arg){ + if(!tryAcquire(arg)&& + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){ + selfInterrupt(); + } +} +``` + +**AbstractQueuedSynchronizer 循环获取lock 方法 acquireQueued** + +```java + /** + * 不支持中断的获取锁 + * 主逻辑: + * 1. 当当前节点的前继节点是head节点时先 tryAcquire获取一下锁, 成功的话设置新 head, 返回 + * 2. 第一步不成功, 检测是否需要sleep, 需要的话就 sleep, 等待前继节点在释放lock时唤醒 或通过中断来唤醒 + * 3. 整个过程可能需要blocking nonblocking 几次 + */ + final boolean acquireQueued(final Node node, int arg){ + boolean failed = true; + try { + boolean interrupted = false; + for(;;){ + final Node p = node.predecessor(); // 1. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) + if(p == head && tryAcquire(arg)){ // 2. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下 + setHead(node); // 3. 获取 lock 成功, 直接设置 新head(原来的head可能就直接被回收) + p.next = null; // help GC // help gc + failed = false; + return interrupted; // 4. 返回在整个获取的过程中是否被中断过 ; 但这又有什么用呢? 若整个过程中被中断过, 则最后我在 自我中断一下 (selfInterrupt), 因为外面的函数可能需要知道整个过程是否被中断过 + } + if(shouldParkAfterFailedAcquire(p, node) && // 5. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) + parkAndCheckInterrupt()){ // 6. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒 + interrupted = true; + } + } + }finally { + if(failed){ // 7. 在整个获取中出错 + cancelAcquire(node); // 8. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) + } + } + } +``` + +shouldParkAfterFailedAcquire(Node, Node) + +**shouldParkAfterFailedAcquire(Node, Node)** + +此方法主要用于检查状态,看看自己是否真的可以去休息了,万一队列前边的线程都放弃了只是瞎站着,那也说不定,对吧! + +```java +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus;//拿到前驱的状态 + if (ws == Node.SIGNAL) + //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了 + return true; + if (ws > 0) { + /* + * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。 + * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)! + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢! + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} +``` + +整个流程中,如果前驱结点的状态不是SIGNAL,那么自己就不能安心去休息,需要去找个安心的休息点,同时可以再尝试下看有没有机会轮到自己拿号。 + +**parkAndCheckInterrupt()** + +如果线程找好安全休息点后,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。 + +```java +private final boolean parkAndCheckInterrupt() { + LockSupport.park(this);//调用park()使线程进入waiting状态 + return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。 + } +``` + +park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。 + +**AbstractQueuedSynchronizer 支持中断获取lock 方法 doAcquireInterruptibly** + +```java +/** + * Acquire in exclusive interruptible mode. + * @param arg the acquire argument + */ +private void doAcquireInterruptibly(int arg) throws InterruptedException{ + final Node node = addWaiter(Node.EXCLUSIVE); // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面 + boolean failed = true; + try { + for(;;){ + final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) + if(p == head && tryAcquire(arg)){ // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下 + setHead(node); + p.next = null; // help GC + failed = false; + return; + } + + if(shouldParkAfterFailedAcquire(p, node) && // 4. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) + parkAndCheckInterrupt()){ // 5. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒 + throw new InterruptedException(); // 6. 线程此时唤醒是通过线程中断, 则直接抛异常 + } + } + }finally { + if(failed){ // 7. 在整个获取中出错(比如线程中断) + cancelAcquire(node); // 8. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) + } + } +} +``` + +**AbstractQueuedSynchronizer 支持超时&中断获取lock 方法 doAcquireNanos(int arg, long nanosTimeout)** + +```java +/** + * Acquire in exclusive timed mode + * + * @param arg the acquire argument + * @param nanosTimeout max wait time + * @return {@code true} if acquired + */ +private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException{ + if(nanosTimeout <= 0L){ + return false; + } + + final long deadline = System.nanoTime() + nanosTimeout; // 0. 计算截至时间 + final Node node = addWaiter(Node.EXCLUSIVE); // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面 + boolean failed = true; + + try { + for(;;){ + final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) + if(p == head && tryAcquire(arg)){ // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquire尝试获取一下 + setHead(node); + p.next = null; // help GC + failed = false; + return true; + } + + nanosTimeout = deadline - System.nanoTime(); // 4. 计算还剩余的时间 + if(nanosTimeout <= 0L){ // 5. 时间超时, 直接返回 + return false; + } + if(shouldParkAfterFailedAcquire(p, node) && // 6. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) + nanosTimeout > spinForTimeoutThreshold){ // 7. 若没超时, 并且大于spinForTimeoutThreshold, 则线程 sleep(小于spinForTimeoutThreshold, 则直接自旋, 因为效率更高 调用 LockSupport 是需要开销的) + LockSupport.parkNanos(this, nanosTimeout); + } + if(Thread.interrupted()){ // 8. 线程此时唤醒是通过线程中断, 则直接抛异常 + throw new InterruptedException(); + } + } + }finally { + if(failed){ // 9. 在整个获取中出错(比如线程中断/超时) + cancelAcquire(node); // 10. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) + } + } +} +``` + +**AbstractQueuedSynchronizer 释放lock方法** + +整个释放 lock 流程 + +1. 调用子类的 tryRelease 方法释放获取的资源 +2. 判断是否完全释放lock(这里有 lock 重复获取的情况) +3. 判断是否有后继节点需要唤醒, 需要的话调用unparkSuccessor进行唤醒 + +看代码: + +```java +/** + * Releasing in exclusive mode. Implemented by unblocking one or + * more threads if {@link #tryRelease(int)} returns true. + * This method can be used to implement method {@link "Lock#unlock}. + * + * @param arg the release argument. This value is conveyed to + * {@link #tryRelease(int)} but is otherwise uninterpreted and + * can represent anything you like. + * @return the value returned from {@link #tryRelease(int)} + */ +public final boolean release(int arg){ + if(tryRelease(arg)){ // 1. 调用子类, 若完全释放好, 则返回true(这里有lock重复获取) + Node h = head; + if(h != null && h.waitStatus != 0){ // 2. h.waitStatus !=0 其实就是 h.waitStatus < 0 后继节点需要唤醒 + unparkSuccessor(h); // 3. 唤醒后继节点 + } + return true; + } + return false; +} + + +/** + * Wakes up node's successor, if one exists. + * 唤醒 node 的后继节点 + * 这里有个注意点: 唤醒时会将当前node的标识归位为 0 + * 等于当前节点标识位 的流转过程: 0(刚加入queue) -> signal (被后继节点要求在释放时需要唤醒) -> 0 (进行唤醒后继节点) + * + */ +private void unparkSuccessor(Node node) { + logger.info("unparkSuccessor node:" + node + Thread.currentThread().getName()); + /* + * If status is negative (i.e., possibly needing signal) try + * to clear in anticipation of signalling. It is OK if this + * fails or if status is changed by waiting thread. + */ + int ws = node.waitStatus; + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); // 1. 清除前继节点的标识 + + /* + * Thread to unpark is held in successor, which is normally + * just the next node. But if cancelled or apparently null, + * traverse backwards from tail to find the actual + * non-cancelled successor. + */ + Node s = node.next; + logger.info("unparkSuccessor s:" + node + Thread.currentThread().getName()); + if (s == null || s.waitStatus > 0) { // 2. 这里若在 Sync Queue 里面存在想要获取 lock 的节点,则一定需要唤醒一下(跳过取消的节点) (PS: s == null发生在共享模式的竞争释放资源) + s = null; + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) // 3. 找到 queue 里面最前面想要获取 Lock 的节点 + s = t; + } + logger.info("unparkSuccessor s:"+s); + if (s != null) + LockSupport.unpark(s.thread); +} +``` + +**AbstractQueuedSynchronizer 获取共享lock** + +共享方式获取lock流程: + +1. 调用 tryAcquireShared 尝试性的获取锁(一般都是由子类实现), 成功的话直接返回 +2. tryAcquireShared 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号 +3. 在 Sync Queue 里面进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking +4. 当获取失败, 则判断是否可以 block(block的前提是前继节点被打上 SIGNAL 标示) +5. 共享与独占获取lock的区别主要在于 在共享方式下获取 lock 成功会判断是否需要继续唤醒下面的继续获取共享lock的节点(及方法 doReleaseShared) + +共享方式获取lock主要分成下面3类: + +1. acquireShared 不响应中断的获取lock, 这里的不响应中断指的是线程被中断后会被唤醒, 并且继续获取lock,在方法返回时, 根据刚才的获取过程是否被中断来决定是否要自己中断一下(方法 selfInterrupt) +2. doAcquireSharedInterruptibly 响应中断的获取 lock, 这里的响应中断, 指在线程获取 lock 过程中若被中断, 则直接抛出异常 +3. doAcquireSharedNanos 响应中断及超时的获取 lock, 当线程被中断, 或获取超时, 则直接抛出异常, 获取失败 + +**AbstractQueuedSynchronizer 获取共享lock 方法 acquireShared** + +```java +/** + * 获取 共享 lock + */ +public final void acquireShared(int arg){ + if(tryAcquireShared(arg) < 0){ // 1. 调用子类, 获取共享 lock 返回 < 0, 表示失败 + doAcquireShared(arg); // 2. 调用 doAcquireShared 当前 线程加入 Sync Queue 里面, 等待获取 lock + } +} +``` + +**AbstractQueuedSynchronizer 获取共享lock 方法 doAcquireShared** + +```java +/** + * Acquire in shared uninterruptible mode + * @param arg the acquire argument + */ +private void doAcquireShared(int arg){ + final Node node = addWaiter(Node.SHARED); // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面 + boolean failed = true; + + try { + boolean interrupted = false; + for(;;){ + final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) + if(p == head){ + int r = tryAcquireShared(arg); // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquireShared 尝试获取一下 + if(r >= 0){ + setHeadAndPropagate(node, r); // 4. 获取 lock 成功, 设置新的 head, 并唤醒后继获取 readLock 的节点 + p.next = null; // help GC + if(interrupted){ // 5. 在获取 lock 时, 被中断过, 则自己再自我中断一下(外面的函数可能需要这个参数) + selfInterrupt(); + } + failed = false; + return; + } + } + + if(shouldParkAfterFailedAcquire(p, node) && // 6. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) + parkAndCheckInterrupt()){ // 7. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒 + interrupted = true; + } + } + }finally { + if(failed){ // 8. 在整个获取中出错(比如线程中断/超时) + cancelAcquire(node); // 9. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) + } + } +} +``` + +**AbstractQueuedSynchronizer 获取共享lock 方法 doAcquireSharedInterruptibly** + +```java +/** + * Acquire in shared interruptible mode + * @param arg the acquire argument + */ +private void doAcquireSharedInterruptibly(int arg) throws InterruptedException{ + final Node node = addWaiter(Node.SHARED); // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面 + boolean failed = true; + + try { + for(;;){ + final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) + if(p == head){ + int r = tryAcquireShared(arg); // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquireShared 尝试获取一下 + if(r >= 0){ + setHeadAndPropagate(node, r); // 4. 获取 lock 成功, 设置新的 head, 并唤醒后继获取 readLock 的节点 + p.next = null; // help GC + failed = false; + return; + } + } + + if(shouldParkAfterFailedAcquire(p, node) && // 5. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) + parkAndCheckInterrupt()){ // 6. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒 + throw new InterruptedException(); // 7. 若此次唤醒是 通过线程中断, 则直接抛出异常 + } + } + }finally { + if(failed){ // 8. 在整个获取中出错(比如线程中断/超时) + cancelAcquire(node); // 9. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) + } + } +} +``` + +**AbstractQueuedSynchronizer 获取共享lock 方法 doAcquireSharedNanos** + +```java +/** + * Acquire in shared timed mode + * + * @param arg the acquire argument + * @param nanosTimeout max wait time + * @return {@code true} if acquired + */ +private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException{ + if (nanosTimeout <= 0L){ + return false; + } + + final long deadline = System.nanoTime() + nanosTimeout; // 0. 计算超时的时间 + final Node node = addWaiter(Node.SHARED); // 1. 将当前的线程封装成 Node 加入到 Sync Queue 里面 + boolean failed = true; + + try { + for(;;){ + final Node p = node.predecessor(); // 2. 获取当前节点的前继节点 (当一个n在 Sync Queue 里面, 并且没有获取 lock 的 node 的前继节点不可能是 null) + if(p == head){ + int r = tryAcquireShared(arg); // 3. 判断前继节点是否是head节点(前继节点是head, 存在两种情况 (1) 前继节点现在占用 lock (2)前继节点是个空节点, 已经释放 lock, node 现在有机会获取 lock); 则再次调用 tryAcquireShared 尝试获取一下 + if(r >= 0){ + setHeadAndPropagate(node, r); // 4. 获取 lock 成功, 设置新的 head, 并唤醒后继获取 readLock 的节点 + p.next = null; // help GC + failed = false; + return true; + } + } + + nanosTimeout = deadline - System.nanoTime(); // 5. 计算还剩余的 timeout , 若小于0 则直接return + if(nanosTimeout <= 0L){ + return false; + } + if(shouldParkAfterFailedAcquire(p, node) && // 6. 调用 shouldParkAfterFailedAcquire 判断是否需要中断(这里可能会一开始 返回 false, 但在此进去后直接返回 true(主要和前继节点的状态是否是 signal)) + nanosTimeout > spinForTimeoutThreshold){// 7. 在timeout 小于 spinForTimeoutThreshold 时 spin 的效率, 比 LockSupport 更高 + LockSupport.parkNanos(this, nanosTimeout); + } + if(Thread.interrupted()){ // 7. 若此次唤醒是 通过线程中断, 则直接抛出异常 + throw new InterruptedException(); + } + } + }finally { + if (failed){ // 8. 在整个获取中出错(比如线程中断/超时) + cancelAcquire(node); // 10. 清除 node 节点(清除的过程是先给 node 打上 CANCELLED标志, 然后再删除) + } + } +} +``` + +**AbstractQueuedSynchronizer 释放共享lock 方法** + +特点: 当 Sync Queue中存在连续多个获取 共享lock的节点时, 会出现并发的唤醒后继节点(因为共享模式下获取lock后会唤醒近邻的后继节点来获取lock) + +流程: + +1. 调用子类的 tryReleaseShared来进行释放 lock +2. 判断是否需要唤醒后继节点来获取 lock + +调用流分类 + +场景1: Sync Queue 里面存在 : 1(共享) -> 2(共享) -> 3(共享) -> 4(共享) + 节点1获取 lock 后调用 setHeadAndPropagate -> doReleaseShared 唤醒 节点2 —> 接下来 node 1 在 release 时再次 doReleaseShared, 而 node 2在获取 lock 后调用 setHeadAndPropagate 时再次 doReleaseShared -> 直至到 node 4, node 4的状态变成 PROPAGATE (期间可能有些节点还没设置为 PROPAGATE 就被其他节点调用 setHead 而踢出 Sync Queue) + +场景2: Sync Queue 里面存在 : 1(共享) -> 2(共享) -> 3(独占) -> 4(共享) + 节点1获取 lock 后调用 setHeadAndPropagate -> doReleaseShared 唤醒 节点2 —> 接下来 node 1 在 release 时再次 doReleaseShared, 而 node 2 在获取 lock 后 + 这是发现后继节点不是共享的, 则 Node 2 不在 setHeadAndPropagate 中调用 doReleaseShared, 而Node 3 没有获取lock, 将 Node 2 变成 SIGNAL, 而 node 2 在 release lock 时唤醒 node 3, 而 node 3 最终在 release lock 时 释放 node 4, node 4在release lock后状态还是保持 0 + +看代码: + +```java +private void doReleaseShared(){ + /** + * Ensure that a release propagates, even if there are other + * in-progress acquires/releases. This proceed in the usual + * way of trying to unparkSuccessor of the head if it needs + * signal. But if it does not, status is set to PROPAGATE to + * ensure that upon release, propagation continues. + * Additionally, we must loop in case a new node is added + * while we are doing this. Also, unlike other uses of + * unparkSuccessor, we need to know if CAS to reset status + * fails, if so rechecking. + */ + for(;;){ + Node h = head; // 1. 获取 head 节点, 准备 release + if(h != null && h != tail){ // 2. Sync Queue 里面不为 空 + int ws = h.waitStatus; + if(ws == Node.SIGNAL){ // 3. h节点后面可能是 独占的节点, 也可能是 共享的, 并且请求了唤醒(就是给前继节点打标记 SIGNAL) + if(!compareAndSetWaitStatus(h, Node.SIGNAL, 0)){ // 4. h 恢复 waitStatus 值置0 (为啥这里要用 CAS 呢, 因为这里的调用可能是在 节点刚刚获取 lock, 而其他线程又对其进行中断, 所用cas就出现失败) + continue; // loop to recheck cases + } + unparkSuccessor(h); // 5. 唤醒后继节点 + } + else if(ws == 0 && + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){ //6. h后面没有节点需要唤醒, 则标识为 PROPAGATE 表示需要继续传递唤醒(主要是区别 独占节点最终状态0 (独占的节点在没有后继节点, 并且release lock 时最终 waitStatus 保存为 0)) + continue; // loop on failed CAS // 7. 同样这里可能存在竞争 + } + } + + if(h == head){ // 8. head 节点没变化, 直接 return(从这里也看出, 一个共享模式的 节点在其唤醒后继节点时, 只唤醒一个, 但是 它会在 获取 lock 时唤醒, 释放 lock 时也进行, 所以或导致竞争的操作) + break; // head 变化了, 说明其他节点获取 lock 了, 自己的任务完成, 直接退出 + } + + } +} +``` + +**AbstractQueuedSynchronizer 判断是否阻塞线程方法 shouldParkAfterFailedAcquire** + +```java + /** + * shouldParkAfterFailedAcquire 这个方法最终的作用: + * 本节点在进行 sleep 前一定需要给 前继节点打上 SIGNAL 标识( + * 因为前继节点在 release lock 时会根据 这个标识决定是否需要唤醒后继节点来获取 lock, + * 若释放时 标识是0, 则说明 Sync queue 里面没有等待获取lock的线程, 或Sync queue里面的节点正在获取 lock) + * + * 一般流程: + * 1. 第一次进入此方法 前继节点状态是 0, 则 CAS 为SIGNAL 返回 false(干嘛返回的是FALSE <- 主要是为了再次 tryAcquire 一下, 说不定就能获取锁呢) + * 2. 第二次进来 前继节点标志为SIGNAL, ok, 标识好了, 这下就可以安心睡觉, 不怕前继节点在释放lock后不进行唤醒我了 + */ + private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){ + int ws = pred.waitStatus; + if(ws == Node.SIGNAL){ // 1. 判断是否已经给前继节点打上标识SIGNAL, 为前继节点释放 lock 时唤醒自己做准备 + /** + * This node has already set status asking a release + * to signal it, so it can safely park + */ + return true; + } + + if(ws > 0){ // 2. 遇到个 CANCELLED 的节点 (ws > 0 只可能是 CANCELLED 的节点, 也就是 获取中被中断, 或超时的节点) + /** // 这里我们帮助删除 + * Predecessor was cancelled. Skip over predecessors and + * indicate retry + */ + do{ + node.prev = pred = pred.prev; // 3. 跳过所有 CANCELLED 的节点 + }while(pred.waitStatus > 0); + pred.next = node; // 跳过 CANCELLED 节点 + } + else{ + /** + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking + */ + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 4. 到这里 ws 只可能是 0 或 PROPAGATE (用于 共享模式的, 所以在共享模式中的提醒前继节点唤醒自己的方式, + // 也是给前继节点打上 SIGNAL标识 见 方法 "doReleaseShared" -> "!compareAndSetWaitStatus(h, Node.SIGNAL, 0)" -> unparkSuccessor) + } + + return false; + } +``` + +**AbstractQueuedSynchronizer 线程自己中断方法selfInterrupt** + +```java + /** + * 自我中断, 这主要是怕外面的线程不知道整个获取的过程中是否中断过, 所以才 .... + */ + static void selfInterrupt(){ + Thread.currentThread().interrupt(); + } +``` + +**AbstractQueuedSynchronizer 中断线程方法parkAndCheckInterrupt** + +```java + /** + * Convenience method to park and then check if interrupted + * + * @return {@code true} if interrupted + */ + /** + * 中断当前线程, 并且返回此次的唤醒是否是通过中断 + */ + private final boolean parkAndCheckInterrupt() { + LockSupport.park(this); + logger.info(Thread.currentThread().getName() + " " + "parkAndCheckInterrupt , ThreadName:" + Thread.currentThread().getName()); + return Thread.interrupted(); // Thread.interrupted() 会清除中断标识, 并返上次的中断标识 + } +``` + +#### **AbstractQueuedSynchronizer 一般方法/队列检查方法** + +```java + /******************************************* Queue inspection methods ****************************/ + + + /** + * SyncQueue 里面是否有 node 节点 + */ + public final boolean hasQueuedThreads() { + return head != tail; + } + + + /** + * 获取 lock 是否发生竞争 + */ + public final boolean hasContented(){ + return head != null; + } + + + /** + * Sync Queue 里面的有效的, 最前面的 node 节点 + */ + public final Thread getFirstQueuedThread(){ + return (head == tail) ? null : fullGetFirstQueuedThread(); + } + + /** + * Sync Queue 里面的有效的, 最前面的 node 节点 + */ + private Thread fullGetFirstQueuedThread(){ + /** + * The first node is normally head next. Try to get its + * thread field, ensuring consistent reads: If thread + * field is nulled out or s.prev is no longer head, then + * some other thread(s) concurrently performed sethead in + * between some of our reads. we try this twice before + * restorting to traversal + */ + + Node h, s; + Thread st; + /** + * 这里两次检测是怕线程 timeout 或 cancelled + */ + if(( + (h = head) != null && (s = h.next) != null && + s.prev == head && (st = s.thread) != null || + ( + (h = head) != null && (s = h.next) != null && + s.prev == head && (st = s.thread) != null + ) + )){ + return st; + } + + /** + * Head's next field might not have been set yet, or may have + * been unset after setHead, So we must check to see if tail + * is actually first node. If not, we continue on, safely + * traversing from tail back to head to find first, + * guaranteeing termination + */ + /** + * 从 tail 开始找 + */ + Node t = tail; + Thread firstThread = null; + while(t != null && t != head){ + Thread tt = t.thread; + if(tt != null){ + firstThread = tt; + } + t = t.prev; + } + return firstThread; + } + + + /** + * 判断线程是否在 Sync Queue 里面 + */ + public final boolean isQueued(Thread thread){ + if(thread == null){ + throw new NullPointerException(); + } + for(Node p = tail; p != null; p = p.prev){ // 从tail 开始 + if(p.thread == thread){ + return true; + } + } + return false; + } + + + /** + * 判断 Sync Queue 中等待获取 lock 的第一个 node 是否是 获取 writeLock 的(head 节点是已经获取 lock 的节点) + */ + public final boolean apparentlyFirstQueuedIsExclusive(){ + Node h, s; + return (h = head) != null && + (s = h.next) != null && + !s.isShared() && + s.thread != null; + } + + + /** + * 当前节点之前在 Sync Queue 里面是否有等待获取的 Node + */ + public final boolean hasQueuedPredecessors(){ + /** + * The correctness of this depends on head being initialized + * before tail and on head next being accurate if the current + * thread is first in queue + */ + Node t = tail; // Read fields in reverse initialization order + Node h = head; + Node s; + return h != t && // h != t 表示 Sync Queu 里面至少存在 一个节点 (这时的 h节点可能是 null) + ((s = h.next) == null || s.thread != Thread.currentThread()); // (s = h.next) == null 说明 h节点获取 lock, 而后又被其他获取 lock 的节点从 Sync Queue 里面剔除掉了 + } + + + /********************************************* Instrumentation and monitoring methods **************************/ + + + /** + * 获取 Sync Queue 里面 等待 获取 lock 的 长度 + */ + public final int getQueueLength(){ + int n = 0; + for(Node p = tail; p != null; p = p.prev){ + if(p.thread != null){ + ++n; + } + } + return n; + } + + + /** + * 获取 Sync Queue 里面 等待 获取 lock 的 thread + */ + public final Collection getQueuedThreads(){ + ArrayList list = new ArrayList<>(); + for(Node p = tail; p != null; p = p.prev){ + Thread t = p.thread; + if(t != null){ + list.add(t); + } + } + return list; + } + + + /** + * 获取 Sync Queue 里面 等待 获取 writeLock 的 thread + */ + public final Collection getExclusiveQueuedThreads(){ + ArrayList list = new ArrayList<>(); + for(Node p = tail; p != null; p = p.prev){ + if(!p.isShared()){ + Thread t = p.thread; + if(t != null){ + list.add(t); + } + } + } + return list; + } + + + /** + * 获取 Sync Queue 里面 等待 获取 readLock 的 thread + */ + public final Collection getSharedQueuedThreads(){ + ArrayList list = new ArrayList<>(); + for(Node p = tail; p != null; p = p.prev){ + if(p.isShared()){ + Thread t = p.thread; + if(t != null){ + list.add(t); + } + } + } + return list; + } + + + public String toString(){ + int s = getState(); + String q = hasQueuedThreads() ? "non" : ""; + return super.toString() + "[State = " + s + ", " + q + " empty queue]"; + } + + + /*********************** Internal support methods for Conditions ***********************/ + + /** + * 判断 node 是否在 Sync Queue 里面 + */ + final boolean isOnSyncQueue(Node node){ + /** + * 这里有点 tricky, + * node.waitStatus == Node.CONDITION 则说明 node 一定在 Condition 里面 + * node.prev == null 说明 node 一定不在 Sync Queue 里面 + */ + if(node.waitStatus == Node.CONDITION || node.prev == null){ + return false; + } + // node.next != null 则 node 一定在 Sync Queue; 但是反过来 在Sync Queue 里面的节点 不一定 node.next != null + if(node.next != null){ // If has successor, it must be on queue + return true; + } + + /** + * node.prev can be non-null, but not yet on queue because + * the CAS to place it on queue can fail. So we have to + * traverse from tail to make sure it actually make it. It + * will always be near the tail in calls to this method, and + * unless the CAS failed (which is unlikely), it will be + * there, so we hardly ever traverse much + */ + /** + * 因为这里存在 node 开始enq Sync Queue 的情形, 所以在此查找一下 + */ + return findNodeFromTail(node); + } + + /** + * 从 tail 开始查找 node + */ + private boolean findNodeFromTail(Node node){ + Node t = tail; + for(;;){ + if(t == node){ + return true; + } + if(t == null){ + return false; + } + t = t.prev; + } + } + + /** + * 将 Node 从Condition Queue 转移到 Sync Queue 里面 + * 在调用transferForSignal之前, 会 first.nextWaiter = null; + * 而我们发现 若节点是因为 timeout / interrupt 进行转移, 则不会清除两种情况的转移都会把 wautStatus 置为 0 + */ + final boolean transferForSignal(Node node){ + /** + * If cannot change waitStatus, the node has been cancelled + */ + if(!compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 若 node 已经 cancelled 则失败 + return false; + } + + /** + * Splice onto queue and try to set waitStatus of predecessor to + * indicate that thread is (probably) waiting, If cancelled or + * attempt to set waitStatus fails, wake up to resync (in which + * case the waitStatus can be transiently and harmlessly wrong) + */ + Node p = enq(node); // 2. 加入 Sync Queue + int ws = p.waitStatus; + if(ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){ // 3. 这里的 ws > 0 指Sync Queue 中node 的前继节点cancelled 了, 所以, 唤醒一下 node ; compareAndSetWaitStatus(p, ws, Node.SIGNAL)失败, 则说明 前继节点已经变成 SIGNAL 或 cancelled, 所以也要 唤醒 + LockSupport.unpark(node.thread); + } + return true; + } + + + /** + * 将 Condition Queue 中因 timeout/interrupt 而唤醒的节点进行转移 + */ + final boolean transferAfterCancelledWait(Node node){ + if(compareAndSetWaitStatus(node, Node.CONDITION, 0)){ // 1. 没有 node 没有 cancelled , 直接进行转移 (转移后, Sync Queue , Condition Queue 都会存在 node) + enq(node); + return true; + } + + /** + * If we lost out to a signal(), then we can't proceed + * until it finishes its enq(). Cancelling during an + * incomplete transfer is both race and transient, so just + * spin + */ + while(!isOnSyncQueue(node)){ // 2.这时是其他的线程发送signal,将本线程转移到 Sync Queue 里面的工程中(转移的过程中 waitStatus = 0了, 所以上面的 CAS 操作失败) + Thread.yield(); // 这里调用 isOnSyncQueue判断是否已经 入Sync Queue 了 + } + return false; + } + + /******************** Instrumentation methods for conditions ***************/ + + + /** + * condition 是否属于这个 AQS 的 + */ + public final boolean owns(ConditionObject condition){ + return condition.isOwnedBy(this); + } + + + /** + * 这个 condition Queue 里面是否有等待的线程 + */ + public final boolean hasWaiters(ConditionObject condition){ + if(!owns(condition)){ + throw new IllegalArgumentException(); + } + return condition.hasWaiters(); + } + + + /** + * 这个 condition Queue 里面等待的线程的量 + */ + public final int getWaitQueueLength(ConditionObject condition){ + if(!owns(condition)){ + throw new IllegalArgumentException("Not owner"); + } + return condition.getWaitQueueLength(); + } + + /** + * 这个 condition Queue 里面等待的线程 + */ + public final Collection getWaitingThreads(ConditionObject condition){ + if(!owns(condition)){ + throw new IllegalArgumentException("not owner"); + } + return condition.getWaitingThreads(); + } +``` \ No newline at end of file diff --git a/second/week_03/81/Java_memory_model.md b/second/week_03/81/Java_memory_model.md new file mode 100644 index 0000000000000000000000000000000000000000..9e4876847f094bdea67204c8b49506694cb28892 --- /dev/null +++ b/second/week_03/81/Java_memory_model.md @@ -0,0 +1,86 @@ +#### 为什么定义Java内存模型 + +现代计算机体系大部是采用的对称多处理器的体系架构。每个处理器均有独立的寄存器组和缓存,多个处理器可同时执行同一进程中的不同线程,这里称为处理器的乱序执行。在Java中,不同的线程可能访问同一个共享或共享变量。如果任由编译器或处理器对这些访问进行优化的话,很有可能出现无法想象的问题,这里称为编译器的重排序。除了处理器的乱序执行、编译器的重排序,还有内存系统的重排序。因此Java语言规范引入了Java内存模型,通过定义多项规则对编译器和处理器进行限制,主要是针对可见性和有序性。 + +导致可见性的原因是缓存,导致有序性的原因是编译优化,那最直接的解决方式,就是按需禁用缓存与编译优化。Java 内存模型可以理解为:规范了 JVM 如何提供按需禁用缓存和编译优化的方法。 + +具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及七项 Happens-Before 规则。 + +#### Happens-Before 规则 + +Happens-Before 规则最初是在一篇叫做 **Time, Clocks, and the Ordering of Events in a Distributed System** 的论文中提出来的,在这篇论文中,Happens-Before 的语义是一种因果关系。在现实世界里,如果 A 事件是导致 B 事件的起因,那么 A 事件一定是先于(Happens-Before)B 事件发生的,这个就是 Happens-Before 语义的现实理解。 + +Happens-Before 真正要表达的是:**前面一个操作的结果对后续操作是可见的。** Happens-Before 约束了编译器的优化行为,虽允许编译器优化,但是要求编译器优化后一定遵守 Happens-Before 规则。 + +1. **程序次序规则** : 指在一个线程中,按照程序顺序,前面的操作 Happens-Before 于后续的任意操作。 + +2. **volatile变量规则** : 对一个 volatile 变量的写操作 Happens-Before 于后续对这个 volatile 变量的读操作。"后面"是指时间上的先后顺序。 + +3. **传递性规则** : 如果 A Happens-Before B,且 B Happens-Before C,那么 A Happens-Before C。 + +3. **管程锁定规则** : 指对一个锁的解锁 Happens-Before 于后续对这个锁的加锁。这里必须强调的是同一个锁,"后面"是指时间上的先后顺序。 + +4. **线程 start() 规则** : 指主线程 A 启动子线程 B 后,子线程 B 能够看到主线程在启动子线程 B 前的操作。换句话说就是,如果线程 A 调用线程 B 的 start() 方法(即在线程 A 中启动线程 B),那么该 start() 操作 Happens-Before 于线程 B 中的任意操作。 + +5. **线程 join() 规则** : 指主线程 A 等待子线程 B 完成(主线程 A 通过调用子线程 B 的 join() 方法实现),当子线程 B 完成后(主线程 A 中 join() 方法返回),主线程能够看到子线程的操作。当然所谓的“看到”,指的是对共享变量的操作。换句话说就是,如果在线程 A 中,调用线程 B 的 join() 并成功返回,那么线程 B 中的任意操作 Happens-Before 于该 join() 操作的返回。 + +6. **线程 interrupt() 规则** : 对线程 interrupt() 方法的调用 Happens-Before 被中断线程的代码检测到中断事件的发生,可以通过 Thread.interrupted() 方法检测到是否有中断发生。 + +7. **对象 finalize() 规则** : 一个对象的初始化完成(构造函数执行结束) Happens-Before 它的 finalize() 方法的开始。 + +**传递性** 是 java 1.5 之后版本对 volatile 语义的增强。 + +假设线程 A 执行 writer() 方法,按照 volatile 语义,会把变量 “v=true” 写入内存;假设线程 B 执行 reader() 方法,同样按照 volatile 语义,线程 B 会从内存中读取变量 v,如果线程 B 看到 “v == true” 时,那么线程 B 看到的变量 x 是多少呢? + +```java +class VolatileExample { + int x = 0; + volatile boolean v = false; + + public void writer() { + x = 42; + v = true; + } + + public void reader() { + if (v == true) { + // 这里x会是多少呢? + } + } + } +``` +在 1.5 之前是不确定的,可能是 0,可能是 42。但是在 1.5 之后: + +1. “x=42” Happens-Before 写变量 “v=true” ,这是规则 1 的内容 +2. 写变量“v=true” Happens-Before 读变量 “v=true”,这是规则 2 的内容 +3. 根据传递性,“x=42” Happens-Before 读变量“v=true” + +如果线程 B 读到了“v=true”,那么线程 A 设置的“x=42”对线程 B 是可见的。也就是说,线程 B 能看到 “x == 42”。 + +**管程** 是一种通用的同步原语,在 Java 中指的就是 synchronized,synchronized 是 Java 里对管程的实现。 + +管程中的锁在 Java 里是隐式实现的,例如下面的代码,在进入同步块之前,会自动加锁,而在代码块执行完会自动释放锁,加锁以及释放锁都是编译器帮我们实现的。 + +```java +synchronized (this) { //此处自动加锁 + // x是共享变量,初始值=10 + if (this.x < 12) { + this.x = 12; + } +} +//此处自动解锁 +``` + +**volatile** 可以解决可见性、排序性,但不能解决原子性。当一个变量被声明为 volatile 时: + ++ 线程在【读取】共享变量时,会先清空本地内存变量值,再从主内存获取最新值 ++ 线程在【写入】共享变量时,不会把值缓存在寄存器或其他地方(就是刚刚说的所谓的「工作内存」),而是会把值刷新回主内存。 + +**synchronized** 和 volatile 在解决可见性问题上是一样的操作: + ++ 【进入】synchronized 块的内存语义是把在 synchronized 块内使用的变量从线程的工作内存中清除,从主内存中读取 ++ 【退出】synchronized 块的内存语义事把在 synchronized 块内对共享变量的修改刷新到主内存中 + +区别是:volatile不保证原子性,而 synchronized 使用悲观锁的方式,保证了原子性。 + +**final** 修饰变量时,初衷是告诉编译器:这个变量生而不变,可以可劲儿优化。在 1.5 之前可能会由于优化过度导致错误,但是 1.5 之后,Java 内存模型对 final 类型变量的重排进行了约束。现在只要我们提供正确构造函数没有“逸出”,就不会出问题了。 \ No newline at end of file diff --git a/second/week_03/81/ReentrantLock.md b/second/week_03/81/ReentrantLock.md new file mode 100644 index 0000000000000000000000000000000000000000..06b2fc013b7f9fd67dce55856b857d881ce6f5f6 --- /dev/null +++ b/second/week_03/81/ReentrantLock.md @@ -0,0 +1,985 @@ +#### 简述 + +```java +public class ReentrantLock extends Object implements Lock, Serializable +``` + +一个可重入互斥锁,具有与使用同步的方法和语句访问隐式监视器锁的方式,相同的基本行为和语义,但具有扩展功能。 + +当另一个线程不拥有该锁时,调用该锁的线程将成功返回该锁。 如果当前线程已经拥有该锁,则该方法将立即返回。 可以使用 isHeldByCurrentThread()和 getHoldCount() 方法进行检查。 + +此类的构造函数接受一个可选的 fairness 参数。 设置为 true 时,在争用下,锁倾向于授予对等待时间最长的线程的访问。否则,此锁不能保证任何特定的访问顺序。使用许多线程访问的公平锁的程序可能会比使用默认的设置(非公平锁)体现较低的总体吞吐量(即较慢,通常要慢得多),但是获取锁并保证缺乏饥饿的时间变化较小。但是请注意,锁的公平性不能保证线程调度的公平性。因此,使用公平锁的许多线程之一可能会连续多次获得它,而其他活动线程没有进行且当前未持有该锁。 还要注意,未定时的tryLock()方法不支持公平性设置。如果锁可用,即使其他线程正在等待,它将成功。 + +建议的做法是始终在调用后使用 try 块进行锁定,最常见的是在构造之前/之后,例如: + +```java +class X { + private final ReentrantLock lock = new ReentrantLock(); + // ... + + public void m() { + lock.lock(); // block until condition holds + try { + // ... method body + } finally { + lock.unlock(); + } + } + } +``` + +除了实现 Lock 接口之外,此类还定义了许多用于检查锁状态的公共方法和受保护方法。 其中一些方法仅对 instrumentation 和 monitoring 有用。 + +此类的序列化与内置锁的行为相同:反序列化的锁处于解锁状态,而不管序列化时的状态如何。 + +此锁通过同一线程最多支持 2147483647 个递归锁。 尝试超过此限制会导致锁定方法引发错误。 + +**ReentrantLock 特点** + ++ 可重入, 一个线程获取独占锁后, 可多次获取, 多次释放(synchronized 也一样, 只是 synchronized 内的代码执行异常后会自动释放到 monitor 上的锁) ++ 支持中断(synchronized 不支持) ++ 支持超时机制, 支持尝试获取 lock, 支持公平与不公平的获取 lock(主要区别在 判断 AQS 中的 Sync Queue 里面是否有其他线程等待获取 lock) ++ 支持调用 Condition 提供的 await(释放 lock, 并等待), signal(将线程节点从 Condition Queue 转移到 Sync Queue 里面) ++ 在运行 synchronized 里面的代码若抛出异常, 则会自动释放监视器上的 lock, 而 ReentrantLock 是需要显示的调用 unlock方法 + +#### 源码分析 + +```java +package java.util.concurrent.locks; +import java.util.concurrent.TimeUnit; +import java.util.Collection; + +public class ReentrantLock implements Lock, java.io.Serializable { + private static final long serialVersionUID = 7373984872572414699L; + /** Synchronizer providing all implementation mechanics */ + //内部类,继承自 AQS + private final Sync sync; + + /** + * Base of synchronization control for this lock. Subclassed + * into fair and nonfair versions below. Uses AQS state to + * represent the number of holds on the lock. + */ + abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = -5179523762034025860L; + + /** + * Performs {@link Lock#lock}. The main reason for subclassing + * is to allow fast path for nonfair version. + */ + //抽象方法,由子类实现,也就是 NonfairSync(非公平获取锁) 和 FairSync(公平获取锁) + abstract void lock(); + + /** + * Performs non-fair tryLock. tryAcquire is implemented in + * subclasses, but both need nonfair try for trylock method. + */ + //非公平获取锁的方法,与公平获取锁的差别就是先用 CAS 设置 State,如果成功了直接抢占锁,否所加入队列。 + //而公平锁,会先查看队列中是否有线程正在等待获取锁,如果有则自己排队,否则才去尝试获取锁 + final boolean nonfairTryAcquire(int acquires) { + //获取当前线程 + final Thread current = Thread.currentThread(); + //获取当前占用锁计数 + int c = getState(); + // == 0 说明当前没有线程占用锁 + if (c == 0) { + if (compareAndSetState(0, acquires)) { //直接 CAS 获取锁 + setExclusiveOwnerThread(current); //获取成功,则直接抢占锁 + return true; + } + } + else if (current == getExclusiveOwnerThread()) { //如果锁已经被线程占用,且这个线程就是自己,那在此重入,增加 state 计数,然后直接返回 + int nextc = c + acquires; //重入锁计数增加 + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); //设置占锁状态 + return true; + } + return false; + } + + //释放锁 + protected final boolean tryRelease(int releases) { + int c = getState() - releases; //计算释放后剩余量 + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + //如果 == 0 说明将锁完全释放 + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); //设置独占锁的线程为 null + } + setState(c); + return free; + } + + //当前线程是否占有锁 + protected final boolean isHeldExclusively() { + // While we must in general read state before owner, + // we don't need to do so to check if current thread is owner + return getExclusiveOwnerThread() == Thread.currentThread(); + } + + final ConditionObject newCondition() { + return new ConditionObject(); + } + + // Methods relayed from outer class 外部类依赖的方法 + //获取占有锁的线程 + final Thread getOwner() { + return getState() == 0 ? null : getExclusiveOwnerThread(); + } + + //获取锁 state 计数 + final int getHoldCount() { + return isHeldExclusively() ? getState() : 0; + } + + final boolean isLocked() { + return getState() != 0; + } + + /** + * Reconstitutes the instance from a stream (that is, deserializes it). + */ + //用于反序列化,并解除锁定状态 + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + setState(0); // reset to unlocked state + } + } + + /** + * Sync object for non-fair locks + */ + //非公平占锁实现 + static final class NonfairSync extends Sync { + private static final long serialVersionUID = 7316153563782823691L; + + /** + * Performs lock. Try immediate barge, backing up to normal + * acquire on failure. + */ + final void lock() { + if (compareAndSetState(0, 1)) //先直接抢占锁,如果成功,则获取锁 + setExclusiveOwnerThread(Thread.currentThread()); + else + acquire(1); //否则进入队列 + } + + protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); + } + } + + /** + * Sync object for fair locks + */ + //公平占锁实现 + static final class FairSync extends Sync { + private static final long serialVersionUID = -3000897897090466540L; + + final void lock() { //直接进队列 + acquire(1); + } + + /** + * Fair version of tryAcquire. Don't grant access unless + * recursive call or no waiters or is first. + */ + //尝试占锁 + protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (!hasQueuedPredecessors() && //会先判断队列中,是否有其他线程正在等待锁资源 + compareAndSetState(0, acquires)) { //如果没有其他线程等待锁,则尝试 CAS 占锁 + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { //如果是当前线程已经占有锁,则重入后直接返回 + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } + } + + /** + * Creates an instance of {@code ReentrantLock}. + * This is equivalent to using {@code ReentrantLock(false)}. + */ + //默认构造函数,默认使用非公平模式 + public ReentrantLock() { + sync = new NonfairSync(); + } + + /** + * Creates an instance of {@code ReentrantLock} with the + * given fairness policy. + * + * @param fair {@code true} if this lock should use a fair ordering policy + */ + //使用 fair 参数,来使用公平还是非公平模式 + public ReentrantLock(boolean fair) { + sync = fair ? new FairSync() : new NonfairSync(); + } + + /** + * Acquires the lock. + * + *

Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + * 获取锁。如果没有其他线程持有该锁,则获取该锁并立即返回,将锁保持计数设置为1。 + * + *

If the current thread already holds the lock then the hold + * count is incremented by one and the method returns immediately. + * + * 如果当前线程已经持有该锁,则持有计数将增加一,该方法将立即返回。 + * + *

If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until the lock has been acquired, + * at which time the lock hold count is set to one. + * + * 如果锁是由另一个线程持有的,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到获取了锁为止,此时,锁持有计数被设置为1。 + */ + public void lock() { + sync.lock(); + } + + /** + * Acquires the lock unless the current thread is + * {@linkplain Thread#interrupt interrupted}. + * + *

Acquires the lock if it is not held by another thread and returns + * immediately, setting the lock hold count to one. + * + *

If the current thread already holds this lock then the hold count + * is incremented by one and the method returns immediately. + * + *

If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of two things happens: + * + *

    + * + *
  • The lock is acquired by the current thread; or + * + *
  • Some other thread {@linkplain Thread#interrupt interrupts} the + * current thread. + * + *
+ * + *

If the lock is acquired by the current thread then the lock hold + * count is set to one. + * + *

If the current thread: + * + *

    + * + *
  • has its interrupted status set on entry to this method; or + * + *
  • is {@linkplain Thread#interrupt interrupted} while acquiring + * the lock, + * + *
+ * + * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock. + * + * @throws InterruptedException if the current thread is interrupted + * + * 除非当前线程被中断,否则获取锁。 + * + * 如果没有其他线程持有该锁,则获取该锁并立即返回,将锁保持计数设置为1。 + * + * 如果当前线程已经持有此锁,则持有计数将增加一,并且该方法将立即返回。 + * + * 如果锁是由另一个线程持有的,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一: + * - 该锁由当前线程获取 + * - 其他一些线程中断当前线程 + * + * 如果当前线程获取了锁,则锁保持计数将设置为1。 + * + * 如果当前线程: + * - 在进入此方法时已设置其中断状态 + * - 获取锁时被中断 + * 那么将抛出InterruptedException并清除当前线程的中断状态。 + * + * 在此实现中,由于此方法是显式的中断点,因此优先于对中断的响应而不是正常或可重入的锁获取。 + */ + public void lockInterruptibly() throws InterruptedException { + sync.acquireInterruptibly(1); + } + + /** + * Acquires the lock only if it is not held by another thread at the time + * of invocation. + * + *

Acquires the lock if it is not held by another thread and + * returns immediately with the value {@code true}, setting the + * lock hold count to one. Even when this lock has been set to use a + * fair ordering policy, a call to {@code tryLock()} will + * immediately acquire the lock if it is available, whether or not + * other threads are currently waiting for the lock. + * This "barging" behavior can be useful in certain + * circumstances, even though it breaks fairness. If you want to honor + * the fairness setting for this lock, then use + * {@link #tryLock(long, TimeUnit) tryLock(0, TimeUnit.SECONDS) } + * which is almost equivalent (it also detects interruption). + * + *

If the current thread already holds this lock then the hold + * count is incremented by one and the method returns {@code true}. + * + *

If the lock is held by another thread then this method will return + * immediately with the value {@code false}. + * + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} otherwise + * + * 仅当调用时另一个线程未持有该锁时才获取该锁。 + * + * 如果没有其他线程持有该锁,则获取该锁,并立即返回 true 值,将锁保持计数设置为1。 + * 即使已将此锁设置为使用公平的排序策略,对 tryLock() 的调用也会立即获取该锁(如果有),无论当前是否有其他线程在等待该锁。 + * 即使破坏公平性,这种“讨价还价”的行为在某些情况下还是有用的。 + * 如果要遵守此锁的公平性设置,请使用几乎等效的 tryLock(0,TimeUnit.SECONDS)(它还会检测到中断)。 + * + * 如果当前线程已经持有此锁,则持有计数将增加一,并且该方法返回true。 + * + * 如果锁由另一个线程持有,则此方法将立即返回false值。 + */ + public boolean tryLock() { + return sync.nonfairTryAcquire(1); + } + + /** + * Acquires the lock if it is not held by another thread within the given + * waiting time and the current thread has not been + * {@linkplain Thread#interrupt interrupted}. + * + *

Acquires the lock if it is not held by another thread and returns + * immediately with the value {@code true}, setting the lock hold count + * to one. If this lock has been set to use a fair ordering policy then + * an available lock will not be acquired if any other threads + * are waiting for the lock. This is in contrast to the {@link #tryLock()} + * method. If you want a timed {@code tryLock} that does permit barging on + * a fair lock then combine the timed and un-timed forms together: + * + *

 {@code
+     * if (lock.tryLock() ||
+     *     lock.tryLock(timeout, unit)) {
+     *   ...
+     * }}
+ * + *

If the current thread + * already holds this lock then the hold count is incremented by one and + * the method returns {@code true}. + * + *

If the lock is held by another thread then the + * current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happens: + * + *

    + * + *
  • The lock is acquired by the current thread; or + * + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + * + *
  • The specified waiting time elapses + * + *
+ * + *

If the lock is acquired then the value {@code true} is returned and + * the lock hold count is set to one. + * + *

If the current thread: + * + *

    + * + *
  • has its interrupted status set on entry to this method; or + * + *
  • is {@linkplain Thread#interrupt interrupted} while + * acquiring the lock, + * + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + *

In this implementation, as this method is an explicit + * interruption point, preference is given to responding to the + * interrupt over normal or reentrant acquisition of the lock, and + * over reporting the elapse of the waiting time. + * + * @param timeout the time to wait for the lock + * @param unit the time unit of the timeout argument + * @return {@code true} if the lock was free and was acquired by the + * current thread, or the lock was already held by the current + * thread; and {@code false} if the waiting time elapsed before + * the lock could be acquired + * @throws InterruptedException if the current thread is interrupted + * @throws NullPointerException if the time unit is null + * + * 如果在给定的等待时间内另一个线程未持有该锁并且当前线程尚未中断,则获取该锁。 + * + * 如果没有其他线程持有该锁,则获取该锁,并立即返回 true 值,将锁保持计数设置为 1。 + * 如果已将此锁设置为使用公平的排序策略,则如果任何其他线程正在等待该锁,则不会获取可用锁。这与 tryLock() 方法相反。 + * 如果您想要定时的 tryLock 确实允许对公平锁进行插入,则将定时和非定时形式组合在一起: + * if (lock.tryLock() || + * lock.tryLock(timeout, unit)) { + * ... + * } + * + * 如果当前线程已经持有此锁,则持有计数将增加一,并且该方法返回true。 + * + * 如果该锁由另一个线程持有,则出于线程调度目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一: + * - 该锁由当前线程获取 + * - 其他一些线程中断当前线程 + * - 经过指定的等待时间 + * + * 如果获取了锁,则返回 true 值,并将锁保持计数设置为 1。 + * + * 如果当前线程: + * - 在进入此方法时已设置其中断状态 + * - 获取锁时被中断 + * 那么将抛出 InterruptedException 并清除当前线程的中断状态。 + * + * 如果经过了指定的等待时间,则返回值 false。 如果时间小于或等于零,则该方法将完全不等待。 + * + * 在此实现中,由于此方法是显式的中断点,因此优先于对中断的响应而不是正常或可重入的锁定获取,而是优先报告等待时间的流逝。 + */ + public boolean tryLock(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireNanos(1, unit.toNanos(timeout)); + } + + /** + * Attempts to release this lock. + * + *

If the current thread is the holder of this lock then the hold + * count is decremented. If the hold count is now zero then the lock + * is released. If the current thread is not the holder of this + * lock then {@link IllegalMonitorStateException} is thrown. + * + * @throws IllegalMonitorStateException if the current thread does not + * hold this lock + * + * 尝试释放此锁。 + * + * 如果当前线程是此锁的持有者,则保留计数将减少。 如果保持计数现在为零,则释放锁定。 如果当前线程不是此锁的持有者,则抛出 IllegalMonitorStateException。 + */ + public void unlock() { + sync.release(1); + } + + /** + * Returns a {@link Condition} instance for use with this + * {@link Lock} instance. + * + *

The returned {@link Condition} instance supports the same + * usages as do the {@link Object} monitor methods ({@link + * Object#wait() wait}, {@link Object#notify notify}, and {@link + * Object#notifyAll notifyAll}) when used with the built-in + * monitor lock. + * + *

    + * + *
  • If this lock is not held when any of the {@link Condition} + * {@linkplain Condition#await() waiting} or {@linkplain + * Condition#signal signalling} methods are called, then an {@link + * IllegalMonitorStateException} is thrown. + * + *
  • When the condition {@linkplain Condition#await() waiting} + * methods are called the lock is released and, before they + * return, the lock is reacquired and the lock hold count restored + * to what it was when the method was called. + * + *
  • If a thread is {@linkplain Thread#interrupt interrupted} + * while waiting then the wait will terminate, an {@link + * InterruptedException} will be thrown, and the thread's + * interrupted status will be cleared. + * + *
  • Waiting threads are signalled in FIFO order. + * + *
  • The ordering of lock reacquisition for threads returning + * from waiting methods is the same as for threads initially + * acquiring the lock, which is in the default case not specified, + * but for fair locks favors those threads that have been + * waiting the longest. + * + *
+ * + * @return the Condition object + * + * 返回用于此 Lock 实例的 Condition 实例。 + * + * 当与内置监视器锁定一起使用时,返回的 Condition 实例支持与 Object 监视器方法(wait,notify 和 notifyAll)相同的用法。 + * + * - 如果在调用任何 Condition 的 wait 或 signal 方法时未保持此锁,则将抛出IllegalMonitorStateException。 + * - 当调用 Condition wait 方法时,将释放锁,并在锁返回之前,重新获取该锁,并将锁保持计数恢复到调用该方法时的状态。 + * - 如果线程在等待时被中断,则等待将终止,将抛出 InterruptedException,并清除线程的中断状态。 + * - 等待线程以 FIFO 顺序发出信号。 + * - 从等待方法返回的线程的锁重新获取顺序与最初获取锁的线程的顺序相同(默认情况下未指定),但对于公平锁,优先使用等待时间最长的线程。 + */ + public Condition newCondition() { + return sync.newCondition(); + } + + /** + * Queries the number of holds on this lock by the current thread. + * + *

A thread has a hold on a lock for each lock action that is not + * matched by an unlock action. + * + *

The hold count information is typically only used for testing and + * debugging purposes. For example, if a certain section of code should + * not be entered with the lock already held then we can assert that + * fact: + * + *

 {@code
+     * class X {
+     *   ReentrantLock lock = new ReentrantLock();
+     *   // ...
+     *   public void m() {
+     *     assert lock.getHoldCount() == 0;
+     *     lock.lock();
+     *     try {
+     *       // ... method body
+     *     } finally {
+     *       lock.unlock();
+     *     }
+     *   }
+     * }}
+ * + * @return the number of holds on this lock by the current thread, + * or zero if this lock is not held by the current thread + * + * 查询当前线程对该锁的保持次数。 + * + * 对于每个未与解锁动作匹配的锁定动作,线程都会拥有一个锁。 + * + * 保留计数信息通常仅用于测试和调试目的。 例如,如果不应在已锁定的状态下输入特定的代码段,则可以断言以下事实: + * class X { + * ReentrantLock lock = new ReentrantLock(); + * // ... + * public void m() { + * assert lock.getHoldCount() == 0; + * lock.lock(); + * try { + * // ... method body + * } finally { + * lock.unlock(); + * } + * } + * } + */ + public int getHoldCount() { + return sync.getHoldCount(); + } + + /** + * Queries if this lock is held by the current thread. + * + *

Analogous to the {@link Thread#holdsLock(Object)} method for + * built-in monitor locks, this method is typically used for + * debugging and testing. For example, a method that should only be + * called while a lock is held can assert that this is the case: + * + *

 {@code
+     * class X {
+     *   ReentrantLock lock = new ReentrantLock();
+     *   // ...
+     *
+     *   public void m() {
+     *       assert lock.isHeldByCurrentThread();
+     *       // ... method body
+     *   }
+     * }}
+ * + *

It can also be used to ensure that a reentrant lock is used + * in a non-reentrant manner, for example: + * + *

 {@code
+     * class X {
+     *   ReentrantLock lock = new ReentrantLock();
+     *   // ...
+     *
+     *   public void m() {
+     *       assert !lock.isHeldByCurrentThread();
+     *       lock.lock();
+     *       try {
+     *           // ... method body
+     *       } finally {
+     *           lock.unlock();
+     *       }
+     *   }
+     * }}
+ * + * @return {@code true} if current thread holds this lock and + * {@code false} otherwise + * + * 查询此锁是否由当前线程持有。 + * + * 与内置监视器锁的 Thread.holdsLock(Object) 方法类似,此方法通常用于调试和测试。 例如,仅在持有锁的情况下才应调用的方法可以断言是这种情况: + * class X { + * ReentrantLock lock = new ReentrantLock(); + * // ... + * + * public void m() { + * assert lock.isHeldByCurrentThread(); + * // ... method body + * } + * } + * + * 它还可以用于确保以非可重入方式使用可重入锁,例如: + * class X { + * ReentrantLock lock = new ReentrantLock(); + * // ... + * + * public void m() { + * assert !lock.isHeldByCurrentThread(); + * lock.lock(); + * try { + * // ... method body + * } finally { + * lock.unlock(); + * } + * } + * } + */ + public boolean isHeldByCurrentThread() { + return sync.isHeldExclusively(); + } + + /** + * Queries if this lock is held by any thread. This method is + * designed for use in monitoring of the system state, + * not for synchronization control. + * + * 查询此锁是否由任何线程持有。 此方法设计用于监视系统状态,而不用于同步控制。 + * + * @return {@code true} if any thread holds this lock and + * {@code false} otherwise + */ + public boolean isLocked() { + return sync.isLocked(); + } + + /** + * Returns {@code true} if this lock has fairness set true. + * + * 如果此锁的公平性设置为true,则返回true。 + * + * @return {@code true} if this lock has fairness set true + */ + public final boolean isFair() { + return sync instanceof FairSync; + } + + /** + * Returns the thread that currently owns this lock, or + * {@code null} if not owned. When this method is called by a + * thread that is not the owner, the return value reflects a + * best-effort approximation of current lock status. For example, + * the owner may be momentarily {@code null} even if there are + * threads trying to acquire the lock but have not yet done so. + * This method is designed to facilitate construction of + * subclasses that provide more extensive lock monitoring + * facilities. + * + * 返回当前拥有此锁的线程;如果不拥有,则返回null。 + * 当非所有者的线程调用此方法时,返回值反映当前锁定状态的尽力而为近似。 + * 例如,即使有线程试图获取锁,但所有者尚未拥有,所有者可能暂时为空。 + * 设计此方法是为了便于构造提供更广泛的锁监视功能的子类。 + * + * @return the owner, or {@code null} if not owned + */ + protected Thread getOwner() { + return sync.getOwner(); + } + + /** + * Queries whether any threads are waiting to acquire this lock. Note that + * because cancellations may occur at any time, a {@code true} + * return does not guarantee that any other thread will ever + * acquire this lock. This method is designed primarily for use in + * monitoring of the system state. + * + * 查询是否有任何线程正在等待获取此锁。 + * 请注意,由于取消可能随时发生,因此返回 true 不能保证任何其他线程都将获得此锁。 + * 此方法主要设计用于监视系统状态。 + * + * @return {@code true} if there may be other threads waiting to + * acquire the lock + */ + public final boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Queries whether the given thread is waiting to acquire this + * lock. Note that because cancellations may occur at any time, a + * {@code true} return does not guarantee that this thread + * will ever acquire this lock. This method is designed primarily for use + * in monitoring of the system state. + * + * 查询给定线程是否正在等待获取此锁。 + * 请注意,由于取消可能随时发生,因此返回 true 不能保证此线程将获得此锁。 + * 此方法主要设计用于监视系统状态。 + * + * @param thread the thread + * @return {@code true} if the given thread is queued waiting for this lock + * @throws NullPointerException if the thread is null + */ + public final boolean hasQueuedThread(Thread thread) { + return sync.isQueued(thread); + } + + /** + * Returns an estimate of the number of threads waiting to + * acquire this lock. The value is only an estimate because the number of + * threads may change dynamically while this method traverses + * internal data structures. This method is designed for use in + * monitoring of the system state, not for synchronization + * control. + * + * 返回等待获取此锁的线程数的估计值。 + * 该值只是一个估计值,因为在此方法遍历内部数据结构时,线程数可能会动态变化。 + * 此方法设计用于监视系统状态,而不用于同步控制。 + * + * @return the estimated number of threads waiting for this lock + */ + public final int getQueueLength() { + return sync.getQueueLength(); + } + + /** + * Returns a collection containing threads that may be waiting to + * acquire this lock. Because the actual set of threads may change + * dynamically while constructing this result, the returned + * collection is only a best-effort estimate. The elements of the + * returned collection are in no particular order. This method is + * designed to facilitate construction of subclasses that provide + * more extensive monitoring facilities. + * + * 返回一个包含可能正在等待获取此锁的线程的集合。 + * 因为实际的线程集在构造此结果时可能会动态变化,所以返回的集合只是尽力而为的估计。 + * 返回的集合的元素没有特定的顺序。 + * 设计此方法是为了便于构造子类,以提供更广泛的监视功能。 + * + * @return the collection of threads + */ + protected Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } + + /** + * Queries whether any threads are waiting on the given condition + * associated with this lock. Note that because timeouts and + * interrupts may occur at any time, a {@code true} return does + * not guarantee that a future {@code signal} will awaken any + * threads. This method is designed primarily for use in + * monitoring of the system state. + * + * 查询是否有任何线程正在等待与此锁关联的给定条件。 + * 请注意,由于超时和中断可能随时发生,因此真正的返回并不保证将来的信号会唤醒任何线程。 + * 此方法主要设计用于监视系统状态。 + * + * @param condition the condition + * @return {@code true} if there are any waiting threads + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + public boolean hasWaiters(Condition condition) { + if (condition == null) + throw new NullPointerException(); + if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) + throw new IllegalArgumentException("not owner"); + return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); + } + + /** + * Returns an estimate of the number of threads waiting on the + * given condition associated with this lock. Note that because + * timeouts and interrupts may occur at any time, the estimate + * serves only as an upper bound on the actual number of waiters. + * This method is designed for use in monitoring of the system + * state, not for synchronization control. + * + * 返回等待与此锁关联的给定条件的线程数的估计值。 + * 请注意,由于超时和中断可能随时发生,因此估算值仅用作实际侍者数的上限。 + * 此方法设计用于监视系统状态,而不用于同步控制。 + * + * @param condition the condition + * @return the estimated number of waiting threads + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + public int getWaitQueueLength(Condition condition) { + if (condition == null) + throw new NullPointerException(); + if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) + throw new IllegalArgumentException("not owner"); + return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); + } + + /** + * Returns a collection containing those threads that may be + * waiting on the given condition associated with this lock. + * Because the actual set of threads may change dynamically while + * constructing this result, the returned collection is only a + * best-effort estimate. The elements of the returned collection + * are in no particular order. This method is designed to + * facilitate construction of subclasses that provide more + * extensive condition monitoring facilities. + * + * 返回一个包含那些可能正在等待与此锁相关的给定条件的线程的集合。 + * 因为实际的线程集在构造此结果时可能会动态变化,所以返回的集合只是尽力而为的估计。 + * 返回的集合的元素没有特定的顺序。 + * 设计此方法是为了便于构造提供更广泛的状态监视工具的子类。 + * + * @param condition the condition + * @return the collection of threads + * @throws IllegalMonitorStateException if this lock is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this lock + * @throws NullPointerException if the condition is null + */ + protected Collection getWaitingThreads(Condition condition) { + if (condition == null) + throw new NullPointerException(); + if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) + throw new IllegalArgumentException("not owner"); + return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); + } + + /** + * Returns a string identifying this lock, as well as its lock state. + * The state, in brackets, includes either the String {@code "Unlocked"} + * or the String {@code "Locked by"} followed by the + * {@linkplain Thread#getName name} of the owning thread. + * + * 返回标识此锁定及其锁定状态的字符串。 + * 括号中的状态包括字符串"Unlocked"或字符串"Locked by",后跟拥有线程的名称。 + * + * @return a string identifying this lock, as well as its lock state + */ + public String toString() { + Thread o = sync.getOwner(); + return super.toString() + ((o == null) ? + "[Unlocked]" : + "[Locked by thread " + o.getName() + "]"); + } +} +``` + +#### demo 示例 + +```java +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class OtherTest { + private static final Logger logger = LoggerFactory.getLogger(OtherTest.class); + static final Lock lock = new ReentrantLock(); + static final Condition condition = lock.newCondition(); + + public static void main(String[] args) throws InterruptedException { + final Thread t1 = new Thread("Thread 1") { + @Override + public void run() { + lock.lock(); + logger.info(Thread.currentThread().getName() + " 正在运行 ....."); + try { + Thread.sleep(2 * 1000); + logger.info(Thread.currentThread().getName() + " 停止运行, 等待一个 signal "); + condition.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + logger.info(Thread.currentThread().getName() + " 获取一个 signal, 继续执行 "); + lock.unlock(); + } + }; + t1.start(); + Thread.sleep(1000); + + final Thread t2 = new Thread("Thread 2") { + @Override + public void run() { + lock.lock(); + logger.info(Thread.currentThread().getName() + " 正在运行 ....."); + t1.interrupt(); + try { + Thread.sleep(2 * 1000); + } catch (Exception e) { + // do nothing + } + logger.info(Thread.currentThread().getName() + " 发送一个 signal "); + condition.signal(); + logger.info(Thread.currentThread().getName() + " 发送 signal 结束"); + lock.unlock(); + } + }; + t2.start(); + + while (Thread.activeCount() > 1) { + Thread.yield(); + } + } +} +``` + +执行步骤: + +1. 线程 1 开始执行, 获取 lock, 然后开始睡眠 2 秒 +2. 当线程 1 睡眠到 1秒时, 线程 开始执行, 但是 lock 被线程1获取, 所以等待 +3. 线程 1 睡足 2 秒 调用 condition.await() 进行锁的释放, 并且将线程 1 封装成一个 node 放到 condition 的 Condition Queue里面, 等待其他获取锁的线程给他 signal, 或对其进行中断(中断后可以到 Sync Queue里面进而获取锁) +4. 线程 2 获取锁成功, 中断线程 1, 线程被中断后, node 从 Condition Queue 转移到 Sync Queue 里面, 但是 lock 还是被线程 2 获取者, 所以 node 呆在 Sync Queue 里面等待获取 lock +5. 线程 2 睡了 2 秒, 开始用 signal 唤醒 Condition Queue 里面的节点(此时代表线程 1 的 node 已经到 Sync Queue 里面) +6. 线程 2 释放lock, 并且在 Sync Queue 里面进行唤醒等待获取锁的节点 node +7. 线程 1 得到唤醒, 获取锁 +8. 线程 1 释放锁 + +执行结果 + +``` +Connected to the target VM, address: '127.0.0.1:54489', transport: 'socket' +11:33:39.005 [Thread 1] INFO com.OtherTest - Thread 1 正在运行 ..... +11:33:41.012 [Thread 1] INFO com.OtherTest - Thread 1 停止运行, 等待一个 signal +11:33:41.012 [Thread 2] INFO com.OtherTest - Thread 2 正在运行 ..... +11:33:43.016 [Thread 2] INFO com.OtherTest - Thread 2 发送一个 signal +11:33:43.016 [Thread 2] INFO com.OtherTest - Thread 2 发送 signal 结束 +11:33:43.018 [Thread 1] INFO com.OtherTest - Thread 1 获取一个 signal, 继续执行 +java.lang.InterruptedException + at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) + at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048) + at com.OtherTest$1.run(OtherTest.java:24) +Disconnected from the target VM, address: '127.0.0.1:54489', transport: 'socket' + +Process finished with exit code 0 +``` \ No newline at end of file diff --git a/second/week_03/81/Semaphore.md b/second/week_03/81/Semaphore.md new file mode 100644 index 0000000000000000000000000000000000000000..75dc2bfe1020134f9d488c2709a6ffea52a68661 --- /dev/null +++ b/second/week_03/81/Semaphore.md @@ -0,0 +1,846 @@ +#### 简述 + +```java +public class Semaphore implements java.io.Serializable +``` + +Semaphore 实现为一种基于计数的信号量,Semaphore管理着一组虚拟的许可集合,这种许可可以作为某种凭证,来管理资源,在一些资源有限的场景下很有实用性,比如数据库连接,应用可初始化一组数据库连接,然后通过使用Semaphore来管理获取连接的许可,任何线程想要获得一个连接必须首先获得一个许可,然后再凭这个许可获得一个连接,这个许可将持续到这个线程归还了连接。在使用上,任何一个线程都需要通过acquire来获得一个Semaphore许可,这个操作可能会阻塞线程直到成功获得一个许可,因为资源是有限的,所以许可也是有限的,没有获得资源就需要阻塞等待其他线程归还Semaphore,而归还Semaphore操作通过release方法来进行,release会唤醒一个等待在Semaphore上的一个线程来尝试获得许可。如果想要达到一种互斥的效果,比如任何时刻只能有一个线程获得许可,那么就可以初始化Semaphore的数量为1,一个线程获得这个Semaphore之后,任何到来的通过acquire来尝试获得许可的线程都会被阻塞直到这个持有Semaphore的线程调用了release方法来释放Semaphore。 + +通常,用于控制资源访问的信号量应初始化为公平,以确保没有线程因访问资源而挨饿。 当使用信号量进行其他类型的同步控制时,非公平排序的吞吐量优势通常会超过公平考虑。 + +此类还提供方便的方法来一次获取和释放多个许可证。 当在不公平的情况下使用这些方法时,请注意无限期推迟的风险增加。 + +内存一致性影响:在调用"release"方法(例如release())之前,线程中的操作发生在另一个线程中成功执行"acquire"方法(例如acquire())之后的操作之前。 + +#### 源码分析 + +```java +package java.util.concurrent; +import java.util.Collection; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +public class Semaphore implements java.io.Serializable { + private static final long serialVersionUID = -3222578661600680210L; + /** All mechanics via AbstractQueuedSynchronizer subclass */ + private final Sync sync; + + /** + * Synchronization implementation for semaphore. Uses AQS state + * to represent permits. Subclassed into fair and nonfair + * versions. + */ + abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 1192457210091910933L; + + //初始化信号量总数量。也就是一共有多少 state 可用于获取 + Sync(int permits) { + setState(permits); + } + + //获取当前剩余信号量数量 + final int getPermits() { + return getState(); + } + + //非公平共享模式 + final int nonfairTryAcquireShared(int acquires) { + //自旋获取指定数量的资源 + for (;;) { + int available = getState(); //获取当前剩余资源量 + int remaining = available - acquires; //减去准备获取后,剩下多少 + if (remaining < 0 || + compareAndSetState(available, remaining)) //如果剩余 > 0,则 CAS state + return remaining; //如果返回 < 0,则会进入等待队列,否则获取成功 + } + } + + //释放资源 + protected final boolean tryReleaseShared(int releases) { + //自旋释放指定数量的资源 + for (;;) { + int current = getState(); //获取当前剩余资源量 + int next = current + releases; //加上要释放的资源 + if (next < current) // overflow + throw new Error("Maximum permit count exceeded"); + if (compareAndSetState(current, next)) //CAS 设置 state + return true; + } + } + + //减少指定数量的资源 + final void reducePermits(int reductions) { + for (;;) { + int current = getState(); //获取当前剩余资源量 + int next = current - reductions; //减去指定数量的资源 + if (next > current) // underflow + throw new Error("Permit count underflow"); + if (compareAndSetState(current, next)) //CAS 设置 state + return; + } + } + + //一次性获取所有资源 + final int drainPermits() { + for (;;) { + int current = getState(); + if (current == 0 || compareAndSetState(current, 0)) //如果剩余资源为 0,则直接返回,否则设置 CAS state 为 0,表示获取所有资源 + return current; + } + } + } + + /** + * NonFair version + */ + //非公平模式 + static final class NonfairSync extends Sync { + private static final long serialVersionUID = -2694183684443567898L; + + NonfairSync(int permits) { + super(permits); + } + + //尝试获取资源 + protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); + } + } + + /** + * Fair version + */ + //公平模式 + static final class FairSync extends Sync { + private static final long serialVersionUID = 2014338818796000944L; + + FairSync(int permits) { + super(permits); + } + + //尝试获取资源 + protected int tryAcquireShared(int acquires) { + for (;;) { + if (hasQueuedPredecessors()) //由于是公平模式,所以先判断是否队列中,有线程在排队等候资源 + return -1; + int available = getState(); //获取当前资源数 + int remaining = available - acquires; //减去要获取的资源数 + if (remaining < 0 || + compareAndSetState(available, remaining)) //CAS 设置 state + return remaining; //如果剩余资源为负数,则进入队列 + } + } + } + + /** + * Creates a {@code Semaphore} with the given number of + * permits and nonfair fairness setting. + * + * @param permits the initial number of permits available. + * This value may be negative, in which case releases + * must occur before any acquires will be granted. + */ + //默认初始化使用非公平模式,通常情况下首选公平模式 + public Semaphore(int permits) { + sync = new NonfairSync(permits); + } + + /** + * Creates a {@code Semaphore} with the given number of + * permits and the given fairness setting. + * + * @param permits the initial number of permits available. + * This value may be negative, in which case releases + * must occur before any acquires will be granted. + * @param fair {@code true} if this semaphore will guarantee + * first-in first-out granting of permits under contention, + * else {@code false} + */ + //参数设置使用公平模式还是非公平模式 + public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); + } + + /** + * Acquires a permit from this semaphore, blocking until one is + * available, or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + *
+ * + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@linkplain Thread#interrupt interrupted} while waiting + * for a permit, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + * @throws InterruptedException if the current thread is interrupted + * + * 从此信号量获取许可,阻塞直到可用,否则线程被中断。 + * + * 如果有许可证,则获取许可证并立即返回,从而将可用许可证的数量减少一个。 + * + * 如果没有可用的许可,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一: + * - 其他一些线程为此信号量调用 release() 方法,接下来将为当前线程分配一个许可 + * - 其他一些线程中断当前线程 + * + * 如果当前线程 + * - 在进入此方法时已设置其中断状态 + * -在等待许可证时被打断 + * 那么将抛出 InterruptedException 并清除当前线程的中断状态。 + */ + //默认使用响应中断的共享模式 + public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + /** + * Acquires a permit from this semaphore, blocking until one is + * available. + * + *

Acquires a permit, if one is available and returns immediately, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit. + * + *

If the current thread is {@linkplain Thread#interrupt interrupted} + * while waiting for a permit then it will continue to wait, but the + * time at which the thread is assigned a permit may change compared to + * the time it would have received the permit had no interruption + * occurred. When the thread does return from this method its interrupt + * status will be set. + * + * 从此信号量获取许可,直到可用为止。 + * + * 如果有许可证,则获取许可证并立即返回,从而将可用许可证的数量减少一个。 + * + * 如果没有可用的许可,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到其他某个线程为此信号量调用release()方法,然后为当前线程分配许可。 + * + * 如果当前线程在等待许可时被中断,则它将继续等待,但是与没有发生中断的情况下获得许可的时间相比,为线程分配许可的时间可能会发生变化。 + * 当线程确实从该方法返回时,将设置其中断状态。 + */ + public void acquireUninterruptibly() { + sync.acquireShared(1); + } + + /** + * Acquires a permit from this semaphore, only if one is available at the + * time of invocation. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then this method will return + * immediately with the value {@code false}. + * + *

Even when this semaphore has been set to use a + * fair ordering policy, a call to {@code tryAcquire()} will + * immediately acquire a permit if one is available, whether or not + * other threads are currently waiting. + * This "barging" behavior can be useful in certain + * circumstances, even though it breaks fairness. If you want to honor + * the fairness setting, then use + * {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) } + * which is almost equivalent (it also detects interruption). + * + * @return {@code true} if a permit was acquired and {@code false} + * otherwise + * + * 仅在调用时可用时,才从此信号量获取许可。 + * + * 如果有许可证,则获取许可证,然后立即返回,值为true,将可用许可证的数量减少一个。 + * + * 如果没有可用的许可,则此方法将立即返回false值。 + * + * 即使已将此信号量设置为使用公平的排序策略,如果有可用的调用 tryAcquire() 也会立即获得许可,无论当前是否正在等待其他线程。 + * 即使破坏公平性,这种“讨价还价”的行为在某些情况下还是有用的。 + * 如果要遵守公平性设置,请使用几乎等效的 tryAcquire(0,TimeUnit.SECONDS)(它还会检测到中断)。 + */ + public boolean tryAcquire() { + return sync.nonfairTryAcquireShared(1) >= 0; + } + + /** + * Acquires a permit from this semaphore, if one becomes available + * within the given waiting time and the current thread has not + * been {@linkplain Thread#interrupt interrupted}. + * + *

Acquires a permit, if one is available and returns immediately, + * with the value {@code true}, + * reducing the number of available permits by one. + * + *

If no permit is available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of three things happens: + *

    + *
  • Some other thread invokes the {@link #release} method for this + * semaphore and the current thread is next to be assigned a permit; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + *
  • The specified waiting time elapses. + *
+ * + *

If a permit is acquired then the value {@code true} is returned. + * + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@linkplain Thread#interrupt interrupted} while waiting + * to acquire a permit, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. + * + * @param timeout the maximum time to wait for a permit + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if a permit was acquired and {@code false} + * if the waiting time elapsed before a permit was acquired + * @throws InterruptedException if the current thread is interrupted + * + * 如果在给定的等待时间内可用,并且当前线程未中断,则从此信号量获取许可。 + * + * 如果有许可证,则获取许可证,然后立即返回,值为true,将可用许可证的数量减少一个。 + * + * 如果没有可用的许可,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一: + * - 其他一些线程为此信号量调用 release() 方法,接下来将为当前线程分配一个许可 + * - 其他一些线程中断当前线程 + * - 经过指定的等待时间 + * + * 如果获得许可,则返回true值。 + * + * 如果当前线程 + * - 在进入此方法时已设置其中断状态 + * - 在等待获得许可时被打断 + * 那么将抛出InterruptedException并清除当前线程的中断状态。 + * + * 如果经过了指定的等待时间,则返回值false。 如果时间小于或等于零,则该方法将完全不等待。 + */ + public boolean tryAcquire(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + /** + * Releases a permit, returning it to the semaphore. + * + *

Releases a permit, increasing the number of available permits by + * one. If any threads are trying to acquire a permit, then one is + * selected and given the permit that was just released. That thread + * is (re)enabled for thread scheduling purposes. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link #acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * 释放许可证,将其返回到信号灯。 + * + * 发放许可证,将可用许可证数量增加一个。 + * 如果有任何线程试图获取许可,则选择一个线程并授予刚刚释放的许可。 + * 出于线程调度目的而启用(重新)该线程。 + * + * 不要求释放许可证的线程必须通过调用 acquire 获取该许可证。 + * 通过在应用程序中编程约定,可以正确使用信号量。 + */ + public void release() { + sync.releaseShared(1); + } + + /** + * Acquires the given number of permits from this semaphore, + * blocking until all are available, + * or the thread is {@linkplain Thread#interrupt interrupted}. + * + *

Acquires the given number of permits, if they are available, + * and returns immediately, reducing the number of available permits + * by the given amount. + * + *

If insufficient permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * one of two things happens: + *

    + *
  • Some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread. + *
+ * + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@linkplain Thread#interrupt interrupted} while waiting + * for a permit, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * Any permits that were to be assigned to this thread are instead + * assigned to other threads trying to acquire permits, as if + * permits had been made available by a call to {@link #release()}. + * + * @param permits the number of permits to acquire + * @throws InterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if {@code permits} is negative + * + * 从该信号量获取给定数量的许可,阻塞直到所有可用,否则线程被中断。 + * + * 如果有可用许可证,则获取给定数量的许可证,然后立即返回,从而将可用许可证的数量减少给定数量。 + * + * 如果没有足够的许可,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下两种情况之一: + * - 其他一些线程为此信号量调用一种释放方法,接下来将为当前线程分配许可,并且可用许可的数量满足此请求 + * - 其他一些线程中断当前线程 + * + * 如果当前线程 + * - 在进入此方法时已设置其中断状态 + * - 在等待许可证时被打断 + * 那么将抛出 InterruptedException 并清除当前线程的中断状态。 + * 相反,将要分配给该线程的所有许可都分配给其他尝试获取许可的线程,就好像通过调用release()使许可可用。 + */ + public void acquire(int permits) throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireSharedInterruptibly(permits); + } + + /** + * Acquires the given number of permits from this semaphore, + * blocking until all are available. + * + *

Acquires the given number of permits, if they are available, + * and returns immediately, reducing the number of available permits + * by the given amount. + * + *

If insufficient permits are available then the current thread becomes + * disabled for thread scheduling purposes and lies dormant until + * some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request. + * + *

If the current thread is {@linkplain Thread#interrupt interrupted} + * while waiting for permits then it will continue to wait and its + * position in the queue is not affected. When the thread does return + * from this method its interrupt status will be set. + * + * @param permits the number of permits to acquire + * @throws IllegalArgumentException if {@code permits} is negative + * + * 从此信号量获取给定数量的许可,直到所有条件都可用为止都将阻塞。 + * + * 如果有可用许可证,则获取给定数量的许可证,然后立即返回,从而将可用许可证的数量减少给定数量。 + * + * 如果没有足够的许可,则当前线程将出于线程调度目的而被禁用,并处于休眠状态,直到某个其他线程为此信号量调用一种释放方法为止,接下来将为当前线程分配许可,并且可用许可的数量满足此请求。 + * + * 如果当前线程在等待许可时中断,则它将继续等待,并且其在队列中的位置不受影响。 当线程确实从该方法返回时,将设置其中断状态。 + */ + public void acquireUninterruptibly(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireShared(permits); + } + + /** + * Acquires the given number of permits from this semaphore, only + * if all are available at the time of invocation. + * + *

Acquires the given number of permits, if they are available, and + * returns immediately, with the value {@code true}, + * reducing the number of available permits by the given amount. + * + *

If insufficient permits are available then this method will return + * immediately with the value {@code false} and the number of available + * permits is unchanged. + * + *

Even when this semaphore has been set to use a fair ordering + * policy, a call to {@code tryAcquire} will + * immediately acquire a permit if one is available, whether or + * not other threads are currently waiting. This + * "barging" behavior can be useful in certain + * circumstances, even though it breaks fairness. If you want to + * honor the fairness setting, then use {@link #tryAcquire(int, + * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) } + * which is almost equivalent (it also detects interruption). + * + * @param permits the number of permits to acquire + * @return {@code true} if the permits were acquired and + * {@code false} otherwise + * @throws IllegalArgumentException if {@code permits} is negative + * + * 仅在调用时所有可用的条件下,从此信号量获取给定数量的许可。 + * + * 如果有可用许可证,则获取给定数量的许可证,并立即返回,值为true,将可用许可证的数量减少给定数量。 + * + * 如果没有足够的许可证,则此方法将立即返回false值,并且可用许可证的数量不变。 + * + * 即使已将此信号量设置为使用公平的排序策略,对 tryAcquire 的调用也会立即获得许可(如果有许可),而无论当前是否正在等待其他线程。 + * 即使破坏公平性,这种“讨价还价”的行为在某些情况下还是有用的。 + * 如果要遵守公平性设置,请使用几乎等效的 tryAcquire(permits,0,TimeUnit.SECONDS)(它还会检测到中断)。 + */ + public boolean tryAcquire(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + return sync.nonfairTryAcquireShared(permits) >= 0; + } + + /** + * Acquires the given number of permits from this semaphore, if all + * become available within the given waiting time and the current + * thread has not been {@linkplain Thread#interrupt interrupted}. + * + *

Acquires the given number of permits, if they are available and + * returns immediately, with the value {@code true}, + * reducing the number of available permits by the given amount. + * + *

If insufficient permits are available then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happens: + *

    + *
  • Some other thread invokes one of the {@link #release() release} + * methods for this semaphore, the current thread is next to be assigned + * permits and the number of available permits satisfies this request; or + *
  • Some other thread {@linkplain Thread#interrupt interrupts} + * the current thread; or + *
  • The specified waiting time elapses. + *
+ * + *

If the permits are acquired then the value {@code true} is returned. + * + *

If the current thread: + *

    + *
  • has its interrupted status set on entry to this method; or + *
  • is {@linkplain Thread#interrupt interrupted} while waiting + * to acquire the permits, + *
+ * then {@link InterruptedException} is thrown and the current thread's + * interrupted status is cleared. + * Any permits that were to be assigned to this thread, are instead + * assigned to other threads trying to acquire permits, as if + * the permits had been made available by a call to {@link #release()}. + * + *

If the specified waiting time elapses then the value {@code false} + * is returned. If the time is less than or equal to zero, the method + * will not wait at all. Any permits that were to be assigned to this + * thread, are instead assigned to other threads trying to acquire + * permits, as if the permits had been made available by a call to + * {@link #release()}. + * + * @param permits the number of permits to acquire + * @param timeout the maximum time to wait for the permits + * @param unit the time unit of the {@code timeout} argument + * @return {@code true} if all permits were acquired and {@code false} + * if the waiting time elapsed before all permits were acquired + * @throws InterruptedException if the current thread is interrupted + * @throws IllegalArgumentException if {@code permits} is negative + * + * 如果所有信号都在给定的等待时间内可用,并且当前线程未中断,则从此信号量获取给定数量的许可。 + * + * 如果有可用许可证,则获取给定数量的许可证,然后立即返回,值为true,将可用许可证的数量减少给定数量。 + * + * 如果没有足够的许可,则出于线程调度的目的,当前线程将被禁用,并处于休眠状态,直到发生以下三种情况之一: + * - 其他一些线程为此信号量调用一种释放方法,接下来将为当前线程分配许可,并且可用许可的数量满足此请求 + * - 其他一些线程中断当前线程 + * - 经过指定的等待时间 + * + * 如果获得许可,则返回true值。 + * + * 如果当前线程 + * - 在进入此方法时已设置其中断状态 + * - 在等待获得许可时被打断 + * 那么将抛出 InterruptedException 并清除当前线程的中断状态。 + * 相反,将要分配给该线程的所有许可,都分配给其他尝试获取许可的线程,就好像通过调用 release()使许可可用。 + * + * 如果经过了指定的等待时间,则返回值false。 + * 如果时间小于或等于零,则该方法将完全不等待。 + * 相反,将要分配给该线程的所有许可,都分配给其他尝试获取许可的线程,就好像通过调用 release() 使许可可用。 + */ + public boolean tryAcquire(int permits, long timeout, TimeUnit unit) + throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); + } + + /** + * Releases the given number of permits, returning them to the semaphore. + * + *

Releases the given number of permits, increasing the number of + * available permits by that amount. + * If any threads are trying to acquire permits, then one + * is selected and given the permits that were just released. + * If the number of available permits satisfies that thread's request + * then that thread is (re)enabled for thread scheduling purposes; + * otherwise the thread will wait until sufficient permits are available. + * If there are still permits available + * after this thread's request has been satisfied, then those permits + * are assigned in turn to other threads trying to acquire permits. + * + *

There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link Semaphore#acquire acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permits the number of permits to release + * @throws IllegalArgumentException if {@code permits} is negative + * + * 释放给定数量的许可证,将其返回到信号灯。 + * + * 释放给定数量的许可证,将可用许可证的数量增加该数量。 + * 如果有任何线程试图获取许可,则选择一个线程并给出刚刚释放的许可。 + * 如果可用许可的数量满足该线程的请求,则出于线程调度目的而(重新)启用该线程。 + * 否则,线程将等待,直到有足够的许可可用为止。 + * 如果在满足该线程的请求之后仍然有可用的许可,则将这些许可依次分配给其他尝试获取许可的线程。 + * + * 不要求释放许可证的线程必须通过调用 acquire 获取该许可证。通过在应用程序中编程约定,可以正确使用信号量。 + */ + public void release(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.releaseShared(permits); + } + + /** + * Returns the current number of permits available in this semaphore. + * + *

This method is typically used for debugging and testing purposes. + * + * @return the number of permits available in this semaphore + * + * 返回此信号量中当前可用的许可数量。 + * + * 此方法通常用于调试和测试目的。 + */ + public int availablePermits() { + return sync.getPermits(); + } + + /** + * Acquires and returns all permits that are immediately available. + * + * @return the number of permits acquired + * + * 获取并返回所有立即可用的许可证。 + */ + public int drainPermits() { + return sync.drainPermits(); + } + + /** + * Shrinks the number of available permits by the indicated + * reduction. This method can be useful in subclasses that use + * semaphores to track resources that become unavailable. This + * method differs from {@code acquire} in that it does not block + * waiting for permits to become available. + * + * @param reduction the number of permits to remove + * @throws IllegalArgumentException if {@code reduction} is negative + * + * 通过指示的减少量缩小可用许可证的数量。 + * 此方法在使用信号量跟踪变得不可用的资源的子类中很有用。 + * 此方法与获取方法的不同之处在于,它不会阻止等待许可证可用。 + */ + protected void reducePermits(int reduction) { + if (reduction < 0) throw new IllegalArgumentException(); + sync.reducePermits(reduction); + } + + /** + * Returns {@code true} if this semaphore has fairness set true. + * + * @return {@code true} if this semaphore has fairness set true + * + * 如果此信号量的公平性设置为true,则返回true。 + */ + public boolean isFair() { + return sync instanceof FairSync; + } + + /** + * Queries whether any threads are waiting to acquire. Note that + * because cancellations may occur at any time, a {@code true} + * return does not guarantee that any other thread will ever + * acquire. This method is designed primarily for use in + * monitoring of the system state. + * + * @return {@code true} if there may be other threads waiting to + * acquire the lock + * + * 查询是否有任何线程正在等待获取。 + * 请注意,由于取消可能随时发生,因此真正的回报并不保证任何其他线程都会获得。 + * 此方法主要设计用于监视系统状态。 + */ + public final boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Returns an estimate of the number of threads waiting to acquire. + * The value is only an estimate because the number of threads may + * change dynamically while this method traverses internal data + * structures. This method is designed for use in monitoring of the + * system state, not for synchronization control. + * + * @return the estimated number of threads waiting for this lock + * + * 返回等待获取的线程数的估计值。 + * 该值只是一个估计值,因为在此方法遍历内部数据结构时,线程数可能会动态变化。 + * 此方法设计用于监视系统状态,而不用于同步控制。 + */ + public final int getQueueLength() { + return sync.getQueueLength(); + } + + /** + * Returns a collection containing threads that may be waiting to acquire. + * Because the actual set of threads may change dynamically while + * constructing this result, the returned collection is only a best-effort + * estimate. The elements of the returned collection are in no particular + * order. This method is designed to facilitate construction of + * subclasses that provide more extensive monitoring facilities. + * + * @return the collection of threads + * + * 返回一个包含可能正在等待获取的线程的集合。 + * 因为实际的线程集在构造此结果时可能会动态变化,所以返回的集合只是尽力而为的估计。 + * 返回的集合的元素没有特定的顺序。 + * 设计此方法是为了便于构造子类,以提供更广泛的监视功能。 + */ + protected Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } + + /** + * Returns a string identifying this semaphore, as well as its state. + * The state, in brackets, includes the String {@code "Permits ="} + * followed by the number of permits. + * + * @return a string identifying this semaphore, as well as its state + */ + public String toString() { + return super.toString() + "[Permits = " + sync.getPermits() + "]"; + } +} +``` + +#### demo 示例 + +```java +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; + +public class SemaphoreDemo { + + static class Resource { + + public Object getResource() { + return new Object(); + } + + } + + static class Pool { + private static final int MAX_AVAILABLE = 100; + private final int availableSemaphore; + private Semaphore available; + private Resource[] items; + private boolean[] used; + + public Pool() { + this(MAX_AVAILABLE); + } + + public Pool(int available) { + this(available, false); + } + + public Pool(int available, boolean fairMode) { + this.availableSemaphore = available; + this.available = new Semaphore(available, fairMode); + this.items = new Resource[availableSemaphore]; + for (int i = 0; i < this.availableSemaphore; ++i) { + items[i] = new Resource(); + } + used = new boolean[availableSemaphore]; + } + + public int getAvailableSemaphore() { + return this.availableSemaphore; + } + + public Resource getItem() throws InterruptedException { + available.acquire(); + return getNextAvailableItem(); + } + + public void putItem(Resource obj) { + if (markAsUnused(obj)) + available.release(); + } + + private synchronized Resource getNextAvailableItem() { + for (int i = 0; i < availableSemaphore; ++i) { + if (!used[i]) { + used[i] = true; + return items[i]; + } + } + return null; + } + + private synchronized boolean markAsUnused(Resource obj) { + for (int i = 0; i < availableSemaphore; ++i) { + if (items[i] == obj) { + if (used[i]) { + used[i] = false; + return true; + } + return false; + } + } + return false; + } + } + + public static void main(String[] args) { + Pool pool = new Pool(3, true); //初始化 3 个信号量资源 + ExecutorService executorService = Executors.newFixedThreadPool(10); + for (int i = 0; i < 10; ++i) { //初始化 10 个线程,并发获取这 3 个信号量资源 + executorService.execute(() -> { + Resource rs = null; + try { + rs = pool.getItem(); + Object obj = rs.getResource(); + Thread.sleep(2000); //mock do something with obj + System.out.println(Thread.currentThread().getName() + "正在运行"); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + pool.putItem(rs); + } + }); + } + executorService.shutdown(); + } +} + +``` + +我们初始化了三个信号量资源,items 数组中有三个 Resource 对象,Semaphore 的 state 数值初始为 3. + +当 10 个线程并发获取这三个资源时,就会被限制为最多同时只能有三个线程获取到资源,其他的会挂起进入队列。 + +通过打印运行结果,可以观察到,每次只有三个线程正在打印运行结果。