diff --git a/week_03/54/AQS-054.md b/week_03/54/AQS-054.md new file mode 100644 index 0000000000000000000000000000000000000000..9b09fcf771dcdf78bd51cd2515fc122aa39023f4 --- /dev/null +++ b/week_03/54/AQS-054.md @@ -0,0 +1,494 @@ +##AbstractQueuedSynchronizer + +AbstractQueuedSynchronizer(简写为AQS)继承AbstractOwnableSynchronizer类 +```java +public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer +implements java.io.Serializable { +``` +####AQS成员 +```java +/** + * Head of the wait queue, lazily initialized. Except for + * initialization, it is modified only via method setHead. Note: + * If head exists, its waitStatus is guaranteed not to be + * CANCELLED. + * 等待队列的头,延迟初始化。除了初始化,它只通过方法setHead进行修改。注意:如果头部存在,它的等待状态就不保证 + */ +private transient volatile Node head; + +/** + * Tail of the wait queue, lazily initialized. Modified only via + * method enq to add new wait node. + * 等待队列的尾部,延迟初始化。仅通过方法enq修改以添加新的等待节点。 + */ +private transient volatile Node tail; + +/** + * The synchronization state.同步状态 + */ +private volatile int state; +``` +```java +//节点的状态值和Node类的初始化 +static final class Node { + /** Marker to indicate a node is waiting in shared mode */ + static final Node SHARED = new Node(); + /** Marker to indicate a node is waiting in exclusive mode */ + static final Node EXCLUSIVE = null; + + /** waitStatus value to indicate thread has cancelled */ + static final int CANCELLED = 1; + /** waitStatus value to indicate successor's thread needs unparking */ + static final int SIGNAL = -1; + /** waitStatus value to indicate thread is waiting on condition */ + static final int CONDITION = -2; + /** + * waitStatus value to indicate the next acquireShared should + * unconditionally propagate + */ + static final int PROPAGATE = -3; + + /** + * Status field, taking on only the values: + * SIGNAL: The successor of this node is (or will soon be) + * blocked (via park), so the current node must + * unpark its successor when it releases or + * cancels. To avoid races, acquire methods must + * first indicate they need a signal, + * then retry the atomic acquire, and then, + * on failure, block. + * CANCELLED: This node is cancelled due to timeout or interrupt. + * Nodes never leave this state. In particular, + * a thread with cancelled node never again blocks. + * CONDITION: This node is currently on a condition queue. + * It will not be used as a sync queue node + * until transferred, at which time the status + * will be set to 0. (Use of this value here has + * nothing to do with the other uses of the + * field, but simplifies mechanics.) + * PROPAGATE: A releaseShared should be propagated to other + * nodes. This is set (for head node only) in + * doReleaseShared to ensure propagation + * continues, even if other operations have + * since intervened. + * 0: None of the above + * + * The values are arranged numerically to simplify use. + * Non-negative values mean that a node doesn't need to + * signal. So, most code doesn't need to check for particular + * values, just for sign. + * + * The field is initialized to 0 for normal sync nodes, and + * CONDITION for condition nodes. It is modified using CAS + * (or when possible, unconditional volatile writes). + */ + volatile int waitStatus; + + /** + * Link to predecessor node that current node/thread relies on + * for checking waitStatus. Assigned during enqueuing, and nulled + * out (for sake of GC) only upon dequeuing. Also, upon + * cancellation of a predecessor, we short-circuit while + * finding a non-cancelled one, which will always exist + * because the head node is never cancelled: A node becomes + * head only as a result of successful acquire. A + * cancelled thread never succeeds in acquiring, and a thread only + * cancels itself, not any other node. + */ + volatile Node prev; + + /** + * Link to the successor node that the current node/thread + * unparks upon release. Assigned during enqueuing, adjusted + * when bypassing cancelled predecessors, and nulled out (for + * sake of GC) when dequeued. The enq operation does not + * assign next field of a predecessor until after attachment, + * so seeing a null next field does not necessarily mean that + * node is at end of queue. However, if a next field appears + * to be null, we can scan prev's from the tail to + * double-check. The next field of cancelled nodes is set to + * point to the node itself instead of null, to make life + * easier for isOnSyncQueue. + */ + volatile Node next; + + /** + * The thread that enqueued this node. Initialized on + * construction and nulled out after use. + */ + volatile Thread thread; + + /** + * Link to next node waiting on condition, or the special + * value SHARED. Because condition queues are accessed only + * when holding in exclusive mode, we just need a simple + * linked queue to hold nodes while they are waiting on + * conditions. They are then transferred to the queue to + * re-acquire. And because conditions can only be exclusive, + * we save a field by using special value to indicate shared + * mode. + */ + Node nextWaiter; + + /** + * Returns true if node is waiting in shared mode. + */ + final boolean isShared() { + return nextWaiter == SHARED; + } + + /** + * Returns previous node, or throws NullPointerException if null. + * Use when predecessor cannot be null. The null check could + * be elided, but is present to help the VM. + * + * @return the predecessor of this node + */ + final Node predecessor() throws NullPointerException { + Node p = prev; + if (p == null) + throw new NullPointerException(); + else + return p; + } + + Node() { // Used to establish initial head or SHARED marker + } + + Node(Thread thread, Node mode) { // Used by addWaiter + this.nextWaiter = mode; + this.thread = thread; + } + + Node(Thread thread, int waitStatus) { // Used by Condition + this.waitStatus = waitStatus; + this.thread = thread; + } +} +``` +####构造方法 +```java +/** + * Creates a new {@code AbstractQueuedSynchronizer} instance + * with initial synchronization state of zero. 会初始化state为0 + */ +protected AbstractQueuedSynchronizer() { } +``` +####主要方法 + +**state** +```java +/** 获取state值 + * Returns the current value of synchronization state. + * This operation has memory semantics of a {@code volatile} read. + * @return current state value + */ +protected final int getState() { + return state; +} +``` +```java +/** 设置一个state值 + * Sets the value of synchronization state. + * This operation has memory semantics of a {@code volatile} write. + * @param newState the new state value + */ +protected final void setState(int newState) { + state = newState; +} +``` +```java +/** CAS操作设置state值,返回结果是boolean,利用了Unsafe类 + * Atomically sets synchronization state to the given updated + * value if the current state value equals the expected value. + * This operation has memory semantics of a {@code volatile} read + * and write. + * + * @param expect the expected value + * @param update the new value + * @return {@code true} if successful. False return indicates that the actual + * value was not equal to the expected value. + */ +protected final boolean compareAndSetState(int expect, int update) { + // See below for intrinsics setup to support this + return unsafe.compareAndSwapInt(this, stateOffset, expect, update); +} +``` +**其他方法** +```java + /** + * Creates and enqueues node for current thread and given mode.为当前线程和给定模式创建和排队节点 + * + * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared + * @return the new node + */ +private Node addWaiter(Node mode) { + Node node = new Node(Thread.currentThread(), mode);//新建一个新的节点,把当前线程实例放入里面 + // Try the fast path of enq; backup to full enq on failure + Node pred = tail;//获取尾部 + if (pred != null) { + node.prev = pred; + //设置尾部的指针 + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + enq(node); + return node; +} +``` +```java +/** + * Inserts node into queue, initializing if necessary. See picture above. + * @param node the node to insert + * @return node's predecessor + */ +private Node enq(final Node node) { + for (;;) { + Node t = tail; + //表示队列为空 + if (t == null) { // Must initialize + //设置头部指针 + if (compareAndSetHead(new Node())) + tail = head; + } else { + node.prev = t; + //设置尾部指针 + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } +} +``` +```java +/** + * Sets head of queue to be node, thus dequeuing. Called only by + * acquire methods. Also nulls out unused fields for sake of GC + * and to suppress unnecessary signals and traversals. + * + * @param node the node + */ + //设置头部节点 +private void setHead(Node node) { + head = node; + node.thread = null; + node.prev = null; +} +``` +```java +/** + * Acquires in exclusive mode, ignoring interrupts. Implemented + * by invoking at least once {@link #tryAcquire}, + * returning on success. Otherwise the thread is queued, possibly + * repeatedly blocking and unblocking, invoking {@link + * #tryAcquire} until success. This method can be used + * to implement method {@link Lock#lock}. + * + * @param arg the acquire argument. This value is conveyed to + * {@link #tryAcquire} but is otherwise uninterpreted and + * can represent anything you like. + */ + //尝试获取线程里面的 +public final void acquire(int arg) { + //tryAcquire尝试获取锁失败 + //acquireQueued 尝试从队列里面获取节点 addWaiter添加节点到尾部 + //selfInterrupt中断 + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} +``` +```java +//以独占不间断模式获取已在中的线程排队。用于条件等待方法以及获取 +/** + * Acquires in exclusive uninterruptible mode for thread already in + * queue. Used by condition wait methods as well as acquire. + * + * @param node the node + * @param arg the acquire argument + * @return {@code true} if interrupted while waiting + */ +final boolean acquireQueued(final Node node, int arg) { + //初始化 failed为true + boolean failed = true; + try { + //默认不会中断 + boolean interrupted = false; + //无限循环获取锁 + for (;;) { + //获取当前节点的前节点 这里没有就会抛出空指针异常 + final Node p = node.predecessor(); + //判断是否为头节点 并获取锁,如果是将当前节点设置为头节点,然后返回false,结束循环 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + //判断是否阻塞当前线程 + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + //失败则取消 + if (failed) + cancelAcquire(node); + } +} +``` +```java +/** waitStatus value to indicate thread has cancelled */ +static final int CANCELLED = 1;//waitStatus值,指示线程已取消 +/** waitStatus value to indicate successor's thread needs unparking */ +static final int SIGNAL = -1;//waitStatus值,指示后续线程需要断开连接 +/** waitStatus value to indicate thread is waiting on condition */ +static final int CONDITION = -2;//waitStatus值,指示线程正在等待条件 +/** + * waitStatus value to indicate the next acquireShared should + * unconditionally propagate + */ +static final int PROPAGATE = -3;//指示下一个acquireShared应无条件传播 +``` +```java +/** + * Checks and updates status for a node that failed to acquire. + * Returns true if thread should block. This is the main signal + * control in all acquire loops. Requires that pred == node.prev. + * + * @param pred node's predecessor holding status + * @param node the node + * @return {@code true} if thread should block + */ +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus;//获取前节点等待状态 + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + return true; + if (ws > 0) { + //表示线程已取消 + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry.跳过前置任务并指示重试 + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + //表示状态为CONDITION或者PROPAGATE(初始化状态) + /* + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking. + */ + //需要线程为signal,不是中断,用CAS不断去比较前节点的当前等待状态和SIGNAL + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} +``` +```java +/** + * Cancels an ongoing attempt to acquire. + * + * @param node the node + */ +private void cancelAcquire(Node node) { + // Ignore if node doesn't exist + //节点不存在 + if (node == null) + return; + + node.thread = null; + + // Skip cancelled predecessors + Node pred = node.prev; + //跳过已被取消的节点 找出<=0的节点 + while (pred.waitStatus > 0) + node.prev = pred = pred.prev; + + // predNext is the apparent node to unsplice. CASes below will + // fail if not, in which case, we lost race vs another cancel + // or signal, so no further action is necessary. + Node predNext = pred.next; + + // Can use unconditional write instead of CAS here. + // After this atomic step, other Nodes can skip past us. + // Before, we are free of interference from other threads. + node.waitStatus = Node.CANCELLED;状态设置为取消 + + // If we are the tail, remove ourselves.//如果为尾节点,就移除 + if (node == tail && compareAndSetTail(node, pred)) { + compareAndSetNext(pred, predNext, null); + } else { + // If successor needs signal, try to set pred's next-link + // so it will get one. Otherwise wake it up to propagate. + int ws; + //pred != head 前节点不是头节点 + //(ws = pred.waitStatus) == Node.SIGNAL 把前节点的stuts赋值给ws 判断是否为-1 + //(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) 如果是为-2或者-3 用CAS改成-1 + //pred.thread != null 前节点线程不为null + if (pred != head && + ((ws = pred.waitStatus) == Node.SIGNAL || + (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && + pred.thread != null) { + Node next = node.next; + //判断后节点不为null和status<=0就用cas把前节点和后节点连接上 + if (next != null && next.waitStatus <= 0) + compareAndSetNext(pred, predNext, next); + } else { + //节点不是在末尾,或者前节点为头节点或者为null + //就启动继任者线程 + unparkSuccessor(node); + } + + node.next = node; // help GC + } +} +``` +```java +/** + * Wakes up node's successor, if one exists.如果存在的话,就唤醒节点的继承者。 + * + * @param node the node + */ +private void unparkSuccessor(Node node) { + /* + * If status is negative (i.e., possibly needing signal) try + * to clear in anticipation of signalling. (如果状态为负(即,可能需要信号),则尝试在收到信号之前清除) + * It is OK if this fails or if status is changed by waiting thread. + * 如果失败或状态由等待线程更改就可以 + */ + int ws = node.waitStatus;//获取等待状态 + if (ws < 0) + //更改状态为0 + compareAndSetWaitStatus(node, ws, 0); + + /* + * Thread to unpark is held in successor, which is normally + * just the next node. But if cancelled or apparently null, + * traverse backwards from tail to find the actual + * non-cancelled successor. + * 要断开连接的线程保存在后续节点中,后者通常只是下一个节点。 + * 但如果被取消或明显无效,则从尾部向后遍历以找到实际的未被取消的继承者 + */ + Node s = node.next; + //线程为null或者取消状态 + if (s == null || s.waitStatus > 0) { + s = null; + //找到后面小于等于0的节点启动它 + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + LockSupport.unpark(s.thread); +} +``` diff --git "a/week_03/54/JAVA\345\206\205\345\255\230\346\250\241\345\236\213-054.md" "b/week_03/54/JAVA\345\206\205\345\255\230\346\250\241\345\236\213-054.md" new file mode 100644 index 0000000000000000000000000000000000000000..ad9623c524c19901c5e2cd616cd1502d75b819cc --- /dev/null +++ "b/week_03/54/JAVA\345\206\205\345\255\230\346\250\241\345\236\213-054.md" @@ -0,0 +1,58 @@ +##Java内存模型 + +参考链接:https://www.jianshu.com/p/8a58d8335270 +https://mp.weixin.qq.com/s/jownTN--npu3o8B4c3sbeA + +####JMM(Java内存模型) +JMM定义了JVM在计算机内存的工作方式。JVM是整个计算机虚拟模型,所以JVM是隶属于JMM的。 +从抽象的角度来看,JMM定义了线程和主内存之间的抽象关系: +线程之间的共享变量存储在主内存(Main Memory)中,每个线程都有一个私有的本地内存(Local Memory), +本地内存中存储了该线程以读/写共享变量的副本。本地内存是JMM的一个抽象概念,并不真实存在。 +它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。 + + +####支持Java内存模型的基础原理 +#####1.指令重排序 +为了优化程序性能,编译器和处理器会对指令进行重排序。 +但是在JMM确保不同的编译器和处理器平台上面,通过插入特定的Memory Barrier(内存屏障)来禁止重排序, +为上层内存提供一致的内存可见性。 +#####2.数据依赖 +如果两个操作访问同一个变量,其中一个为写操作,此时这两个操作之间存在依赖性。 +编译器和处理器不会进行重排序。 +#####3.as-if-serial +不管怎么重排序,都不能影响单线程下面执行的结果,编译器、处理器、runtime都必须遵守as-if-serial原则 +#####4.内存屏障(Memory Barrier) +通过内存屏障可以禁止重排序,它其实是一个CPU指令: +1.保证特定操作执行的顺序; +2.影响某些内存的可见性。 + +#####5.happens-before(先行发生原则) +A先行B发生,A的操作所产生的影响能被B所感应到,这种影响包括修改了共享内存中变量的值、发送了消息、调用了方法等。 +Java内存模型中定义的先行发生原则: +1.程序次序原则 +在一个线程内,按照程序书写的顺序执行,书写在前面的操作先行发生于书写在后面的操作, +准确地讲是控制流顺序而不是代码顺序,因为要考虑分支、循环等情况。 +2.监视器锁定原则 +一个unlock操作先行发生于后面对同一个锁的lock操作。 +3.volatile原则 +对一个volatile变量的写操作先行发生于后面对该变量的读操作。 +4.线程启动原则 +对线程的start()操作先行发生于线程内的任何操作。 +5.线程终止原则 +线程中的所有操作先行发生于检测到线程终止,可以通过Thread.join()、Thread.isAlive()的返回值检测线程是否已经终止。 +6.线程中断原则 +对线程的interrupt()的调用先行发生于线程的代码中检测到中断事件的发生,可以通过Thread.interrupted()方法检测是否发生中断。 +7.对象终结原则 +一个对象的初始化完成(构造方法执行结束)先行发生于它的finalize()方法的开始。 +8.传递性原则 +如果操作A先行发生于操作B,操作B先行发生于操作C,那么操作A先行发生于操作C。 +这里说的“先行发生”与“时间上的先发生”没有必然的关系。 + +####Java内存模型带来的问题 +#####1.可见性问题 +对于在CPU缓存里面对变量进行修改,没有flush到主内存中,对其他CPU线程不可见。 +Java利用volatile关键字或者加锁可以解决 +#####2.竞争现象 +对于多个线程同时请求某一个资源,我们可以用synchronized来解决这个问题 + + diff --git a/week_03/54/ReentrantLock-054.md b/week_03/54/ReentrantLock-054.md new file mode 100644 index 0000000000000000000000000000000000000000..37713c6cf1d29c382e2d7971d9c69d428b69afae --- /dev/null +++ b/week_03/54/ReentrantLock-054.md @@ -0,0 +1,461 @@ +##ReentrantLock + +####构造方法 +```java +/** + * Creates an instance of {@code ReentrantLock}. + * This is equivalent to using {@code ReentrantLock(false)}. + */ +public ReentrantLock() { + sync = new NonfairSync(); +} +``` +```java +/** + * Creates an instance of {@code ReentrantLock} with the + * given fairness policy. + * + * @param fair {@code true} if this lock should use a fair ordering policy + */ +public ReentrantLock(boolean fair) { + sync = fair ? new FairSync() : new NonfairSync(); +} +``` + +上面两者的区别就是传参和没有传参需要选择公平锁和非公平锁 + +####主要实现类以及方法 +核心方法继承了AQS类 +```java +/** + * Base of synchronization control for this lock. Subclassed + * into fair and nonfair versions below. Uses AQS state to + * represent the number of holds on the lock. + */ +abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = -5179523762034025860L; + + /** + * Performs {@link Lock#lock}. The main reason for subclassing + * is to allow fast path for nonfair version. + */ + abstract void lock(); + + /** + * Performs non-fair tryLock. tryAcquire is implemented in + * subclasses, but both need nonfair try for trylock method. + */ + final boolean nonfairTryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } + + protected final boolean tryRelease(int releases) { + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); + } + setState(c); + return free; + } + + protected final boolean isHeldExclusively() { + // While we must in general read state before owner, + // we don't need to do so to check if current thread is owner + return getExclusiveOwnerThread() == Thread.currentThread(); + } + + final ConditionObject newCondition() { + return new ConditionObject(); + } + + // Methods relayed from outer class + + final Thread getOwner() { + return getState() == 0 ? null : getExclusiveOwnerThread(); + } + + final int getHoldCount() { + return isHeldExclusively() ? getState() : 0; + } + + final boolean isLocked() { + return getState() != 0; + } + + /** + * Reconstitutes the instance from a stream (that is, deserializes it). + */ + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + setState(0); // reset to unlocked state + } +} +``` + +#####非公平锁 +```java +/** + * Sync object for non-fair locks + */ +static final class NonfairSync extends Sync { + private static final long serialVersionUID = 7316153563782823691L; + + /** + * Performs lock. Try immediate barge, backing up to normal + * acquire on failure. + */ + final void lock() { + //上锁 + //以CAS方式把state变为1 + if (compareAndSetState(0, 1)) + setExclusiveOwnerThread(Thread.currentThread());//获取锁成功,标记当前线程为锁的持有者,然后直接返回 + else + acquire(1);//失败 + } + + protected final boolean tryAcquire(int acquires) { + //非公平获取 + return nonfairTryAcquire(acquires); + } +} +``` +AQS里面的方法 +```java +/** + * Acquires in exclusive mode, ignoring interrupts. Implemented + * by invoking at least once {@link #tryAcquire}, + * returning on success. Otherwise the thread is queued, possibly + * repeatedly blocking and unblocking, invoking {@link + * #tryAcquire} until success. This method can be used + * to implement method {@link Lock#lock}. + * + * @param arg the acquire argument. This value is conveyed to + * {@link #tryAcquire} but is otherwise uninterpreted and + * can represent anything you like. + */ +public final void acquire(int arg) { + //三个判断 第一个尝试获取该锁 会默认抛出UnsupportedOperationException异常, + //第二个 获取当前线程的头节点状态 + //第三个表示中断 + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} +``` +```java +//以独占不间断模式获取已在中的线程排队。用于条件等待方法以及获取 +/** + * Acquires in exclusive uninterruptible mode for thread already in + * queue. Used by condition wait methods as well as acquire. + * + * @param node the node + * @param arg the acquire argument + * @return {@code true} if interrupted while waiting + */ +final boolean acquireQueued(final Node node, int arg) { + //初始化 failed为true + boolean failed = true; + try { + //默认不会中断 + boolean interrupted = false; + //无限循环获取锁 + for (;;) { + //获取当前节点的前节点 这里没有就会抛出空指针异常 + final Node p = node.predecessor(); + //判断是否为头节点 并获取锁,如果是将当前节点设置为头节点,然后返回false,结束循环 + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + //判断是否阻塞当前线程 + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + //失败则取消 + if (failed) + cancelAcquire(node); + } +} +``` +```java +/** waitStatus value to indicate thread has cancelled */ +static final int CANCELLED = 1;//waitStatus值,指示线程已取消 +/** waitStatus value to indicate successor's thread needs unparking */ +static final int SIGNAL = -1;//waitStatus值,指示后续线程需要断开连接 +/** waitStatus value to indicate thread is waiting on condition */ +static final int CONDITION = -2;//waitStatus值,指示线程正在等待条件 +/** + * waitStatus value to indicate the next acquireShared should + * unconditionally propagate + */ +static final int PROPAGATE = -3;//指示下一个acquireShared应无条件传播 +``` +```java +/** + * Checks and updates status for a node that failed to acquire. + * Returns true if thread should block. This is the main signal + * control in all acquire loops. Requires that pred == node.prev. + * + * @param pred node's predecessor holding status + * @param node the node + * @return {@code true} if thread should block + */ +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus;//获取前节点等待状态 + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + return true; + if (ws > 0) { + //表示线程已取消 + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry.跳过前置任务并指示重试 + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + //表示状态为CONDITION或者PROPAGATE(初始化状态) + /* + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking. + */ + //需要线程为signal,不是中断,用CAS不断去比较前节点的当前等待状态和SIGNAL + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} +``` +```java +/** + * Sets the thread that currently owns exclusive access. + * A {@code null} argument indicates that no thread owns access. + * This method does not otherwise impose any synchronization or + * {@code volatile} field accesses. + * @param thread the owner thread + */ +protected final void setExclusiveOwnerThread(Thread thread) { + exclusiveOwnerThread = thread; +} +``` +```java +/** + * Performs non-fair tryLock. tryAcquire is implemented in + * subclasses, but both need nonfair try for trylock method. + */ +final boolean nonfairTryAcquire(int acquires) { + //当前线程 + final Thread current = Thread.currentThread(); + //获取state + int c = getState(); + //表示没有被任何线程持有 + if (c == 0) { + //利用CAS更新state值 + if (compareAndSetState(0, acquires)) { + //设置当前线程为锁的持有者 + setExclusiveOwnerThread(current); + return true;//获取成功,非重入 + } + } + //如果当前线程为锁的持有者,更新state获取次数,说明该锁被重入了 + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow 表示溢出 + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false;//获取失败 +} +``` +#####公平锁 +```java + /** + * Sync object for fair locks + */ +static final class FairSync extends Sync { + private static final long serialVersionUID = -3000897897090466540L; + + final void lock() { + //尝试获取锁,少了非重入获取锁的方法 + acquire(1); + } + + /** + * Fair version of tryAcquire. Don't grant access unless + * recursive call or no waiters or is first. + */ + protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + //没有锁的情况 判断线程的优先级 CAS替换state的值 设置当前线程为锁的持有者 + if (!hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + //存在锁,计算state次数 + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; + } +} +``` +```java +/** + * Queries whether any threads have been waiting to acquire longer + * than the current thread. + * + *
An invocation of this method is equivalent to (but may be + * more efficient than): + *
{@code
+ * getFirstQueuedThread() != Thread.currentThread() &&
+ * hasQueuedThreads()}
+ *
+ * Note that because cancellations due to interrupts and + * timeouts may occur at any time, a {@code true} return does not + * guarantee that some other thread will acquire before the current + * thread. Likewise, it is possible for another thread to win a + * race to enqueue after this method has returned {@code false}, + * due to the queue being empty. + * + *
This method is designed to be used by a fair synchronizer to + * avoid barging. + * Such a synchronizer's {@link #tryAcquire} method should return + * {@code false}, and its {@link #tryAcquireShared} method should + * return a negative value, if this method returns {@code true} + * (unless this is a reentrant acquire). For example, the {@code + * tryAcquire} method for a fair, reentrant, exclusive mode + * synchronizer might look like this: + * + *
{@code
+ * protected boolean tryAcquire(int arg) {
+ * if (isHeldExclusively()) {
+ * // A reentrant acquire; increment hold count
+ * return true;
+ * } else if (hasQueuedPredecessors()) {
+ * return false;
+ * } else {
+ * // try to acquire normally
+ * }
+ * }}
+ *
+ * @return {@code true} if there is a queued thread preceding the
+ * current thread, and {@code false} if the current thread
+ * is at the head of the queue or the queue is empty
+ * @since 1.7
+ */
+public final boolean hasQueuedPredecessors() {
+ //判断是否有优先级的锁
+ // The correctness of this depends on head being initialized
+ // before tail and on head.next being accurate if the current
+ // thread is first in queue.
+ Node t = tail; // Read fields in reverse initialization order
+ Node h = head;
+ Node s;
+ return h != t &&
+ ((s = h.next) == null || s.thread != Thread.currentThread());
+}
+```
+**释放**
+```java
+protected final boolean tryRelease(int releases) {
+ int c = getState() - releases;//重置state
+ //判断当前线程和锁的持有线程,不等抛出IllegalMonitorStateException异常
+ if (Thread.currentThread() != getExclusiveOwnerThread())
+ throw new IllegalMonitorStateException();
+ boolean free = false;//释放成功标志
+ if (c == 0) {
+ //如果state次数相等,释放线程
+ free = true;//更改释放成功标志
+ setExclusiveOwnerThread(null);
+ }
+ setState(c);设置state状态
+ return free;
+}
+```
+```java
+//获取线程失败的加入同步队列
+ /**
+ * Creates and enqueues node for current thread and given mode.为当前线程和给定模式创建和排队节点
+ *
+ * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
+ * @return the new node
+ */
+private Node addWaiter(Node mode) {
+ Node node = new Node(Thread.currentThread(), mode);//新建一个新的节点,把当前线程实例放入里面
+ // Try the fast path of enq; backup to full enq on failure
+ Node pred = tail;//获取尾部
+ if (pred != null) {
+ node.prev = pred;
+ //设置尾部的指针
+ if (compareAndSetTail(pred, node)) {
+ pred.next = node;
+ return node;
+ }
+ }
+ enq(node);
+ return node;
+}
+```
+```java
+/**
+ * Inserts node into queue, initializing if necessary. See picture above.
+ * @param node the node to insert
+ * @return node's predecessor
+ */
+private Node enq(final Node node) {
+ for (;;) {
+ Node t = tail;
+ //表示队列为空
+ if (t == null) { // Must initialize
+ //设置头部指针
+ if (compareAndSetHead(new Node()))
+ tail = head;
+ } else {
+ node.prev = t;
+ //设置尾部指针
+ if (compareAndSetTail(t, node)) {
+ t.next = node;
+ return t;
+ }
+ }
+ }
+}
+```
diff --git a/week_03/54/Semaphore-054.md b/week_03/54/Semaphore-054.md
new file mode 100644
index 0000000000000000000000000000000000000000..11329b52951d7ef2b47f56743ff118893e1de320
--- /dev/null
+++ b/week_03/54/Semaphore-054.md
@@ -0,0 +1,279 @@
+## Semaphore
+
+### 简介
+Semaphore(信号量),内部维护一组许可证,通过acquire方法获取,如果获取不到则阻塞;
+通过release释放许可,即添加许可证。
+Semaphore也是基于AQS来实现的,分为公平和非公平
+
+#### 构造方法
+```java
+/**
+ * Creates a {@code Semaphore} with the given number of
+ * permits and nonfair fairness setting.
+ *
+ * @param permits the initial number of permits available.
+ * This value may be negative, in which case releases
+ * must occur before any acquires will be granted.
+ */
+public Semaphore(int permits) {
+ sync = new NonfairSync(permits);
+}
+```
+```java
+/**
+ * Creates a {@code Semaphore} with the given number of
+ * permits and the given fairness setting.
+ *
+ * @param permits the initial number of permits available.
+ * This value may be negative, in which case releases
+ * must occur before any acquires will be granted.
+ * @param fair {@code true} if this semaphore will guarantee
+ * first-in first-out granting of permits under contention,
+ * else {@code false}
+ */
+public Semaphore(int permits, boolean fair) {
+ sync = fair ? new FairSync(permits) : new NonfairSync(permits);
+}
+```
+通过上面的方法可以看出需要传入一个int型的permits,表示许可证书数量
+boolean类型的fair是否为公平
+
+#### 内部类
+都是基于下面的Sync内部类实现的 通过继承AQS
+```java
+ /**
+ * Synchronization implementation for semaphore. Uses AQS state
+ * to represent permits. Subclassed into fair and nonfair
+ * versions.
+ */
+abstract static class Sync extends AbstractQueuedSynchronizer {
+ private static final long serialVersionUID = 1192457210091910933L;
+
+ Sync(int permits) {
+ setState(permits);//设置许可证书数量
+ }
+
+ final int getPermits() {
+ return getState();//获取许可证书数量
+ }
+
+ //循环尝试获取剩下的资源
+ final int nonfairTryAcquireShared(int acquires) {
+
+ for (;;) {
+ int available = getState();//获取可用的数量
+ int remaining = available - acquires;//得到剩下的数量
+ //remaining < 0 表示资源被占满了,可用为0,
+ //compareAndSetState(available, remaining) 通过CAS来更新可用的资源
+ if (remaining < 0 ||
+ compareAndSetState(available, remaining))
+ return remaining;
+ }
+ }
+ //释放资源 跟nonfairTryAcquireShared方法相反的逻辑
+ protected final boolean tryReleaseShared(int releases) {
+ for (;;) {
+ int current = getState();
+ int next = current + releases;
+ if (next < current) // overflow
+ throw new Error("Maximum permit count exceeded");
+
+ if (compareAndSetState(current, next))
+ return true;
+ }
+ }
+
+ //减少许可证书数量
+ final void reducePermits(int reductions) {
+ for (;;) {
+ int current = getState();
+ int next = current - reductions;
+ if (next > current) // underflow
+ throw new Error("Permit count underflow");
+ if (compareAndSetState(current, next))
+ return;
+ }
+ }
+
+ //销毁许可证书
+ final int drainPermits() {
+ for (;;) {
+ int current = getState();
+ if (current == 0 || compareAndSetState(current, 0))
+ return current;
+ }
+ }
+}
+```
+#### 非公平
+```java
+/**
+ * NonFair version
+ */
+static final class NonfairSync extends Sync {
+ private static final long serialVersionUID = -2694183684443567898L;
+
+ NonfairSync(int permits) {
+ super(permits);
+ }
+
+ //调用Sync的nonfairTryAcquireShared方法
+ protected int tryAcquireShared(int acquires) {
+ return nonfairTryAcquireShared(acquires);
+ }
+}
+```
+#### 公平
+```java
+ /**
+ * Fair version
+ */
+static final class FairSync extends Sync {
+ private static final long serialVersionUID = 2014338818796000944L;
+
+ FairSync(int permits) {
+ super(permits);
+ }
+
+ protected int tryAcquireShared(int acquires) {
+ for (;;) {
+ //判断线程是否有更高级别的线程
+ if (hasQueuedPredecessors())
+ return -1;
+ //这里跟Sync的nonfairTryAcquireShared方法一样
+ int available = getState();
+ int remaining = available - acquires;
+ if (remaining < 0 ||
+ compareAndSetState(available, remaining))
+ return remaining;
+ }
+ }
+}
+```
+#### 释放
+
+```java
+/**
+ * Releases the given number of permits, returning them to the semaphore.
+ *
+ * Releases the given number of permits, increasing the number of + * available permits by that amount. + * If any threads are trying to acquire permits, then one + * is selected and given the permits that were just released. + * If the number of available permits satisfies that thread's request + * then that thread is (re)enabled for thread scheduling purposes; + * otherwise the thread will wait until sufficient permits are available. + * If there are still permits available + * after this thread's request has been satisfied, then those permits + * are assigned in turn to other threads trying to acquire permits. + * + *
There is no requirement that a thread that releases a permit must + * have acquired that permit by calling {@link Semaphore#acquire acquire}. + * Correct usage of a semaphore is established by programming convention + * in the application. + * + * @param permits the number of permits to release + * @throws IllegalArgumentException if {@code permits} is negative + */ +//释放多个许可证书 +public void release(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.releaseShared(permits); +} +``` + +#### 获取 +```java +/** + * Acquires the given number of permits from this semaphore, only + * if all are available at the time of invocation. + * + *
Acquires the given number of permits, if they are available, and + * returns immediately, with the value {@code true}, + * reducing the number of available permits by the given amount. + * + *
If insufficient permits are available then this method will return + * immediately with the value {@code false} and the number of available + * permits is unchanged. + * + *
Even when this semaphore has been set to use a fair ordering + * policy, a call to {@code tryAcquire} will + * immediately acquire a permit if one is available, whether or + * not other threads are currently waiting. This + * "barging" behavior can be useful in certain + * circumstances, even though it breaks fairness. If you want to + * honor the fairness setting, then use {@link #tryAcquire(int, + * long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) } + * which is almost equivalent (it also detects interruption). + * + * @param permits the number of permits to acquire + * @return {@code true} if the permits were acquired and + * {@code false} otherwise + * @throws IllegalArgumentException if {@code permits} is negative + */ +public boolean tryAcquire(int permits) { + //小于0 抛出IllegalArgumentException异常 + if (permits < 0) throw new IllegalArgumentException(); + //返回Sync类nonfairTryAcquireShared执行结果 + return sync.nonfairTryAcquireShared(permits) >= 0; +} +``` +```java +/** + * Acquires the given number of permits from this semaphore, if all + * become available within the given waiting time and the current + * thread has not been {@linkplain Thread#interrupt interrupted}. + * + *
Acquires the given number of permits, if they are available and + * returns immediately, with the value {@code true}, + * reducing the number of available permits by the given amount. + * + *
If insufficient permits are available then + * the current thread becomes disabled for thread scheduling + * purposes and lies dormant until one of three things happens: + *
If the permits are acquired then the value {@code true} is returned. + * + *
If the current thread: + *
If the specified waiting time elapses then the value {@code false}
+ * is returned. If the time is less than or equal to zero, the method
+ * will not wait at all. Any permits that were to be assigned to this
+ * thread, are instead assigned to other threads trying to acquire
+ * permits, as if the permits had been made available by a call to
+ * {@link #release()}.
+ *
+ * @param permits the number of permits to acquire
+ * @param timeout the maximum time to wait for the permits
+ * @param unit the time unit of the {@code timeout} argument
+ * @return {@code true} if all permits were acquired and {@code false}
+ * if the waiting time elapsed before all permits were acquired
+ * @throws InterruptedException if the current thread is interrupted
+ * @throws IllegalArgumentException if {@code permits} is negative
+ */
+//可以设置一个等待的时间,在指定时间没有返回false否则返回true
+public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ if (permits < 0) throw new IllegalArgumentException();
+ return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
+}
+```
+
diff --git a/week_03/54/synchronized-054.md b/week_03/54/synchronized-054.md
new file mode 100644
index 0000000000000000000000000000000000000000..77d1390d204c2a9bf28c12319cbb4ecf3330d34b
--- /dev/null
+++ b/week_03/54/synchronized-054.md
@@ -0,0 +1,40 @@
+##Synchronized
+
+参考链接:
+https://blog.csdn.net/tongdanping/article/details/79647337#1%E3%80%81%E9%94%81%E5%8D%87%E7%BA%A7
+https://mp.weixin.qq.com/s/RT7VreIh9PU03HhE3WSLjg
+
+####场景
+1.直接在当前对象上加锁
+2.在方法上面加锁
+3.创建一个Object对象,对这个对象加锁
+
+####原理
+**实现原理**
+monitorenter和monitorexit来隐式使用Lock和Unlock指令。
+monitorrenter尝试获取对象的锁,如果对这个对象没有被锁定,或者当前线程已经拥有了这个对象的锁,就把锁的计数器+1
+monitorexit对应就是-1,当计数器减少为0时锁就释放了
+Synchronized是可以保证原子性、可见性和有序性的,且是一个非公平锁。
+就类而言:
+锁定的类的对象,从常量池加载了两次类对象,分别存储在本地变量0和本地变量1中,解锁就是先解锁变量1,然后再是变量0,
+实际上变量0和1都是指向同一个对象,所以synchronized是可重入的。
+
+**锁升级**
+在jdk1.6以前都是重量级锁,在1.6以后做了优化,增加了偏向锁、轻量级锁、锁粗化、锁消除、适应性自旋等操作。
+CAS无锁->偏向锁->轻量级锁->重量级锁
+1.由于大多数线程都是不存在竞争的,为了降低获取锁的代价,引入了偏向锁。
+2.先比较线程id是否一致,如果一致,无需使用CAS加锁、解锁;如果不一致,则查看记录的线程Id是否存活,如果没有存活,被重置成无锁状态,
+其他线程可以竞争为偏向锁,如果存活查找该线程的栈桢信息,如果需要继续保持这个对象,升级为轻量级锁。
+(偏向锁是默认开启的,而且开始时间一般是比应用程序启动慢几秒,如果不想有这个延迟,那么可以使用-XX:BiasedLockingStartUpDelay=0;
+如果不想要偏向锁,那么可以通过-XX:-UseBiasedLocking = false来设置)
+3.当其他线程自旋尝试获取锁还在等待,又出现线程来竞争这个锁对象,轻量级锁升级为重量级锁。
+
+**锁粗化**
+按理来说,同步块的作用范围应该尽可能小,仅在共享数据的实际作用域中才进行同步,这样做的目的是为了使需要同步的操作数量尽可能缩小,缩短阻塞时间,如果存在锁竞争,那么等待锁的线程也能尽快拿到锁。
+但是加锁解锁也需要消耗资源,如果存在一系列的连续加锁解锁操作,可能会导致不必要的性能损耗。
+锁粗化就是将多个连续的加锁、解锁操作连接在一起,扩展成一个范围更大的锁,避免频繁的加锁解锁操作
+
+**锁消除**
+Java虚拟机在JIT编译时(可以简单理解为当某段代码即将第一次被执行时进行编译,又称即时编译),通过对运行上下文的扫描,
+经过逃逸分析,去除不可能存在共享资源竞争的锁,通过这种方式消除没有必要的锁,可以节省毫无意义的请求锁时间
+
diff --git a/week_03/54/volatile-054.md b/week_03/54/volatile-054.md
new file mode 100644
index 0000000000000000000000000000000000000000..57b96011ed6eb189f818d33b455f56234d142908
--- /dev/null
+++ b/week_03/54/volatile-054.md
@@ -0,0 +1,38 @@
+##volatile底层分析
+
+参考链接:
+https://blog.csdn.net/ljheee/article/details/82317448
+
+####一.volatile的作用
+1.内存可见性
+(可见性指在多线程环境中,共享变量对每个线程来说都是内存可见的,volatile直接把值刷新到主内存中)
+2.禁止指令重排序
+
+####二.指令重排序
+#####1.编译器重排序
+编译器在不改变单线程程序语义的前提下,可以重新排语句的执行顺序
+#####2.处理器重排序
+如果不存在数据依赖性,处理器可以改变语句的执行顺序
+
+####三.编译器
+#####基本构造
+语法分析、词法分析、语义分析、中间代码生成、指令优化、目标代码产生
+#####第一阶段
+编译器优化,发生在编译阶段,就Java而言,就是编译生成class文件的时候,
+对编译生成的中间代码进行一次指令优化
+#####第二阶段
+处理器优化,跟每个硬件厂商的实现有关
+
+####四.jvm的volatile如何禁止重排序
+在加入volatile关键字的时候,会多出一个Lock前缀指令,在汇编的时候也是加了一个lock指令前缀,
+当有一个lock指令前缀,在多线程环境下可以互斥的使用这个内存地址,当指令执行完毕,
+这个锁定动作才会消失。
+
+
+
+
+
+
+
+
+
diff --git "a/week_03/54/\345\210\206\345\270\203\345\274\217\351\224\201-054.md" "b/week_03/54/\345\210\206\345\270\203\345\274\217\351\224\201-054.md"
new file mode 100644
index 0000000000000000000000000000000000000000..2838e41741b91d99a751251c495fea066a1bcd45
--- /dev/null
+++ "b/week_03/54/\345\210\206\345\270\203\345\274\217\351\224\201-054.md"
@@ -0,0 +1,38 @@
+##分布式锁
+参考链接:
+https://www.jianshu.com/p/a1ebab8ce78a
+
+####造成原因
+多个分布式系统中,多个进程相互干扰
+
+####分布式锁具备条件
+在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行;
+高可用的获取锁与释放锁;
+高性能的获取锁与释放锁;
+具备可重入特性(可理解为重新进入,由多于一个任务并发使用,而不必担心数据错误);
+具备锁失效机制,防止死锁;
+具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败;
+
+####实现方式
+#####1.数据库
+加锁:往表中添加一条记录
+释放锁:删除记录
+多次请求到数据库,数据库只能保证一个成功。
+#####2.Redis
+Redis是单线程,Redis里面自带分布式锁
+setnx方法和expire实现//在设置时间的时候程序崩溃会出现死锁的情况
+2.6.1以后有set方法set(key,value,NX,PX,time)
+NX表示key不存在进行set操作,存在则不作任何操作
+PX表示设置过期时间
+time表示过期时间
+#####3.Zk
+基于ZK的最终一致性实现的(临时有序节点)
+请求进来时在对应的目录下面生成唯一一个瞬时有序节点
+判断获取锁的方式:判断有序节点最小的一个
+释放锁:删除节点
+
+####几种实现对比
+性能:数据库=zk