From 3a8b7efecc620a40a265de1fae2faaeea1fa4967 Mon Sep 17 00:00:00 2001 From: A-b Date: Sun, 29 Dec 2019 21:46:45 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AC=AC=E4=B8=89=E5=91=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_03/17/AbstractQueuedSynchronizer | 1650 +++++++++++++++++ week_03/17/ReentrantLock | 293 +++ week_03/17/Semaphore | 247 +++ week_03/17/synchronized | 51 + week_03/17/volatile | 21 + ...6\205\345\255\230\346\250\241\345\236\213" | 32 + 6 files changed, 2294 insertions(+) create mode 100644 week_03/17/AbstractQueuedSynchronizer create mode 100644 week_03/17/ReentrantLock create mode 100644 week_03/17/Semaphore create mode 100644 week_03/17/synchronized create mode 100644 week_03/17/volatile create mode 100644 "week_03/17/\345\206\205\345\255\230\346\250\241\345\236\213" diff --git a/week_03/17/AbstractQueuedSynchronizer b/week_03/17/AbstractQueuedSynchronizer new file mode 100644 index 0000000..73b3e2b --- /dev/null +++ b/week_03/17/AbstractQueuedSynchronizer @@ -0,0 +1,1650 @@ +##AbstractQueuedSynchronizer--AQS + 1.继承了AbstractOwnableSynchronizer + Thread exclusiveOwnerThread; + AbstractOwnableSynchronizer(); + //设置线程独占 + setExclusiveOwnerThread(Thread thread); + //获取独占线程 + getExclusiveOwnerThread() + +##源码分析 + + public abstract class AbstractQueuedSynchronizer + extends AbstractOwnableSynchronizer + implements java.io.Serializable { + + private static final long serialVersionUID = 7373984972572414691L; + + //无参构造器--初始状态为0 + protected AbstractQueuedSynchronizer() { } + //一个节点-----在后面对获取锁顺序方面有用 + static final class Node { + + + + /** Marker to indicate a node is waiting in shared mode */ + static final Node SHARED = new Node(); + /** Marker to indicate a node is waiting in exclusive mode */ + static final Node EXCLUSIVE = null; + /** waitStatus value to indicate thread has cancelled */ + static final int CANCELLED = 1; + /** waitStatus value to indicate successor's thread needs unparking */ + static final int SIGNAL = -1; + /** waitStatus value to indicate thread is waiting on condition */ + static final int CONDITION = -2; + /** + * waitStatus value to indicate the next acquireShared should + * unconditionally propagate + */ + static final int PROPAGATE = -3; + + + + //有五种标记 + INITIAL = 0---初始状态 + CANCELLED = 1---取消状态(不排队了) + SIGNAL = -1---释放锁了/或者取消了,(将会通知下一个节点) + CONDITION = -2---条件锁标记(当满足我需要的条件,我就可以去排队获取锁了) + PROPAGATE = -3 + 以下是源码的解释 + + /** + * Status field, taking on only the values: + * SIGNAL: The successor of this node is (or will soon be) + * blocked (via park), so the current node must + * unpark its successor when it releases or + * cancels. To avoid races, acquire methods must + * first indicate they need a signal, + * then retry the atomic acquire, and then, + * on failure, block. + * CANCELLED: This node is cancelled due to timeout or interrupt. + * Nodes never leave this state. In particular, + * a thread with cancelled node never again blocks. + * CONDITION: This node is currently on a condition queue. + * It will not be used as a sync queue node + * until transferred, at which time the status + * will be set to 0. (Use of this value here has + * nothing to do with the other uses of the + * field, but simplifies mechanics.) + * PROPAGATE: A releaseShared should be propagated to other + * nodes. This is set (for head node only) in + * doReleaseShared to ensure propagation + * continues, even if other operations have + * since intervened. + * 0: None of the above + * + * The values are arranged numerically to simplify use. + * Non-negative values mean that a node doesn't need to + * signal. So, most code doesn't need to check for particular + * values, just for sign. + * + * The field is initialized to 0 for normal sync nodes, and + * CONDITION for condition nodes. It is modified using CAS + * (or when possible, unconditional volatile writes). + */ + volatile int waitStatus; + //前驱节点(可以理解为前面的那个人-----因为排队机制) + volatile Node prev; + + //后面的节点 + volatile Node next; + + //拿到这个锁的线程 + volatile Thread thread; + + //条件锁的--条件节点 + Node nextWaiter; + + //共享模式 + final boolean isShared() { + return nextWaiter == SHARED; + } + + //获取前一个节点 + final Node predecessor() throws NullPointerException { + //前一个节点 + Node p = prev; + if (p == null) + throw new NullPointerException(); + else + return p; + } + + Node() { // Used to establish initial head or SHARED marker + } + //用于同步队列添加节点 + Node(Thread thread, Node mode) { // Used by addWaiter + this.nextWaiter = mode; + this.thread = thread; + } + //用于条件中 + Node(Thread thread, int waitStatus) { // Used by Condition + this.waitStatus = waitStatus; + this.thread = thread; + } + } + + //等待队列头 + private transient volatile Node head; + + //等待队列尾 + private transient volatile Node tail; + + //同步状态 + private volatile int state; + + //获取同步状态---共享变量状态 + protected final int getState() { + return state; + } + + //设置同步状态 + protected final void setState(int newState) { + state = newState; + } + + //修改最底层上的数据 + protected final boolean compareAndSetState(int expect, int update) { + // See below for intrinsics setup to support this + return unsafe.compareAndSwapInt(this, stateOffset, expect, update); + } + + // Queuing utilities + + /** + * The number of nanoseconds for which it is faster to spin + * rather than to use timed park. A rough estimate suffices + * to improve responsiveness with very short timeouts. + */ + static final long spinForTimeoutThreshold = 1000L; + + //插入一个节点 + private Node enq(final Node node) { + for (;;) { + //旧队列中最后的节点 + Node t = tail; + //如果没有节点 + if (t == null) { // Must initialize + //初始化一个,并且该节点就是头节点 + if (compareAndSetHead(new Node())) + tail = head; + } else { + //将旧队列最后的节点设置为当前节点的前一个节点 + node.prev = t; + if (compareAndSetTail(t, node)) { + //前一个节点的后节点设置为现在这个节点 + t.next = node; + //返回之前的节点 + return t; + } + } + } + } + + //把节点添加到队列中 + private Node addWaiter(Node mode) { + Node node = new Node(Thread.currentThread(), mode); + // Try the fast path of enq; backup to full enq on failure + Node pred = tail; + if (pred != null) { + node.prev = pred; + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + enq(node); + return node; + } + + //设置头 + private void setHead(Node node) { + head = node; + node.thread = null; + node.prev = null; + } + + //如果节点后面还有节点,则唤醒操作 + private void unparkSuccessor(Node node) { + /* + * If status is negative (i.e., possibly needing signal) try + * to clear in anticipation of signalling. It is OK if this + * fails or if status is changed by waiting thread. + */ + //获取节点的状态 + int ws = node.waitStatus; + //上面有5种状态----- + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + + /* + * Thread to unpark is held in successor, which is normally + * just the next node. But if cancelled or apparently null, + * traverse backwards from tail to find the actual + * non-cancelled successor. + */ + // 当前下一个节点 + Node s = node.next; + //没有下一个节点,或者下个节点已经取消了 + if (s == null || s.waitStatus > 0) { + s = null; + //尾节点; 该节点不为空并且不是头节点;该节点的前置节点 + for (Node t = tail; t != null && t != node; t = t.prev) + //从队列最后向前查找,最靠前的的状态不为取消的 + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + //取消线程挂起-----唤醒操作 + LockSupport.unpark(s.thread); + } + + //释放共享 + private void doReleaseShared() { + /* + * Ensure that a release propagates, even if there are other + * in-progress acquires/releases. This proceeds in the usual + * way of trying to unparkSuccessor of head if it needs + * signal. But if it does not, status is set to PROPAGATE to + * ensure that upon release, propagation continues. + * Additionally, we must loop in case a new node is added + * while we are doing this. Also, unlike other uses of + * unparkSuccessor, we need to know if CAS to reset status + * fails, if so rechecking. + */ + for (;;) { + //获得头节点 + Node h = head; + //有队列,并且不止一个节点 + if (h != null && h != tail) { + //当前节点的状态 + int ws = h.waitStatus; + //是等待唤醒状态 + if (ws == Node.SIGNAL) { + //把共享变量变成无人占用的状态 + if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) + continue; // loop to recheck cases + //唤醒队列中 最靠前不是取消的节点--去获取共享变量 + unparkSuccessor(h); + } + else if (ws == 0 && + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) + continue; // loop on failed CAS + } + //只有一个节点 跳出释放 + if (h == head) // loop if head changed + break; + } + } + + //设置队列头并传播 + private void setHeadAndPropagate(Node node, int propagate) { + // 记录之前的队列头 + Node h = head; // Record old head for check below + //把新的节点设置为队列头 + setHead(node); + /* + * Try to signal next queued node if: + * Propagation was indicated by caller, + * or was recorded (as h.waitStatus either before + * or after setHead) by a previous operation + * (note: this uses sign-check of waitStatus because + * PROPAGATE status may transition to SIGNAL.) + * and + * The next node is waiting in shared mode, + * or we don't know, because it appears null + * + * The conservatism in both of these checks may cause + * unnecessary wake-ups, but only when there are multiple + * racing acquires/releases, so most need signals now or soon + * anyway. + */ + if (propagate > 0 || h == null || h.waitStatus < 0 || + (h = head) == null || h.waitStatus < 0) { + Node s = node.next; + if (s == null || s.isShared()) + //释放共享 + doReleaseShared(); + } + } + + // Utilities for various versions of acquire + + + //不排队了--出列 + private void cancelAcquire(Node node) { + // Ignore if node doesn't exist + if (node == null) + return; + //不和线程挂钩了 + node.thread = null; + + // Skip cancelled predecessors + Node pred = node.prev; + //一直找一个前面没有被取消的节点 + 这个时候pred一定不是被取消的 + 可以理解为,出列,但是要补齐位置,让前后连接起来 + while (pred.waitStatus > 0) + node.prev = pred = pred.prev; + + // predNext is the apparent node to unsplice. CASes below will + // fail if not, in which case, we lost race vs another cancel + // or signal, so no further action is necessary. + + + //记录一下以及新占位节点的下一个节点 + Node predNext = pred.next; + + // Can use unconditional write instead of CAS here. + // After this atomic step, other Nodes can skip past us. + // Before, we are free of interference from other threads. + + //将将要出列的节点设置为取消 + node.waitStatus = Node.CANCELLED; + + // If we are the tail, remove ourselves. + + //如果要出列的是最后一个节点,并且把新结尾的节点已经替换了 + if (node == tail && compareAndSetTail(node, pred)) { + //把最新结尾节点的的后一个节点设置为null---表示我就是最后一个节点了 + compareAndSetNext(pred, predNext, null); + } else { + // If successor needs signal, try to set pred's next-link + // so it will get one. Otherwise wake it up to propagate. + int ws; + //如果node既不是tail,又不是head的后继节点 + //则将node的前继节点的waitStatus置为SIGNAL + //并使node的前继节点指向node的后继节点 + if (pred != head && + //状态是要唤醒的下一个节点的 + ((ws = pred.waitStatus) == Node.SIGNAL || + (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && + pred.thread != null) { + Node next = node.next; + if (next != null && next.waitStatus <= 0) + //将当前节点的前一个节点指向当前节点的后一个节点 + compareAndSetNext(pred, predNext, next); + } else { + //如果node是head的后继节点,则直接唤醒node的后继节点 + unparkSuccessor(node); + } + + node.next = node; // help GC + } + } + + //线程是否应该阻塞 + private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + //上一个节点的等待状态 + int ws = pred.waitStatus; + //如果前面那个是等待唤醒状态 + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + //当前节点需要阻塞---因为前面的还没有执行完 + return true; + //前面的节点已经取消了 + if (ws > 0) { + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry. + */ + //把当前节点前面的已经取消的节点 全部剔除 + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + /* + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking. + */ + //设置为等待唤醒 + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; + } + + //中断当前线程 + static void selfInterrupt() { + Thread.currentThread().interrupt(); + } + + //线程挂起,并且检查是否已经中断 + private final boolean parkAndCheckInterrupt() { + LockSupport.park(this); + return Thread.interrupted(); + } + + /* + * Various flavors of acquire, varying in exclusive/shared and + * control modes. Each is mostly the same, but annoyingly + * different. Only a little bit of factoring is possible due to + * interactions of exception mechanics (including ensuring that we + * cancel if tryAcquire throws exception) and other control, at + * least not without hurting performance too much. + */ + + //节点获取锁 + final boolean acquireQueued(final Node node, int arg) { + //标识 + boolean failed = true; + try { + //中断标识 + boolean interrupted = false; + //无线循环 + for (;;) { + //当前节点的前一个节点 + final Node p = node.predecessor(); + //是头节点(该它去获取锁了),并且获取锁成功(具体是看那个模板用的这个方法) + if (p == head && tryAcquire(arg)) { + //当前节点作为头节点 + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + //不是头节点 就看看是不是需要阻塞,并且线程挂起和中断 + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } + } + + //一直获取 + private void doAcquireInterruptibly(int arg) + throws InterruptedException { + //获取添加到队列中的 + //Node.EXCLUSIVE是null,即使插入了一个null在尾节点 + final Node node = addWaiter(Node.EXCLUSIVE); + boolean failed = true; + try { + for (;;) { + //获取当前null前面一个节点 + final Node p = node.predecessor(); + //一直加锁直到上锁成功 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return; + } + //不是头节点的 就判断一下要不要线程挂起 + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + throw new InterruptedException(); + } + } finally { + if (failed) + cancelAcquire(node); + } + } + + //排它定时获取 + private boolean doAcquireNanos(int arg, long nanosTimeout) + throws InterruptedException { + if (nanosTimeout <= 0L) + return false; + //一个时间范围 + final long deadline = System.nanoTime() + nanosTimeout; + final Node node = addWaiter(Node.EXCLUSIVE); + boolean failed = true; + try { + for (;;) { + final Node p = node.predecessor(); + //头节点获取成功 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return true; + } + nanosTimeout = deadline - System.nanoTime(); + if (nanosTimeout <= 0L) + return false; + //线程挂起 + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) + LockSupport.parkNanos(this, nanosTimeout); + if (Thread.interrupted()) + throw new InterruptedException(); + } + } finally { + //取消这个节点 + if (failed) + cancelAcquire(node); + } + } + //获取锁,头节点就直到获取成功为止,其他节点,判断是否等待,等待就挂起 + private void doAcquireShared(int arg) { + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + if (p == head) { + int r = tryAcquireShared(arg); + if (r >= 0) { + setHeadAndPropagate(node, r); + p.next = null; // help GC + if (interrupted) + selfInterrupt(); + failed = false; + return; + } + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } + } + + //不断获取锁,抛出中断异常,有些业务是需要这个异常 + private void doAcquireSharedInterruptibly(int arg) + throws InterruptedException { + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + try { + for (;;) { + final Node p = node.predecessor(); + if (p == head) { + int r = tryAcquireShared(arg); + if (r >= 0) { + setHeadAndPropagate(node, r); + p.next = null; // help GC + failed = false; + return; + } + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + throw new InterruptedException(); + } + } finally { + if (failed) + cancelAcquire(node); + } + } + + //一段时间内不断获取 + private boolean doAcquireSharedNanos(int arg, long nanosTimeout) + throws InterruptedException { + if (nanosTimeout <= 0L) + return false; + final long deadline = System.nanoTime() + nanosTimeout; + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + try { + for (;;) { + final Node p = node.predecessor(); + if (p == head) { + int r = tryAcquireShared(arg); + if (r >= 0) { + setHeadAndPropagate(node, r); + p.next = null; // help GC + failed = false; + return true; + } + } + nanosTimeout = deadline - System.nanoTime(); + if (nanosTimeout <= 0L) + return false; + if (shouldParkAfterFailedAcquire(p, node) && + nanosTimeout > spinForTimeoutThreshold) + LockSupport.parkNanos(this, nanosTimeout); + if (Thread.interrupted()) + throw new InterruptedException(); + } + } finally { + if (failed) + cancelAcquire(node); + } + } + + // Main exported methods + + //尝试获取锁,具体实现有子类去实现 + protected boolean tryAcquire(int arg) { + throw new UnsupportedOperationException(); + } + + //尝试释放 + protected boolean tryRelease(int arg) { + throw new UnsupportedOperationException(); + } + //共享模式获取 + protected int tryAcquireShared(int arg) { + throw new UnsupportedOperationException(); + } + + + protected boolean tryReleaseShared(int arg) { + throw new UnsupportedOperationException(); + } + + + protected boolean isHeldExclusively() { + throw new UnsupportedOperationException(); + } + + //获取失败就排队 + public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); + } + + //一直获取 + public final void acquireInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (!tryAcquire(arg)) + doAcquireInterruptibly(arg); + } + + //一段时间里不管获取 + public final boolean tryAcquireNanos(int arg, long nanosTimeout) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + return tryAcquire(arg) || + doAcquireNanos(arg, nanosTimeout); + } + + //释放 + public final boolean release(int arg) { + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; + } + + + public final void acquireShared(int arg) { + if (tryAcquireShared(arg) < 0) + doAcquireShared(arg); + } + + public final void acquireSharedInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (tryAcquireShared(arg) < 0) + doAcquireSharedInterruptibly(arg); + } + + + public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + return tryAcquireShared(arg) >= 0 || + doAcquireSharedNanos(arg, nanosTimeout); + } + + + public final boolean releaseShared(int arg) { + if (tryReleaseShared(arg)) { + doReleaseShared(); + return true; + } + return false; + } + + // Queue inspection methods + + + public final boolean hasQueuedThreads() { + return head != tail; + } + + + public final boolean hasContended() { + return head != null; + } + + public final Thread getFirstQueuedThread() { + // handle only fast path, else relay + return (head == tail) ? null : fullGetFirstQueuedThread(); + } + + /** + * Version of getFirstQueuedThread called when fastpath fails + */ + private Thread fullGetFirstQueuedThread() { + /* + * The first node is normally head.next. Try to get its + * thread field, ensuring consistent reads: If thread + * field is nulled out or s.prev is no longer head, then + * some other thread(s) concurrently performed setHead in + * between some of our reads. We try this twice before + * resorting to traversal. + */ + Node h, s; + Thread st; + if (((h = head) != null && (s = h.next) != null && + s.prev == head && (st = s.thread) != null) || + ((h = head) != null && (s = h.next) != null && + s.prev == head && (st = s.thread) != null)) + return st; + + /* + * Head's next field might not have been set yet, or may have + * been unset after setHead. So we must check to see if tail + * is actually first node. If not, we continue on, safely + * traversing from tail back to head to find first, + * guaranteeing termination. + */ + + Node t = tail; + Thread firstThread = null; + while (t != null && t != head) { + Thread tt = t.thread; + if (tt != null) + firstThread = tt; + t = t.prev; + } + return firstThread; + } + + /** + * Returns true if the given thread is currently queued. + * + *

This implementation traverses the queue to determine + * presence of the given thread. + * + * @param thread the thread + * @return {@code true} if the given thread is on the queue + * @throws NullPointerException if the thread is null + */ + public final boolean isQueued(Thread thread) { + if (thread == null) + throw new NullPointerException(); + for (Node p = tail; p != null; p = p.prev) + if (p.thread == thread) + return true; + return false; + } + + /** + * Returns {@code true} if the apparent first queued thread, if one + * exists, is waiting in exclusive mode. If this method returns + * {@code true}, and the current thread is attempting to acquire in + * shared mode (that is, this method is invoked from {@link + * #tryAcquireShared}) then it is guaranteed that the current thread + * is not the first queued thread. Used only as a heuristic in + * ReentrantReadWriteLock. + */ + final boolean apparentlyFirstQueuedIsExclusive() { + Node h, s; + return (h = head) != null && + (s = h.next) != null && + !s.isShared() && + s.thread != null; + } + + /** + * Queries whether any threads have been waiting to acquire longer + * than the current thread. + * + *

An invocation of this method is equivalent to (but may be + * more efficient than): + *

 {@code
+     * getFirstQueuedThread() != Thread.currentThread() &&
+     * hasQueuedThreads()}
+ * + *

Note that because cancellations due to interrupts and + * timeouts may occur at any time, a {@code true} return does not + * guarantee that some other thread will acquire before the current + * thread. Likewise, it is possible for another thread to win a + * race to enqueue after this method has returned {@code false}, + * due to the queue being empty. + * + *

This method is designed to be used by a fair synchronizer to + * avoid barging. + * Such a synchronizer's {@link #tryAcquire} method should return + * {@code false}, and its {@link #tryAcquireShared} method should + * return a negative value, if this method returns {@code true} + * (unless this is a reentrant acquire). For example, the {@code + * tryAcquire} method for a fair, reentrant, exclusive mode + * synchronizer might look like this: + * + *

 {@code
+     * protected boolean tryAcquire(int arg) {
+     *   if (isHeldExclusively()) {
+     *     // A reentrant acquire; increment hold count
+     *     return true;
+     *   } else if (hasQueuedPredecessors()) {
+     *     return false;
+     *   } else {
+     *     // try to acquire normally
+     *   }
+     * }}
+ * + * @return {@code true} if there is a queued thread preceding the + * current thread, and {@code false} if the current thread + * is at the head of the queue or the queue is empty + * @since 1.7 + */ + public final boolean hasQueuedPredecessors() { + // The correctness of this depends on head being initialized + // before tail and on head.next being accurate if the current + // thread is first in queue. + Node t = tail; // Read fields in reverse initialization order + Node h = head; + Node s; + return h != t && + ((s = h.next) == null || s.thread != Thread.currentThread()); + } + + + // Instrumentation and monitoring methods + + /** + * Returns an estimate of the number of threads waiting to + * acquire. The value is only an estimate because the number of + * threads may change dynamically while this method traverses + * internal data structures. This method is designed for use in + * monitoring system state, not for synchronization + * control. + * + * @return the estimated number of threads waiting to acquire + */ + public final int getQueueLength() { + int n = 0; + for (Node p = tail; p != null; p = p.prev) { + if (p.thread != null) + ++n; + } + return n; + } + + /** + * Returns a collection containing threads that may be waiting to + * acquire. Because the actual set of threads may change + * dynamically while constructing this result, the returned + * collection is only a best-effort estimate. The elements of the + * returned collection are in no particular order. This method is + * designed to facilitate construction of subclasses that provide + * more extensive monitoring facilities. + * + * @return the collection of threads + */ + public final Collection getQueuedThreads() { + ArrayList list = new ArrayList(); + for (Node p = tail; p != null; p = p.prev) { + Thread t = p.thread; + if (t != null) + list.add(t); + } + return list; + } + + /** + * Returns a collection containing threads that may be waiting to + * acquire in exclusive mode. This has the same properties + * as {@link #getQueuedThreads} except that it only returns + * those threads waiting due to an exclusive acquire. + * + * @return the collection of threads + */ + public final Collection getExclusiveQueuedThreads() { + ArrayList list = new ArrayList(); + for (Node p = tail; p != null; p = p.prev) { + if (!p.isShared()) { + Thread t = p.thread; + if (t != null) + list.add(t); + } + } + return list; + } + + /** + * Returns a collection containing threads that may be waiting to + * acquire in shared mode. This has the same properties + * as {@link #getQueuedThreads} except that it only returns + * those threads waiting due to a shared acquire. + * + * @return the collection of threads + */ + public final Collection getSharedQueuedThreads() { + ArrayList list = new ArrayList(); + for (Node p = tail; p != null; p = p.prev) { + if (p.isShared()) { + Thread t = p.thread; + if (t != null) + list.add(t); + } + } + return list; + } + + /** + * Returns a string identifying this synchronizer, as well as its state. + * The state, in brackets, includes the String {@code "State ="} + * followed by the current value of {@link #getState}, and either + * {@code "nonempty"} or {@code "empty"} depending on whether the + * queue is empty. + * + * @return a string identifying this synchronizer, as well as its state + */ + public String toString() { + int s = getState(); + String q = hasQueuedThreads() ? "non" : ""; + return super.toString() + + "[State = " + s + ", " + q + "empty queue]"; + } + + + // Internal support methods for Conditions + + /** + * Returns true if a node, always one that was initially placed on + * a condition queue, is now waiting to reacquire on sync queue. + * @param node the node + * @return true if is reacquiring + */ + final boolean isOnSyncQueue(Node node) { + if (node.waitStatus == Node.CONDITION || node.prev == null) + return false; + if (node.next != null) // If has successor, it must be on queue + return true; + /* + * node.prev can be non-null, but not yet on queue because + * the CAS to place it on queue can fail. So we have to + * traverse from tail to make sure it actually made it. It + * will always be near the tail in calls to this method, and + * unless the CAS failed (which is unlikely), it will be + * there, so we hardly ever traverse much. + */ + return findNodeFromTail(node); + } + + /** + * Returns true if node is on sync queue by searching backwards from tail. + * Called only when needed by isOnSyncQueue. + * @return true if present + */ + private boolean findNodeFromTail(Node node) { + Node t = tail; + for (;;) { + if (t == node) + return true; + if (t == null) + return false; + t = t.prev; + } + } + + /** + * Transfers a node from a condition queue onto sync queue. + * Returns true if successful. + * @param node the node + * @return true if successfully transferred (else the node was + * cancelled before signal) + */ + final boolean transferForSignal(Node node) { + /* + * If cannot change waitStatus, the node has been cancelled. + */ + if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) + return false; + + /* + * Splice onto queue and try to set waitStatus of predecessor to + * indicate that thread is (probably) waiting. If cancelled or + * attempt to set waitStatus fails, wake up to resync (in which + * case the waitStatus can be transiently and harmlessly wrong). + */ + Node p = enq(node); + int ws = p.waitStatus; + if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) + LockSupport.unpark(node.thread); + return true; + } + + /** + * Transfers node, if necessary, to sync queue after a cancelled wait. + * Returns true if thread was cancelled before being signalled. + * + * @param node the node + * @return true if cancelled before the node was signalled + */ + final boolean transferAfterCancelledWait(Node node) { + if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { + enq(node); + return true; + } + /* + * If we lost out to a signal(), then we can't proceed + * until it finishes its enq(). Cancelling during an + * incomplete transfer is both rare and transient, so just + * spin. + */ + while (!isOnSyncQueue(node)) + Thread.yield(); + return false; + } + + /** + * Invokes release with current state value; returns saved state. + * Cancels node and throws exception on failure. + * @param node the condition node for this wait + * @return previous sync state + */ + final int fullyRelease(Node node) { + boolean failed = true; + try { + int savedState = getState(); + if (release(savedState)) { + failed = false; + return savedState; + } else { + throw new IllegalMonitorStateException(); + } + } finally { + if (failed) + node.waitStatus = Node.CANCELLED; + } + } + + // Instrumentation methods for conditions + + /** + * Queries whether the given ConditionObject + * uses this synchronizer as its lock. + * + * @param condition the condition + * @return {@code true} if owned + * @throws NullPointerException if the condition is null + */ + public final boolean owns(ConditionObject condition) { + return condition.isOwnedBy(this); + } + + /** + * Queries whether any threads are waiting on the given condition + * associated with this synchronizer. Note that because timeouts + * and interrupts may occur at any time, a {@code true} return + * does not guarantee that a future {@code signal} will awaken + * any threads. This method is designed primarily for use in + * monitoring of the system state. + * + * @param condition the condition + * @return {@code true} if there are any waiting threads + * @throws IllegalMonitorStateException if exclusive synchronization + * is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this synchronizer + * @throws NullPointerException if the condition is null + */ + public final boolean hasWaiters(ConditionObject condition) { + if (!owns(condition)) + throw new IllegalArgumentException("Not owner"); + return condition.hasWaiters(); + } + + /** + * Returns an estimate of the number of threads waiting on the + * given condition associated with this synchronizer. Note that + * because timeouts and interrupts may occur at any time, the + * estimate serves only as an upper bound on the actual number of + * waiters. This method is designed for use in monitoring of the + * system state, not for synchronization control. + * + * @param condition the condition + * @return the estimated number of waiting threads + * @throws IllegalMonitorStateException if exclusive synchronization + * is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this synchronizer + * @throws NullPointerException if the condition is null + */ + public final int getWaitQueueLength(ConditionObject condition) { + if (!owns(condition)) + throw new IllegalArgumentException("Not owner"); + return condition.getWaitQueueLength(); + } + + /** + * Returns a collection containing those threads that may be + * waiting on the given condition associated with this + * synchronizer. Because the actual set of threads may change + * dynamically while constructing this result, the returned + * collection is only a best-effort estimate. The elements of the + * returned collection are in no particular order. + * + * @param condition the condition + * @return the collection of threads + * @throws IllegalMonitorStateException if exclusive synchronization + * is not held + * @throws IllegalArgumentException if the given condition is + * not associated with this synchronizer + * @throws NullPointerException if the condition is null + */ + public final Collection getWaitingThreads(ConditionObject condition) { + if (!owns(condition)) + throw new IllegalArgumentException("Not owner"); + return condition.getWaitingThreads(); + } + + /** + * Condition implementation for a {@link + * AbstractQueuedSynchronizer} serving as the basis of a {@link + * Lock} implementation. + * + *

Method documentation for this class describes mechanics, + * not behavioral specifications from the point of view of Lock + * and Condition users. Exported versions of this class will in + * general need to be accompanied by documentation describing + * condition semantics that rely on those of the associated + * {@code AbstractQueuedSynchronizer}. + * + *

This class is Serializable, but all fields are transient, + * so deserialized conditions have no waiters. + */ + public class ConditionObject implements Condition, java.io.Serializable { + private static final long serialVersionUID = 1173984872572414699L; + /** First node of condition queue. */ + private transient Node firstWaiter; + /** Last node of condition queue. */ + private transient Node lastWaiter; + + /** + * Creates a new {@code ConditionObject} instance. + */ + public ConditionObject() { } + + // Internal methods + + /** + * Adds a new waiter to wait queue. + * @return its new wait node + */ + private Node addConditionWaiter() { + Node t = lastWaiter; + // If lastWaiter is cancelled, clean out. + if (t != null && t.waitStatus != Node.CONDITION) { + unlinkCancelledWaiters(); + t = lastWaiter; + } + Node node = new Node(Thread.currentThread(), Node.CONDITION); + if (t == null) + firstWaiter = node; + else + t.nextWaiter = node; + lastWaiter = node; + return node; + } + + /** + * Removes and transfers nodes until hit non-cancelled one or + * null. Split out from signal in part to encourage compilers + * to inline the case of no waiters. + * @param first (non-null) the first node on condition queue + */ + private void doSignal(Node first) { + do { + if ( (firstWaiter = first.nextWaiter) == null) + lastWaiter = null; + first.nextWaiter = null; + } while (!transferForSignal(first) && + (first = firstWaiter) != null); + } + + /** + * Removes and transfers all nodes. + * @param first (non-null) the first node on condition queue + */ + private void doSignalAll(Node first) { + lastWaiter = firstWaiter = null; + do { + Node next = first.nextWaiter; + first.nextWaiter = null; + transferForSignal(first); + first = next; + } while (first != null); + } + + /** + * Unlinks cancelled waiter nodes from condition queue. + * Called only while holding lock. This is called when + * cancellation occurred during condition wait, and upon + * insertion of a new waiter when lastWaiter is seen to have + * been cancelled. This method is needed to avoid garbage + * retention in the absence of signals. So even though it may + * require a full traversal, it comes into play only when + * timeouts or cancellations occur in the absence of + * signals. It traverses all nodes rather than stopping at a + * particular target to unlink all pointers to garbage nodes + * without requiring many re-traversals during cancellation + * storms. + */ + private void unlinkCancelledWaiters() { + Node t = firstWaiter; + Node trail = null; + while (t != null) { + Node next = t.nextWaiter; + if (t.waitStatus != Node.CONDITION) { + t.nextWaiter = null; + if (trail == null) + firstWaiter = next; + else + trail.nextWaiter = next; + if (next == null) + lastWaiter = trail; + } + else + trail = t; + t = next; + } + } + + // public methods + + /** + * Moves the longest-waiting thread, if one exists, from the + * wait queue for this condition to the wait queue for the + * owning lock. + * + * @throws IllegalMonitorStateException if {@link #isHeldExclusively} + * returns {@code false} + */ + public final void signal() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + Node first = firstWaiter; + if (first != null) + doSignal(first); + } + + /** + * Moves all threads from the wait queue for this condition to + * the wait queue for the owning lock. + * + * @throws IllegalMonitorStateException if {@link #isHeldExclusively} + * returns {@code false} + */ + public final void signalAll() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + Node first = firstWaiter; + if (first != null) + doSignalAll(first); + } + + /** + * Implements uninterruptible condition wait. + *

    + *
  1. Save lock state returned by {@link #getState}. + *
  2. Invoke {@link #release} with saved state as argument, + * throwing IllegalMonitorStateException if it fails. + *
  3. Block until signalled. + *
  4. Reacquire by invoking specialized version of + * {@link #acquire} with saved state as argument. + *
+ */ + public final void awaitUninterruptibly() { + Node node = addConditionWaiter(); + int savedState = fullyRelease(node); + boolean interrupted = false; + while (!isOnSyncQueue(node)) { + LockSupport.park(this); + if (Thread.interrupted()) + interrupted = true; + } + if (acquireQueued(node, savedState) || interrupted) + selfInterrupt(); + } + + /* + * For interruptible waits, we need to track whether to throw + * InterruptedException, if interrupted while blocked on + * condition, versus reinterrupt current thread, if + * interrupted while blocked waiting to re-acquire. + */ + + /** Mode meaning to reinterrupt on exit from wait */ + private static final int REINTERRUPT = 1; + /** Mode meaning to throw InterruptedException on exit from wait */ + private static final int THROW_IE = -1; + + /** + * Checks for interrupt, returning THROW_IE if interrupted + * before signalled, REINTERRUPT if after signalled, or + * 0 if not interrupted. + */ + private int checkInterruptWhileWaiting(Node node) { + return Thread.interrupted() ? + (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : + 0; + } + + /** + * Throws InterruptedException, reinterrupts current thread, or + * does nothing, depending on mode. + */ + private void reportInterruptAfterWait(int interruptMode) + throws InterruptedException { + if (interruptMode == THROW_IE) + throw new InterruptedException(); + else if (interruptMode == REINTERRUPT) + selfInterrupt(); + } + + /** + * Implements interruptible condition wait. + *
    + *
  1. If current thread is interrupted, throw InterruptedException. + *
  2. Save lock state returned by {@link #getState}. + *
  3. Invoke {@link #release} with saved state as argument, + * throwing IllegalMonitorStateException if it fails. + *
  4. Block until signalled or interrupted. + *
  5. Reacquire by invoking specialized version of + * {@link #acquire} with saved state as argument. + *
  6. If interrupted while blocked in step 4, throw InterruptedException. + *
+ */ + public final void await() throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + Node node = addConditionWaiter(); + int savedState = fullyRelease(node); + int interruptMode = 0; + while (!isOnSyncQueue(node)) { + LockSupport.park(this); + if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + break; + } + if (acquireQueued(node, savedState) && interruptMode != THROW_IE) + interruptMode = REINTERRUPT; + if (node.nextWaiter != null) // clean up if cancelled + unlinkCancelledWaiters(); + if (interruptMode != 0) + reportInterruptAfterWait(interruptMode); + } + + /** + * Implements timed condition wait. + *
    + *
  1. If current thread is interrupted, throw InterruptedException. + *
  2. Save lock state returned by {@link #getState}. + *
  3. Invoke {@link #release} with saved state as argument, + * throwing IllegalMonitorStateException if it fails. + *
  4. Block until signalled, interrupted, or timed out. + *
  5. Reacquire by invoking specialized version of + * {@link #acquire} with saved state as argument. + *
  6. If interrupted while blocked in step 4, throw InterruptedException. + *
+ */ + public final long awaitNanos(long nanosTimeout) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + Node node = addConditionWaiter(); + int savedState = fullyRelease(node); + final long deadline = System.nanoTime() + nanosTimeout; + int interruptMode = 0; + while (!isOnSyncQueue(node)) { + if (nanosTimeout <= 0L) { + transferAfterCancelledWait(node); + break; + } + if (nanosTimeout >= spinForTimeoutThreshold) + LockSupport.parkNanos(this, nanosTimeout); + if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + break; + nanosTimeout = deadline - System.nanoTime(); + } + if (acquireQueued(node, savedState) && interruptMode != THROW_IE) + interruptMode = REINTERRUPT; + if (node.nextWaiter != null) + unlinkCancelledWaiters(); + if (interruptMode != 0) + reportInterruptAfterWait(interruptMode); + return deadline - System.nanoTime(); + } + + /** + * Implements absolute timed condition wait. + *
    + *
  1. If current thread is interrupted, throw InterruptedException. + *
  2. Save lock state returned by {@link #getState}. + *
  3. Invoke {@link #release} with saved state as argument, + * throwing IllegalMonitorStateException if it fails. + *
  4. Block until signalled, interrupted, or timed out. + *
  5. Reacquire by invoking specialized version of + * {@link #acquire} with saved state as argument. + *
  6. If interrupted while blocked in step 4, throw InterruptedException. + *
  7. If timed out while blocked in step 4, return false, else true. + *
+ */ + public final boolean awaitUntil(Date deadline) + throws InterruptedException { + long abstime = deadline.getTime(); + if (Thread.interrupted()) + throw new InterruptedException(); + Node node = addConditionWaiter(); + int savedState = fullyRelease(node); + boolean timedout = false; + int interruptMode = 0; + while (!isOnSyncQueue(node)) { + if (System.currentTimeMillis() > abstime) { + timedout = transferAfterCancelledWait(node); + break; + } + LockSupport.parkUntil(this, abstime); + if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + break; + } + if (acquireQueued(node, savedState) && interruptMode != THROW_IE) + interruptMode = REINTERRUPT; + if (node.nextWaiter != null) + unlinkCancelledWaiters(); + if (interruptMode != 0) + reportInterruptAfterWait(interruptMode); + return !timedout; + } + + /** + * Implements timed condition wait. + *
    + *
  1. If current thread is interrupted, throw InterruptedException. + *
  2. Save lock state returned by {@link #getState}. + *
  3. Invoke {@link #release} with saved state as argument, + * throwing IllegalMonitorStateException if it fails. + *
  4. Block until signalled, interrupted, or timed out. + *
  5. Reacquire by invoking specialized version of + * {@link #acquire} with saved state as argument. + *
  6. If interrupted while blocked in step 4, throw InterruptedException. + *
  7. If timed out while blocked in step 4, return false, else true. + *
+ */ + public final boolean await(long time, TimeUnit unit) + throws InterruptedException { + long nanosTimeout = unit.toNanos(time); + if (Thread.interrupted()) + throw new InterruptedException(); + Node node = addConditionWaiter(); + int savedState = fullyRelease(node); + final long deadline = System.nanoTime() + nanosTimeout; + boolean timedout = false; + int interruptMode = 0; + while (!isOnSyncQueue(node)) { + if (nanosTimeout <= 0L) { + timedout = transferAfterCancelledWait(node); + break; + } + if (nanosTimeout >= spinForTimeoutThreshold) + LockSupport.parkNanos(this, nanosTimeout); + if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + break; + nanosTimeout = deadline - System.nanoTime(); + } + if (acquireQueued(node, savedState) && interruptMode != THROW_IE) + interruptMode = REINTERRUPT; + if (node.nextWaiter != null) + unlinkCancelledWaiters(); + if (interruptMode != 0) + reportInterruptAfterWait(interruptMode); + return !timedout; + } + + // support for instrumentation + + /** + * Returns true if this condition was created by the given + * synchronization object. + * + * @return {@code true} if owned + */ + final boolean isOwnedBy(AbstractQueuedSynchronizer sync) { + return sync == AbstractQueuedSynchronizer.this; + } + + /** + * Queries whether any threads are waiting on this condition. + * Implements {@link AbstractQueuedSynchronizer#hasWaiters(ConditionObject)}. + * + * @return {@code true} if there are any waiting threads + * @throws IllegalMonitorStateException if {@link #isHeldExclusively} + * returns {@code false} + */ + protected final boolean hasWaiters() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + for (Node w = firstWaiter; w != null; w = w.nextWaiter) { + if (w.waitStatus == Node.CONDITION) + return true; + } + return false; + } + + /** + * Returns an estimate of the number of threads waiting on + * this condition. + * Implements {@link AbstractQueuedSynchronizer#getWaitQueueLength(ConditionObject)}. + * + * @return the estimated number of waiting threads + * @throws IllegalMonitorStateException if {@link #isHeldExclusively} + * returns {@code false} + */ + protected final int getWaitQueueLength() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + int n = 0; + for (Node w = firstWaiter; w != null; w = w.nextWaiter) { + if (w.waitStatus == Node.CONDITION) + ++n; + } + return n; + } + + /** + * Returns a collection containing those threads that may be + * waiting on this Condition. + * Implements {@link AbstractQueuedSynchronizer#getWaitingThreads(ConditionObject)}. + * + * @return the collection of threads + * @throws IllegalMonitorStateException if {@link #isHeldExclusively} + * returns {@code false} + */ + protected final Collection getWaitingThreads() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + ArrayList list = new ArrayList(); + for (Node w = firstWaiter; w != null; w = w.nextWaiter) { + if (w.waitStatus == Node.CONDITION) { + Thread t = w.thread; + if (t != null) + list.add(t); + } + } + return list; + } + } + + /** + * Setup to support compareAndSet. We need to natively implement + * this here: For the sake of permitting future enhancements, we + * cannot explicitly subclass AtomicInteger, which would be + * efficient and useful otherwise. So, as the lesser of evils, we + * natively implement using hotspot intrinsics API. And while we + * are at it, we do the same for other CASable fields (which could + * otherwise be done with atomic field updaters). + */ + private static final Unsafe unsafe = Unsafe.getUnsafe(); + private static final long stateOffset; + private static final long headOffset; + private static final long tailOffset; + private static final long waitStatusOffset; + private static final long nextOffset; + + static { + try { + stateOffset = unsafe.objectFieldOffset + (AbstractQueuedSynchronizer.class.getDeclaredField("state")); + headOffset = unsafe.objectFieldOffset + (AbstractQueuedSynchronizer.class.getDeclaredField("head")); + tailOffset = unsafe.objectFieldOffset + (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); + waitStatusOffset = unsafe.objectFieldOffset + (Node.class.getDeclaredField("waitStatus")); + nextOffset = unsafe.objectFieldOffset + (Node.class.getDeclaredField("next")); + + } catch (Exception ex) { throw new Error(ex); } + } + + /** + * CAS head field. Used only by enq. + */ + private final boolean compareAndSetHead(Node update) { + return unsafe.compareAndSwapObject(this, headOffset, null, update); + } + + /** + * CAS tail field. Used only by enq. + */ + private final boolean compareAndSetTail(Node expect, Node update) { + return unsafe.compareAndSwapObject(this, tailOffset, expect, update); + } + + /** + * CAS waitStatus field of a node. + */ + private static final boolean compareAndSetWaitStatus(Node node, + int expect, + int update) { + return unsafe.compareAndSwapInt(node, waitStatusOffset, + expect, update); + } + + /** + * CAS next field of a node. + */ + private static final boolean compareAndSetNext(Node node, + Node expect, + Node update) { + return unsafe.compareAndSwapObject(node, nextOffset, expect, update); + } +} + + diff --git a/week_03/17/ReentrantLock b/week_03/17/ReentrantLock new file mode 100644 index 0000000..d3fc968 --- /dev/null +++ b/week_03/17/ReentrantLock @@ -0,0 +1,293 @@ +##ReentrantLock---重入锁(公平和非公平) + 1.实现Lock + //锁 + void lock(); + //可中断锁 + void lockInterruptibly() throws InterruptedException; + //尝试加锁 + boolean tryLock(); + //一个时间段内尝试加锁 + boolean tryLock(long time, TimeUnit unit) throws InterruptedException; + //释放锁 + void unlock(); + //条件锁 + Condition newCondition(); +##理解 + 1.公平锁,先看下有人在用锁么,如果没有去去检查一下有没有等待加锁的节点 + 没有节点,就拿走锁,有节点就去后面排队 + 如果排队获取锁成功,就把自己的这个节点变成老大,之前的那个老大删掉,返回。 + 如果排队获取锁失败,就阻塞在这里, + 如果当前节点前面一位老哥已经取消了就把前面所有取消的,剔除队伍。 + 如果前面都在等待唤醒排队中,那自己也就老老实实在等待唤醒的队伍中。 + 2.非公平锁,直接上锁,如果上面已经有锁了,就在旁边等(可以理解为插队到等待加锁的队伍的第一个),这样他一来就2次机会。 +##源码分析 + + public class ReentrantLock implements Lock, java.io.Serializable { + private static final long serialVersionUID = 7373984872572414699L; + + /** Synchronizer providing all implementation mechanics */ + private final Sync sync; + + //模板模式 + abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = -5179523762034025860L; + + + abstract void lock(); + + //非公平锁 + final boolean nonfairTryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } + //尝试释放 + protected final boolean tryRelease(int releases) { + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); + } + setState(c); + return free; + } + //当前线程是不是独占了 + protected final boolean isHeldExclusively() { + // While we must in general read state before owner, + // we don't need to do so to check if current thread is owner + return getExclusiveOwnerThread() == Thread.currentThread(); + } + + final ConditionObject newCondition() { + return new ConditionObject(); + } + + // Methods relayed from outer class + //获取状态 + final Thread getOwner() { + return getState() == 0 ? null : getExclusiveOwnerThread(); + } + + final int getHoldCount() { + return isHeldExclusively() ? getState() : 0; + } + + final boolean isLocked() { + return getState() != 0; + } + + /** + * Reconstitutes the instance from a stream (that is, deserializes it). + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + setState(0); // reset to unlocked state + } + } + + //非公平锁 + static final class NonfairSync extends Sync { + private static final long serialVersionUID = 7316153563782823691L; + + /** + * Performs lock. Try immediate barge, backing up to normal + * acquire on failure. + */ + final void lock() { + //直接上底层改 + if (compareAndSetState(0, 1)) + //设置为当前线程独占 + setExclusiveOwnerThread(Thread.currentThread()); + else + //如果直接修改失败的话,去排队 + + acquire(1); + } + + protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); + } + } + + //公平锁 + static final class FairSync extends Sync { + private static final long serialVersionUID = -3000897897090466540L; + + final void lock() { + acquire(1); + } + + /** + * Fair version of tryAcquire. Don't grant access unless + * recursive call or no waiters or is first. + */ + protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + //获取共享变量的状态 + int c = getState(); + //没有被使用 + if (c == 0) { + //检查队列 + if (!hasQueuedPredecessors() && + //尝试修改 + compareAndSetState(0, acquires)) { + //设置独占 + setExclusiveOwnerThread(current); + return true; + } + } + //检查是不是当前线程独占 + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } + } + + //初始化 + public ReentrantLock() { + sync = new NonfairSync(); + } + + //选择初始化 + public ReentrantLock(boolean fair) { + sync = fair ? new FairSync() : new NonfairSync(); + } + + + public void lock() { + sync.lock(); + } + + //可中断加锁 + public void lockInterruptibly() throws InterruptedException { + sync.acquireInterruptibly(1); + } + + //尝试加锁(非公平加锁) + public boolean tryLock() { + return sync.nonfairTryAcquire(1); + } + + //释放锁 + public void unlock() { + sync.release(1); + } + + //条件锁 + public Condition newCondition() { + return sync.newCondition(); + } + + //占用次数 + public int getHoldCount() { + return sync.getHoldCount(); + } + + + public boolean isHeldByCurrentThread() { + return sync.isHeldExclusively(); + } + + //查询锁是否被上锁了 + public boolean isLocked() { + return sync.isLocked(); + } + + /** + * Returns {@code true} if this lock has fairness set true. + * + * @return {@code true} if this lock has fairness set true + */ + public final boolean isFair() { + return sync instanceof FairSync; + } + + //返回拥有锁的线程 + protected Thread getOwner() { + return sync.getOwner(); + } + + //查询是否还有线程在等改锁 + public final boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + //这个线程是否在等这个锁 + public final boolean hasQueuedThread(Thread thread) { + return sync.isQueued(thread); + } + + //队列长度 + public final int getQueueLength() { + return sync.getQueueLength(); + } + + //等待该锁的所有线程 + protected Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } + + //AQS中的方法 + public boolean hasWaiters(Condition condition) { + if (condition == null) + throw new NullPointerException(); + if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) + throw new IllegalArgumentException("not owner"); + return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition); + } + + //AQS中的方法---返回正在等待的线程数 + public int getWaitQueueLength(Condition condition) { + if (condition == null) + throw new NullPointerException(); + if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) + throw new IllegalArgumentException("not owner"); + return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition); + } + + //AQS中的方法--返回在等待条件的线程 + protected Collection getWaitingThreads(Condition condition) { + if (condition == null) + throw new NullPointerException(); + if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject)) + throw new IllegalArgumentException("not owner"); + return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition); + } + + /** + * Returns a string identifying this lock, as well as its lock state. + * The state, in brackets, includes either the String {@code "Unlocked"} + * or the String {@code "Locked by"} followed by the + * {@linkplain Thread#getName name} of the owning thread. + * + * @return a string identifying this lock, as well as its lock state + */ + public String toString() { + Thread o = sync.getOwner(); + return super.toString() + ((o == null) ? + "[Unlocked]" : + "[Locked by thread " + o.getName() + "]"); + } + } diff --git a/week_03/17/Semaphore b/week_03/17/Semaphore new file mode 100644 index 0000000..3942b4d --- /dev/null +++ b/week_03/17/Semaphore @@ -0,0 +1,247 @@ +##Semaphore----信号量 + 1.可以理解为锁次数的管理员 + 2.加一次锁 就消耗一次,释放一次就增加一次。 + 3.通俗的来说,可以理解为,锁的限流。 + +##代码中也存在NonfairSync,FairSync + 1.所有这个也是有公平和非公平的说法。 + +##源码分析 + + { + private static final long serialVersionUID = -3222578661600680210L; + /** All mechanics via AbstractQueuedSynchronizer subclass */ + private final Sync sync; + + /** + * Synchronization implementation for semaphore. Uses AQS state + * to represent permits. Subclassed into fair and nonfair + * versions. + */ + abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 1192457210091910933L; + //初始化,把许可次数放进去 + Sync(int permits) { + setState(permits); + } + //获取一个许可 + final int getPermits() { + return getState(); + } + //默认非公平 + final int nonfairTryAcquireShared(int acquires) { + for (;;) { + int available = getState(); + //用剩余的许可总数减去马上用的这个 + int remaining = available - acquires; + //如果小于0或者已经更改成功了就返回 + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + //归还许可 + protected final boolean tryReleaseShared(int releases) { + for (;;) { + int current = getState(); + int next = current + releases; + if (next < current) // overflow + throw new Error("Maximum permit count exceeded"); + if (compareAndSetState(current, next)) + return true; + } + } + //减少许可证 + final void reducePermits(int reductions) { + for (;;) { + //当前剩余许可证 + int current = getState(); + int next = current - reductions; + if (next > current) // underflow + throw new Error("Permit count underflow"); + //把许可次数换成减少之后的 + if (compareAndSetState(current, next)) + return; + } + } + //销毁许可 + final int drainPermits() { + for (;;) { + int current = getState(); + if (current == 0 || compareAndSetState(current, 0)) + return current; + } + } + } + + //非公平获取一个许可 + static final class NonfairSync extends Sync { + private static final long serialVersionUID = -2694183684443567898L; + + NonfairSync(int permits) { + super(permits); + } + + protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); + } + } + + //公平的获取一个许可-----检查一下同步队列 + static final class FairSync extends Sync { + private static final long serialVersionUID = 2014338818796000944L; + + FairSync(int permits) { + super(permits); + } + + protected int tryAcquireShared(int acquires) { + for (;;) { + if (hasQueuedPredecessors()) + return -1; + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + } + + //指定许可次数 + public Semaphore(int permits) { + sync = new NonfairSync(permits); + } + + //指定许可次数,并且指定公平和非公平 + public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); + } + + //获取一个许可--可以中断,失败就要去排队 + public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + //获取一个许可--失败就排队 + public void acquireUninterruptibly() { + sync.acquireShared(1); + } + + //可中断获取许可成功/失败,如果失败,不排队 + public boolean tryAcquire() { + return sync.nonfairTryAcquireShared(1) >= 0; + } + + //获取许可,失败就在一段时间里获取,超时false,不在队伍中 + public boolean tryAcquire(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + } + + //释放许可-- + public void release() { + sync.releaseShared(1); + } + + //获取多个许可--可中断 + public void acquire(int permits) throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireSharedInterruptibly(permits); + } + //一次获取多个许可--不断尝试 + public void acquireUninterruptibly(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireShared(permits); + } + + //一次获取多个-只是一次 + public boolean tryAcquire(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + return sync.nonfairTryAcquireShared(permits) >= 0; + } + //尝试获取多个--一段时间内没有获取到则返回 + public boolean tryAcquire(int permits, long timeout, TimeUnit unit) + throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); + } + + //释放多个许可 + public void release(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.releaseShared(permits); + } + + //获取许可次数 + public int availablePermits() { + return sync.getPermits(); + } + + //销毁剩余的许可次数, + public int drainPermits() { + return sync.drainPermits(); + } + + //减少许可次数 + protected void reducePermits(int reduction) { + if (reduction < 0) throw new IllegalArgumentException(); + sync.reducePermits(reduction); + } + //判断非公平 + public boolean isFair() { + return sync instanceof FairSync; + } + + /** + * Queries whether any threads are waiting to acquire. Note that + * because cancellations may occur at any time, a {@code true} + * return does not guarantee that any other thread will ever + * acquire. This method is designed primarily for use in + * monitoring of the system state. + * + * @return {@code true} if there may be other threads waiting to + * acquire the lock + */ + public final boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); + } + + /** + * Returns an estimate of the number of threads waiting to acquire. + * The value is only an estimate because the number of threads may + * change dynamically while this method traverses internal data + * structures. This method is designed for use in monitoring of the + * system state, not for synchronization control. + * + * @return the estimated number of threads waiting for this lock + */ + public final int getQueueLength() { + return sync.getQueueLength(); + } + + /** + * Returns a collection containing threads that may be waiting to acquire. + * Because the actual set of threads may change dynamically while + * constructing this result, the returned collection is only a best-effort + * estimate. The elements of the returned collection are in no particular + * order. This method is designed to facilitate construction of + * subclasses that provide more extensive monitoring facilities. + * + * @return the collection of threads + */ + protected Collection getQueuedThreads() { + return sync.getQueuedThreads(); + } + + /** + * Returns a string identifying this semaphore, as well as its state. + * The state, in brackets, includes the String {@code "Permits ="} + * followed by the number of permits. + * + * @return a string identifying this semaphore, as well as its state + */ + public String toString() { + return super.toString() + "[Permits = " + sync.getPermits() + "]"; + } +} \ No newline at end of file diff --git a/week_03/17/synchronized b/week_03/17/synchronized new file mode 100644 index 0000000..b837d5d --- /dev/null +++ b/week_03/17/synchronized @@ -0,0 +1,51 @@ +##synchronized----同步 + 1.我们都知道保证同步可以用这个实现。 + 及对变量加锁,操作,解锁 + 加锁会保证只有一个线程在用---及中间不会有其他干扰,是原子的 + 解锁的时候,会将新的数据放到主内存在,是可见的 + 加锁的过程是同步队列排队进行的,所以也是有序的。 + 2.是个可重入锁,非公平的锁 + 3.偏向锁,轻量锁,重量锁 + 解释:偏向锁是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。 + 轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。 + 重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低 + + 注意:多个synchronized只有锁的是同一个对象,它们之间的代码才是同步的,这一点在使用synchronized的时候一定要注意. + synchronized只是jvm级别的锁(就是都放在一台tomcat上是没有问题的), + 但是,现在有很多都采用分布式的,这个就满足不了,就会用到分布式锁。 + + +##分布式锁 + 1.通过之前对锁的理解,多个线程维护同一个共享变量,依据该变量则可以保证。 + + 那么分布式我们可以理解为几个线程(一台服务器看成一个线程),共同去维护同一个变量就可以实现了。 + + 可以利用: + 1.mysql数据库主键唯一性。 + // 加锁、释放锁必须使用同一个session。 + // 会单独开一个session来维护这个,所有增加开销以及数据库的压力。 + 2.redis的key唯一性,涉及方法setex方法,以及它的lau脚本 + 推荐使用redisson,能解决,加锁和释放之间的原子性,锁的唯一性,以及锁续命的效果 + 缺点:集群模式下会在所有master节点执行加锁命令,大部分(2N+1)成功了则获得锁,节点越多,加锁的过程越慢; + 高并发情况下,未获得锁的线程会睡眠重试,如果同一把锁竞争非常激烈,会占用非常多的系统资源 + + + 3.ZK临时有序节点 + 因为ZK相同的节点只能创建一个。 + 有序节点可以解决,惊群效应; + 大家根据自己创建的顺序去依次获取,最后一个用完之后,检查后面没有了,就把锁删掉 + 和公平锁的一样的道理,排队获取。 + 有现有的轮子curator可以使用,又互斥锁,读写锁、多重锁、信号量 + + 缺点: 加锁会频繁地“写”zookeeper,增加zookeeper的压力 + 写zookeeper的时候会在集群进行同步,节点数越多,同步越慢,获取锁的过程越慢; + 依赖zookeeper,如果本系统没有依赖的话,相对增加了复杂度 + + 惊群效应:都监听了加锁那个老哥有没有释放锁,如果释放了,一窝蜂的就去抢锁,但是又只有一个能创建成功 + 就会产生不必要的开销。 + + + + + + \ No newline at end of file diff --git a/week_03/17/volatile b/week_03/17/volatile new file mode 100644 index 0000000..8d614e2 --- /dev/null +++ b/week_03/17/volatile @@ -0,0 +1,21 @@ +##volatile---可见性,有序性 + 1.保证了可见性。 + 被volatile修饰的变量,只要一被改动,其他的线程就都知道了。 + 2.禁止重排序 + 在JVM中,有时候会提速,你所有的变量,可能不会按照你的代码那样顺序的做 + 例如:int a=1; int b=2; + 我们会觉得 应该是先初始a=1,再b=2。但是JVM的时候,有可能是先b,再a的。 + 这种被成为代码重新排序。 + 实现方法:内存屏障 + 内存屏障的作用: + (1)阻止屏障两侧的指令重排序; + (2)强制把写缓冲区/高速缓存中的数据回写到主内存,让缓存中相应的数据失效 + (个人理解可见性)在我们主内存外面还有一层消息总线触发机制,类似与关卡。 + 其他线程和这个消息总线之间,有类似与发布订阅的关系。所以当我们其中一个线程对这个变量做了修改,并且 + 写回主内存的时候,就回触发该机制,其他线程就会再去主内存中获取这个新的值。 +##缺点---不能保证原子性 + 因为我们程序中的lang和double是高32+低32组成的,所以建议将共享的这两个类型用volatile修饰(因为它能保证单次读写的原子性) + 这两个类是由2次操作完成的 + + + \ No newline at end of file diff --git "a/week_03/17/\345\206\205\345\255\230\346\250\241\345\236\213" "b/week_03/17/\345\206\205\345\255\230\346\250\241\345\236\213" new file mode 100644 index 0000000..e9498de --- /dev/null +++ "b/week_03/17/\345\206\205\345\255\230\346\250\241\345\236\213" @@ -0,0 +1,32 @@ +##java内存模型 + 1.组成 + 1.主内存 + 2.多个工作线程---多个工作内存 + 类似: + 主 | 工作内存--------------java线程 + | 工作内存--------------java线程 + 内 | 工作内存--------------java线程 + | 工作内存--------------java线程 + 存 | 工作内存--------------java线程 + 3.存的东西 + 主内存是被用来java线程之间的数据交互,因为java线程之间是不能访问的,数据都是通过主内存交互。 + 工作内存,会用来存一些局部变量,和方法的。 + +##内存区域 + 1.虚拟机栈,本地方法栈,PC寄存器,方法区,堆 + + 2.还有一个元空间(1.8之前用的java内存,1.8开始是直接在主内存上申请了一块) + +##GC + 1.过程 + 年轻代 老年代 本地内存 + Eden------S0----S1,oldMe, 元空间 + 1.当Eden园满了之后会触发minorGC,这个时候会将Eden中还存在的对象 移动S区域。 + 2.这个对象会在S0-S1周而复始(minorGC一次,对象年龄+1),当这个对象的年龄在15岁的时候,会放到老年代去。 + 3.如果老年代也满了,就会触发FullGC(如果有比较大的对象,会直接放到老年代中) + + 对象的年龄是放在对象头中的 + + + + -- Gitee