diff --git a/week_03/26/AQS.md b/week_03/26/AQS.md new file mode 100644 index 0000000000000000000000000000000000000000..c051e7591b469a98f6ed77df566f2e4098b06143 --- /dev/null +++ b/week_03/26/AQS.md @@ -0,0 +1,518 @@ + +# AbstractQueuedSynchronizer + +## 简单说明 +队列同步器(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件(信号量、事件等)的基础框架类.JDK中许多并发工具类的内部实现都依赖于AQS。 + +AQS的主要使用方式是继承它作为一个内部辅助类实现同步原语,它可以简化你的并发工具的内部实现,屏蔽同步状态管理、线程的排队、等待与唤醒等底层操作。 + +AQS设计基于模板方法模式,需要继承同步器并且重写指定的方法,将其组合在并发组件的实现中,调用同步器的模板方法,模板方法会调用使用者重写的方法。 +## 成员变量 +``` Java +//同步队列的头结点 +private transient volatile Node head; + +//同步队列的尾结点 +private transient volatile Node tail; + +//同步状态 +private volatile int state; + +//获取同步状态 +protected final int getState() { + return state; +} + +//设置同步状态 +protected final void setState(int newState) { + state = newState; +} + +//以CAS方式设置同步状态 +protected final boolean compareAndSetState(int expect, int update) { + return unsafe.compareAndSwapInt(this, stateOffset, expect, update); +} +``` +AQS的三个成员变量都使用了volatile关键字进行修饰,这就确保了多个线程对它的修改都是内存可见的。 + +整个类的核心就是这个同步状态,可以看到同步状态其实就是一个int型的变量,同步状态决定了当前锁对象是否允许继续被占用或者,也可以根据锁状态来判断当前同步锁被多少个线程占据了。 + +当然锁的状态根据不同子类的不同具体需求而有所不同,例如在ReentrantLock中,state等于0表示锁是开的,state大于0表示锁是锁着的,且其大小正比于一个线程重复占据一个重入锁的此数,而在Semaphore中,state大于0表示锁是开的,state等于0表示锁是锁着的 +## 同步、条件队列 +AbstractQueuedSynchronizer内部其实有两个排队区,一个是同步队列,一个是条件队列。 + +同步队列只有一条,而条件队列可以有多条。其中唯一的同步队列的管理者是AQS对象本身,而条件队列由AQS内部的Conditon接口对象管理,每一个Condition对象都对应着一个由其管理的条件等待队列。同步队列的结点分别持有前后结点的引用(双向队列),而条件队列的结点只有一个指向后继结点的引用(向后指向的单向队列)。 + +每个节点包含一个线程,线程在获取锁失败后首先进入同步队列排队,而想要进入条件队列该线程必须持有锁才行。所以节点是线程的存储之处,节点和线程在某些语境下是通用的。 + +### +```Java +//同步队列的节点 +static final class Node { + + static final Node SHARED = new Node(); //表示当前线程以共享模式持有锁 + + static final Node EXCLUSIVE = null; //表示当前线程以独占模式持有锁 + + static final int CANCELLED = 1; //表示当前结点已经取消获取锁 + + static final int SIGNAL = -1; //表示后继结点的线程需要运行 + + static final int CONDITION = -2; //表示当前结点在条件队列中排队 + + static final int PROPAGATE = -3; //表示后继结点可以直接获取锁 + + volatile int waitStatus; //表示当前结点的等待状态 + + volatile Node prev; //表示同步队列中的前继结点 + + volatile Node next; //表示同步队列中的后继结点 + + volatile Thread thread; //当前结点持有的线程引用 + + Node nextWaiter; //表示条件队列中的后继结点,条件队列中节点没有前继节点 + + //当前结点状态是否是共享模式 + final boolean isShared() { + return nextWaiter == SHARED; + } + + //返回当前结点的前继结点 + final Node predecessor() throws NullPointerException { + Node p = prev; + if (p == null) { + throw new NullPointerException(); + } else { + return p; + } + } + + //构造器1 + Node() {} + + //构造器2, 默认用这个构造器 + Node(Thread thread, Node mode) { + //注意持有模式是赋值给nextWaiter + this.nextWaiter = mode; + this.thread = thread; + } + + //构造器3, 只在条件队列中用到 + Node(Thread thread, int waitStatus) { + this.waitStatus = waitStatus; + this.thread = thread; + } +} +``` +Node代表同步队列和条件队列中的一个结点,它是AbstractQueuedSynchronizer的内部类。 + +Node有很多属性,比如持有模式,等待状态,同步队列中的前继和后继,以及条件队列中的后继引用等等。 + +每个节点都存储着指向一个线程对象的引用变量,所以一定程度上我们完全可以把节点理解为线程对象 + +## 加锁模式 + +AQS的内部类Node定义了两个节点Node常量SHARED和EXCLUSIVE,他们分别标识 AQS队列中等待线程的锁获取模式。 + +java并发包提供的加锁模式分为独占锁和共享锁: + +独占锁:每次只能有一个线程能持有锁,就是以独占方式实现的互斥锁。独占锁是一种悲观保守的加锁策略,它避免了读/读冲突,如果某个只读线程获取锁,则其他读线程都只能等待,这种情况下就限制了不必要的并发性,因为读操作并不会影响数据的一致性。具有独占锁功能的子类,它必须实现tryAcquire、tryRelease、isHeldExclusively等 + +独占锁获取锁时,设置节点模式为Node.EXCLUSIVE: +```Java +public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); + } +``` + +共享锁:则允许多个线程同时获取锁,并发访问共享资源,如:ReadWriteLock。共享锁则是一种乐观锁,它放宽了加锁策略,允许多个执行读操作的线程同时访问共享资源。 java的并发包中提供了ReadWriteLock,读-写锁。它允许一个资源可以被多个读操作访问,或者被一个 写操作访问,但读写操作不能同时进行。共享锁功能的子类,必须实现tryAcquireShared和tryReleaseShared等方法,带有Shared后缀的方法都是支持共享锁加锁的语义。 + +共享锁获取锁,节点模式则为Node.SHARED: +```Java +private void doAcquireShared(int arg) { + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + ..... + } +``` +## 结点的等待状态 +```Java + volatile int waitStatus; //表示当前结点的等待状态 +``` + +#### CANCELLED :当这个线程在排队过程中已经打算放弃了,它就会将自己座位上的牌子设置为CANCELLED,此状态的旧节点在新节点遍历向前找时会被清理出队列。 + +#### SIGNAL :状态为SIGNAL的线程在执行完自己的代码后,退出线程前,回去唤醒下一个在等待队列中的线程/节点。只有保证前面节点的状态为SIGNAL,当前节点才能够保证被唤醒; + +#### CONDITION:表示该线程在条件队列中排队; + +#### PROPAGATE:提醒后面来的线程可以直接获取锁,这个状态只在共享模式用到,后面单独讲共享模式的时候会讲到。 +## 获取独占锁 +```Java +/** + * 获取独占锁,对中断不敏感。 + * 首先尝试获取一次锁,如果成功,则返回; + * 否则会把当前线程包装成Node插入到队列中,在队列中会检测是否为head的直接后继,并尝试获取锁, + * 如果获取失败,则会通过LockSupport阻塞当前线程,直至被释放锁的线程唤醒或者被中断,随后再次尝试获取锁,如此反复。 + */ +public final void acquire(int arg) { + if (!tryAcquire(arg) && + acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) + selfInterrupt(); +} + +/** + * 在队列中新增一个节点。 + */ +private Node addWaiter(Node mode) { + Node node = new Node(Thread.currentThread(), mode); + Node pred = tail; + // 快速尝试 + if (pred != null) { + node.prev = pred; + // 通过CAS在队尾插入当前节点 + if (compareAndSetTail(pred, node)) { + pred.next = node; + return node; + } + } + // 初始情况或者在快速尝试失败后插入节点 + enq(node); + return node; +} + +/** + * 通过循环+CAS在队列中成功插入一个节点后返回。 + */ +private Node enq(final Node node) { + for (;;) { + Node t = tail; + // 初始化head和tail + if (t == null) { + if (compareAndSetHead(new Node())) + tail = head; + } else { + /* + * AQS的精妙就是体现在很多细节的代码,比如需要用CAS往队尾里增加一个元素 + * 此处的else分支是先在CAS的if前设置node.prev = t,而不是在CAS成功之后再设置。 + * 一方面是基于CAS的双向链表插入目前没有完美的解决方案,另一方面这样子做的好处是: + * 保证每时每刻tail.prev都不会是一个null值,否则如果node.prev = t + * 放在下面if的里面,会导致一个瞬间tail.prev = null,这样会使得队列不完整。 + */ + node.prev = t; + // CAS设置tail为node,成功后把老的tail也就是t连接到node。 + if (compareAndSetTail(t, node)) { + t.next = node; + return t; + } + } + } +} + +/** + * 在队列中的节点通过此方法获取锁,对中断不敏感。 + */ +final boolean acquireQueued(final Node node, int arg) { + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + /* + * 检测当前节点前驱是否head,这是试获取锁的资格。 + * 如果是的话,则调用tryAcquire尝试获取锁, + * 成功,则将head置为当前节点。 + */ + if (p == head && tryAcquire(arg)) { + setHead(node); + p.next = null; // help GC + failed = false; + return interrupted; + } + /* + * 如果未成功获取锁则根据前驱节点判断是否要阻塞。 + * 如果阻塞过程中被中断,则置interrupted标志位为true。 + * shouldParkAfterFailedAcquire方法在前驱状态不为SIGNAL的情况下都会循环重试获取锁。 + */ + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } +} + +/** + * 根据前驱节点中的waitStatus来判断是否需要阻塞当前线程。 + */ +private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { + int ws = pred.waitStatus; + if (ws == Node.SIGNAL) + /* + * 前驱节点设置为SIGNAL状态,在释放锁的时候会唤醒后继节点, + * 所以后继节点(也就是当前节点)现在可以阻塞自己。 + */ + return true; + if (ws > 0) { + /* + * 前驱节点状态为取消,向前遍历,更新当前节点的前驱为往前第一个非取消节点。 + * 当前线程会之后会再次回到循环并尝试获取锁。 + */ + do { + node.prev = pred = pred.prev; + } while (pred.waitStatus > 0); + pred.next = node; + } else { + /** + * 等待状态为0或者PROPAGATE(-3),设置前驱的等待状态为SIGNAL, + * 并且之后会回到循环再次重试获取锁。 + */ + compareAndSetWaitStatus(pred, ws, Node.SIGNAL); + } + return false; +} + + +/** + * 该方法实现某个node取消获取锁。 + */ +private void cancelAcquire(Node node) { + + if (node == null) + return; + + node.thread = null; + + // 遍历并更新节点前驱,把node的prev指向前部第一个非取消节点。 + Node pred = node.prev; + while (pred.waitStatus > 0) + node.prev = pred = pred.prev; + + // 记录pred节点的后继为predNext,后续CAS会用到。 + Node predNext = pred.next; + + // 直接把当前节点的等待状态置为取消,后继节点即便也在cancel可以跨越node节点。 + node.waitStatus = Node.CANCELLED; + + /* + * 如果CAS将tail从node置为pred节点了 + * 则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。 + * 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。 + * 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。 + * + * 这里的CAS更新pred的next即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新掉了。 + */ + if (node == tail && compareAndSetTail(node, pred)) { + compareAndSetNext(pred, predNext, null); + } else { + // 如果node还有后继节点,这种情况要做的事情是把pred和后继非取消节点拼起来。 + int ws; + if (pred != head && + ((ws = pred.waitStatus) == Node.SIGNAL || + (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && + pred.thread != null) { + Node next = node.next; + /* + * 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点 + * 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。 + */ + if (next != null && next.waitStatus <= 0) + compareAndSetNext(pred, predNext, next); + } else { + /* + * 这时说明pred == head或者pred状态取消或者pred.thread == null + * 在这些情况下为了保证队列的活跃性,需要去唤醒一次后继线程。 + * 举例来说pred == head完全有可能实际上目前已经没有线程持有锁了, + * 自然就不会有释放锁唤醒后继的动作。如果不唤醒后继,队列就挂掉了。 + * + * 这种情况下看似由于没有更新pred的next的操作,队列中可能会留有一大把的取消节点。 + * 实际上不要紧,因为后继线程唤醒之后会走一次试获取锁的过程, + * 失败的话会走到shouldParkAfterFailedAcquire的逻辑。 + * 那里面的if中有处理前驱节点如果为取消则维护pred/next,踢掉这些取消节点的逻辑。 + */ + unparkSuccessor(node); + } + + /* + * 取消节点的next之所以设置为自己本身而不是null, + * 是为了方便AQS中Condition部分的isOnSyncQueue方法, + * 判断一个原先属于条件队列的节点是否转移到了同步队列。 + * + * 因为同步队列中会用到节点的next域,取消节点的next也有值的话, + * 可以断言next域有值的节点一定在同步队列上。 + * + * 在GC层面,和设置为null具有相同的效果。 + */ + node.next = node; + } +} + +/** + * 唤醒后继线程。 + */ +private void unparkSuccessor(Node node) { + int ws = node.waitStatus; + // 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁。 + if (ws < 0) + compareAndSetWaitStatus(node, ws, 0); + + Node s = node.next; + /* + * 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可 + * 否则需要从tail开始向前找到node之后最近的非取消节点。 + * + * 这里为什么要从tail开始向前查找也是值得琢磨的: + * 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。 + * 不妨考虑到如下场景: + * 1. node某时刻为tail + * 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己 + * 3. compareAndSetTail成功 + * 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了! + */ + if (s == null || s.waitStatus > 0) { + s = null; + for (Node t = tail; t != null && t != node; t = t.prev) + if (t.waitStatus <= 0) + s = t; + } + if (s != null) + LockSupport.unpark(s.thread); +} +``` +## 释放独占锁 +```Java +public final boolean release(int arg) { + if (tryRelease(arg)) { + /* + * 此时的head节点可能有3种情况: + * 1. null (AQS的head延迟初始化+无竞争的情况) + * 2. 当前线程在获取锁时new出来的节点通过setHead设置的 + * 3. 由于通过tryRelease已经完全释放掉了独占锁,有新的节点在acquireQueued中获取到了独占锁,并设置了head + + * 第三种情况可以再分为两种情况: + * (一)时刻1:线程A通过acquireQueued,持锁成功,set了head + * 时刻2:线程B通过tryAcquire试图获取独占锁失败失败,进入acquiredQueued + * 时刻3:线程A通过tryRelease释放了独占锁 + * 时刻4:线程B通过acquireQueued中的tryAcquire获取到了独占锁并调用setHead + * 时刻5:线程A读到了此时的head实际上是线程B对应的node + * (二)时刻1:线程A通过tryAcquire直接持锁成功,head为null + * 时刻2:线程B通过tryAcquire试图获取独占锁失败失败,入队过程中初始化了head,进入acquiredQueued + * 时刻3:线程A通过tryRelease释放了独占锁,此时线程B还未开始tryAcquire + * 时刻4:线程A读到了此时的head实际上是线程B初始化出来的傀儡head + */ + Node h = head; + // head节点状态不会是CANCELLED,所以这里h.waitStatus != 0相当于h.waitStatus < 0 + if (h != null && h.waitStatus != 0) + // 唤醒后继线程,此函数在acquire中已经分析过,不再列举说明 + unparkSuccessor(h); + return true; + } + return false; +} +``` +## 获取共享锁 +```Java +public final void acquireShared(int arg) { + if (tryAcquireShared(arg) < 0) + doAcquireShared(arg); +} + +private void doAcquireShared(int arg) { + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + try { + boolean interrupted = false; + for (;;) { + final Node p = node.predecessor(); + if (p == head) { + int r = tryAcquireShared(arg); + // 一旦共享获取成功,设置新的头结点,并且唤醒后继线程 + if (r >= 0) { + setHeadAndPropagate(node, r); + p.next = null; // help GC + if (interrupted) + selfInterrupt(); + failed = false; + return; + } + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + interrupted = true; + } + } finally { + if (failed) + cancelAcquire(node); + } +} + +/** + * 这个函数做的事情有两件: + * 1. 在获取共享锁成功后,设置head节点 + * 2. 根据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程。 + */ +private void setHeadAndPropagate(Node node, int propagate) { + // 把当前的head封闭在方法栈上,用以下面的条件检查。 + Node h = head; + setHead(node); + /* + * propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一。 + * h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒, + * 这里为什么不能只用propagate > 0来决定是否可以传播在本文下面的思考问题中有相关讲述。 + */ + if (propagate > 0 || h == null || h.waitStatus < 0 || + (h = head) == null || h.waitStatus < 0) { + Node s = node.next; + if (s == null || s.isShared()) + doReleaseShared(); + } +} + +/** + * 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。 + * 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。 + * 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。 + */ +private void doReleaseShared() { + /* + * 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程; + * 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下, + * 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。 + * 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。 + */ + for (;;) { + Node h = head; + // 如果队列中存在后继线程。 + if (h != null && h != tail) { + int ws = h.waitStatus; + if (ws == Node.SIGNAL) { + if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) + continue; + unparkSuccessor(h); + } + // 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。 + else if (ws == 0 && + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) + continue; + } + // 检查h是否仍然是head,如果不是的话需要再进行循环。 + if (h == head) + break; + } +} +``` +## 释放共享锁 +```Java +public final boolean releaseShared(int arg) { + if (tryReleaseShared(arg)) { + // doReleaseShared的实现上面获取共享锁已经介绍 + doReleaseShared(); + return true; + } + return false; +} +``` diff --git "a/week_03/26/Java\345\206\205\345\255\230\346\250\241\345\236\213.md" "b/week_03/26/Java\345\206\205\345\255\230\346\250\241\345\236\213.md" new file mode 100644 index 0000000000000000000000000000000000000000..895d84fb23e78f67af10c0e63e4e37d4d8c3c629 --- /dev/null +++ "b/week_03/26/Java\345\206\205\345\255\230\346\250\241\345\236\213.md" @@ -0,0 +1,49 @@ +# Java内存模型 + +## 什么是内存模型 +为了保证共享内存的正确性(可见性、有序性、原子性),内存模型定义了共享内存系统中多线程程序读写操作行为的规范。通过这些规则来规范对内存的读写操作,从而保证指令执行的正确性。它与处理器有关、与缓存有关、与并发有关、与编译器也有关。他解决了CPU多级缓存、处理器优化、指令重排等导致的内存访问问题,保证了并发场景下的一致性、原子性和有序性。 + +## Java内存区域 +Java虚拟机在运行程序时会把其自动管理的内存划分为方法区、Java堆、程序计数器、虚拟机栈、本地方法栈,每个区域都有其用途以及创建销毁的时机。其中方法区、Java堆是所有线程共享的数据区域,而程序计数器、虚拟机栈、本地方法栈是每个线程的私有数据区域。 + +方法区(Method Area):方法区属于线程共享的内存区域,又称Non-Heap(非堆),主要用于存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据,根据Java 虚拟机规范的规定,当方法区无法满足内存分配需求时,将抛出OutOfMemoryError 异常。值得注意的是在方法区中存在一个叫运行时常量池(Runtime Constant Pool)的区域,它主要用于存放编译器生成的各种字面量和符号引用,这些内容将在类加载后存放到运行时常量池中,以便后续使用。 + +Java堆(Java Heap):Java 堆也是属于线程共享的内存区域,它在虚拟机启动时创建,是Java 虚拟机所管理的内存中最大的一块,主要用于存放对象实例,几乎所有的对象实例都在这里分配内存,注意Java 堆是垃圾收集器管理的主要区域,因此很多时候也被称做GC 堆,如果在堆中没有内存满足实例分配需求,并且堆也无法再扩展时,将会抛出OutOfMemoryError 异常。 + +程序计数器(Program Counter Register):属于线程私有的数据区域,是一小块内存空间,主要代表当前线程所执行的字节码行号指示器。字节码解释器工作时,通过改变这个计数器的值来选取下一条需要执行的字节码指令,分支、循环、跳转、异常处理、线程恢复等基础功能都需要依赖这个计数器来完成。 + +虚拟机栈(Java Virtual Machine Stacks):属于线程私有的数据区域,与线程同时创建,总数与线程关联,代表Java方法执行的内存模型。每个方法执行时都会创建一个栈桢来存储方法的的变量表、操作数栈、动态链接方法、返回值、返回地址等信息。每个方法从调用直结束就对于一个栈桢在虚拟机栈中的入栈和出栈过程。 + +本地方法栈(Native Method Stacks):本地方法栈属于线程私有的数据区域,这部分主要与虚拟机用到的 Native 方法相关 +## Java内存模型 +Java内存模型(即Java Memory Model,简称JMM)本身是一种抽象的概念,并不真实存在,它描述的是一组规则或规范,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式。 + +Java线程之间的通信采用的是过共享内存模型。JVM中运行的每个线程都拥有自己的线程栈,线程栈包含了当前线程执行的方法调用相关信息,我们也把它称作调用栈。随着代码的不断执行,调用栈会不断变化。 + +线程栈还包含了当前方法的所有本地变量信息。一个线程只能读取自己的线程栈,也就是说,线程中的本地变量对其它线程是不可见的。即使两个线程执行的是同一段代码,它们也会各自在自己的线程栈中创建本地变量,因此,每个线程中的本地变量都会有自己的版本。 + +一个本地变量如果是原始类型,那么它会被完全存储到栈区。 +一个本地变量也有可能是一个对象的引用,这种情况下,这个本地引用会被存储到栈中,但是对象本身仍然存储在堆区。 + +对于一个对象的成员方法,这些方法中包含本地变量,仍需要存储在栈区,即使它们所属的对象在堆区。 +对于一个对象的成员变量,不管它是原始类型还是包装类型,都会被存储到堆区。 + +Static类型的变量以及类本身相关信息都会随着类本身存储在堆区。 + +堆中的对象可以被多线程共享。如果一个线程获得一个对象的应用,它便可访问这个对象的成员变量。如果两个线程同时调用了同一个对象的同一个方法,那么这两个线程便可同时访问这个对象的成员变量,但是对于本地变量,每个线程都会拷贝一份到自己的线程栈中。 +## happens-before 原则 +程序顺序原则,即在一个线程内必须保证语义串行性,也就是说按照代码顺序执行。 + +锁规则 解锁(unlock)操作必然发生在后续的同一个锁的加锁(lock)之前,也就是说,如果对于一个锁解锁后,再加锁,那么加锁的动作必须在解锁动作之后(同一个锁)。 + +volatile规则 volatile变量的写,先发生于读,这保证了volatile变量的可见性,简单的理解就是,volatile变量在每次被线程访问时,都强迫从主内存中读该变量的值,而当该变量发生变化时,又会强迫将最新的值刷新到主内存,任何时刻,不同的线程总是能够看到该变量的最新值。 + +线程启动规则 线程的start()方法先于它的每一个动作,即如果线程A在执行线程B的start方法之前修改了共享变量的值,那么当线程B执行start方法时,线程A对共享变量的修改对线程B可见。 + +线程终止规则 线程的所有操作先于线程的终结,Thread.join()方法的作用是等待当前执行的线程终止。假设在线程B终止之前,修改了共享变量,线程A从线程B的join方法成功返回后,线程B对共享变量的修改将对线程A可见。 + +线程中断规则 对线程 interrupt()方法的调用先行发生于被中断线程的代码检测到中断事件的发生,可以通过Thread.interrupted()方法检测线程是否中断。 + +对象终结规则 对象的构造函数执行,结束先于finalize()方法 + +传递性 A先于B ,B先于C 那么A必然先于C。 diff --git a/week_03/26/ReentrantLock .md b/week_03/26/ReentrantLock .md new file mode 100644 index 0000000000000000000000000000000000000000..8b521e60e80af344a42d39d20709ce043c1f1cf7 --- /dev/null +++ b/week_03/26/ReentrantLock .md @@ -0,0 +1,185 @@ + +# ReentrantLock + +## 简单说明 +synchronized关键字和ReentrantLock锁都是重入锁,可重入锁是指当一个线程获取到锁后,此线程还可继续获得这把锁,在此线程释放这把锁前其他线程则不可获得这边锁。相比synchronized关键字,ReentrantLock锁具有锁获取超时和获取锁响应中断的特点。ReentrantLock锁还分公平锁和非公平锁,公平锁模式是按线程调用加锁的先后排队顺序获取锁,非公平锁模式是已经在排队中的线程按顺序获取锁,但是新来的线程会和排队中的线程进行竞争,并不保证先排先获取锁。 + +ReentrantLock类内部总共存在Sync、NonfairSync、FairSync三个类,NonfairSync与FairSync类继承自Sync类,Sync类继承自AbstractQueuedSynchronizer抽象类 +### 类说明 +```Java +public class ReentrantLock implements Lock, java.io.Serializable { + // 序列号 + private static final long serialVersionUID = 7373984872572414699L; + // 同步队列 + private final Sync sync; +} +``` +```Java +public ReentrantLock() { + // 默认非公平策略 + sync = new NonfairSync(); + } +``` +```Java +public ReentrantLock(boolean fair) { + sync = fair ? new FairSync() : new NonfairSync(); + } +``` + +### Sync类 +```Java +abstract static class Sync extends AbstractQueuedSynchronizer { + // 序列号 + private static final long serialVersionUID = -5179523762034025860L; + + // 获取锁 + abstract void lock(); + + // 非公平方式获取 + final boolean nonfairTryAcquire(int acquires) { + // 当前线程 + final Thread current = Thread.currentThread(); + // 获取状态 + int c = getState(); + if (c == 0) { // 表示没有线程正在竞争该锁 + if (compareAndSetState(0, acquires)) { // 比较并设置状态成功,状态0表示锁没有被占用 + // 设置当前线程独占 + setExclusiveOwnerThread(current); + return true; // 成功 + } + } + else if (current == getExclusiveOwnerThread()) { // 当前线程拥有该锁 + int nextc = c + acquires; // 增加重入次数 + if (nextc < 0) + throw new Error("Maximum lock count exceeded"); + // 设置状态 + setState(nextc); + // 成功 + return true; + } + // 失败 + return false; + } + + // 试图在共享模式下获取对象状态,此方法应该查询是否允许它在共享模式下获取对象状态,如果允许,则获取它 + protected final boolean tryRelease(int releases) { + int c = getState() - releases; + if (Thread.currentThread() != getExclusiveOwnerThread()) // 当前线程不为独占线程 + throw new IllegalMonitorStateException(); // 抛出异常 + // 释放标识 + boolean free = false; + if (c == 0) { + free = true; + // 已经释放,清空独占 + setExclusiveOwnerThread(null); + } + // 设置标识 + setState(c); + return free; + } + + // 判断资源是否被当前线程占有 + protected final boolean isHeldExclusively() { + return getExclusiveOwnerThread() == Thread.currentThread(); + } + + // 新生一个条件 + final ConditionObject newCondition() { + return new ConditionObject(); + } + + // 返回资源的占用线程 + final Thread getOwner() { + return getState() == 0 ? null : getExclusiveOwnerThread(); + } + // 返回状态 + final int getHoldCount() { + return isHeldExclusively() ? getState() : 0; + } + + // 资源是否被占用 + final boolean isLocked() { + return getState() != 0; + } + + // 自定义反序列化逻辑 + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + setState(0); + } + } +``` +### NonfairSync类 +NonfairSync类继承了Sync类,表示采用非公平策略获取锁,其实现了Sync类中抽象的lock方法。从lock方法的源码可知,每一次都尝试获取锁,而并不会按照公平等待的原则进行等待,让等待时间最久的线程获得锁。 +```Java +// 非公平锁 + static final class NonfairSync extends Sync { + // 版本号 + private static final long serialVersionUID = 7316153563782823691L; + + // 获得锁 + final void lock() { + if (compareAndSetState(0, 1)) // 比较并设置状态成功,状态0表示锁没有被占用 + // 把当前线程设置独占了锁 + setExclusiveOwnerThread(Thread.currentThread()); + else // 锁已经被占用,或者set失败 + // 以独占模式获取对象,忽略中断 + acquire(1); + } + + protected final boolean tryAcquire(int acquires) { + return nonfairTryAcquire(acquires); + } + } +``` +### FairSyn类 +```Java +// 公平锁 + static final class FairSync extends Sync { + // 版本序列化 + private static final long serialVersionUID = -3000897897090466540L; + + final void lock() { + // 以独占模式获取对象,忽略中断 + acquire(1); + } + + /** + * Fair version of tryAcquire. Don't grant access unless + * recursive call or no waiters or is first. + */ + // 尝试公平获取锁 + protected final boolean tryAcquire(int acquires) { + // 获取当前线程 + final Thread current = Thread.currentThread(); + // 获取状态 + int c = getState(); + if (c == 0) { // 状态为0 + if (!hasQueuedPredecessors() && + compareAndSetState(0, acquires)) { // 不存在已经等待更久的线程并且比较并且设置状态成功 + // 设置当前线程独占 + setExclusiveOwnerThread(current); + return true; + } + } + else if (current == getExclusiveOwnerThread()) { // 状态不为0,即资源已经被线程占据 + // 下一个状态 + int nextc = c + acquires; + if (nextc < 0) // 超过了int的表示范围 + throw new Error("Maximum lock count exceeded"); + // 设置状态 + setState(nextc); + return true; + } + return false; + } + } +``` +FairSync类也继承了Sync类,表示采用公平策略获取锁,其实现了Sync类中的抽象lock方法。 + +当资源空闲时,它总是会先判断sync队列(AbstractQueuedSynchronizer中的数据结构)是否有等待时间更长的线程,如果存在,则将该线程加入到等待队列的尾部,实现了公平获取原则。 + +其中,FairSync类的lock的方法调用可以看出只要资源被其他线程占用,该线程就会添加到sync queue中的尾部,而不会先尝试获取资源。 + +这也是和Nonfair最大的区别,Nonfair每一次都会尝试去获取资源,如果此时该资源恰好被释放,则会被当前线程获取,这就造成了不公平的现象,当获取不成功,再加入队列尾部。 diff --git a/week_03/26/Semaphore.md b/week_03/26/Semaphore.md new file mode 100644 index 0000000000000000000000000000000000000000..5aa22ce129bb5c05f6ca0c516acf7a84e9438d4d --- /dev/null +++ b/week_03/26/Semaphore.md @@ -0,0 +1,289 @@ + +# Semaphore + +## 简单说明 +Semaphore是一种基于计数的信号量。它可以设定一个阈值,基于此,多个线程竞争获取许可信号,做完自己的申请后归还,超过阈值后,线程申请许可信号将会被阻塞。 + +Semaphore内部主要通过AQS(AbstractQueuedSynchronizer)实现线程的管理。Semaphore有两个构造函数,参数permits表示许可数,它最后传递给了AQS的state值。 + +线程在运行时首先获取许可,如果成功,许可数就减1,线程运行,当线程运行结束就释放许可,许可数就加1。如果许可数为0,则获取失败,线程位于AQS的等待队列中,它会被其它释放许可的线程唤醒。 + +在创建Semaphore对象的时候还可以指定它的公平性。一般常用非公平的信号量,非公平信号量是指在获取许可时先尝试获取许可,而不必关心是否已有需要获取许可的线程位于等待队列中,如果获取失败,才会入列。而公平的信号量在获取许可时首先要查看等待队列中是否已有线程,如果有则入列。 + +Semaphore可以用来构建一些对象池,资源池之类的,比如数据库连接池,我们也可以创建计数为1的Semaphore,将其作为一种类似互斥锁的机制,这也叫二元信号量,表示两种互斥状态。 +## 获取信号量的方法: +acquire()本质调用:sync.acquireSharedInterruptibly(1) + +acquire(int permits)本质调用:sync.acquireSharedInterruptibly(permits) + +acquireUninterruptibly()本质调用:sync.acquireShared(1) + +acquireUninterruptibly(int permits)本质调用:sync.acquireShared(permits); + +acquire()方法就相当于acquire(1),acquireUninterruptibly同理,只不过一种响应中断,一种不响应中断。 + +除了其中的tryAcquireShared(arg)由子类实现外,其他的都由AQS实现。 +## tryAcquireShared +```Java +protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); +} +final int nonfairTryAcquireShared(int acquires) { + for (;;) { + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } +} +``` +#### tryAcquireShared返回值的含义: +如果该值小于0,则代表当前线程获取共享锁失败 + +如果该值大于0,则代表当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功 + +如果该值等于0,则代表当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败 + +这里的返回值其实代表的是剩余的信号量的值,如果为负值则说明信号量不够了。 + +与一般的tryAcquire逻辑不同,Semaphore的tryAcquire逻辑是一个自旋操作,因为Semaphore是共享锁,同一时刻可能有多个线程来修改这个值,所以我们必须使用自旋 + CAS来避免线程冲突。 + +该方法退出的唯一条件是成功的修改了state值,并返回state的剩余值。如果剩下的信号量不够了,则就不需要进行CAS操作,直接返回剩余值。所以其实tryAcquireShared返回的不是当前剩余的信号量的值,而是如果扣去acquires之后,当前将要剩余的信号量的值,如果这个“将要”剩余的值比0小,则是不会发生扣除操作的。 + +## Semaphore类定义 +```Java +public class Semaphore implements java.io.Serializable { + + private final Sync sync; + + // 继承自AQS实现抽象类Sync,作为NonfairSync和FairSync的基类。 + abstract static class Sync extends AbstractQueuedSynchronizer { + private static final long serialVersionUID = 1192457210091910933L; + + Sync(int permits) { + setState(permits); + } + + final int getPermits() { + return getState(); + } + + final int nonfairTryAcquireShared(int acquires) { + for (;;) { + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + + protected final boolean tryReleaseShared(int releases) { + for (;;) { + int current = getState(); + int next = current + releases; + if (next < current) // overflow + throw new Error("Maximum permit count exceeded"); + if (compareAndSetState(current, next)) + return true; + } + } + + final void reducePermits(int reductions) { + for (;;) { + int current = getState(); + int next = current - reductions; + if (next > current) // underflow + throw new Error("Permit count underflow"); + if (compareAndSetState(current, next)) + return; + } + } + + final int drainPermits() { + for (;;) { + int current = getState(); + if (current == 0 || compareAndSetState(current, 0)) + return current; + } + } + } + + // 非公平锁类定义 + static final class NonfairSync extends Sync { + private static final long serialVersionUID = -2694183684443567898L; + + NonfairSync(int permits) { + super(permits); + } + + protected int tryAcquireShared(int acquires) { + return nonfairTryAcquireShared(acquires); + } + } + + // 公平锁类定义 + static final class FairSync extends Sync { + private static final long serialVersionUID = 2014338818796000944L; + + FairSync(int permits) { + super(permits); + } + + protected int tryAcquireShared(int acquires) { + for (;;) { + if (hasQueuedPredecessors()) + return -1; + // 判断是否还能获取锁,通过递减全局计数state来实现 + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + } + + // 构造函数 + public Semaphore(int permits) { + sync = new NonfairSync(permits); + } + + // 构造函数 + public Semaphore(int permits, boolean fair) { + sync = fair ? new FairSync(permits) : new NonfairSync(permits); + } +``` +Semaphore类内部的Sync继承自AQS,作为Semaphore的公平锁和非公平锁的基类。 + +Semaphore类内部的NonfairSync继承自Sync类,通过非公平的方法加锁和解锁。 + +Semaphore类内部的FairSync继承自Sync类,通过公平的方法加锁和解锁。 + +Semaphore的构造函数创建FairSync或NonfairSync对象赋值给Sync。 + +## Semaphore加锁过程 + +```Java + public void acquire() throws InterruptedException { + sync.acquireSharedInterruptibly(1); + } + + + public final void acquireSharedInterruptibly(int arg) + throws InterruptedException { + if (Thread.interrupted()) + throw new InterruptedException(); + if (tryAcquireShared(arg) < 0) + doAcquireSharedInterruptibly(arg); + } + + + protected int tryAcquireShared(int acquires) { + for (;;) { + if (hasQueuedPredecessors()) + return -1; + // 通过递减锁全局技术变量state来判定是否能获取锁 + // 值小于0说明获锁失败,否则代表获锁成功 + int available = getState(); + int remaining = available - acquires; + if (remaining < 0 || + compareAndSetState(available, remaining)) + return remaining; + } + } + + + private void doAcquireSharedInterruptibly(int arg) + throws InterruptedException { + final Node node = addWaiter(Node.SHARED); + boolean failed = true; + try { + for (;;) { + final Node p = node.predecessor(); + if (p == head) { + int r = tryAcquireShared(arg); + if (r >= 0) { + setHeadAndPropagate(node, r); + p.next = null; // help GC + failed = false; + return; + } + } + if (shouldParkAfterFailedAcquire(p, node) && + parkAndCheckInterrupt()) + throw new InterruptedException(); + } + } finally { + if (failed) + cancelAcquire(node); + } + } +``` +Semaphore加锁过程基于AQS实现的。 + +Semaphore加锁过程步骤是尝试获取锁,如果获锁成功线程继续执行,获锁失败就挂起当前线程。 + +Semaphore加锁过程因为公平锁和非公平锁略有不同,但是大流程是一致的。 + +tryAcquireShared()方法是尝试的操作,doAcquireSharedInterruptibly()是获锁失败的操作。 + +tryAcquireShared()方法通过递减锁全局技术变量state来判定是否能获取锁。 +## Semaphore解锁过程 + + public void release() { + sync.releaseShared(1); + } + + + public final boolean releaseShared(int arg) { + if (tryReleaseShared(arg)) { + doReleaseShared(); + return true; + } + return false; + } + + + protected final boolean tryReleaseShared(int releases) { + for (;;) { + // 释放锁,通过累加全局计数state来实现 + int current = getState(); + int next = current + releases; + if (next < current) // overflow + throw new Error("Maximum permit count exceeded"); + if (compareAndSetState(current, next)) + return true; + } + } + + + private void doReleaseShared() { + for (;;) { + Node h = head; + if (h != null && h != tail) { + int ws = h.waitStatus; + if (ws == Node.SIGNAL) { + if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) + continue; // loop to recheck cases + unparkSuccessor(h); + } + else if (ws == 0 && + !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) + continue; // loop on failed CAS + } + if (h == head) // loop if head changed + break; + } + } + +Semaphore解锁过程基于AQS实现的。 + +Semaphore加锁过程步骤是尝试释放锁,释放成功后就唤醒其他等待线程。 + +Semaphore的tryReleaseShared()方法尝试释放锁,doReleaseShared()方法唤醒休眠等待线程。 + +Semaphore的tryReleaseShared()方法通过递增锁的全局计数state来实现。 + + diff --git a/week_03/26/Synchronized.md b/week_03/26/Synchronized.md new file mode 100644 index 0000000000000000000000000000000000000000..0ab26288ab749a8fa9740deed17b0e7aed7ac781 --- /dev/null +++ b/week_03/26/Synchronized.md @@ -0,0 +1,100 @@ +# synchronized + +## 简单说明 +synchronized是实现线程同步的基本手段,然而底层实现还是通过锁机制来保证,对于被synchronized修饰的区域每次只有一个线程可以访问。synchronized通过锁机制的实现,满足了原子性,可见性和有序性,是并发编程正确执行的有效保障,而volatile只保证了可见性和有序性(禁止指令重排) + +synchronized可以修饰范围的包括:方法级别,代码块级别;而实际加锁的目标包括:对象锁(普通变量,静态变量),类锁 + +同步不仅仅保证互斥访问,同步还保证当前线程在同步块前和同步块中,对内存的写操作对于其他访问相同同步块(使用了同一个monitor)的线程是可见的。当我们退出了同步块,会释放monitor,并且将缓存数据刷新到内存,这样当前线程的写操作对于其他线程是可见的,当我们进入同步快之前,会获取monitor,并且使得当前处理器的缓存失效,从而读取数据必须从内存中重新加载,这样我们就可以看到其他线程在同步块中写操作 +## 通常用法 +``` Java + /** + * 对象锁,代码级别,同一对象争用该锁,this为SynMethod实例,synchronized的锁绑定在this对象上 + */ + public void method1() { + synchronized (this) { + for (int i = 0; i < 5; i++) { + System.out.println(Thread.currentThread().getName() + " synchronized loop " + i); + } + } + } + + /** + * 对象锁,方法级别,同一对象争用该锁,普通(非静态)方法,synchronized的锁绑定在调用该方法的对象上,与上一个写法含义一致 + */ + public synchronized void method2() { + for (int i = 0; i < 5; i++) { + System.out.println(Thread.currentThread().getName() + " synchronized loop " + i); + } + } + + /** + * 对象锁,代码级别,同一类争用该锁,绑定在staticLockObj上,不同SynMethod实例,拥有同一个staticLockObj对象 + */ + public void method3() { + synchronized (staticLockObj) { + for (int i = 0; i < 5; i++) { + System.out.println(Thread.currentThread().getName() + " synchronized loop " + i); + } + } + } + + /** + * 类锁,代码级别,同一类争用该锁 + */ + public void method4() { + synchronized (SynMethod.class) { + for (int i = 0; i < 5; i++) { + System.out.println(Thread.currentThread().getName() + " synchronized loop " + i); + } + } + } + + /** + * 类锁,方法级别,同一类争用该锁,synchronized的锁绑定在SynMethod.class上 + */ + public static synchronized void staticMethod() { + for (int i = 0; i < 5; i++) { + System.out.println(Thread.currentThread().getName() + " synchronized loop " + i); + } + } +``` +注意事项: + +接口方法时不能使用synchronized关键字; + +构造方法不能使用synchronized关键字,但可以使用synchronized代码块进行同步; + +synchronized关键字无法继承; + +如果在父类中的某个方法使用了synchronized关键字,而在子类中覆盖了这个方法,在子类中的这个方法默认情况下并不是同步的。 + + +#### 子类方法同步的解决方案 +1)子类方法也加上synchronized 关键字 + +2)子类方法中调用父类同步的方法,例如:使用 super.xxxMethod()调用父类方法 + +##### 同步的场景只用三个,一个是SynMethod实例(可以多个),SynMethod的静态对象(共享)和SynMethod类(一个),只要是在同一个对象上同步,这个对象可以是实例对象,可以是静态对象,可以是类对象,那么就可以实现同步效果 +## 实现原理 +方法级别的synchronized同步使用了monitorenter和monitorexit这个同步命令 + +Synchronized的语义底层是通过一个monitor的对象来完成,其实wait/notify等方法也依赖于monitor对象,所以只有在同步的块或者方法中才能调用wait/notify等方法,否则会抛出java.lang.IllegalMonitorStateException的异常的原因。 + +对于方法级别的同步,在flag标志中多了ACC_SYNCHRONIZED标示符,用于标记整个方法的同步,JVM在执行该方法前会读取该标记符,从而调用monitorentor命令,在退出方法时调用monitorexit命令,从而达到同步的效果 +### monitorenter +每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下: + +1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。 + +2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1. + +3.如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。 +#### monitorexit +执行monitorexit的线程必须是objectref所对应的monitor的所有者。指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。 +## synchronized影响性能的原因: +1、加锁解锁操作需要额外操作; + +2、互斥同步对性能最大的影响是阻塞的实现,因为阻塞涉及到的挂起线程和恢复线程的操作都需要转入内核态中完成(用户态与内核态的切换的性能代价是比较大的) + + diff --git a/week_03/26/Volatile.md b/week_03/26/Volatile.md new file mode 100644 index 0000000000000000000000000000000000000000..5463e7074cf44cc7f9a1017659d67ac7e7bf71fb --- /dev/null +++ b/week_03/26/Volatile.md @@ -0,0 +1,120 @@ +# volatile + +## volatile 的特性 +保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。(实现可见性) + +禁止进行指令重排序。(实现有序性) + +volatile 只能保证对单次读/写的原子性。i++ 这种操作不能保证原子性。 + +## volatile 可见性 + +volatile 变量的内存可见性是基于内存屏障(Memory Barrier)实现。内存屏障,又称内存栅栏,是一个 CPU 指令。 + +在程序运行时,为了提高执行性能,编译器和处理器会对指令进行重排序,JMM 为了保证在不同的编译器和 CPU 上有相同的结果,通过插入特定类型的内存屏障来禁止特定类型的编译器重排序和处理器重排序,插入一条内存屏障会告诉编译器和 CPU:不管什么指令都不能和这条 Memory Barrier 指令重排序。 + +为了提高处理速度,处理器不直接和内存进行通信,而是先将系统内存的数据读到内部缓存后再进行操作,但操作完不知道何时会写到内存。 + +如果对声明了 volatile 的变量进行写操作,JVM 就会向处理器发送一条 lock 前缀的指令,将这个变量所在缓存行的数据写回到系统内存。 + +为了保证各个处理器的缓存是一致的,实现了缓存一致性协议(MESI),每个处理器通过嗅探在总线上传播的数据来检查自己缓存的值是不是过期了,当处理器发现自己缓存行对应的内存地址被修改,就会将当前处理器的缓存行设置成无效状态,当处理器对这个数据进行修改操作的时候,会重新从系统内存中把数据读到处理器缓存里。 + +所有多核处理器下还会完成:3)当处理器发现本地缓存失效后,就会从内存中重读该变量数据,即可以获取当前最新值。 + +volatile 变量通过这样的机制就使得每个线程都能获得该变量的最新值。 + +#### LOCK指令 +将当前处理器缓存行的数据写回到系统内存。 + +写回内存的操作会使在其他 CPU 里缓存了该内存地址的额数据无效。 + +在 Pentium 和早期的 IA-32 处理器中,lock 前缀会使处理器执行当前指令时产生一个 LOCK# 信号,会对总线进行锁定,其它 CPU 对内存的读写请求都会被阻塞,直到锁释放。 + +后来的处理器,加锁操作是由高速缓存锁代替总线锁来处理。因为锁总线的开销比较大,锁总线期间其他 CPU 没法访问内存。这种场景多缓存的数据一致通过缓存一致性协议(MESI)来保证。 + + +#### 缓存一致性 + +缓存是分段(line)的,一个段对应一块存储空间,称之为缓存行,它是 CPU 缓存中可分配的最小存储单元,大小 32 字节、64 字节、128 字节不等,这与 CPU 架构有关,通常来说是 64 字节。 + +LOCK# 因为锁总线效率太低,因此使用了多组缓存。为了使其行为看起来如同一组缓存那样。因而设计了 缓存一致性协议。缓存一致性协议有多种,但是日常处理的大多数计算机设备都属于 " 嗅探(snooping)" 协议。 + +所有内存的传输都发生在一条共享的总线上,而所有的处理器都能看到这条总线。 + +缓存本身是独立的,但是内存是共享资源,所有的内存访问都要经过仲裁(同一个指令周期中,只有一个 CPU 缓存可以读写内存)。 + +CPU 缓存不仅仅在做内存传输的时候才与总线打交道,而是不停在嗅探总线上发生的数据交换,跟踪其他缓存在做什么。 + +当一个缓存代表它所属的处理器去读写内存时,其它处理器都会得到通知,它们以此来使自己的缓存保持同步。 + +只要某个处理器写内存,其它处理器马上知道这块内存在它们的缓存段中已经失效。 +## volatile 有序性 +### volatile 的 happens-before +happens-before 规则中有一条是 volatile 变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。 +``` Java +//假设线程A执行writer方法,线程B执行reader方法 +class VolatileExample { + int a = 0; + volatile boolean flag = false; + + public void writer() { + a = 1; // 1 线程A修改共享变量 + flag = true; // 2 线程A写volatile变量 + } + + public void reader() { + if (flag) { // 3 线程B读同一个volatile变量 + int i = a; // 4 线程B读共享变量 + …… + } + } +} +``` +根据 happens-before 规则,上面过程会建立 3 类 happens-before 关系。 + +根据程序次序规则:1 happens-before 2 且 3 happens-before 4。 + +根据 volatile 规则:2 happens-before 3。 + +根据 happens-before 的传递性规则:1 happens-before 4 +### volatile 禁止重排序 +为了性能优化,JMM 在不改变正确语义的前提下,会允许编译器和处理器对指令序列进行重排序。JMM 提供了内存屏障阻止这种重排序。 + +Java 编译器会在生成指令系列时在适当的位置会插入内存屏障指令来禁止特定类型的处理器重排序。 + +为了实现 volatile 内存语义时,编译器在生成字节码时,会在指令序列中插入内存屏障来禁止特定类型的处理器重排序。 + +对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎是不可能的,为此,JMM 采取了保守的策略。 + +在每个 volatile 写操作的前面插入一个 StoreStore 屏障。 + +在每个 volatile 写操作的后面插入一个 StoreLoad 屏障。 + +在每个 volatile 读操作的后面插入一个 LoadLoad 屏障。 + +在每个 volatile 读操作的后面插入一个 LoadStore 屏障。 + +volatile 写是在前面和后面分别插入内存屏障,而 volatile 读操作是在后面插入两个内存屏障。 + +内存屏障 说明 + +StoreStore 屏障 禁止上面的普通写和下面的 volatile 写重排序。 + +StoreLoad 屏障 防止上面的 volatile 写与下面可能有的 volatile 读/写重排序。 + +LoadLoad 屏障 禁止下面所有的普通读操作和上面的 volatile 读重排序。 + +LoadStore 屏障 禁止下面所有的普通写操作和上面的 volatile 读重排序。 + +## volatile 的应用场景 + +使用 volatile 必须具备的条件:对变量的写操作不依赖于当前值。该变量没有包含在具有其他变量的不变式中。 + +只有在状态真正独立于程序内其他内容时才能使用 volatile。 + + + + + + + diff --git "a/week_03/26/\345\210\206\345\270\203\345\274\217\351\224\201.md" "b/week_03/26/\345\210\206\345\270\203\345\274\217\351\224\201.md" new file mode 100644 index 0000000000000000000000000000000000000000..0d9dfdbcc9013119b49c3b07a143fc3a7d20323e --- /dev/null +++ "b/week_03/26/\345\210\206\345\270\203\345\274\217\351\224\201.md" @@ -0,0 +1,143 @@ + +# 分布式锁 + +## 简单说明 +分布式与单机情况下最大的不同在于其不是多线程而是多进程。 + +多线程由于可以共享堆内存,因此可以简单的采取内存作为标记存储位置。而进程之间甚至可能都不在同一台物理机上,因此需要将标记存储在一个所有进程都能看到的地方。 + +当在分布式模型下,数据可能只有一份,此时需要利用锁的技术控制某一时刻修改数据的进程数。 + +与单机模式下的锁不同,分布式锁不仅需要保证进程可见,还需要考虑进程与锁之间的网络问题。(分布式情况下之所以问题变得复杂,主要就是需要考虑到网络的延时和不可靠) + +分布式锁还是可以将标记存在内存,只是该内存不是某个进程分配的内存而是公共内存如Redis、Memcache。至于利用数据库、文件等做锁与单机的实现是一样的,只要保证标记能互斥就行。 + +#### 分布式锁的特点: + +  1.排他性: 只有一个线程能获取到。 + +  2.阻塞性: 其他未抢到的线程阻塞,直到锁释放出来,在抢。 + +  3.可重入性:线程获得锁后,后续是否可重复获取该锁。 +### 常用的分布式锁实现技术 +#### 基于数据库实现分布式锁 + +原理:要实现分布式锁,最简单的方式可能就是直接创建一张锁表,然后通过操作该表中的数据来实现了。 + +当我们要锁住某个方法或资源时,我们就在该表中增加一条记录,想要释放锁的时候就删除这条记录。 + +优点: + +1、直接借助数据库,容易理解。 + +缺点: + +1、会有各种各样的问题,在解决问题的过程中会使整个方案变得越来越复杂。 + +2、操作数据库需要一定的开销,性能问题需要考虑。 + +3、使用数据库的行级锁并不一定靠谱,尤其是当我们的锁表并不大的时候。 + + +#### 基于缓存实现分布式锁 + +原理:使用缓存来代替数据库来实现分布式锁,这个可以提供更好的性能,同时,很多缓存服务都是集群部署的,可以避免单点问题。并且很多缓存服务都提供了可以用来实现分布式锁的方法,比如Tair的put方法,redis的setnx方法等。并且,这些缓存服务也都提供了对数据的过期自动删除的支持,可以直接设置超时时间来控制锁的释放。 + +##### 1.加锁 + +最简单的方法是使用setnx命令。key是锁的唯一标识,按业务来决定命名。比如想要给一种商品的秒杀活动加锁,可以给key命名为 “lock_sale_商品ID” 。而value设置成什么呢?我们可以姑且设置成1。 + +加锁伪代码: + +setnx(key,1) + +当一个线程执行setnx返回1,说明key原本不存在,该线程成功得到了锁;当一个线程执行setnx返回0,说明key已经存在,该线程抢锁失败 + +##### 2.解锁 + +有加锁就得有解锁。当得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入。释放锁的最简单方式是执行del指令,释放锁之后,其他线程就可以继续执行setnx命令来获得锁 + +解锁伪代码: + +del(key) +##### 3.锁超时 + +如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。 + +所以,setnx的key必须设置一个超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。setnx不支持超时参数,所以需要额外的指令, + +锁超时伪代码: + +expire(key, 30) + +#### 优点: + +1、性能好,实现起来较为方便。 + +#### 缺点: + +1、通过超时时间来控制锁的失效时间并不是十分的靠谱。 + + +### 基于zookeeper实现分布式锁 +Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Znode +#### Znode四种类型 + +#### 1.持久节点 (PERSISTENT) + +默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。 + +#### 2.持久节点顺序节点(PERSISTENT_SEQUENTIAL) + +所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号: +#### 3.临时节点(EPHEMERAL) + +和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除: + +#### 4.临时顺序节点(EPHEMERAL_SEQUENTIAL)  + +临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。 +#### 原理 +Zookeeper分布式锁应用了临时顺序节点。 + +#### 获取锁 + +首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。 + +之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。 + +这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。 + +Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。 + +于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。 + +这时候,如果还有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。 + +Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。 + +于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。 + +这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock所依赖的AQS(AbstractQueuedSynchronizer)。 +#### 释放锁 +释放锁分为两种情况: +#### 1.任务完成,客户端显式释放 + +当任务完成时,Client1会显示调用删除节点Lock1的指令。 + +#### 2.任务执行过程中,客户端崩溃 + +获得锁的Client1在任务执行过程中,如果崩溃了,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点Lock1会随之自动删除。 + +由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2顺理成章获得了锁。 + +#### 优点: + +1、有效的解决单点问题,不可重入问题,非阻塞问题以及锁无法释放的问题。实现起来较为简单。 + +#### 缺点: + +1、性能上不如使用缓存实现分布式锁。 需要对ZK的原理有所了解。 +  + +