diff --git a/week_03/55/AQS-55.md b/week_03/55/AQS-55.md new file mode 100644 index 0000000000000000000000000000000000000000..a0805b0c2951faa9062410b69f9be202c1a5b179 --- /dev/null +++ b/week_03/55/AQS-55.md @@ -0,0 +1,281 @@ +AQS类源码分析 +AbstractQueuedSynchronizer(AQS)是JDK中实现并发编程的核心,平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的。 + + AQS类中维护了一个双向链表(FIFO队列),队列中的每个元素都用一个Node表示,我们可以看到,Node类中有几个静态常量表示的状态: + + +static final class Node { + /** Marker to indicate a node is waiting in shared mode */ + static final Node SHARED = new Node(); + /** Marker to indicate a node is waiting in exclusive mode */ + static final Node EXCLUSIVE = null; + + /** waitStatus value to indicate thread has cancelled */ + static final int CANCELLED = 1; + /** waitStatus value to indicate successor's thread needs unparking */ + static final int SIGNAL = -1; + /** waitStatus value to indicate thread is waiting on condition */ + static final int CONDITION = -2; + static final int PROPAGATE = -3; + volatile int waitStatus; + volatile Node prev; + + volatile Node next; + + volatile Thread thread; + + Node nextWaiter; + + final boolean isShared() { + return nextWaiter == SHARED; + } + + final Node predecessor() throws NullPointerException { + Node p = prev; + if (p == null) + throw new NullPointerException(); + else + return p; + } + + Node() { + } + + Node(Thread thread, Node mode) { + this.nextWaiter = mode; + this.thread = thread; + } + + Node(Thread thread, int waitStatus) { + this.waitStatus = waitStatus; + this.thread = thread; + } + } + + 此外,AQS中通过一个state的volatile变量表示同步状态。 + + 那么AQS是如何通过队列实现锁操作的呢? + +一.获取锁操作 + 下面的是AQS中执行获取锁的代码: + + +public final void acquire(int arg) { + /**通过tryAcquire获取锁,如果成功获取到锁直接终止(selfInterrupt),否则将当前线程插入队列 + * 这里的Node.EXCLUSIVE表示创建一个独占模式的节点 + */ + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); + } + + 然而实际上,AQS中并没有实现上面的tryAcquire(arg)方法,具体获取锁的操作需要由其子类比如ReentrantLock中的Sync实现: + + + + +protected final boolean tryAcquire(int acquires) { + //取到当前线程 + final Thread current = Thread.currentThread(); + //获取到state值(前文提到) + int c = getState(); + //state为0标识当前没有线程占有锁 + //如果队列中前面没有元素(因为是公平锁的原因,非公平锁中不进行判断,如果state为0直接获取到锁),CAS修改当前值 + if (c == 0) { + if (!hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { + //标识当前线程成功获取锁 + setExclusiveOwnerThread(current); + return true; + } + } + //state不为0,且占有锁的线程是当前线程(这里涉及到一个可重入锁的概念) + else if (current == getExclusiveOwnerThread()) { + //增加重入次数 + int nextc = c + acquires; + //如果次数值溢出,抛出异常 + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + //如果锁已经被其它线程占用,获取锁失败 + return false; + } + + 上面的代码注释中提到了可重入锁的概念,可重入锁又叫递归锁,简单来讲就是已经获取到锁的线程还可以再次获取到同一个锁,我们通常使用的syschronized操作,ReentrantLock都属于可重入锁。自旋锁则不属于可重入锁。 + + 下面我们再看一下如果tryAcquire失败,AQS是如何处理的: + + +private Node addWaiter(Node mode) { + //创建一个队列的Node + Node node = new Node(Thread.currentThread(), mode); + //获取当前队列尾部 + Node pred = tail; + if (pred != null) { + //CAS操作尝试插入Node到等待队列,这里只尝试一次 + node.prev = pred; + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + //如果添加失败,enq这里会做自旋操作,知道插入成功。 + enq(node); + return node; + } + + +//自旋操作添加元素到队列尾部 +private Node enq(final Node node) { + for (;;) { + //获取尾节点 + Node t = tail; + //如果尾节点为空,说明当前队列是空,需要初始化队列 + if (t == null) { + //初始化当前队列 + if (compareAndSetHead(new Node())) + tail = head; + } else { + //通过CAS操作插入Node,设置Node为队列的尾节点,并返回Node + node.prev = t; + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } + } + + +/** +* 如果插入的节点前面是head,尝试获取锁, +*/ +final boolean acquireQueued(final Node node, int arg) { + boolean failed = true; + try { + boolean interrupted = false; + //自旋操作 + for (;;) { + //获取当前插入节点的前置节点 + final Node p = node.predecessor(); + //前置节点是head,尝试获取锁 + if (p == head && tryAcquire(arg)) { + //设置head为当前节点,表示获取锁成功 + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + //是否挂起当前线程,如果是,则挂起线程 + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } + } + + 上面的代码有些复杂,这里解释一下,之前的addWaiter代码已经将node加入了等待队列,所以这里需要让节点队列中挂起,等待唤醒。队列的head节点代表的是当前占有锁的节点,首先判断插入的node的前置节点是否是head,如果是,尝试获取锁(tryAcquire),如果获取成功则将head设置为当前节点;如果获取失败需要判断是否挂起当前线程。 + + + +/** +* 判断是否可以挂起当前线程 +*/ +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + //ws为node前置节点的状态 + int ws = pred.waitStatus; + if (ws == Node.SIGNAL) +       //如果前置节点状态为SIGNAL,当前节点可以挂起 + return true; + if (ws > 0) { + + //通过循环跳过所有的CANCELLED节点,找到一个正常的节点,将当前节点排在它后面 + + //GC会将这些CANCELLED节点回收 + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + //将前置节点的状态修改为SIGNAL + + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; + } + + + +//通过LockSupport挂起线程,等待唤醒 +private final boolean parkAndCheckInterrupt() { + LockSupport.park(this); + return Thread.interrupted(); +} + + + + +二.释放锁操作 + 有了获取锁的基础,再来看释放锁的源码就比较容易了,下面的代码执行的是AQS中释放锁的操作: + + +//释放锁的操作 +public final boolean release(int arg) + //尝试释放锁,这里tryRelease同样由子类实现,如果失败直接返回false + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; + } + + 下面的代码是尝试释放锁的操作: + + + protected final boolean tryRelease(int releases) { + //获取state值,释放一定值 + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + //如果差是0,表示锁已经完全释放 + if (c == 0) { + free = true; + //下面设置为null表示当前没有线程占用锁 + setExclusiveOwnerThread(null); + } + //如果c不是0表示锁还没有完全释放,修改state值 + setState(c); + return free; + } + + 释放锁后,还需要唤醒队列中的一个后继节点: + + +private void unparkSuccessor(Node node) { + //将当前节点的状态修改为0 + int ws = node.waitStatus; + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + //从队列里找出下一个需要唤醒的节点 + //首先是直接后继 + Node s = node.next; + //如果直接后继为空或者它的waitStatus大于0(已经放弃获取锁了),我们就遍历整个队列, + //获取第一个需要唤醒的节点 + if (s == null || s.waitStatus > 0) { + s = null; + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + //将节点唤醒 + LockSupport.unpark(s.thread); + } diff --git "a/week_03/55/Java\345\206\205\345\255\230\346\250\241\345\236\213-55.md" "b/week_03/55/Java\345\206\205\345\255\230\346\250\241\345\236\213-55.md" new file mode 100644 index 0000000000000000000000000000000000000000..33b38a94ce5b1a0ec6d789db8f93da9c73664f87 --- /dev/null +++ "b/week_03/55/Java\345\206\205\345\255\230\346\250\241\345\236\213-55.md" @@ -0,0 +1,166 @@ +Java内存模型 +JMM是Java虚拟机规范定义的,用于屏蔽掉Java程序在各种不同的硬件和操作系统对内存的访问的差异,这样就可以实现Java程序在各种不同的平台上都能达到内存访问的一致性. +虽然Java程序所有的运行都是在虚拟机中,涉及到的内存等信息都是虚拟机的一部分,但实际也是物理机的,只不过是虚拟机作为最外层的容器统一做了处理。虚拟机的内存模型,以及多线程的场景下与物理机的情况是很相似的,可以类比参考。 +Java内存模型的主要目标是定义程序中变量的访问规则。即在虚拟机中将变量存储到主内存或者将变量从主内存取出这样的底层细节。需要注意的是这里的变量跟我们写java程序中的变量不是完全等同的。这里的变量是指实例字段,静态字段,构成数组对象的元素,但是不包括局部变量和方法参数(因为这是线程私有的)。这里可以简单的认为主内存是java虚拟机内存区域中的堆,局部变量和方法参数是在虚拟机栈中定义的。但是在堆中的变量如果在多线程中都使用,就涉及到了堆和不同虚拟机栈中变量的值的一致性问题了。 +Java内存模型中涉及到的概念有: + +主内存:java虚拟机规定所有的变量(不是程序中的变量)都必须在主内存中产生,为了方便理解,可以认为是堆区。可以与前面说的物理机的主内存相比,只不过物理机的主内存是整个机器的内存,而虚拟机的主内存是虚拟机内存中的一部分。 +工作内存:java虚拟机中每个线程都有自己的工作内存,该内存是线程私有的为了方便理解,可以认为是虚拟机栈。可以与前面说的高速缓存相比。线程的工作内存保存了线程需要的变量在主内存中的副本。虚拟机规定,线程对主内存变量的修改必须在线程的工作内存中进行,不能直接读写主内存中的变量。不同的线程之间也不能相互访问对方的工作内存。如果线程之间需要传递变量的值,必须通过主内存来作为中介进行传递。 +这里需要说明一下:主内存、工作内存与java内存区域中的java堆、虚拟机栈、方法区并不是一个层次的内存划分。这两者是基本上是没有关系的,上文只是为了便于理解,做的类比 +image.png +工作内存与主内存交互 +物理机高速缓存和主内存之间的交互有协议,同样的,java内存中线程的工作内存和主内存的交互是由java虚拟机定义了如下的8种操作来完成的,每种操作必须是原子性的(double和long类型在某些平台有例外,参考volatile详解和非原子性协定) +java虚拟机中主内存和工作内存交互,就是一个变量如何从主内存传输到工作内存中,如何把修改后的变量从工作内存同步回主内存。 + +lock(锁定):作用于主内存的变量,一个变量在同一时间只能一个线程锁定,该操作表示这条线成独占这个变量 +unlock(解锁):作用于主内存的变量,表示这个变量的状态由处于锁定状态被释放,这样其他线程才能对该变量进行锁定 +read(读取):作用于主内存变量,表示把一个主内存变量的值传输到线程的工作内存,以便随后的load操作使用 +load(载入):作用于线程的工作内存的变量,表示把read操作从主内存中读取的变量的值放到工作内存的变量副本中(副本是相对于主内存的变量而言的) +use(使用):作用于线程的工作内存中的变量,表示把工作内存中的一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令时就会执行该操作 +assign(赋值):作用于线程的工作内存的变量,表示把执行引擎返回的结果赋值给工作内存中的变量,每当虚拟机遇到一个给变量赋值的字节码指令时就会执行该操作 +store(存储):作用于线程的工作内存中的变量,把工作内存中的一个变量的值传递给主内存,以便随后的write操作使用 +write(写入):作用于主内存的变量,把store操作从工作内存中得到的变量的值放入主内存的变量中 +如果要把一个变量从主内存传输到工作内存,那就要顺序的执行read和load操作,如果要把一个变量从工作内存回写到主内存,就要顺序的执行store和write操作。对于普通变量,虚拟机只是要求顺序的执行,并没有要求连续的执行,所以如下也是正确的。对于两个线程,分别从主内存中读取变量a和b的值,并不一样要read a; load a; read b; load b; 也会出现如下执行顺序:read a; read b; load b; load a; (对于volatile修饰的变量会有一些其他规则,后边会详细列出),对于这8中操作,虚拟机也规定了一系列规则,在执行这8中操作的时候必须遵循如下的规则: + +不允许read和load、store和write操作之一单独出现,也就是不允许从主内存读取了变量的值但是工作内存不接收的情况,或者不允许从工作内存将变量的值回写到主内存但是主内存不接收的情况 +不允许一个线程丢弃最近的assign操作,也就是不允许线程在自己的工作线程中修改了变量的值却不同步/回写到主内存 +不允许一个线程回写没有修改的变量到主内存,也就是如果线程工作内存中变量没有发生过任何assign操作,是不允许将该变量的值回写到主内存 +变量只能在主内存中产生,不允许在工作内存中直接使用一个未被初始化的变量,也就是没有执行load或者assign操作。也就是说在执行use、store之前必须对相同的变量执行了load、assign操作 +一个变量在同一时刻只能被一个线程对其进行lock操作,也就是说一个线程一旦对一个变量加锁后,在该线程没有释放掉锁之前,其他线程是不能对其加锁的,但是同一个线程对一个变量加锁后,可以继续加锁,同时在释放锁的时候释放锁次数必须和加锁次数相同。 +对变量执行lock操作,就会清空工作空间该变量的值,执行引擎使用这个变量之前,需要重新load或者assign操作初始化变量的值 +不允许对没有lock的变量执行unlock操作,如果一个变量没有被lock操作,那也不能对其执行unlock操作,当然一个线程也不能对被其他线程lock的变量执行unlock操作 +对一个变量执行unlock之前,必须先把变量同步回主内存中,也就是执行store和write操作 +当然,最重要的还是如开始所说,这8个动作必须是原子的,不可分割的。 +针对volatile修饰的变量,会有一些特殊规定。 + +volatile修饰的变量的特殊规则 +关键字volatile可以说是java虚拟机中提供的最轻量级的同步机制。java内存模型对volatile专门定义了一些特殊的访问规则。这些规则有些晦涩拗口,先列出规则,然后用更加通俗易懂的语言来解释: +假定T表示一个线程,V和W分别表示两个volatile修饰的变量,那么在进行read、load、use、assign、store和write操作的时候需要满足如下规则: + +只有当线程T对变量V执行的前一个动作是load,线程T对变量V才能执行use动作;同时只有当线程T对变量V执行的后一个动作是use的时候线程T对变量V才能执行load操作。所以,线程T对变量V的use动作和线程T对变量V的read、load动作相关联,必须是连续一起出现。也就是在线程T的工作内存中,每次使用变量V之前必须从主内存去重新获取最新的值,用于保证线程T能看得见其他线程对变量V的最新的修改后的值。 +只有当线程T对变量V执行的前一个动作是assign的时候,线程T对变量V才能执行store动作;同时只有当线程T对变量V执行的后一个动作是store的时候,线程T对变量V才能执行assign动作。所以,线程T对变量V的assign操作和线程T对变量V的store、write动作相关联,必须一起连续出现。也即是在线程T的工作内存中,每次修改变量V之后必须立刻同步回主内存,用于保证线程T对变量V的修改能立刻被其他线程看到。 +假定动作A是线程T对变量V实施的use或assign动作,动作F是和动作A相关联的load或store动作,动作P是和动作F相对应的对变量V的read或write动作;类似的,假定动作B是线程T对变量W实施的use或assign动作,动作G是和动作B相关联的load或store动作,动作Q是和动作G相对应的对变量W的read或write动作。如果动作A先于B,那么P先于Q。也就是说在同一个线程内部,被volatile修饰的变量不会被指令重排序,保证代码的执行顺序和程序的顺序相同。 +总结上面三条规则,前面两条可以概括为:volatile类型的变量保证对所有线程的可见性。第三条为:volatile类型的变量禁止指令重排序优化。 + +valatile类型的变量保证对所有线程的可见性 +可见性是指当一个线程修改了这个变量的值,新值(修改后的值)对于其他线程来说是立即可以得知的。正如上面的前两条规则规定,volatile类型的变量每次值被修改了就立即同步回主内存,每次使用时就需要从主内存重新读取值。返回到前面对普通变量的规则中,并没有要求这一点,所以普通变量的值是不会立即对所有线程可见的。 +误解:volatile变量对所有线程是立即可见的,所以对volatile变量的所有修改(写操作)都立刻能反应到其他线程中。或者换句话说:volatile变量在各个线程中是一致的,所以基于volatile变量的运算在并发下是线程安全的。 +这个观点的论据是正确的,但是根据论据得出的结论是错误的,并不能得出这样的结论。volatile的规则,保证了read、load、use的顺序和连续行,同理assign、store、write也是顺序和连续的。也就是这几个动作是原子性的,但是对变量的修改,或者对变量的运算,却不能保证是原子性的。如果对变量的修改是分为多个步骤的,那么多个线程同时从主内存拿到的值是最新的,但是经过多步运算后回写到主内存的值是有可能存在覆盖情况发生的。如下代码的例子: +public class VolatileTest { + public static volatile int race = 0; + public static void increase() { + race++ + } + + private static final int THREADS_COUNT = 20; + + public void static main(String[] args) { + Thread[] threads = new Thread[THREADS_COUNT); + for (int = 0; i < THREADS_COUNT; i++) { + threads[i] = new Thread(new Runnable(){ + @Override + public void run() { + for (int j = 0; j < 10000; j++) { + increase(); + } + } + }); + threads[i].start(); + } + while (Thread.activeCount() > 1) { + Thread.yield(); + } + System.out.println(race); + } +} +代码就是对volatile类型的变量启动了20个线程,每个线程对变量执行1w次加1操作,如果volatile变量并发操作没有问题的话,那么结果应该是输出20w,但是结果运行的时候每次都是小于20w,这就是因为race++操作不是原子性的,是分多个步骤完成的。假设两个线程a、b同时取到了主内存的值,是0,这是没有问题的,在进行++操作的时候假设线程a执行到一半,线程b执行完了,这时线程b立即同步给了主内存,主内存的值为1,而线程a此时也执行完了,同步给了主内存,此时的值仍然是1,线程b的结果被覆盖掉了。 + +volatile变量禁止指令重排序优化 +普通的变量仅仅会保证在该方法执行的过程中,所有依赖赋值结果的地方都能获取到正确的结果,但不能保证变量赋值的操作顺序和程序代码的顺序一致。因为在一个线程的方法执行过程中无法感知到这一点,这也就是java内存模型中描述的所谓的“线程内部表现为串行的语义”。 +也就是在单线程内部,我们看到的或者感知到的结果和代码顺序是一致的,即使代码的执行顺序和代码顺序不一致,但是在需要赋值的时候结果也是正确的,所以看起来就是串行的。但实际结果有可能代码的执行顺序和代码顺序是不一致的。这在多线程中就会出现问题。 +看下面的伪代码举例: +Map configOptions; +char[] configText; +//volatile类型bianliang +volatile boolean initialized = false; + +//假设以下代码在线程A中执行 +//模拟读取配置信息,读取完成后认为是初始化完成 +configOptions = new HashMap(); +configText = readConfigFile(fileName); +processConfigOptions(configText, configOptions); +initialized = true; + +//假设以下代码在线程B中执行 +//等待initialized为true后,读取配置信息进行操作 +while ( !initialized) { + sleep(); +} +doSomethingWithConfig(); +如果initialiezd是普通变量,没有被volatile修饰,那么线程A执行的代码的修改初始化完成的结果initialized = true就有可能先于之前的三行代码执行,而此时线程B发现initialized为true了,就执行doSomethingWithConfig()方法,但是里面的配置信息都是null的,就会出现问题了。 +现在initialized是volatile类型变量,保证禁止代码重排序优化,那么就可以保证initialized = true执行的时候,前边的三行代码一定执行完成了,那么线程B读取的配置文件信息就是正确的。 + +跟其他保证并发安全的工具相比,volatile的性能确实会好一些。在某些情况下,volatile的同步机制性能要优于锁(使用synchronized关键字或者java.util.concurrent包中的锁)。但是现在由于虚拟机对锁的不断优化和实行的许多消除动作,很难有一个量化的比较。 +与自己相比,就可以确定一个原则:volatile变量的读操作和普通变量的读操作几乎没有差异,但是写操作会性能差一些,慢一些,因为要在本地代码中插入许多内存屏障指令来禁止指令重排序,保证处理器不发生代码乱序执行行为。 + +long和double变量的特殊规则 +Java内存模型要求对主内存和工作内存交换的八个动作是原子的,正如章节开头所讲,对long和double有一些特殊规则。八个动作中lock、unlock、read、load、use、assign、store、write对待32位的基本数据类型都是原子操作,对待long和double这两个64位的数据,java虚拟机规范对java内存模型的规定中特别定义了一条相对宽松的规则:允许虚拟机将没有被volatile修饰的64位数据的读写操作划分为两次32位的操作来进行,也就是允许虚拟机不保证对64位数据的read、load、store和write这4个动作的操作是原子的。这也就是我们常说的long和double的非原子性协定(Nonautomic Treatment of double and long Variables)。 + +并发内存模型的实质 +Java内存模型围绕着并发过程中如何处理原子性、可见性和顺序性这三个特征来设计的。 + +原子性(Automicity) +由Java内存模型来直接保证原子性的变量操作包括read、load、use、assign、store、write这6个动作,虽然存在long和double的特例,但基本可以忽律不计,目前虚拟机基本都对其实现了原子性。如果需要更大范围的控制,lock和unlock也可以满足需求。lock和unlock虽然没有被虚拟机直接开给用户使用,但是提供了字节码层次的指令monitorenter和monitorexit对应这两个操作,对应到java代码就是synchronized关键字,因此在synchronized块之间的代码都具有原子性。 + +可见性 +可见性是指一个线程修改了一个变量的值后,其他线程立即可以感知到这个值的修改。正如前面所说,volatile类型的变量在修改后会立即同步给主内存,在使用的时候会从主内存重新读取,是依赖主内存为中介来保证多线程下变量对其他线程的可见性的。 +除了volatile,synchronized和final也可以实现可见性。synchronized关键字是通过unlock之前必须把变量同步回主内存来实现的,final则是在初始化后就不会更改,所以只要在初始化过程中没有把this指针传递出去也能保证对其他线程的可见性。 + +有序性 +有序性从不同的角度来看是不同的。单纯单线程来看都是有序的,但到了多线程就会跟我们预想的不一样。可以这么说:如果在本线程内部观察,所有操作都是有序的;如果在一个线程中观察另一个线程,所有的操作都是无序的。前半句说的就是“线程内表现为串行的语义”,后半句值得是“指令重排序”现象和主内存与工作内存之间同步存在延迟的现象。 +保证有序性的关键字有volatile和synchronized,volatile禁止了指令重排序,而synchronized则由“一个变量在同一时刻只能被一个线程对其进行lock操作”来保证。 + +总体来看,synchronized对三种特性都有支持,虽然简单,但是如果无控制的滥用对性能就会产生较大影响。 + +先行发生原则 +如果Java内存模型中所有的有序性都要依靠volatile和synchronized来实现,那是不是非常繁琐。Java语言中有一个“先行发生原则”,是判断数据是否存在竞争、线程是否安全的主要依据。 + +什么是先行发生原则 +先行发生原则是Java内存模型中定义的两个操作之间的偏序关系。比如说操作A先行发生于操作B,那么在B操作发生之前,A操作产生的“影响”都会被操作B感知到。这里的影响是指修改了内存中的共享变量、发送了消息、调用了方法等。个人觉得更直白一些就是有可能对操作B的结果有影响的都会被B感知到,对B操作的结果没有影响的是否感知到没有太大关系。 + +Java内存模型自带先行发生原则有哪些 +程序次序原则 +在一个线程内部,按照代码的顺序,书写在前面的先行发生与后边的。或者更准确的说是在控制流顺序前面的先行发生与控制流后面的,而不是代码顺序,因为会有分支、跳转、循环等。 +管程锁定规则 +一个unlock操作先行发生于后面对同一个锁的lock操作。这里必须注意的是对同一个锁,后面是指时间上的后面 +volatile变量规则 +对一个volatile变量的写操作先行发生与后面对这个变量的读操作,这里的后面是指时间上的先后顺序 +线程启动规则 +Thread对象的start()方法先行发生与该线程的每个动作。当然如果你错误的使用了线程,创建线程后没有执行start方法,而是执行run方法,那此句话是不成立的,但是如果这样其实也不是线程了 +线程终止规则 +线程中的所有操作都先行发生与对此线程的终止检测,可以通过Thread.join()和Thread.isAlive()的返回值等手段检测线程是否已经终止执行 +线程中断规则 +对线程interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测到是否有中断发生。 +对象终结规则 +一个对象的初始化完成先行发生于他的finalize方法的执行,也就是初始化方法先行发生于finalize方法 +传递性 +如果操作A先行发生于操作B,操作B先行发生于操作C,那么操作A先行发生于操作C。 +看一个例子: + +private int value = 0; +public void setValue(int value) { + this.value = value; +} +public int getValue() { + return this.value; +} +如果有两个线程A和B,A先调用setValue方法,然后B调用getValue方法,那么B线程执行方法返回的结果是什么? +我们去对照先行发生原则一个一个对比。首先是程序次序规则,这里是多线程,不在一个线程中,不适用;然后是管程锁定规则,这里没有synchronized,自然不会发生lock和unlock,不适用;后面对于线程启动规则、线程终止规则、线程中断规则也不适用,这里与对象终结规则、传递性规则也没有关系。所以说B返回的结果是不确定的,也就是说在多线程环境下该操作不是线程安全的。 +如何修改呢,一个是对get/set方法加入synchronized 关键字,可以使用管程锁定规则;要么对value加volatile修饰,可以使用volatile变量规则。 +通过上面的例子可知,一个操作时间上先发生并不代表这个操作先行发生,那么一个操作先行发生是不是代表这个操作在时间上先发生?也不是,如下面的例子: + +int i = 2; +int j = 1; +在同一个线程内,对i的赋值先行发生于对j赋值的操作,但是代码重排序优化,也有可能是j的赋值先发生,我们无法感知到这一变化。 + +所以,综上所述,时间先后顺序与先行发生原则之间基本没有太大关系。我们衡量并发安全的问题的时候不要受到时间先后顺序的干扰,一切以先行发生原则为准。 diff --git a/week_03/55/ReentrantLock-55.md b/week_03/55/ReentrantLock-55.md new file mode 100644 index 0000000000000000000000000000000000000000..617a2866738ebc74e9906c7888e4dc0fc49a721e --- /dev/null +++ b/week_03/55/ReentrantLock-55.md @@ -0,0 +1,820 @@ +ReentrantLock源码分析 +ReentrantLock 介绍 +一个可重入的互斥锁,它具有与使用{synchronized}方法和语句访问的隐式监视器锁相同的基本行为和语义,但它具有可扩展的能力。 + +一个ReentrantLock会被最后一次成功锁定(lock)的线程拥有,在还没解锁(unlock)之前。当锁没有被其他线程拥有的话,一个线程执行『lock』方法将会返回,获取锁成功。一个方法将会立即的返回,如果当前线程已经拥有了这个锁。可以使用『isHeldByCurrentThread』和『getHoldCount』来检查当前线程是否持有锁。 + +这个类的构造方法会接受一个可选的“fairness”参数。当该参数设置为true时,在发生多线程竞争时,锁更倾向将使用权授予给最长等待时间的线程。另外,锁不保证任何特定的访问顺序。程序在多线程情况下使用公平锁来访问的话可能表现出较低的吞吐量(如,较慢;经常慢很多)与比使用默认设置相比,但是在获取锁上有较小的时间差异,并保证不会有饥饿(线程)。然而需要注意的是,公平锁并不保证线程调度的公平性。(也就是说,即使使用公平锁,也无法确保线程调度器是公平的。如果线程调度器选择忽略一个线程,而该线程为了这个锁已经等待了很长时间,那么就没有机会公平地处理这个锁了) +还需要注意的是,没有时间参数的『tryLock()』方法是没有信誉的公平设置。它将会成功如果锁是可获取的,即便有其他线程正在等待获取锁。 + +除了对Lock接口的实现外,这个类还定义了一系列的public和protected方法用于检测lock的state。这些方法中的某些方法仅用于检测和监控。 + +这个类的序列化行为同lock内置的行为是一样的:一个反序列化的锁的状态(state)是未锁定的(unlocked),无论它序列化时的状态(state)是什么。 + +这个锁支持同一个线程最大递归获取锁2147483647(即,Integer.MAX_VALUE)次。如果尝试获取锁的次数操作了这个限制,那么一个Error获取lock方法中抛出。 + + +AbstractQueuedSynchronizer +ReentrantLock的公平锁和非公平锁都是基于AbstractQueuedSynchronizer(AQS)实现的。ReentrantLock使用的是AQS的排他锁模式,由于AQS除了排他锁模式还有共享锁模式,本文仅对ReentrantLock涉及到的排他锁模式部分的内容进行介绍,关于共享锁模式的部分会在 CountDownLatch 源码浅析一文中介绍。 + +AQS提供一个框架用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和同步器(信号量,事件等)。这个类被设计与作为一个有用的基类,一个依赖单一原子值为代表状态的多种同步器的基类。子类必须将修改这个状态值的方法定义为受保护的方法,并且该方法会根据对象(即,AbstractQueuedSynchronizer子类)被获取和释放的方式来定义这个状态。根据这些,这个类的其他方法实现所有排队和阻塞的机制。子类能够维护其他的状态属性,但是只有使用『getState』方法、『setState』方法以及『compareAndSetState』方法来原子性的修改 int 状态值的操作才能遵循相关同步性。 + +等待队列节点类 ——— Node +等待队列是一个CLH锁队列的变体。CLH通常被用于自旋锁(CLH锁是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)。我们用它来代替阻塞同步器,但是使用相同的基本策略,该策略是持有一些关于一个线程在它前驱节点的控制信息。一个“status”字段在每个节点中用于保持追踪是否一个线程需要被阻塞。一个节点会得到通知当它的前驱节点被释放时。队列中的每一个节点都作为一个持有单一等待线程的特定通知风格的监视器。状态字段不会控制线程是否被授予锁等。一个线程可能尝试去获取锁如果它在队列的第一个。但是首先这并不保证成功,它只是给与了竞争的权力(也就是说,队列中第一个线程尝试获取锁时,并不保证一定能得到锁,它只是有竞争锁的权力而已)。所以当前被释放的竞争者线程可能需要重新等待获取锁。 +(这里说的"队列中的第一个的线程"指的时,从队列头开始往下的节点中,第一个node.thread != null的线程。因为,AQS队列的head节点是一个虚节点,不是有个有效的等待节点,因此head节点的thread是为null的。) + +为了排队进入一个CLH锁,你可以原子性的拼接节点到队列中作为一个新的队尾;对于出队,你只要设置头字段。(即,入队操作时新的节点会排在CLH锁队列的队尾,而出队操作就是将待出队的node设置为head。由此可见,在AQS中维护的这个等待队列,head是一个无效的节点。初始化时head是一个new Node()节点;在后期的操作中,需要出队的节点就会设置到head中。) + + +------+ prev +-----+ +-----+ + head | | <---- | | <---- | | tail + +------+ +-----+ +-----+ +插入到一个CLH队列的请求只是一个对“tail”的单个原子操作,所以有一个简单的从未入队到入队的原子分割点。类似的,出队调用只需要修改“head”。然而,节点需要更多的工作来确定他们的后继者是谁,部分是为了处理由于超时和中断而导致的可能的取消。 +(也就是说,一个node的后继节点不一定就是node.next,因为队列中的节点可能因为超时或中断而取消了,而这些取消的节点此时还没被移除队列(也许正在移除队列的过程中),而一个node的后继节点指的是一个未被取消的有效节点,因此在下面的操作中你就会发现,在寻找后继节点时,寻找的都是当前节点后面第一个有效节点,即非取消节点。) + +“prev”(前驱)连接(原始的CLH锁是不使用前驱连接的),主要用于处理取消。如果一个节点被取消了,它的后驱(通常)会重连接到一个未被取消的前驱。 + +另外我们使用“next”连接去实现阻塞机制。每个节点的线程ID被它们自己的节点所持有,所以前驱节点通知下一个节点可以被唤醒,这是通过遍历下一个链接(即,next字段)来确定需要唤醒的线程。后继节点的决定必须同‘新入队的节点在设置它的前驱节点的“next”属性操作(即,新入队节点为newNode,在newNode的前驱节点preNewNode进行preNewNode.next = newNode操作)’产生竞争。一个解决方法是必要的话当一个节点的后继看起来是空的时候,从原子更新“tail”向前检测。(或者换句话说,next链接是一个优化,所以我们通常不需要反向扫描。) + +取消引入了对基本算法的一些保守性。当我们必须为其他节点的取消轮询时,我们不需要留意一个取消的节点是在我们节点的前面还是后面。它的处理方式是总是根据取消的节点唤醒其后继节点,允许它们去连接到一个新的前驱节点,除非我们能够标识一个未被取消的前驱节点来完成这个责任。 + + +waitStatus +volatile int waitStatus; +状态属性,只有如下值: +① SIGNAL: +static final int SIGNAL = -1; +这个节点的后继(或者即将被阻塞)被阻塞(通过park阻塞)了,所以当前节点需要唤醒它的后继当它被释放或者取消时。为了避免竞争,获取方法必须首先表示他们需要一个通知信号,然后再原子性的尝试获取锁,如果失败,则阻塞。 +也就是说,在获取锁的操作中,需要确保当前node的preNode的waitStatus状态值为’SIGNAL’,才可以被阻塞,当获取锁失败时。(『shouldParkAfterFailedAcquire』方法的用意就是这) +② CANCELLED: +static final int CANCELLED = 1; +这个节点由于超时或中断被取消了。节点不会离开(改变)这个状态。尤其,一个被取消的线程不再会被阻塞了。 +③ CONDITION: +static final int CONDITION = -2; +这个节点当前在一个条件队列中。它将不会被用于当做一个同步队列的节点直到它被转移到同步队列中,转移的同时状态值(waitStatus)将会被设置为0。(这里使用这个值将不会做任何事情与该字段其他值对比,只是为了简化机制)。 +④ PROPAGATE: +static final int PROPAGATE = -3; +一个releaseShared操作必须被广播给其他节点。(只有头节点的)该值会在doReleaseShared方法中被设置去确保持续的广播,即便其他操作的介入。 +⑤ 0:不是上面的值的情况。 +这个值使用数值排列以简化使用。非负的值表示该节点不需要信号(通知)。因此,大部分代码不需要去检查这个特殊的值,只是为了标识。 +对于常规的节点该字段会被初始化为0,竞争节点该值为CONDITION。这个值使用CAS修改(或者可能的话,无竞争的volatile写)。 + + +prev +volatile Node prev +连接到前驱节点,当前节点/线程依赖与这个节点waitStatus的检测。分配发生在入队时,并在出队时清空(为了GC)。并且,一个前驱的取消,我们将短路当发现一个未被取消的节点时,未被取消的节点总是存在因为头节点不能被取消:只有在获取锁操作成功的情况下一个节点才会成为头节点。一个被取消的线程绝不会获取成功,一个线程只能被它自己取消,不能被其他线程取消。 + + +next +volatile Node next +连接到后继的节点,该节点是当前的节点/线程释放唤醒的节点。分配发生在入队时,在绕过取消的前驱节点时进行调整,并在出队列时清空(为了GC的缘故)。一个入队操作(enq)不会被分配到前驱节点的next字段,直到tail成功指向当前节点之后(通过CAS来将tail指向当前节点。『enq』方法实现中,会先将node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再将oldTailNode.next = node;),所以当看到next字段为null时并不意味着当前节点是队列的尾部了。无论如何,如果一个next字段显示为null,我们能够从队列尾向前扫描进行复核。被取消的节点的next字段会被设置为它自己,而不是一个null,这使得isOnSyncQueue方法更简单。 + + +thread +volatile Thread thread +这个节点的入队线程。在构建时初始化,在使用完后清除。 + + +nextWaiter +Node nextWaiter +链接下一个等待条件的节点,或者一个指定的SHARED值。因为只有持有排他锁时能访问条件队列,所以我们只需要一个简单的单链表来维持正在等待条件的节点。它们接下来会被转换到队列中以去重新获取锁。因为只有排他锁才有conditions,所以我们使用给一个特殊值保存的字段来表示共享模式。 +也就是说,nextWaiter用于在排他锁模式下表示正在等待条件的下一个节点,因为只有排他锁模式有conditions;所以在共享锁模式下,我们使用’SHARED’这个特殊值来表示该字段。 + + +源码分析 +初始化 +初始化 ———— 公平锁: +ReentrantLock lock = new ReentrantLock(true) +初始化 ———— 非公平锁: +ReentrantLock lock = new ReentrantLock() + +或 + +ReentrantLock lock = new ReentrantLock(false) + + +lock +public void lock() { + sync.lock(); +} +获取锁。 +如果其他线程没有持有锁的话,获取锁并且立即返回,设置锁被持有的次数为1. +如果当前线程已经持有锁了,那么只有锁的次数加1,并且方法立即返回。 +如果其他线程持有了锁,那么当前线程会由于线程调度变得不可用,并处于休眠状态直到当前线程获取到锁,此时当前线程持有锁的次数被设置为1次。 + +公平锁『lock()』方法的实现: +『FairSync#lock()』 +final void lock() { + acquire(1); +} +调用『acquire』在再次尝试获取锁失败的情况下,会将当前线程入队至等待队列。该方法会在成功获取锁的情况下才会返回。因此该方法是可能导致阻塞的(线程挂起)。 + + +非公平锁『lock()』方法的实现: +『NonfairSync#lock()』 +final void lock() { + if (compareAndSetState(0, 1)) + setExclusiveOwnerThread(Thread.currentThread()); + else + acquire(1); +} +① 尝试获取锁,若『compareAndSetState(0, 1)』操作成功(这步操作有两层意思。第一,当前state为0,说明当前锁没有被任何线程持有;第二,原子性的将state从’0’修改为’1’成功,说明当前线程成功获取了这个锁),则说明当前线程成功获取锁。那么设置锁的持有者为当前线程(『setExclusiveOwnerThread(Thread.currentThread())』)。 +那么此时,AQS state为1,锁的owner为当前线程。结束方法。 +② 如果获取锁失败,则调用『acquire』在再次尝试获取锁失败的情况下,会将当前线程入队至等待队列。该方法会在成功获取锁的情况下才会返回。因此该方法是可能导致阻塞的(线程挂起)。 + + +公共方法『AbstractQueuedSynchronizer#acquire』 +public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} +在排他模式下尝试获,忽略中断。该方法的实现至少执行一次『tryAcquire』方法,如果成功获取锁则返回。否则,线程将入队等待队列中,可能会重复阻塞和解除阻塞,以及调用『tryAcquire』直到成功获取锁。这个方法能被用于实现『Lock#lock』 +① 执行『tryAcquire』来尝试获取锁,如果成功(即,返回true)。则返回true退出方法;否则到第②步 +② 执行『acquireQueued(addWaiter(Node.EXCLUSIVE), arg)』 +『addWaiter(Node.EXCLUSIVE)』:为当前线程创建一个排他模式的Node,并将这个节点加入等待队列尾部。 +『acquireQueued』:已经入队的节点排队等待获取锁。 +③ 如果在尝试获取锁的过程中发现线程被标志位了中断。因为是通过『Thread.interrupted()』方法来检测的当前线程是否有被标志位中断,该方法会清除中断标志,所以如果线程在尝试获取锁的过程中发现被标识为了中断,则需要重新调用『Thread.currentThread().interrupt();』重新将中断标志置位。 +该方法是排他模式下获取锁的方法,并且该方法忽略中断,也就说中断不会导致该方法的结束。首先,会尝试通过不公平的方式立即抢占该锁(『tryAcquire』),如果获取锁成功,则结束方法。否则,将当前线程加入到等待获取锁的队列中,如果当前线程还未入队的话。此后就需要在队列中排队获取锁了,而这就不同于前面非公平的方式了,它会根据FIFO的公平方式来尝试获取这个锁。而这个方法会一直“阻塞”直到成功获取到锁了才会返回。注意,这里的“阻塞”并不是指线程一直被挂起这,它可能被唤醒,然后同其他线程(比如,那么尝试非公平获取该锁的线程)竞争这个锁,如果失败,它会继续被挂起,等待被唤醒,再重新尝试获取锁,直到成功。 +同时注意,关于中断的操作。因为该方法是不可中断的方法,因此若在该方法的执行过程中线程被标志位了中断,我们需要确保这个标志位不会因为方法的调用而被清除,也就是我们不处理中断,但是外层的逻辑可能会对中断做相关的处理,我们不应该影响中断的状态,即,“私自”在不处理中断的情况下将中断标志清除了。 + + +先继续来看公平锁和非公平锁对『tryAcquire』方法的实现 +tryAcquire 这类型的方法都不会导致阻塞(即,线程挂起)。它会尝试获取锁,如果失败就返回false。 + +FairSync#tryAcquire +protected final boolean tryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (!hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} +公平版本的『tryAcquire』方法。当前线程没有权限获取锁,除非递归调用到没有等待者了,或者当前线程就是第一个尝试获取锁的线程(即,等待队列中没有等待获取锁的线程)。 +① 获取AQS state,即,当前锁被获取的次数。如果为’0’,则说明当前锁没有被任何线程获取,那么执行『hasQueuedPredecessors』方法判断当前线程之前是否有比它等待更久准备获取锁的线程: +a)如果有则方法结束,返回false; +b)如果没有,则说明当前线程前面没有另一个比它等待更久的时间在等待获取这个锁的线程,则尝试通过CAS的方式让当前的线程获取锁。如果成功则设置持有锁的线程为当前线程(『setExclusiveOwnerThread(current)』),然后方法结束返回true。 +② 如果AQS state > 0,则说明当前锁已经被某个线程所持有了,那么判断这个持有锁的线程是否就是当前线程(『current == getExclusiveOwnerThread()』),如果是的话,尝试进行再次获取这个锁(ReentrantLock是一个可重入的锁)如果获取锁的次数没有超过上限的话(即,c + acquires > 0),则更新state的值为最终该锁被当前线程获取的次数,然后方法结束返回true;否则,如果当前线程获取这个锁的次数超过了上限则或抛出Error异常。再者如果当前线程不是持有锁的线程,则方法结束返回false。 + + +『AbstractQueuedSynchronizer#hasQueuedPredecessors』 + +public final boolean hasQueuedPredecessors() { + // The correctness of this depends on head being initialized + // before tail and on head.next being accurate if the current + // thread is first in queue. + Node t = tail; // Read fields in reverse initialization order + Node h = head; + Node s; + return h != t && + ((s = h.next) == null || s.thread != Thread.currentThread()); +} +查询是否有线程已经比当前线程等待更长的时间在等待获取锁。 +这个方法的调用等价于(但更高效与): +『getFirstQueuedThread() != Thread.currentThread() && hasQueuedThreads()』 +即,可见如下任一情况,则说明当前线程前面没有比它等待更久的需要获取锁的线程: +a)当队列中第一个等待获取锁的线程是当前线程时 +b)等待队列为空时。即,当前没有其他线程等待获取锁。 +注意,因为中断和超时导致的取消可能发生在任何时候,该方法返回‘true’不能保证其他线程会比当前线程更早获得到锁。同样,由于队列是空的,在当前方法返回‘false’之后,另一个线程可能会赢得一个入队竞争。 +这个方法被涉及用于一个公平的同步器,以避免闯入。如果这个方法返回’true’,那么像是一个(公平)同步器的『tryAcquire』方法应该返回’false’,以及『tryAcquireShared』方法需要返回一个负数值(除非这是一个可重入的锁,因为可重入锁,获取锁的结果还需要判断当前线程是否就是已经获取锁的线程了,如果是,则在没有超过同一线程可获取锁的次数上限的情况下,当前线程可以再次获取这个锁)。比如,一个公平的、可重入的、排他模式下的『tryAcquire』方法,可能看起来像是这样的: + +返回: +a)true:如果当前线程前面有排队等待的线程 +b)false:如果当前线程是第一个等待获取锁的线程(即,一般就是head.next);或者等待队列为空。 + +该方法的正确性依赖于head在tail之前被初始化,以及head.next的精确性,如果当前线程是队列中第一个等待获取锁的线程的时候。 +① tail节点的获取一定先于head节点的获取。因为head节点的初始化在tail节点之前,那么基于当前的tail值,你一定能获取到有效的head值。这么做能保证接下来流程的正确性。举个反例来说明这么做的必要性:如果你按『Node h = head; Node t = tail;』的顺序来对h、t进行赋值,那么可能出现你在操作这两步的时候有其他的线程正在执行队列的初始化操作,那么就可能的带一个『h==null』,而『tail!=null』的情况(这种情况下,是不对的,因为tail!=null的话,head一定也不为null了),这使得『h != t』判断为true,认为当下是一个非空的等待队列,那么接着执行『s = h.next』就会抛出NPE异常了。但是当『Node t = tail; Node h = head;』按初始化相反的顺序来赋值的话,则不会有问题,因为我们保证了在tail取值的情况下,head的正确性。我们接下看下面的步骤,来说明为什么这么做就可以了。 +② 在获取完t、h之后,我们接着先判断『h != t』,该判断的用意在于,判断当前的队列是否为空。如果为true则说明,当前队列非空。如果为false 则说明当前队列为空,为空的话,方法就直接结束了,并返回false。 +但是请注意,当『h != t』为true时,其实是有两种情况的: +a)当tail和head都非空时,说明此时等待队列已经完成了初始化,head和tail都指向了其队列的头和队列的尾。 +b)当“tail==null”同时“head != null”,则说明,此时队列正在被其他线程初始化,当前我们获取的h、t是初始化未完成的中间状态。但是没关系,下面的流程会对此请进行判断。 +③ 当『h != t』返回’true’的话,继续判断『(s = h.next) == null || s.thread != Thread.currentThread()』。这里的两个判断分别对应了两种情况: +a)『(s = h.next) == null』返回’true’,则说明当获取的h、t为初始化的中间状态,因为第一个线程入队的时候,会先初始化队列,然后才对head的next值进行赋值,所以我们需要“s = h.next”是否为null进行判断,如果为’null’,则说明当前等待队列正在被初始化,并且有一个线程正在入队的操作中。所以此时方法直接结束,并且返回true。 +b)如果『h != t』并且『(s = h.next) != null』,则说明当前线程已经被初始化好了,并且等待队列中的第一个等待获取锁的线程也已经入队了。那么接着我们就判断这个在等待队列中第一个等待获取锁的线程是不是当前线程『s.thread != Thread.currentThread()』,如果是的话,方法结束并返回false,表示当前线程前面没有比它等待更久获取这个锁的线程了;否则方法结束返回true,表示当前线程前面有比它等待更久希望获取这个锁的线程。 + + +『AbstractQueuedSynchronizer#addWaiter』 +private Node addWaiter(Node mode) { + Node node = new Node(Thread.currentThread(), mode); + // Try the fast path of enq; backup to full enq on failure + Node pred = tail; + if (pred != null) { + node.prev = pred; + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + enq(node); + return node; +} +根据给定的模式创建当前线程的节点,并将创建好的节点入队(加入等待队列尾部)。 +首先在队列非空的情况下会尝试一次快速入队,也就是通过尝试一次CAS操作入队,如果CAS操作失败,则调用enq方法进行“自旋+CAS”方法将创建好的节点加入队列尾。 +在排他模式下,将节点加入到锁的同步队列时,Node的mode(即,waitStatus)为’EXCLUSIVE’。waitStatus是用于在排他锁模式下当节点处于条件队列时表示下一个等待条件的节点,所以在加入到锁的同步队列中(而非条件队列),我们使用’EXCLUSIVE’这个特殊值来表示该字段。本文主要围绕共享锁模式的介绍,就不对其进行展开了,关于排他锁的内容会在“ReentrantLock源码解析”一文中介绍。 + + +NonfairSync#tryAcquire +protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); +} +对父类AQS tryAcquire方法的重写。调用『nonfairTryAcquire(acquires)』方法,非公平的尝试获取这个可重入的排他锁 + + +『nonfairTryAcquire』 + +final boolean nonfairTryAcquire(int acquires) { + final Thread current = Thread.currentThread(); + int c = getState(); + if (c == 0) { + if (compareAndSetState(0, acquires)) { + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { + int nextc = c + acquires; + if (nextc < 0) // overflow + throw new Error("Maximum lock count exceeded"); + setState(nextc); + return true; + } + return false; +} +执行不公平的『tryLock』。『tryAcquire』在子类中实现,但是都需要不公平的尝试在『tryLock』方法中。 +① 获取state值,如果为’0’,则说明当前没有线程占用锁,那么调用CAS来尝试将state的值从0修改为’acquires’的值, +a)如果成功则说明当前线程成功获取到了这个不公平锁,那么通过『setExclusiveOwnerThread(current)』方法来标志当前线程为持有锁的线程,方法结束,返回true; +b)如果失败,则说明有其他线程先获取到了这个锁,那么当前线程获取锁失败。方法结束,返回false。 +② "state != 0",则说明当前锁已经被某个线程所持有了,那么判断当前的线程是否就是持有锁的那个线程(『if (current == getExclusiveOwnerThread())』)。 +a)如果持有锁的线程就是当前线程,因为ReentrantLock是一个可重入的锁,所以接下来继续判断预计递归获取锁的次数是否超过了限制的值(即,“nextc < 0”则说明预计递归获取锁的次数超过了限制值Integer.MAX_VALUE了),那么会抛出“Error”异常;否则将当前state的值设置为最新获取锁的次数(注意,这里不需要使用CAS的方式来修改state了,因为能操作到这里的一定就是当前持有锁的线程了,因此是不会发送多线程竞争的情况)。然后方法结束,返回true; +b)如果持有锁的线程不是当前线程,那么当前线程获取锁失败。方法结束,返回false。 + + +『FairSync#lock』 VS 『NonfairSync#lock』 +a)在公平锁的模式下,所有获取锁的线程必须是按照调用lock方法先后顺序来决定的,严格的说当有多个线程同时尝试获取同一个锁时,多个线程最终获取锁的先后顺序是由入队等待队列的顺序决定的,当然,第一个获取锁的线程是无需入队的,等待队列是用于存储那些尝试获取锁失败后的节点。并且按照FIFO的顺序让队列中的节点依次获取锁。 +b)在非公平模式下,当执行lock时,无论当前等待队列中是否有等待获取锁的线程了,当前线程都会尝试直接去获取锁。 +👆两点从『FairSync#lock』与『NonfairSync#lock』 实现的不同,以及『FairSync#tryAcquire』与『NonfairSync#tryAcquire』方法实现的不同中都能表现出来。 +对于非公平锁:首先会尝试立即抢占获取锁(若锁当前没有被任何线程持有时,并且此时它会和当前同时首次尝试获取该锁的线程以及等待队列中尝试获取该锁的线程竞争),如果获取锁失败,则会被入队到等待队列中,此时就需要排队等待获取锁了。 + +在排他锁模式下,等待队列中的第一个等待获取锁的线程(即,前驱节点是head节点的节点),仅代表这个节点当前有竞争获取锁的权力,并不代表它会成功获取锁。因为它可能会同非公平获取锁的操作产生竞争。 + + +继续回到『AbstractQueuedSynchronizer#acquire』方法,继续展开里面的实现。我们接着看『acquireQueued』方法是如何实现将已经入队的节点排队等待获取锁的。 +『AbstractQueuedSynchronizer#acquireQueued』 + +final boolean acquireQueued(final Node node, int arg) { + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } +} +用于已经入队的线程在排他不可中断模式下获取锁。同时也被用于给条件(condition)的wait方法来获取锁。 +该方法不会因为中断的发送而返回,只有在获取锁的情况下才会返回,但是如果在等待获取锁的过程中,当前线程被标识为了中断,则在方法返回的时候返回true,否则方法返回是返回false。 +① 获取当前节点的前驱节点,如果前驱节点是head节点,则说明当前节点是队列中第一个等待获取锁的节点,那么就执行『tryAcquire』方法尝试获取排他锁,如果获取排他锁成功(即,tryAcquire方法返回true)则调用『setHead(node)』将当前节点设置为头节点。然后将p(即,旧的head节点)的next置null,有助于p被垃圾收集器收集。然后标识failed为false。结束方法调用,返回interrupted(该字段表示在等待获取锁的过程中,当前线程是否有被表示为中断了)。 +② 如果当前节点的前驱节点不是head节点,说明该节点前面已经有等待着获取这个排他锁的节点;或者当前节点的前驱节点是head节点,但是当前节点获取锁失败了,那么执行『shouldParkAfterFailedAcquire』方法,若该方法返回true,则说明本次获取排他锁失败需要阻塞/挂起当前线程,那么就调用『LockSupport.park(this);』将当前线程挂起,直到被唤醒,并且若挂起期间该线程被标志为了中断状态,则将interrupted标识为true。 +③ 当当前节点经过多次唤醒与挂起,终于成功获取锁后,则退出方法,并返回当前线程是否有被中断的标志。如果当前节点因为某些原因没有成功获取到锁,却要结束该方法了,那么调用『cancelAcquire(node)』方法将当前节点从等待队列中移除。因为方法结束了,说明当前节点不会被操作再去尝试获取锁了,那么就不应该作为一个有效节点放在等待队列中,应该被标识为无效的节点后从队列中移除。 + + +『AbstractQueuedSynchronizer#setHead』 + +private void setHead(Node node) { + head = node; + node.thread = null; + node.prev = null; +} +将node设置为队列的头节点,当它出队时。该方法只能被获取方法调用。仅将无用字段清空(即,置为null)以便于GC并废除不必要的通知和递归。 + + +『AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire』 + +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus; + if (ws == Node.SIGNAL) + /* + * This node has already set status asking a release + * to signal it, so it can safely park. + */ + return true; + if (ws > 0) { + /* + * Predecessor was cancelled. Skip over predecessors and + * indicate retry. + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + /* + * waitStatus must be 0 or PROPAGATE. Indicate that we + * need a signal, but don't park yet. Caller will need to + * retry to make sure it cannot acquire before parking. + */ + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} +检查并修改一个节点的状态,当该节点获取锁失败时。返回true如果线程需要阻塞。这是主要的信号通知控制在所有的获取锁循环中。要求’pred’ == ‘node.prev’ +① 如果pred.waitStatus == Node.SIGNAL。则说明node的前驱节点已经被要求去通知释放它的后继节点,所以node可以安全的被挂起(park)。然后,退出方法,返回true。 +② 如果pred.waitStatus > 0。则说明node的前驱节点被取消了。那么跳过这个前驱节点并重新标志一个有效的前驱节点(即,waitStatus <= 0 的节点可作为有效的前驱节点),然后,退出方法,返回false。 +③ 其他情况下,即pred.waitStatus为’0’或’PROPAGATE’。表示我们需要一个通知信号(即,当前的node需要唤醒的通知),但是当前还不能挂起node。调用『compareAndSetWaitStatus(pred, ws, Node.SIGNAL)』方法通过CAS的方式来修改前驱节点的waitStatus为“SIGNAL”。退出方法,返回false。 +我们需要一个通知信号,主要是因为当前线程要被挂起了(park)。而如果waitStatus已经是’SIGNAL’的话就无需修改,直接挂起就好,而如果waitStatus是’CANCELLED’的话,说明prev已经被取消了,是个无效节点了,那么无需修改这个无效节点的waitStatus,而是需要先找到一个有效的prev。因此,剩下的情况就只有当waitStatus为’0’和’PROPAGAET’了(注意,waitStatus为’CONDITION’是节点不在等待队列中,所以当下情况waitStatus不可能为’CONDITION’),这是我们需要将prev的waitStatus使用CAS的方式修改为’SIGNAL’,而且只有修改成功的情况下,当前的线程才能安全被挂起。 +还值得注意的时,因此该方法的CAS操作都是没有自旋的,所以当它操作完CAS后都会返回false,在外层的方法中会使用自旋,当发现返回的是false时,会再次调用该方法,以检查保证有当前node有一个有效的prev,并且其waitStatus为’SIGNAL’,在此情况下当前的线程才会被挂起(park)。 + + +unlock +public void unlock() { + sync.release(1); +} +尝试去释放这个锁。 +如果当前线程是持有这个锁的线程,那么将持有次数减少1。如果释放后当前的锁被持有的次数为0,那么锁被释放。如果当前线程不是持有锁的线程,那么抛出“IllegalMonitorStateException”异常。 + + +『AbstractQueuedSynchronizer#release』 + +public final boolean release(int arg) { + if (tryRelease(arg)) { + Node h = head; + if (h != null && h.waitStatus != 0) + unparkSuccessor(h); + return true; + } + return false; +} +排他模式下的释放。该方法的实现通过解除一个或多个线程的的阻塞,如果『tryRelase』方法返回true的话。 +该方法能够被用于实现『Lock#unlock』。 +① 调用『tryRelease(arg)』方法来尝试设置状态值来反映一个排他模式下的释放。如果操作成功,则进入步骤[2];否则,如果操作失败,则方法结束,返回false。 +② 在成功释放给定的状态值后,获取等待队列的头节点。如果头节点不为null并且头节点的waitStatus!=0(头节点的waitStatus要么是’0’,要么是’SIGNAL’),那么执行『unparkSuccessor(h)』来唤醒头节点的后继节点。(节点被唤醒后,会继续acquireQueued方法中流程) +③ 只要『tryRelease(arg)』释放操作成功,无论是否需要唤醒头结点的后继节点,方法结束都会返回true。 + + +『tryRelease』 + +protected final boolean tryRelease(int releases) { + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) + throw new IllegalMonitorStateException(); + boolean free = false; + if (c == 0) { + free = true; + setExclusiveOwnerThread(null); + } + setState(c); + return free; +} +尝试通过设置状态值来反映一个在排他模式下的释放操作。 +①『Thread.currentThread() != getExclusiveOwnerThread()』如果执行释放操作的线程不是持有锁的线程,那么直接抛出“IllegalMonitorStateException”异常,方法结束;否则 +② 如果执行当前释放操作的线程是持有锁的线程,那么 +a)计算新state的值,即当前释放操作后state的值,如果state为0,则说明当前线程完成释放了对该锁的持有,那么将锁的持有者重置为null(即,『setExclusiveOwnerThread(null)』)。然后通过『setState(c);』将AQS的state值设置为这个新的state值(即,0),结束方法,返回true,表示该锁现在没有线程持有,可以被重新获取。 +b)如果新state的值不为0(即,大于0),则说明当前的线程并未完成释放该锁(因为reentrantLock是一个可重入的锁,所以一个线程可以多次获取这锁,而state值就表示这一线程获取锁的次数),那么通过『setState(c);』将AQS的state值设置为这个新的state值,结束方法,返回false。 +可见对于『tryRelease』方法,释放锁操作失败是通过抛出“IllegalMonitorStateException”异常来表示的。该方法无论返回‘true’还是‘false’都表示本次的释放操作完成了。返回‘true’表示的是锁已经被当前线程完全释放了,其他线程可以继续争夺这个锁了,在完全释放锁的时候也会将锁中持有者字段重新置null;返回‘false’表示的是当前释放操作完成后,该线程还继续持有这该锁,此时其他线程是无法获取到这个锁的。 +同时,我们可以知道,释放操作只能有持有锁的线程来完成,因此对于AQS state字段(一个volatile字段)的修改,不需要使用CAS来完成,只需要直接设置修改就好。 + + +『AbstractQueuedSynchronizer#unparkSuccessor』 + +private void unparkSuccessor(Node node) { + /* + * If status is negative (i.e., possibly needing signal) try + * to clear in anticipation of signalling. It is OK if this + * fails or if status is changed by waiting thread. + */ + int ws = node.waitStatus; + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + + /* + * Thread to unpark is held in successor, which is normally + * just the next node. But if cancelled or apparently null, + * traverse backwards from tail to find the actual + * non-cancelled successor. + */ + Node s = node.next; + if (s == null || s.waitStatus > 0) { + s = null; + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + LockSupport.unpark(s.thread); +} +唤醒后继节点,如果存在的话 +① 如果状态值是负数,则在预期发信号通知时清除这个负数状态值。如果状态被等待的线程修改了或者清除负数状态值失败是允许。 +② 后继节点的线程被唤醒,后继节点通常就是下一个节点。但是如果下一个节点被取消了或者下一个节点为null,则从队列尾(tail)往前遍历去找真实的未取消的后继节点。 +『(s == null || s.waitStatus > 0)』:说明下一个节点为null或被取消了(waitStatus允许的状态值中,只有’CANCELLED’是>0的)。那么,就从队列尾(tail)开始向前遍历,获取第一个非空且未被取消的节点。如果存在这样的一个后继节点的话(即,“s != null”),则执行『LockSupport.unpark(s.thread);』操作来唤醒这个节点的线程,此时等待队列中第一个等待的线程就会被重新启动,流程会回到『acquireQueued』方法,该线程会重新重试获取该锁,如果成功acquireQueued方法返回,否则线程会再次被挂起,等待下次唤醒后再去再去竞争获取锁。 + +Q:关于node的waitStatus为’CANCELLED’的情况? +A:关于node的waitStatus为’CANCELLED’的情况:比如,当这个node被中断了,或者设置的超时时间到了,那么说明这个线程获取锁失败,那么此时就应该将其设置为cancelled,因为如果该线程还需要获取锁的话,会重新调用获取锁的方法,而获取锁的方法就是创建一个新的node的。所以,那么线程获取锁失败的时候就会将这个node的waitStatus设置为’CANCELLED’,一个被取消的线程绝不会获取锁成功,一个线程只能被它自己取消,不能被其他线程取消。 + +Q:关于node为null的情况? +A:关于node为null的情况:比如,一个入队操作(enq)不会被分配到前驱节点的next字段,直到tail成功指向当前节点之后(通过CAS来将tail指向当前节点。『enq』方法实现中,会先将node.prev = oldTailNode;在需要在CAS成功之后,即tail = node之后,再将oldTailNode.next = node;),所以当看到next字段为null时并不意味着当前节点是队列的尾部了。无论如何,如果一个next字段显示为null,我们能够从队列尾向前扫描进行复核。 + +Q:对于ReentrantLock无论是公平锁还是非公平锁,在入队时waitStatus都是什么?? +能确定的是从条件等待队列转移到锁的同步队列的时候,节点的waitStatus是’0’。 +A:无论是公平锁还是非公平锁,在构建一个node的时候,waitStatus都是默认值’0’。然后在将node入队到锁的等待队列中后就会执行『acquireQueued』来等待获取锁,而该方法会修改当前节点的前驱节点的waitStatus(即,『shouldParkAfterFailedAcquire(p, node)』方法)。在当前节点无法获取锁的时候需要被挂起前会将其前驱节点的waitStatus设置为’Node.SIGNAL’。这样在释放操作中(『release』),如果释放后发现锁的state为’0’,则说明锁当前可以被其他线程获取了,那么就会获取锁的等待队列的head节点,如果head节点的waitStatus!=0(即,head的waitStatus为’Node.SIGNAL’或’Node.PROPAGATE’,其中’Node.PROPAGATE’是共享模式下head节点的waitStatus可能的值,在排他模式下,head节点的waitStatus是’Node.SIGNAL’或’0’),那么说明head节点后面有等待唤醒获取锁的线程,那么调用『unparkSuccessor』方法来唤醒head节点的后继节点。 + +在排他锁模式下,head节点的waitStatus不是在该节点被设置为head节点的时候修改的。而是如果有节点入队到等待队列中,并且此时该节点无法获取锁,那么会将其前驱节点的waitStatus设置为’Node.SIGNAL’后,该节点对应的线程就被挂起了。所以也就是说,如果head节点后还有节点等待获取锁,那么此时head节点的waitStatus自然会使’Node.SIGNAL’,这是在head节点的后继节点入队后等待获取锁的过程中设置的。而将一个节点设置为head节点,仅是将该节点赋值给head节点,并将thread和prev属性会被置null。 + + +ConditionObject ———— 条件对象 +条件对象用来管理那些已经获得了一个锁,但是却不能做有用工作的线程。 + +Condition实现是AbstractQueuedSynchronizer作为一个Lock实现的基础。 +该类的方法文档描述了机制,而不是从锁和条件(Condition)用户的观点来指定行为规范。该类所暴露的版本通常需要伴随着依赖于描述相关AbstractQueuedSynchronizer的条件语义的文档。 +这个类是可序列化的,但是所有字段都是transient的,所以反序列化的conditions没有等待者。 + +注意,关于在ConditionObject中的描述,若无特殊说明“等待队列”均指“条件等待队列”,同锁的等待队列不同! + +条件等待队列中有效节点的waitStatus只能是“Node.CONDITION”,这说明,如果发现条件等待队列中的节点waitStatus!=“Node.CONDITION”,则说明这个节点被取消等待条件了,那么应该将其出条件等待队列中移除。 + +// 等待队列的头节点 +private transient Node firstWaiter; + +// 等待队列的尾节点 +private transient Node lastWaiter; + + +条件对象初始化 +Condition condition = lock.newCondition() +『newCondition』 + +public Condition newCondition() { + return sync.newCondition(); +} +返回一个用于这个Lock实例的Condition实例。 +返回的Condition实例与内置的监视器锁一起使用时,支持同Object监控器方法(『Object#wait()』、『Object#notify』、『Object#notifyAll』)相同的用法。 +如果这个锁没有被持有的话任何Condition等待(『Condition#await()』)或者通知(『Condition#signal』)方法被调用,那么一个“IllegalMonitorStateException”异常将会抛出。(也就说是说,Condition是用在锁已经被持有的情况下) + +当一个condition的等待方法被调用(『Condition#await()』),锁会被释放,并且在这个方法返回之前,锁会被重新获取并且锁的持有次数会重新存储为这个方法被调用到时候的值。 +如果一个线程在等待期间被中断了,那么等待将会结束,一个“InterruptedException”异常将会抛出,并且线程的中断状态将被清除。 +等待线程被以先进先出(FIFO)的顺序被通知。 + + +等待获得锁的线程和调用await方法的线程存在本质上的不同。一旦一个线程调用await方法,它进入该条件的等待集。当锁可用时,该线程不能马上解除阻塞。相反,它处于阻塞状态,直到另一个线程调用同一条件上的signalAll方法时为止。这一调用重新激活因为这一条件而等待的所有线程。当这些线程从等待集当中移出时,它们再次成为可运行的,调度器将再次激活它们。同时,它们将试图重新进入该对象。一旦锁成为可用的,它们中的某个将从await调用返回,获得该锁并从被阻塞的地方继续执行。 + + +await +『AbstractQueuedSynchronizer#await』 +public final void await() throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + Node node = addConditionWaiter(); + int savedState = fullyRelease(node); + int interruptMode = 0; + while (!isOnSyncQueue(node)) { + LockSupport.park(this); + if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) + break; + } + if (acquireQueued(node, savedState) && interruptMode != THROW_IE) + interruptMode = REINTERRUPT; + if (node.nextWaiter != null) // clean up if cancelled + unlinkCancelledWaiters(); + if (interruptMode != 0) + reportInterruptAfterWait(interruptMode); +} +实现可中断的条件等待。 +1.如果当前线程被中断了,则抛出“InterruptedException”异常。 +2.保存通过『getState』返回的锁的状态(state) +3.调用『release』方法,使用保存的state作为参数,如果该方法失败则抛出“IllegalMonitorStateException”异常 +4.线程阻塞直到收到唤醒通知或者线程被中断 +5.通过调用使用保存状态为参数的指定版本的『acquire』方法来重新获取锁。 +6.如果在步骤4中线程在堵塞的时候被中断了,那么抛出“InterruptedException”异常。 + +① 首先一进入该方法会先判断当前线程是否被标志为了中断状态,如果是则抛出“InterruptedException”异常,并且中断状态被清除。 +② 调用『addConditionWaiter』添加一个新的等待节点到条件等待队列中。 +③ 调用『fullyRelease(node)』使用锁当前的状态值执行释放,并返回这个状态值。(即,该方法调用完后,并执行成功的话,那么此时其他线程可以去获取这个锁了,可见await方法会使当前线程放弃对锁的持有。同时返回锁在释放前的状态值。) +④ 自旋判断创建的等待节点是否在所的同步队列中了,如果没有(则说明节点还未被信号通知,以从条件等待队列中转移到锁的同步队列中),则执行『LockSupport.park(this);』挂起当前线程,线程会一直被挂起直到被信号通知唤醒(『signal()』或『signalAll()』方法会将节点从条件等待队列转移到锁的同步队列中,并根据加入到同步队列后得到的前驱节点的waitStatus,可能会去唤醒当前线程;或者当锁的同步等待队列中的线程依次获取锁并释放后,直到轮到当前线程成为同步队列中第一个等待获取锁的线程时,当前线程或被唤醒)。接着判断线程是否发生中断,如果发送中断,则退出自旋;否则继续自旋重新执行本步骤的流程,直至新创建的等待节点被转移到锁的同步队列中。 +⑤ 执行『acquireQueued(node, savedState)』方法在排他模式下从等待队列中的线程获取锁,并且在获取锁后将锁的状态值设置为给定’savedState’(由于’savedState’就是上面因条件等待而释放锁前该线程获取锁的次数,而在该线程重新获取锁,继续await之后的流程时,保持了该线程在await之前持有锁的状态)。并且该方法会在获取锁的情况下才会返回: +a)若在等待获取锁的过程中,当前线程被标识为了中断,则在方法返回的时候返回true;接着判断interruptMode是否等于“THROW_IE”,如果为true,则说明节点的等待在得到唤醒通知之前就被取消了,此时interruptMode为“THROW_IE”;否则interruptMode!=THROW_IE,则说明节点的等待在得到唤醒通知之后才被取消了,那么设置interruptMode为“REINTERRUPT”,继续步骤[6] +b)若在等待获取锁的过程中,当前线程未被标识为中断,则继续步骤[6] +(这里一个正常的未被中断的流程就是,await的节点对应的线程会在步骤[4]被挂起,然后在某一个时刻因为signalAll()方法调用,该节点被转移到了锁的等待队列中。然后当该线程为锁的等待队列中第一个等待获取锁的线程时,会被它的前驱节点唤醒,此时节点被唤醒,判断得到已经在等待队列中了,那么结束步骤[4]的自旋,进入的步骤[5],调用『acquireQueued(node, savedState)』尝试获取锁,此时节点已经具有获取锁的权限了,如果成功获取锁流程继续,否则节点会被再次挂起,acquireQueued方法会阻塞直到当前线程获取锁的情况下返回。) +⑥ 如果node节点的nextWaiter非null,那么执行『unlinkCancelledWaiters();』来清除等待队列中被取消的节点。 +因为,如果node节点是通过signal/signalAll信号通知而从条件等待队列转移到锁的同步队列的话,那么node的nextWaiter是为null(在signal/signalAll方法中会将该字段置为null);否则如果是因为中断而将节点从条件等待队列转移到锁的同步队列的话,此时nextWaiter是不会被重置的,它依旧指向该节点在条件等待队列中的下一个节点。 +⑦ 如果中断模式标志不为’0’(即,“interruptMode != 0”),则根据给定的中断模式(interruptMode)在等待结束后报告中断(『reportInterruptAfterWait(interruptMode)』) + +因此,从『await()』方法中,我们可以得知下面几点: + +创建一个条件等待节点,并加入条件等待队列尾部。 +彻底释放当前线程所持有锁(因为,首先只有在持有锁的情况下才可以执行await操作,再者ReentrantLock是一个可重入的锁,因此同一个线程可以多次获取锁),这样锁就可以被其他线程获取。但会记录这个线程在彻底释放锁之前持有该锁的次数(即,锁的state值) +在该线程再次获取该锁时,会将锁的state设置为释放之前的值。即,从await()条件等待返回的时候,当前线程对锁持有的状态同await()等待条件之前是一致的。 +节点从条件等待队列中转移到锁的同步队列是两种情况: +a)收到了信号通知,即signal/signalAll +b)在未收到信号通知之前,检测到了当前线程被中断的标志。 +在当前线程重新获取到锁,准备从await方法返回的时候,await方法的返回也分两种情况: +a)在条件等待中的节点是通过signal/signalAll信号通知转移到锁的同步队列的,然后再在同步队列中根据FIFO的顺序来重新获取到了该锁。那么此时await方法正常返回。(在信号通知之后线程可能被标志位中断,但这不影响方法的正常返回) +b)在条件等待中节点是因为当前线程被标志为了中断而将其转移到了锁的同步队列中,这样在当前线程再次重新获取锁时,方法会异常返回,即抛出“InterruptedException”异常。 + + +接下来对,『await』中的源码细节进一步展开 +『AbstractQueuedSynchronizer#addConditionWaiter』 +private Node addConditionWaiter() { + Node t = lastWaiter; + // If lastWaiter is cancelled, clean out. + if (t != null && t.waitStatus != Node.CONDITION) { + unlinkCancelledWaiters(); + t = lastWaiter; + } + Node node = new Node(Thread.currentThread(), Node.CONDITION); + if (t == null) + firstWaiter = node; + else + t.nextWaiter = node; + lastWaiter = node; + return node; +} +添加一个新的等待者到等待队列中。 +①『Node t = lastWaiter;』:获取等待队列的中的最后一个节点, +②『t != null && t.waitStatus != Node.CONDITION』如果这个节点被取消了,那么调用『unlinkCancelledWaiters();』方法将等待队列中被取消的节点移除。并重新获取等待队列中的最后一个节点(『t = lastWaiter』) +③ 为当前线程创建一个waitStatus为“Node.CONDITION”的节点。 +④ 将新创建好的节点加入到等待队列的尾部: +a)如当前等待队列为空(即,上面获取的t为null,也就是说,当等待队列尾指针为null时,则说明此时等待队列为空)那么需要先初始化firstWaiter,将其指向这个新创建的节点。然后将lastWaiter也指向这个新创建的节点。此时等待队列中只有一个节点,firstWaiter和lastWaiter都指向这个节点。 +b)将等待队列中最后一个节点的next属性指向当前这个新创建的节点,然后将lastWaiter指向当前这个新创建的节点。 +⑤ 返回新创建的等待节点。 + + +『AbstractQueuedSynchronizer#fullyRelease』 + +final int fullyRelease(Node node) { + boolean failed = true; + try { + int savedState = getState(); + if (release(savedState)) { + failed = false; + return savedState; + } else { + throw new IllegalMonitorStateException(); + } + } finally { + if (failed) + node.waitStatus = Node.CANCELLED; + } +} +使用当前的状态值执行释放;返回保存的状态值。 +若操作失败,则取消节点,并抛出异常。 +①『int savedState = getState();』获取当前锁的状态值 +② 使用获取的状态值执行释放操作『release(savedState)』,如果操作成功,则方法结束,返回释放使用的保存的状态值;如果操作失败,则抛出“IllegalMonitorStateException”异常,并取消node节点,即,将节点node的waitStatus设置为“Node.CANCELLED”。 + + +『AbstractQueuedSynchronizer#isOnSyncQueue』 + +final boolean isOnSyncQueue(Node node) { + if (node.waitStatus == Node.CONDITION || node.prev == null) + return false; + if (node.next != null) // If has successor, it must be on queue + return true; + /* + * node.prev can be non-null, but not yet on queue because + * the CAS to place it on queue can fail. So we have to + * traverse from tail to make sure it actually made it. It + * will always be near the tail in calls to this method, and + * unless the CAS failed (which is unlikely), it will be + * there, so we hardly ever traverse much. + */ + return findNodeFromTail(node); +} +返回true,如果一个节点总是初始化于条件队列中,并且当前在同步队列中等待获取锁。 +① 如果node的waitStatus为“Node.CONDITION”或者node的prev为null,则说明node节点当前还没有入队同步队列,方法结束,返回false;否则步骤[2] +② 接着判断『if (node.next != null)』,如果为true,则说明node已经入队完毕,则方法结束,返回true。否则步骤[3] +③ 调用『findNodeFromTail(node)』从同步队列尾开始寻找节点。此时,node.prev非null,但是由于通过CAS将节点入队的操作可能失败导致当前节点还未在同步队列中(即,节点入队操作还未完成)。所以我们需要从同步队列尾部开始向前遍历以明确该节点是否在同步队列中。在这种方法的调用中,节点总是靠近尾部,除非CAS失败(不太可能),否则节点将在同步队列尾部附近,所以我们几乎不会经历很多遍历。 + + +『AbstractQueuedSynchronizer#findNodeFromTail』 + +private boolean findNodeFromTail(Node node) { + Node t = tail; + for (;;) { + if (t == node) + return true; + if (t == null) + return false; + t = t.prev; + } +} +从同步队列尾向前查询节点,如果节点在同步队列中,则返回true。 +仅在『isOnSyncQueue』方法内调用该方法。 +从锁的等待队列尾部开始向前遍历,如果找到node节点则返回true;否则遍历完整个等待队列也就没法找到node节点,则返回false。 + + +『AbstractQueuedSynchronizer#checkInterruptWhileWaiting』 + +private int checkInterruptWhileWaiting(Node node) { + return Thread.interrupted() ? + (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : + 0; +} +用于检测中断,如果中断在信号通知之前发生则返回“THROW_IE”,若中断在信号通知之后发生则返回“REINTERRUPT”,或者如果没有被中断则返回“0”。 + + +『AbstractQueuedSynchronizer#transferAfterCancelledWait』 + +final boolean transferAfterCancelledWait(Node node) { + if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { + enq(node); + return true; + } + /* + * If we lost out to a signal(), then we can't proceed + * until it finishes its enq(). Cancelling during an + * incomplete transfer is both rare and transient, so just + * spin. + */ + while (!isOnSyncQueue(node)) + Thread.yield(); + return false; +} +如果需要的话,在取消等待后,将节点转移到同步队列。如果线程在被信号通知之前取消等待了则返回true。 +① 通过CAS的方式将节点的状态从“Node.CONDITION”修改为“0”,如果成功,则说明节点此时还没有收到信号通知,此时将节点的waitStatus从“Node.CONDITION”修改为“0”就是在被信号通知前取消了节点对条件的等待,接着调用『enq(node)』将节点入队到锁的等待队列中,并结束方法,返回true。 +② CAS操作失败,则说明该等待条件的节点被其他线程信号通知了(一般是signalAll),那么自旋调用『isOnSyncQueue(node)』以确保节点入队(锁的等待队列)完成后退出自旋(因为取消等待条件期间一个未完成的转换是罕见且瞬间的时期,所以使用自旋即可)。然后方法结束,返回false。 +也就是说,首先该方法会确保node从条件等待队列转移到锁的同步队列中。node是因为该方法的执行而从条件等待队列转移到锁的同步队列的话,则返回true;否则如果node是因为signal/signalAll信号通知而从条件等待队列转移到锁的同步队列的话,则返回false。 + + +『AbstractQueuedSynchronizer#enq』 + +private Node enq(final Node node) { + for (;;) { + Node t = tail; + if (t == null) { // Must initialize + if (compareAndSetHead(new Node())) + tail = head; + } else { + node.prev = t; + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } +} +使用自旋锁的方式(自旋+CAS)插入节点到等待队列,如果等待队列为空则初始化队列。 +初始化队列:创建一个空节点(即,new Node()),将head和tail都指向这个节点。 +然后才是将我们待插入的节点插入,即:emptyNode -> newNode. head指向emptyNode,tail指向newNode。 + + +『AbstractQueuedSynchronizer#unlinkCancelledWaiters』 + +private void unlinkCancelledWaiters() { + Node t = firstWaiter; + Node trail = null; + while (t != null) { + Node next = t.nextWaiter; + if (t.waitStatus != Node.CONDITION) { + t.nextWaiter = null; + if (trail == null) + firstWaiter = next; + else + trail.nextWaiter = next; + if (next == null) + lastWaiter = trail; + } + else + trail = t; + t = next; + } +} +从等待队列中解除被取消等待节点的连接。该方法仅在持有锁的时候调用。这个方法调用发生在取消发生在等待条件期间,并根据一个新的等待节点插入时lastWaiter看起来已经被取消了。这个方法需要去避免垃圾的滞留在没有信号通知的时候。所以即便它可能需要一个完全遍历,这仅会在超时和取消发生在缺少通知的情况下发生。它会遍历所有的节点而非停止在一个指定的目标,以便在取消风暴期间不需要多次重新遍历就可以将所有的垃圾节点解除链接。 +该方法会从firstWaiter开始遍历整个等待队列,将被取消(即,waitStatus != Node.CONDITION)的节点从等待队列中移除。 +①『Node t = firstWaiter;』:获取等待队列的头结点。 +② 从头节点开始遍历等待队列。 +③『Node next = t.nextWaiter;』获取当前遍历节点的下一个节点。 +④ 如果当前节点被取消了(即,『t.waitStatus != Node.CONDITION』),那么将当前节点的next字段置null(便于垃圾回收)。然后判断『trail == null』,如果为true,则说明目前是头节点被取消了,那么设置『firstWaiter=next』,即当前节点的下一个节点。此时,next节点可能是一个有效节点,也可能是一个被取消的节点(如果是被取消的节点,会在下一次循环的时候再次重新设置firstWaiter),也可能是一个null(如果为null,接下来就会退出循环,说明等待队列为空了);如果『trail == null』为false,则说明此遍历到的被取消的节点不是头节点,并且trail指向了遍历到目前为止等待队列中最后一个有效的等待节点,那么执行『trail.nextWaiter = next;』以将当前正在被遍历的节点从等待队列中解除连接。接着判断『next == null』,若为true,则说明当前遍历的被取消的节点是等待队列的最后一个节点,那么执行『lastWaiter = trail;』将lastWaiter指向最后一个有效的等待节点。 +⑤ 如果当前节点没有被取消(即,『t.waitStatus == Node.CONDITION』),那么将trail置为t,这说明了trail指向了在遍历等待队列过程中的最后一个有效的等待节点。 +⑥ 将t置为next,即当前遍历节点的下一个节点。继续步骤[3],直至整个等待队列节点都遍历完(即,next为null)。 + + +signal/signalAll +** signal** +public final void signal() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + Node first = firstWaiter; + if (first != null) + doSignal(first); +} +将当前的条件等待队列中将等待时间最长的线程移动到锁的等待队列中,如果存在这么一个线程的话。 +① 判断执行该方法的当前线程是否是持有排他锁的线程,如果不是则抛出“IllegalMonitorStateException”异常。 +② 当执行该方法的线程是持有排他锁的线程时,获取条件等待队列中的第一个等待节点,若这个节点不为null,则执行『doSignal(first)』来信号通知这个节点。 +注意,因为条件等待节点是按照FIFO的顺序操作节点的,也就是新的等待节点总是会添加对队列尾部,所以队列头节点就是等待最长时间的节点。 + + +『AbstractQueuedSynchronizer#doSignal』 + +private void doSignal(Node first) { + do { + if ( (firstWaiter = first.nextWaiter) == null) + lastWaiter = null; + first.nextWaiter = null; + } while (!transferForSignal(first) && + (first = firstWaiter) != null); +} +删除并转移节点,直到命中一个未被取消的节点或者节点为null(节点为null,说明等待队列中已经没有一个有效的节点了,即等待队列要么为空,要么等待队列中的节点都是被取消的节点)。 +① 根据给定的first节点,为起始遍历直至获取第一个有效等待节点,并信号通知该节点。 +② 将当前节点的下一个等待节点(nextWaiter)设置为firstWaiter,然后判断firstWaiter是否为null,如果为null则说明当前节点已经是条件等待队列中的最后一个节点了,那么将lastWaiter也置为null。 +③ 将当前遍历节点的nextWaiter置为null(以便于当前节点在方法结束后被垃圾收集器回收) +④ 执行『transferForSignal』将节点从条件等待队列转移到同步队列队列中,如果操作成功,则当前循环结束,方法返回;如果操作失败,那么继续从头节点开始循环步骤[2],直到成功转移一个节点或者条件等待队列为空为止。 + + +signalAll +public final void signalAll() { + if (!isHeldExclusively()) + throw new IllegalMonitorStateException(); + Node first = firstWaiter; + if (first != null) + doSignalAll(first); +} +将条件等待队列中的所有线程转移到锁的等待队列中。 +① 判断执行该方法的当前线程是否是持有排他锁的线程,如果不是则抛出“IllegalMonitorStateException”异常。 +② 当执行该方法的线程是持有排他锁的线程时,获取条件等待队列中的第一个等待节点,若这个节点不为null,则执行『doSignalAll(first)』来信号通知所有的节点。 + + +『AbstractQueuedSynchronizer#doSignalAll』 + +private void doSignalAll(Node first) { + lastWaiter = firstWaiter = null; + do { + Node next = first.nextWaiter; + first.nextWaiter = null; + transferForSignal(first); + first = next; + } while (first != null); +} +删除并转移所有的节点。 +① 将lastWaiter和firstWaiter置为null +② 从给定的节点为起始,开始遍历节点,调用『transferForSignal(first);』来将节点从条件等待队列中转移到锁的等待队列中。 + + +『isHeldExclusively』 + +protected final boolean isHeldExclusively() { + // While we must in general read state before owner, + // we don't need to do so to check if current thread is owner + return getExclusiveOwnerThread() == Thread.currentThread(); +} +判断执行该方法的当前线程是否是持有排他锁的线程。 +如下情况,返回’true’: +a)执行该方法的线程就是持有排他锁的线程。 +如下情况,返回’false’: +a)执行该方法的线程不是持有排他锁的线程。 +b)当前排他锁没有被任何线程所持有。 + + +『AbstractQueuedSynchronizer#transferForSignal』 + +final boolean transferForSignal(Node node) { + /* + * If cannot change waitStatus, the node has been cancelled. + */ + if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) + return false; + + /* + * Splice onto queue and try to set waitStatus of predecessor to + * indicate that thread is (probably) waiting. If cancelled or + * attempt to set waitStatus fails, wake up to resync (in which + * case the waitStatus can be transiently and harmlessly wrong). + */ + Node p = enq(node); + int ws = p.waitStatus; + if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) + LockSupport.unpark(node.thread); + return true; +} +从条件等待队列转移一个节点到同步队列中。如果成功返回true +返回: +a)true:成功从条件队列中转移一个节点到同步队列中 +b)false:在释放信号通知之前,该节点被取消了。 +① 通过CAS的方式将需要转移的节点的状态从“Node.CONDITION”修改为“0”。如果CAS操作失败,则说明这个节点的已经被取消了。那么方法结束,返回false。 +② 将修改完状态后的节点加入到锁等待队列中(『enq(node)』),并得到加入到等待队列后,当前节点的前驱节点。 +③ 若前驱节点的"waitStatus > 0”(即,waitStatus为“CANCELLED”)或者通过CAS的方式将前驱节点的waitStatus修改为“SIGNAL”失败,则调用『LockSupport.unpark(node.thread);』将当前线程唤醒(唤醒后的线程会继续await中被挂起之后的流程)。 +④ 否则,"waitStatus <= 0”并且通过CAS成功将前驱节点的waitStatus修改为了“SIGNAL”,以此来标识当前线程正在等待获取锁。 + + +『signal』vs『signalAll』 +signal解除的是条件等待队列中第一个有效的节点(即,节点的waitStatus为“CONDITION”),这比解除所有线程的阻塞更加有效,但也存在危险。如果signal的线程发现自己仍然不能运行,那么它再次被阻塞(await)。如果没有其他线程再次调用signal,那么系统就死锁了。 +signal/signalAll方法本质上只是将条件等待队列中的节点转移到锁的同步队列中。因此,不能任务signal/signalAll方法调用后就会使得线程获取锁,线程什么时候获取锁,就是根据锁的同步队列FIFO的顺序来决定的,只有同步队列中的第一个线程才有权利去争夺获取锁。 diff --git a/week_03/55/Semaphore-55.md b/week_03/55/Semaphore-55.md new file mode 100644 index 0000000000000000000000000000000000000000..3e3b069fb7a9790670c180372c5f9c4bda042679 --- /dev/null +++ b/week_03/55/Semaphore-55.md @@ -0,0 +1,178 @@ +Semaphore源码分析 +类介绍 +Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。比如控制用户的访问量,同一时刻只允许1000个用户同时使用系统,如果超过1000个并发,则需要等待。 + +使用场景 +比如模拟一个停车场停车信号,假设停车场只有两个车位,一开始两个车位都是空的。这时如果同时来了两辆车,看门人允许它们进入停车场,然后放下车拦。以后来的车必须在入口等待,直到停车场中有车辆离开。这时,如果有一辆车离开停车场,看门人得知后,打开车拦,放入一辆,如果又离开一辆,则又可以放入一辆,如此往复。 + +public class SemaphoreDemo { + private static Semaphore s = new Semaphore(2); + public static void main(String[] args) { + ExecutorService pool = Executors.newCachedThreadPool(); + pool.submit(new ParkTask("1")); + pool.submit(new ParkTask("2")); + pool.submit(new ParkTask("3")); + pool.submit(new ParkTask("4")); + pool.submit(new ParkTask("5")); + pool.submit(new ParkTask("6")); + pool.shutdown(); + } + + static class ParkTask implements Runnable { + private String name; + public ParkTask(String name) { + this.name = name; + } + @Override + public void run() { + try { + s.acquire(); + System.out.println("Thread "+this.name+" start..."); + TimeUnit.SECONDS.sleep(new Random().nextInt(10)); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + s.release(); + } + } + } +} +Semaphore 源码分析 +Semaphore 通过使用内部类Sync继承AQS来实现。 +支持公平锁和非公平锁。内部使用的AQS的共享锁。 +具体实现可参考 AbstractQueuedSynchronizer 源码分析 + +Semaphore 的结构如下: + + +Semaphore 类结构 +Semaphore构造 +public Semaphore(int permits) { + sync = new NonfairSync(permits); +} + +public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); +} +构造方法指定信号量的许可数量,默认采用的是非公平锁,也只可以指定为公平锁。 +permits赋值给AQS中的state变量。 + +acquire:可响应中断的获得信号量 +public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); +} + +public void acquire(int permits) throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireSharedInterruptibly(permits); +} +获得信号量方法,这两个方法支持 Interrupt中断机制,可使用acquire() 方法每次获取一个信号量,也可以使用acquire(int permits) 方法获取指定数量的信号量 。 + +acquire:不可响应中断的获取信号量 +public void acquireUninterruptibly() { + sync.acquireShared(1); +} + +public void acquireUninterruptibly(int permits) { + if (permits < 0) throw new IllegalArgumentException(); + sync.acquireShared(permits); +} +这两个方法不响应Interrupt中断机制,其它功能同acquire方法机制。 + +tryAcquire 方法,尝试获得信号量 +public boolean tryAcquire() { + return sync.nonfairTryAcquireShared(1) >= 0; +} + +public boolean tryAcquire(long timeout, TimeUnit unit) + throws InterruptedException { + return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); +} + +public boolean tryAcquire(int permits, long timeout, TimeUnit unit) + throws InterruptedException { + if (permits < 0) throw new IllegalArgumentException(); + return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); +} +尝试获得信号量有三个方法。 + +尝试获取信号量,如果获取成功则返回true,否则马上返回false,不会阻塞当前线程。 +尝试获取信号量,如果在指定的时间内获得信号量,则返回true,否则返回false +尝试获取指定数量的信号量,如果在指定的时间内获得信号量,则返回true,否则返回false。 +release 释放信号量 +public void release() { + sync.releaseShared(1); +} +调用AQS中的releaseShared方法,使得state每次减一来控制信号量。 + +availablePermits方法,获取当前剩余的信号量数量 +public int availablePermits() { + return sync.getPermits(); +} + +//=========Sync类======== +final int getPermits() { + return getState(); + } +该方法返回AQS中state变量的值,当前剩余的信号量个数 + +drainPermits方法 +public int drainPermits() { + return sync.drainPermits(); +} + +//=========Sync类======== +final int drainPermits() { + for (;;) { + int current = getState(); + if (current == 0 || compareAndSetState(current, 0)) + return current; + } +} +获取并返回立即可用的所有许可。Sync类的drainPermits方法,获取1个信号量后将可用的信号量个数置为0。例如总共有10个信号量,已经使用了5个,再调用drainPermits方法后,可以获得一个信号量,剩余4个信号量就消失了,总共可用的信号量就变成6个了。 + +reducePermits 方法 +protected void reducePermits(int reduction) { + if (reduction < 0) throw new IllegalArgumentException(); + sync.reducePermits(reduction); +} + +//=========Sync类======== +final void reducePermits(int reductions) { + for (;;) { + int current = getState(); + int next = current - reductions; + if (next > current) // underflow + throw new Error("Permit count underflow"); + if (compareAndSetState(current, next)) + return; + } +} +该方法是protected 方法,减少信号量个数 + +判断AQS等待队列中是否还有Node +public final boolean hasQueuedThreads() { + return sync.hasQueuedThreads(); +} + +//=========AbstractQueuedSynchronizer类======== +public final boolean hasQueuedThreads() { + //头结点不等于尾节点就说明链表中还有元素 + return head != tail; +} +getQueuedThreads方法 +protected Collection getQueuedThreads() { + return sync.getQueuedThreads(); +} + +//=========AbstractQueuedSynchronizer类======== +public final Collection getQueuedThreads() { + ArrayList list = new ArrayList(); + for (Node p = tail; p != null; p = p.prev) { + Thread t = p.thread; + if (t != null) + list.add(t); + } + return list; +} +该方法获取AQS中等待队列中所有未获取信号量的线程相关的信息(等待获取信号量的线程相关信息)。 diff --git a/week_03/55/synchronized-55.md b/week_03/55/synchronized-55.md new file mode 100644 index 0000000000000000000000000000000000000000..533edadebc93bffd253af6cf11b340d4f023c279 --- /dev/null +++ b/week_03/55/synchronized-55.md @@ -0,0 +1,162 @@ +synchronized +前言 +一、synchronized的特性 +1.1 原子性 +1.2 可见性 +1.3 有序性 +1.4 可重入性 +二、synchronized的用法 +三、synchronized锁的实现 +3.1 同步方法 +3.2 同步代码块 +四、synchronized锁的底层实现 +五、JVM对synchronized的优化 +5.1 锁膨胀 +5.1.1 偏向锁 +5.1.2 轻量级锁 +5.1.3 重量级锁 +5.2 锁消除 +5.3 锁粗化 +5.4 自旋锁与自适应自旋锁 +结语 +前言 +如果某一个资源被多个线程共享,为了避免因为资源抢占导致资源数据错乱,我们需要对线程进行同步,那么synchronized就是实现线程同步的关键字,可以说在并发控制中是必不可少的部分,今天就来看一下synchronized的使用和底层原理。 + +一、synchronized的特性 +1.1 原子性 +所谓原子性就是指一个操作或者多个操作,要么全部执行并且执行的过程不会被任何因素打断,要么就都不执行。 + +在Java中,对基本数据类型的变量的读取和赋值操作是原子性操作,即这些操作是不可被中断的,要么执行,要么不执行。但是像i++、i+=1等操作字符就不是原子性的,它们是分成读取、计算、赋值几步操作,原值在这些步骤还没完成时就可能已经被赋值了,那么最后赋值写入的数据就是脏数据,无法保证原子性。 + +被synchronized修饰的类或对象的所有操作都是原子的,因为在执行操作之前必须先获得类或对象的锁,直到执行完才能释放,这中间的过程无法被中断(除了已经废弃的stop()方法),即保证了原子性。 + +注意!面试时经常会问比较synchronized和volatile,它们俩特性上最大的区别就在于原子性,volatile不具备原子性。 + +1.2 可见性 +可见性是指多个线程访问一个资源时,该资源的状态、值信息等对于其他线程都是可见的。 + +synchronized和volatile都具有可见性,其中synchronized对一个类或对象加锁时,一个线程如果要访问该类或对象必须先获得它的锁,而这个锁的状态对于其他任何线程都是可见的,并且在释放锁之前会将对变量的修改刷新到主存当中,保证资源变量的可见性,如果某个线程占用了该锁,其他线程就必须在锁池中等待锁的释放。 + +而volatile的实现类似,被volatile修饰的变量,每当值需要修改时都会立即更新主存,主存是共享的,所有线程可见,所以确保了其他线程读取到的变量永远是最新值,保证可见性。 + +1.3 有序性 +有序性值程序执行的顺序按照代码先后执行。 + +synchronized和volatile都具有有序性,Java允许编译器和处理器对指令进行重排,但是指令重排并不会影响单线程的顺序,它影响的是多线程并发执行的顺序性。synchronized保证了每个时刻都只有一个线程访问同步代码块,也就确定了线程执行同步代码块是分先后顺序的,保证了有序性。 + +1.4 可重入性 +synchronized和ReentrantLock都是可重入锁。当一个线程试图操作一个由其他线程持有的对象锁的临界资源时,将会处于阻塞状态,但当一个线程再次请求自己持有对象锁的临界资源时,这种情况属于重入锁。通俗一点讲就是说一个线程拥有了锁仍然还可以重复申请锁。 + +二、synchronized的用法 +synchronized可以修饰静态方法、成员函数,同时还可以直接定义代码块,但是归根结底它上锁的资源只有两类:一个是对象,一个是类。 + +先看看下面的代码(初学者看到先不要晕,后面慢慢讲解): + + +首先我们知道被static修饰的静态方法、静态属性都是归类所有,同时该类的所有实例对象都可以访问。但是普通成员属性、成员方法是归实例化的对象所有,必须实例化之后才能访问,这也是为什么静态方法不能访问非静态属性的原因。我们明确了这些属性、方法归哪些所有之后就可以理解上面几个synchronized的锁到底是加给谁的了。 + +首先看第一个synchronized所加的方法是add1(),该方法没有被static修饰,也就是说该方法是归实例化的对象所有,那么这个锁就是加给Test1类所实例化的对象。 + +然后是add2()方法,该方法是静态方法,归Test1类所有,所以这个锁是加给Test1类的。 + +最后是method()方法中两个同步代码块,第一个代码块所锁定的是Test1.class,通过字面意思便知道该锁是加给Test1类的,而下面那个锁定的是instance,这个instance是Test1类的一个实例化对象,自然它所上的锁是给instance实例化对象的。 + +弄清楚这些锁是上给谁的就应该很容易懂synchronized的使用啦,只要记住要进入同步方法或同步块必须先获得相应的锁才行。那么我下面再列举出一个非常容易进入误区的代码,看看你是否真的理解了上面的解释。 + + +上面的简单意思就是用两个线程分别对i加100万次,理论结果应该是200万,而且我还加了synchronized锁住了add方法,保证了其线程安全性。可是!!!我无论运行多少次都是小于200万的,为什么呢? + +原因就在于synchronized加锁的函数,这个方法是普通成员方法,那么锁就是加给对象的,但是在创建线程时却new了两个Test2实例,也就是说这个锁是给这两个实例加的锁,并没有达到同步的效果,所以才会出现错误。至于为什么小于200万,要理解i++的过程就明白了,我之前写了一篇文章讲解过这个过程,请阅读:详谈Java中的CAS操作 + +三、synchronized锁的实现 +synchronized有两种形式上锁,一个是对方法上锁,一个是构造同步代码块。他们的底层实现其实都一样,在进入同步代码之前先获取锁,获取到锁之后锁的计数器+1,同步代码执行完锁的计数器-1,如果获取失败就阻塞式等待锁的释放。只是他们在同步块识别方式上有所不一样,从class字节码文件可以表现出来,一个是通过方法flags标志,一个是monitorenter和monitorexit指令操作。 + +3.1 同步方法 +首先来看在方法上上锁,我们就新定义一个同步方法然后进行反编译,查看其字节码: + + + +可以看到在add方法的flags里面多了一个ACC_SYNCHRONIZED标志,这标志用来告诉JVM这是一个同步方法,在进入该方法之前先获取相应的锁,锁的计数器加1,方法结束后计数器-1,如果获取失败就阻塞住,知道该锁被释放。 + +如果看不懂字节码指令的朋友可以先阅读我之前写的两篇文章,了解一下class的结构: + +详解Class类文件的结构(上) +详解Class类文件的结构(下) +3.2 同步代码块 +我们新定义一个同步代码块,编译出class字节码,然后找到method方法所在的指令块,可以清楚的看到其实现上锁和释放锁的过程,截图如下: + + + +从反编译的同步代码块可以看到同步块是由monitorenter指令进入,然后monitorexit释放锁,在执行monitorenter之前需要尝试获取锁,如果这个对象没有被锁定,或者当前线程已经拥有了这个对象的锁,那么就把锁的计数器加1。当执行monitorexit指令时,锁的计数器也会减1。当获取锁失败时会被阻塞,一直等待锁被释放。 + +但是为什么会有两个monitorexit呢?其实第二个monitorexit是来处理异常的,仔细看反编译的字节码,正常情况下第一个monitorexit之后会执行goto指令,而该指令转向的就是23行的return,也就是说正常情况下只会执行第一个monitorexit释放锁,然后返回。而如果在执行中发生了异常,第二个monitorexit就起作用了,它是由编译器自动生成的,在发生异常时处理异常然后释放掉锁。 + +四、synchronized锁的底层实现 +在理解锁实现原理之前先了解一下Java的对象头和Monitor,在JVM中,对象是分成三部分存在的:对象头、实例数据、对其填充。 + + +实例数据和对其填充与synchronized无关,这里简单说一下(我也是阅读《深入理解Java虚拟机》学到的,读者可仔细阅读该书相关章节学习)。实例数据存放类的属性数据信息,包括父类的属性信息,如果是数组的实例部分还包括数组的长度,这部分内存按4字节对齐;对其填充不是必须部分,由于虚拟机要求对象起始地址必须是8字节的整数倍,对齐填充仅仅是为了使字节对齐。 + +对象头是我们需要关注的重点,它是synchronized实现锁的基础,因为synchronized申请锁、上锁、释放锁都与对象头有关。对象头主要结构是由Mark Word 和 Class Metadata Address组成,其中Mark Word存储对象的hashCode、锁信息或分代年龄或GC标志等信息,Class Metadata Address是类型指针指向对象的类元数据,JVM通过该指针确定该对象是哪个类的实例。 + +锁也分不同状态,JDK6之前只有两个状态:无锁、有锁(重量级锁),而在JDK6之后对synchronized进行了优化,新增了两种状态,总共就是四个状态:无锁状态、偏向锁、轻量级锁、重量级锁,其中无锁就是一种状态了。锁的类型和状态在对象头Mark Word中都有记录,在申请锁、锁升级等过程中JVM都需要读取对象的Mark Word数据。 + +每一个锁都对应一个monitor对象,在HotSpot虚拟机中它是由ObjectMonitor实现的(C++实现)。每个对象都存在着一个monitor与之关联,对象与其monitor之间的关系有存在多种实现方式,如monitor可以与对象一起创建销毁或当线程试图获取对象锁时自动生成,但当一个monitor被某个线程持有后,它便处于锁定状态。 + +ObjectMonitor() { + _header = NULL; + _count = 0; //锁计数器 + _waiters = 0, + _recursions = 0; + _object = NULL; + _owner = NULL; + _WaitSet = NULL; //处于wait状态的线程,会被加入到_WaitSet + _WaitSetLock = 0 ; + _Responsible = NULL ; + _succ = NULL ; + _cxq = NULL ; + FreeNext = NULL ; + _EntryList = NULL ; //处于等待锁block状态的线程,会被加入到该列表 + _SpinFreq = 0 ; + _SpinClock = 0 ; + OwnerIsThread = 0 ; + } +该段摘自:https://blog.csdn.net/javazejian/article/details/72828483 + ObjectMonitor中有两个队列_WaitSet和_EntryList,用来保存ObjectWaiter对象列表(每个等待锁的线程都会被封装ObjectWaiter对象),_owner指向持有ObjectMonitor对象的线程,当多个线程同时访问一段同步代码时,首先会进入_EntryList 集合,当线程获取到对象的monitor 后进入 _Owner 区域并把monitor中的owner变量设置为当前线程同时monitor中的计数器count加1,若线程调用 wait() 方法,将释放当前持有的monitor,owner变量恢复为null,count自减1,同时该线程进入 WaitSe t集合中等待被唤醒。若当前线程执行完毕也将释放monitor(锁)并复位变量的值,以便其他线程进入获取monitor(锁)。 + monitor对象存在于每个Java对象的对象头中(存储的指针的指向),synchronized锁便是通过这种方式获取锁的,也是为什么Java中任意对象可以作为锁的原因,同时也是notify/notifyAll/wait等方法存在于顶级对象Object中的原因(关于这点稍后还会进行分析) + +五、JVM对synchronized的优化 +从最近几个jdk版本中可以看出,Java的开发团队一直在对synchronized优化,其中最大的一次优化就是在jdk6的时候,新增了两个锁状态,通过锁消除、锁粗化、自旋锁等方法使用各种场景,给synchronized性能带来了很大的提升。 + +5.1 锁膨胀 +上面讲到锁有四种状态,并且会因实际情况进行膨胀升级,其膨胀方向是:无锁——>偏向锁——>轻量级锁——>重量级锁,并且膨胀方向不可逆。 + +5.1.1 偏向锁 +一句话总结它的作用:减少统一线程获取锁的代价。在大多数情况下,锁不存在多线程竞争,总是由同一线程多次获得,那么此时就是偏向锁。 + +核心思想: + +如果一个线程获得了锁,那么锁就进入偏向模式,此时Mark Word的结构也就变为偏向锁结构,当该线程再次请求锁时,无需再做任何同步操作,即获取锁的过程只需要检查Mark Word的锁标记位为偏向锁以及当前线程ID等于Mark Word的ThreadID即可,这样就省去了大量有关锁申请的操作。 + +5.1.2 轻量级锁 +轻量级锁是由偏向锁升级而来,当存在第二个线程申请同一个锁对象时,偏向锁就会立即升级为轻量级锁。注意这里的第二个线程只是申请锁,不存在两个线程同时竞争锁,可以是一前一后地交替执行同步块。 + +5.1.3 重量级锁 +重量级锁是由轻量级锁升级而来,当同一时间有多个线程竞争锁时,锁就会被升级成重量级锁,此时其申请锁带来的开销也就变大。 + +重量级锁一般使用场景会在追求吞吐量,同步块或者同步方法执行时间较长的场景。 + +5.2 锁消除 +消除锁是虚拟机另外一种锁的优化,这种优化更彻底,在JIT编译时,对运行上下文进行扫描,去除不可能存在竞争的锁。比如下面代码的method1和method2的执行效率是一样的,因为object锁是私有变量,不存在所得竞争关系。 + + +5.3 锁粗化 +锁粗化是虚拟机对另一种极端情况的优化处理,通过扩大锁的范围,避免反复加锁和释放锁。比如下面method3经过锁粗化优化之后就和method4执行效率一样了。 + + +5.4 自旋锁与自适应自旋锁 +轻量级锁失败后,虚拟机为了避免线程真实地在操作系统层面挂起,还会进行一项称为自旋锁的优化手段。 + +自旋锁:许多情况下,共享数据的锁定状态持续时间较短,切换线程不值得,通过让线程执行循环等待锁的释放,不让出CPU。如果得到锁,就顺利进入临界区。如果还不能获得锁,那就会将线程在操作系统层面挂起,这就是自旋锁的优化方式。但是它也存在缺点:如果锁被其他线程长时间占用,一直不释放CPU,会带来许多的性能开销。 + +自适应自旋锁:这种相当于是对上面自旋锁优化方式的进一步优化,它的自旋的次数不再固定,其自旋的次数由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定,这就解决了自旋锁带来的缺点。 diff --git a/week_03/55/volatile-55.md b/week_03/55/volatile-55.md new file mode 100644 index 0000000000000000000000000000000000000000..9133faf23a2bdc40d0f6c51f2a8e2a1dd5ff3f83 --- /dev/null +++ b/week_03/55/volatile-55.md @@ -0,0 +1,90 @@ +volatile +volatile的定义及其实现 +定义:如果一个字段被声明成volatile,那么java线程内存模型将确保所有线程看到的这个变量的值都是一致的. + +从它的定义当中咱们也可以了解到volatile具有可见性的特性.但它具体是如何保证其可见性的呢? + +先看一段JIT编译器生成的汇编指令 + +//Java代码如下 +instance = new Singleton(); //这里instance是volatile变量 +//反汇编后 +0x01a3de1d: movb $0x0,0x1104800(%esi); +0x01a3de24: lock add1 $0x0,(%esp); +有volatile修饰的变量在进行写操作时会出现第二行反汇编代码,重点在lock这个指令.它有两个目的: + +立即回写当前处理器缓存行的值到内存. +其他所有cpu缓存了该地址的数据将会失效. +这里大家也许会有疑问,有没有可能存在多个cpu一起回写数据? + +答案是不会的.虽然cpu鼓励多个处理器可以有竞争,但是总线会对竞争做出裁决,只会有一个cpu获取优先权.其他处理器会被总线禁止,处于阻塞状态.如下图: + + + +对于第二点,其他cpu缓存该地址的数据失效后想要再次使用的话就必须得从主内存中重新读取,这样就能保证再次执行计算时所获取的值是最新的,也可以认为所有CPU的缓存是一致的,这也就证明了volatile修饰的字段是可见的. + +可见性不代表在并发下是安全的 +这里咱们先引进一段代码: + +/** + * volatile 变量自增运算 + * + * @author mars + */ +public class VolatileTest { + public static volatile int count = 0; + + public static void increase() { + count++; + } + + private static final int THREAD_COUNTS = 20; + + public static void main(String[] args) throws InterruptedException { + CountDownLatch latch = new CountDownLatch(THREAD_COUNTS); + Thread[] threads = new Thread[THREAD_COUNTS]; + for (int j = 0; j < THREAD_COUNTS; j++) { + threads[j] = new Thread(new Runnable() { + @Override + public void run() { + for (int i = 0; i < 10000; i++) { + increase(); + } + latch.countDown(); + } + }); + threads[j].start(); + } + //等待所有的线程执行结束 + latch.await(); + + System.out.println(count); + } +} +这段代码供发起了20个线程,对count变量进行了10000次自增操作,如果volatile修饰的字段在并发下是安全的话,讲道理最终结果都会是200000,但经过测试发现,每次的输出结果都会不一样.但具体是什么原因造成的? + +其实最主要的问题是出在increase()这个自增方法上,这个操作不是一个原子操作,也就是不是一步就能操作完成的,其中会经历count值入栈,add,出栈,到操作线程缓存,最终到内存等等一系列步骤.当A线程其执行这些指令时,B线程正好将数据同步到了主内存中,此时A线程栈顶的数据就会变成过期数据,然后A线程就会将较小的值同步到主内存中. + + +如何正确的运用volatile +要想运用好volatile修饰符,需要保证运用场景符合下述规则: + +运算结果不依赖变量的当前值. +该变量不需要和其他变量共同参与约束. +例如使用volatile变量来控制并发就很合适: + +volatile boolean shutdownWork; + + public void shutdowm(){ + shutdownWork = true; + } + + public void doWork(){ + while (!shutdownWork){ + //execute task + } + } +上面这段代码运行结果并无需依赖shutdownWork的值,但是只要shutdownWork的值一旦经过改变,便会立即被其他所有线程所感知,然后停止执行任务. + +小知识点 +在多处理器下,为了保证各个处理器的缓存是一致的,处理器会使用嗅探技术来保证它的内部缓存,系统内存和其他处理器的缓存的数据在总线上保持一致.如果通过嗅探检测到其他处理器打算写内存地址,而这个地址当前处于共享状态,那么正在嗅探的处理器将使它的缓存无效,在下次访问相同的内存地址时,强制执行缓存行填充,也就是从内存中重新读取该内存地址指向的值. diff --git "a/week_03/55/\345\210\206\345\270\203\345\274\217\351\224\201-55.md" "b/week_03/55/\345\210\206\345\270\203\345\274\217\351\224\201-55.md" new file mode 100644 index 0000000000000000000000000000000000000000..ffc258ad2ca181b8caf3bf2d07002ba1443f961c --- /dev/null +++ "b/week_03/55/\345\210\206\345\270\203\345\274\217\351\224\201-55.md" @@ -0,0 +1,186 @@ +分布式锁 +大多数互联网系统都是分布式部署的,分布式部署确实能带来性能和效率上的提升,但为此,我们就需要多解决一个分布式环境下,数据一致性的问题。 + + + +当某个资源在多系统之间,具有共享性的时候,为了保证大家访问这个资源数据是一致的,那么就必须要求在同一时刻只能被一个客户端处理,不能并发的执行,否者就会出现同一时刻有人写有人读,大家访问到的数据就不一致了。 + + + +一、我们为什么需要分布式锁? + + + +在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等) + + + +但是到了分布式系统的时代,这种线程之间的锁机制,就没作用了,系统可能会有多份并且部署在不同的机器上,这些资源已经不是在线程之间共享了,而是属于进程之间共享的资源。 + + + +因此,为了解决这个问题,我们就必须引入「分布式锁」。 + + + +分布式锁,是指在分布式的部署环境下,通过锁机制来让多客户端互斥的对共享资源进行访问。 + + + +分布式锁要满足哪些要求呢? + + + +排他性:在同一时间只会有一个客户端能获取到锁,其它客户端无法同时获取 + +避免死锁:这把锁在一段有限的时间之后,一定会被释放(正常释放或异常释放) + +高可用:获取或释放锁的机制必须高可用且性能佳 + +讲完了背景和理论,那我们接下来再看一下分布式锁的具体分类和实际运用。 + + + +二、分布式锁的实现方式有哪些? + + + +目前主流的有三种,从实现的复杂度上来看,从上往下难度依次增加: + + + +基于数据库实现 + +基于Redis实现 + +基于ZooKeeper实现 + +无论哪种方式,其实都不完美,依旧要根据咱们业务的实际场景来选择。 + + + +基于数据库实现: + +基于数据库来做分布式锁的话,通常有两种做法: + +基于数据库的乐观锁 + +基于数据库的悲观锁 + + + +我们先来看一下如何基于「乐观锁」来实现: + + + +乐观锁机制其实就是在数据库表中引入一个版本号(version)字段来实现的。 + + + +当我们要从数据库中读取数据的时候,同时把这个version字段也读出来,如果要对读出来的数据进行更新后写回数据库,则需要将version加1,同时将新的数据与新的version更新到数据表中,且必须在更新的时候同时检查目前数据库里version值是不是之前的那个version,如果是,则正常更新。如果不是,则更新失败,说明在这个过程中有其它的进程去更新过数据了。 + + + +下面找图举例, + + + + +如图,假设同一个账户,用户A和用户B都要去进行取款操作,账户的原始余额是2000,用户A要去取1500,用户B要去取1000,如果没有锁机制的话,在并发的情况下,可能会出现余额同时被扣1500和1000,导致最终余额的不正确甚至是负数。但如果这里用到乐观锁机制,当两个用户去数据库中读取余额的时候,除了读取到2000余额以外,还读取了当前的版本号version=1,等用户A或用户B去修改数据库余额的时候,无论谁先操作,都会将版本号加1,即version=2,那么另外一个用户去更新的时候就发现版本号不对,已经变成2了,不是当初读出来时候的1,那么本次更新失败,就得重新去读取最新的数据库余额。 + + + +通过上面这个例子可以看出来,使用「乐观锁」机制,必须得满足: + +(1)锁服务要有递增的版本号version + +(2)每次更新数据的时候都必须先判断版本号对不对,然后再写入新的版本号 + + + +我们再来看一下如何基于「悲观锁」来实现: + +悲观锁也叫作排它锁,在Mysql中是基于 for update 来实现加锁的,例如: + +//锁定的方法-伪代码publicbooleanlock(){    connection.setAutoCommit(false)for(){        result =select*fromuserwhereid =100forupdate;if(result){//结果不为空,//则说明获取到了锁returntrue;        }//没有获取到锁,继续获取sleep(1000);    }returnfalse;}//释放锁-伪代码connection.commit(); + +上面的示例中,user表中,id是主键,通过 for update 操作,数据库在查询的时候就会给这条记录加上排它锁。 + +(需要注意的是,在InnoDB中只有字段加了索引的,才会是行级锁,否者是表级锁,所以这个id字段要加索引) + +当这条记录加上排它锁之后,其它线程是无法操作这条记录的。 + +那么,这样的话,我们就可以认为获得了排它锁的这个线程是拥有了分布式锁,然后就可以执行我们想要做的业务逻辑,当逻辑完成之后,再调用上述释放锁的语句即可。 + + + +基于Redis实现 + + + +基于Redis实现的锁机制,主要是依赖redis自身的原子操作,例如: + +SETuser_key user_value NX PX100 + + + +redis从2.6.12版本开始,SET命令才支持这些参数: + + + +NX:只在在键不存在时,才对键进行设置操作,SET key value NX 效果等同于 SETNX key value  + + + +PX millisecond:设置键的过期时间为millisecond毫秒,当超过这个时间后,设置的键会自动失效 + + + +上述代码示例是指,当redis中不存在user_key这个键的时候,才会去设置一个user_key键,并且给这个键的值设置为 user_value,且这个键的存活时间为100ms + + + +为什么这个命令可以帮我们实现锁机制呢? + + + +因为这个命令是只有在某个key不存在的时候,才会执行成功。那么当多个进程同时并发的去设置同一个key的时候,就永远只会有一个进程成功。 + + + +当某个进程设置成功之后,就可以去执行业务逻辑了,等业务逻辑执行完毕之后,再去进行解锁。 + + + +解锁很简单,只需要删除这个key就可以了,不过删除之前需要判断,这个key对应的value是当初自己设置的那个。 + + + +另外,针对redis集群模式的分布式锁,可以采用redis的Redlock机制。 + + + +基于ZooKeeper实现 + + + +其实基于ZooKeeper,就是使用它的临时有序节点来实现的分布式锁。 + + + +原理就是:当某客户端要进行逻辑的加锁时,就在zookeeper上的某个指定节点的目录下,去生成一个唯一的临时有序节点, 然后判断自己是否是这些有序节点中序号最小的一个,如果是,则算是获取了锁。如果不是,则说明没有获取到锁,那么就需要在序列中找到比自己小的那个节点,并对其调用exist()方法,对其注册事件监听,当监听到这个节点被删除了,那就再去判断一次自己当初创建的节点是否变成了序列中最小的。如果是,则获取锁,如果不是,则重复上述步骤。 + + + +当释放锁的时候,只需将这个临时节点删除即可。 + + +如图,locker是一个持久节点,node_1/node_2/…/node_n 就是上面说的临时节点,由客户端client去创建的。 + + + +client_1/client_2/…/clien_n 都是想去获取锁的客户端。以client_1为例,它想去获取分布式锁,则需要跑到locker下面去创建临时节点(假如是node_1)创建完毕后,看一下自己的节点序号是否是locker下面最小的,如果是,则获取了锁。如果不是,则去找到比自己小的那个节点(假如是node_2),找到后,就监听node_2,直到node_2被删除,那么就开始再次判断自己的node_1是不是序列中最小的,如果是,则获取锁,如果还不是,则继续找一下一个节点。 + + + +以上,就讲完了为什么我们需要分布式锁这个技术,以及分布式锁中常见的三种机制。