From 9f0358553ca7d8de1cd938cc08f78e850df018a8 Mon Sep 17 00:00:00 2001 From: FIve <5332795+qylningque@user.noreply.gitee.com> Date: Sun, 5 Jan 2020 13:05:58 +0800 Subject: [PATCH] 060-week 04 --- week_04/60/ArrayBlockingQueue-60.md | 297 ++++++++++++++ week_04/60/ConcurrentHashMap-60.md | 477 ++++++++++++++++++++++ week_04/60/ConcurrentLinkedQueue-60.md | 531 +++++++++++++++++++++++++ week_04/60/CopyOnWriteArrayList-60.md | 212 ++++++++++ week_04/60/DelayQueue-60.md | 137 +++++++ 5 files changed, 1654 insertions(+) create mode 100644 week_04/60/ArrayBlockingQueue-60.md create mode 100644 week_04/60/ConcurrentHashMap-60.md create mode 100644 week_04/60/ConcurrentLinkedQueue-60.md create mode 100644 week_04/60/CopyOnWriteArrayList-60.md create mode 100644 week_04/60/DelayQueue-60.md diff --git a/week_04/60/ArrayBlockingQueue-60.md b/week_04/60/ArrayBlockingQueue-60.md new file mode 100644 index 0000000..982e04e --- /dev/null +++ b/week_04/60/ArrayBlockingQueue-60.md @@ -0,0 +1,297 @@ +阻塞队列(BlockingQueue)是一个支持两个附加操作的一种特殊队列。这两个附加的操作是: + +- 在队列为空时,获取元素的线程会等待队列变为非空。 +- 当队列满时,存储元素的线程会等待队列可用。 + +阻塞队列经常用于`生产者和消费者`的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素,所谓容器,就是临界区,是为了将生产者和消费者进行解耦而加入的。 + +ArrayBlockingQueue,一个由数组实现的有界阻塞队列。该队列采用FIFO的原则对元素进行排序添加的。 + +ArrayBlockingQueue为有界且固定,其大小在构造时由构造函数来决定,确认之后就不能再改变了。ArrayBlockingQueue支持对等待的生产者线程和使用者线程进行排序的可选公平策略,但是在默认情况下不保证线程公平的访问,在构造时可以选择公平策略(fair = true)。公平性通常会降低吞吐量,但是减少了可变性和避免了“不平衡性”。 + +# 1. ArrayBlockingQueue原理分析 + +## 1.1 主要属性 + +```java + public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, Serializable { + private static final long serialVersionUID = -817911632652898426L; + // 使用数组存储元素 + final Object[] items; + + // 取元素的指针 + int takeIndex; + + // 放元素的指针 + int putIndex; + + // 元素数量 + int count; + + // 保证并发访问的锁 + final ReentrantLock lock; + + // 非空条件 + private final Condition notEmpty; + + // 非满条件 + private final Condition notFull; + transient ArrayBlockingQueue.Itrs itrs; + } +``` + +通过属性我们可以得出以下几个重要信息: + +(1)利用数组存储元素; + +(2)通过放指针和取指针来标记下一次操作的位置; + +(3)利用重入锁来保证并发安全; + +## 1.2 主要构造方法 + +```java + public ArrayBlockingQueue(int capacity) { + this(capacity, false); + } + + public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + // 初始化数组 + this.items = new Object[capacity]; + // 创建重入锁及两个条件 + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } +``` + +通过构造方法我们可以得出以下两个结论: + +(1)ArrayBlockingQueue 初始化时必须传入容量,也就是数组的大小; + +(2)可以通过构造方法控制重入锁的类型是公平锁还是非公平锁; + +## 1.3 入队 + +ArrayBlockingQueue 提供了诸多方法,可以将元素加入队列尾部。 + +- add(E e) :将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException +- offer(E e) : 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false +- offer(E e, long timeout, TimeUnit unit) : 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间 +- put(E e) : 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间 + +```java + public boolean add(E e) { + // 调用父类的add(e)方法 + return super.add(e); + } + + // super.add(e) + public boolean add(E e) { + // 调用offer(e)如果成功返回true,如果失败抛出异常 + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); + } + + public boolean offer(E e) { + // 元素不可为空 + checkNotNull(e); + final ReentrantLock lock = this.lock; + // 加锁 + lock.lock(); + try { + if (count == items.length) + // 如果数组满了就返回false + return false; + else { + // 如果数组没满就调用入队方法并返回true + enqueue(e); + return true; + } + } finally { + // 解锁 + lock.unlock(); + } + } + + public void put(E e) throws InterruptedException { + checkNotNull(e); + final ReentrantLock lock = this.lock; + // 加锁,如果线程中断了抛出异常 + lock.lockInterruptibly(); + try { + // 如果数组满了,使用notFull等待 + // notFull等待的意思是说现在队列满了 + // 只有取走一个元素后,队列才不满 + // 然后唤醒notFull,然后继续现在的逻辑 + // 这里之所以使用while而不是if + // 是因为有可能多个线程阻塞在lock上 + // 即使唤醒了可能其它线程先一步修改了队列又变成满的了 + // 这时候需要再次等待 + while (count == items.length) + notFull.await(); + // 入队 + enqueue(e); + } finally { + // 解锁 + lock.unlock(); + } + } + + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + checkNotNull(e); + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + // 加锁 + lock.lockInterruptibly(); + try { + // 如果数组满了,就阻塞nanos纳秒 + // 如果唤醒这个线程时依然没有空间且时间到了就返回false + while (count == items.length) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + // 入队 + enqueue(e); + return true; + } finally { + // 解锁 + lock.unlock(); + } + } + + private void enqueue(E x) { + final Object[] items = this.items; + // 把元素直接放在放指针的位置上 + items[putIndex] = x; + // 如果放指针到数组尽头了,就返回头部 + if (++putIndex == items.length) + putIndex = 0; + // 数量加1 + count++; + // 唤醒notEmpty,因为入队了一个元素,所以肯定不为空了 + notEmpty.signal(); + } +``` + +## 1.4 出队 + +ArrayBlockingQueue 提供的出队方法如下: + +- poll() : 获取并移除此队列的头,如果此队列为空,则返回 null +- poll(long timeout, TimeUnit unit) : 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要) +- remove(Object o) : 从此队列中移除指定元素的单个实例(如果存在) +- take() : 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要) + +```java + public E remove() { + // 调用poll()方法出队 + E x = poll(); + if (x != null) + // 如果有元素出队就返回这个元素 + return x; + else + // 如果没有元素出队就抛出异常 + throw new NoSuchElementException(); + } + + public E poll() { + final ReentrantLock lock = this.lock; + // 加锁 + lock.lock(); + try { + // 如果队列没有元素则返回null,否则出队 + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } + } + + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + // 加锁 + lock.lockInterruptibly(); + try { + // 如果队列无元素,则阻塞等待在条件notEmpty上 + while (count == 0) + notEmpty.await(); + // 有元素了再出队 + return dequeue(); + } finally { + // 解锁 + lock.unlock(); + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + // 加锁 + lock.lockInterruptibly(); + try { + // 如果队列无元素,则阻塞等待nanos纳秒 + // 如果下一次这个线程获得了锁但队列依然无元素且已超时就返回null + while (count == 0) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + return dequeue(); + } finally { + lock.unlock(); + } + } + + private E dequeue() { + final Object[] items = this.items; + @SuppressWarnings("unchecked") + // 取取指针位置的元素 + E x = (E) items[takeIndex]; + // 把取指针位置设为null + items[takeIndex] = null; + // 取指针前移,如果数组到头了就返回数组前端循环利用 + if (++takeIndex == items.length) + takeIndex = 0; + // 元素数量减1 + count--; + if (itrs != null) + itrs.elementDequeued(); + // 唤醒notFull条件 + notFull.signal(); + return x; + } +``` + +# 2. ArrayBlockingQueue缺点 + +a)队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量; + +b)如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险; + +c)只使用了一个锁来控制入队出队,效率较低,那是不是可以借助分段的思想把入队出队分裂成两个锁呢?且听下回分解。 + +## 总结 + +(1)ArrayBlockingQueue 不需要扩容,因为是初始化时指定容量,并循环利用数组; + +(2)ArrayBlockingQueue 利用 takeIndex 和 putIndex 循环利用数组; + +(3)入队和出队各定义了四组方法为满足不同的用途; + +(4)利用重入锁和两个条件保证并发安全; + + + +引用: + +[**Java 并发编程 ----ArrayBlockingQueue 源码分析**](https://juejin.im/post/5ac9aba6518825555e5e1bec#comment) + +[**【死磕 Java 并发】—–J.U.C 之阻塞队列:ArrayBlockingQueue**](https://mp.weixin.qq.com/s/j-VHVQUQhRMYhLKYT_4urA) + +[**死磕 java 集合之 ArrayBlockingQueue 源码分析**](https://juejin.im/post/5cbc7fdb6fb9a068b03765b0#heading-0) \ No newline at end of file diff --git a/week_04/60/ConcurrentHashMap-60.md b/week_04/60/ConcurrentHashMap-60.md new file mode 100644 index 0000000..f02f530 --- /dev/null +++ b/week_04/60/ConcurrentHashMap-60.md @@ -0,0 +1,477 @@ +# **为什么要使用 ConcurrentHashMap** + +主要基于两个原因: + +1. 在并发编程中使用 HashMap 可能造成死循环(jdk1.7,jdk1.8 中会造成数据丢失) +2. HashTable 效率非常低下 + +# 1. ConcurrentHashMap 结构 + +jdk 1.7 和 jdk 1.8 中,ConcurrentHashMap 的结构有着很大的变化 + +## 1.1 jdk 1.7 中结构 + +![jdk1.7 ConcurrentHashMap结构](https://user-gold-cdn.xitu.io/2019/12/11/16ef52f1ce824293?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +在 jdk 1.7 中,ConcurrentHashMap 是由 Segment 数据结构和 HashEntry 数组结构构成。采取分段锁来保证安全性。 + +Segment 是 ReentrantLock 重入锁,在 ConcurrentHashMap 中扮演锁的角色;HashEntry 则用于存储键值对数据。 + +一个 ConcurrentHashMap 里包含一个 Segment 数组,一个 Segment 里包含一个 HashEntry 数组,Segment 的结构和 HashMap 类似,是一个数组和链表结构。 + +## 1.2 jdk 1.8 中结构 + +![jdk1.8 ConcurrentHashMap结构](https://user-gold-cdn.xitu.io/2018/4/14/162c260d64e630de?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +JDK1.8 的实现已经摒弃了 Segment 的概念,而是直接用 Node 数组+链表+红黑树的数据结构来实现,并发控制使用 Synchronized 和 CAS 来操作,整个看起来就像是优化过且线程安全的 HashMap,虽然在 JDK1.8 中还能看到 Segment 的数据结构,但是已经简化了属性,只是为了兼容旧版本。 + +![img](https://user-gold-cdn.xitu.io/2018/4/14/162c260db23db861?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + +根据上面注释可以简单总结: + +- JDK1.8 底层是**散列表 + 红黑树** +- ConCurrentHashMap 支持**高并发**的访问和更新,它是**线程安全**的 +- 检索操作不用加锁,get 方法是非阻塞的 +- key 和 value 都不允许为 null + +# 2. ConcurrentHashMap(1.8)的实现 + +## 2.1 基本属性及概念 + +```java + //node数组最大容量:2^30=1073741824 + private static final int MAXIMUM_CAPACITY = 1 << 30; + //默认初始值,必须是2的幂数 + private static final int DEFAULT_CAPACITY = 16; + //数组可能最大值,需要与toArray()相关方法关联 + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + //并发级别,遗留下来的,为兼容以前的版本 + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + //负载因子 + private static final float LOAD_FACTOR = 0.75f; + //链表转红黑树阀值,> 8 链表转换为红黑树 + static final int TREEIFY_THRESHOLD = 8; + //树转链表阀值,小于等于6(tranfer时,lc、hc=0两个计数器分别++记录原bin、新binTreeNode数量,<=UNTREEIFY_THRESHOLD 则untreeify(lo)) + static final int UNTREEIFY_THRESHOLD = 6; + static final int MIN_TREEIFY_CAPACITY = 64; + private static final int MIN_TRANSFER_STRIDE = 16; + private static int RESIZE_STAMP_BITS = 16; + //2^15-1,help resize的最大线程数 + private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; + //32-16=16,sizeCtl中记录size大小的偏移量 + private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + //forwarding nodes的hash值 + static final int MOVED = -1; + //树根节点的hash值 + static final int TREEBIN = -2; + //ReservationNode的hash值 + static final int RESERVED = -3; + //可用处理器数量 + static final int NCPU = Runtime.getRuntime().availableProcessors(); + //存放node的数组 + transient volatile Node[] table; + /*控制标识符,用来控制table的初始化和扩容的操作,不同的值有不同的含义 + *当为负数时:-1代表正在初始化,-N代表有N-1个线程正在 进行扩容 + *当为0时:代表当时的table还没有被初始化 + *当为正数时:表示初始化或者下一次进行扩容的大小 + */ + private transient volatile int sizeCtl; +``` + +- **table**: 默认为 null,初始化发生在第一次插入操作,默认大小为 16 的数组,用来存储 Node 节点数据,扩容时大小总是 2 的幂次方。 +- **nextTable**: 默认为 null,扩容时新生成的数组,其大小为原数组的两倍 +- **Node** : 保存 key,value 及 key 的 hash 值的数据结构。 + +```java + class Node implements Map.Entry { + final int hash; + final K key; + volatile V val; + volatile Node next; + //省略部分代码 + } +``` + +其中 value 和 next 都用 `volatile` 修饰,保证并发的可见性。 + +- **ForwardingNode**: 一个特殊的 Node 节点,hash 值为 -1,其中存储 nextTable 的引用。 + +```java + final class ForwardingNode extends Node { + final Node[] nextTable; + ForwardingNode(Node[] tab) { + super(MOVED, null, null, null); + this.nextTable = tab; + } + } +``` + +只有 table 发生扩容的时候,ForwardingNode 才会发挥作用,作为一个占位符放在 table 中表示当前节点为 null 或则已经被移动。 + +- **TreeNode 类和 TreeBin 类**:  TreeNode 类表示的是红黑树上的每个节点。当一个链表上的节点数量超过了指定的值,会将这个链表变为红黑树,当然每个节点就转换为 TreeNode。不像 HashMap,ConcurrentHashMap 在桶里面直接存储的不是 TreeNode,而是一个 TreeBin,在 TreeBin 内部维护一个红黑树,也就是说 TreeNode 在 TreeBin 内部使用的。 + +## 2.2 初始化 + +实例化 ConcurrentHashMap 时带参数时,会根据参数调整 table 的大小,假设参数为 100,最终会调整成 256,确保 table 的大小总是 2 的幂次方. + +**table 初始化** + +```java +private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片 + if ((sc = sizeCtl) < 0) + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + if ((tab = table) == null || tab.length == 0) { + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; + } + break; + } + } + return tab; +} +``` + +## 2.3 put 操作 + +假设 table 已经初始化完成,put 操作采用 CAS + synchronized 实现并发插入或更新操作。 + +```java + final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); + int hash = spread(key.hashCode()); + int binCount = 0; + for (Node[] tab = table;;) { + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + if (casTabAt(tab, i, null, new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + ...省略部分代码 + } + addCount(1L, binCount); + return null; + } +``` + +**hash 算法** + +```java + static final int spread(int h) { + return (h ^ (h >>> 16)) & HASH_BITS; + } +``` + +table 中定位索引位置,n 是 table 的大小 + +``` +int index = (n - 1) & hash +``` + +**获取 table 中对应索引的元素 f** + +Unsafe.getObjectVolatile 可以直接获取指定内存的数据,保证了每次拿到数据都是最新的。 + +如果 f 为 null,说明 table 中这个位置第一次插入元素,利用 Unsafe.compareAndSwapObject 方法插入 Node 节点。 + +如果 CAS 成功,说明 Node 节点已经插入,随后 addCount(1L, binCount) 方法会检查当前容量是否需要进行扩容。 + +如果 CAS 失败,说明有其它线程提前插入了节点,自旋重新尝试在这个位置插入节点。 + +如果 f 的 hash 值为 -1,说明当前 f 是 ForwardingNode 节点,意味有其它线程正在扩容,则一起进行扩容操作。 + +其余情况把新的 Node 节点按链表或红黑树的方式插入到合适的位置,这个过程采用同步内置锁实现并发,代码如下: + +```java + synchronized (f) { + if (tabAt(tab, i) == f) { + if (fh >= 0) { + binCount = 1; + for (Node e = f;; ++binCount) { + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } +``` + +在节点 f 上进行同步,节点插入之前,再次利用`tabAt(tab, i) == f`判断,防止被其它线程修改。 + +如果 f.hash >= 0,说明 f 是链表结构的头结点,遍历链表,如果找到对应的 node 节点,则修改 value,否则在链表尾部加入节点。 如果 f 是 TreeBin 类型节点,说明 f 是红黑树根节点,则在树结构上遍历元素,更新或增加节点。 如果链表中节点数 binCount >= TREEIFY_THRESHOLD(默认是 8),则把链表转化为红黑树结构。 + +**table 扩容** + +当 table 容量不足的时候,即 table 的元素数量达到容量阈值 `sizeCtl`,需要对 table 进行扩容。 + +整个扩容分为两部分: + +构建一个 nextTable,大小为 table 的两倍。 把 table 的数据复制到 nextTable 中。 + +这两个过程在单线程下实现很简单,但是 ConcurrentHashMap 是支持并发插入的,扩容操作自然也会有并发的出现,这种情况下,第二步可以支持节点的并发复制,这样性能自然提升不少,但实现的复杂度也上升了一个台阶。 + +先看第一步,**构建 nextTable**,毫无疑问,这个过程只能只有单个线程进行 nextTable 的初始化,具体实现如下: + +```java +private final void addCount(long x, int check) { + ... 省略部分代码 + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } +} +``` + +通过 Unsafe.compareAndSwapInt 修改 sizeCtl 值,保证只有一个线程能够初始化 nextTable,扩容后的数组长度为原来的两倍,但是容量是原来的 1.5。 + +节点从 table 移动到 nextTable,大体思想是遍历、复制的过程。 + +首先根据运算得到需要遍历的次数 i,然后利用 tabAt 方法获得 i 位置的元素 f,初始化一个 forwardNode 实例 fwd。 + +如果 f == null,则在 table 中的 i 位置放入 fwd,这个过程是采用 Unsafe.compareAndSwapObjectf 方法实现的,很巧妙的实现了节点的并发移动。 + +如果 f 是链表的头节点,就构造一个反序链表,把他们分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 如果 f 是 TreeBin 节点,也做一个反序处理,并判断是否需要 untreeify,把处理的结果分别放在 nextTable 的 i 和 i+n 的位置上,移动完成,同样采用 Unsafe.putObjectVolatile 方法给 table 原位置赋值 fwd。 遍历过所有的节点以后就完成了复制工作,把 table 指向 nextTable,并更新 sizeCtl 为新数组大小的 0.75 倍 ,扩容完成。 + +**红黑树构造** + +注意:如果链表结构中元素超过 TREEIFY_THRESHOLD 阈值,默认为 `8` 个,则把链表转化为红黑树,提高遍历查询效率。 + +```java + if (binCount != 0) { + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } +``` + +接下来我们看看如何构造树结构,代码如下: + +```java + private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) + tryPresize(n << 1); + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { + synchronized (b) { + if (tabAt(tab, index) == b) { + TreeNode hd = null, tl = null; + for (Node e = b; e != null; e = e.next) { + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) + hd = p; + else + tl.next = p; + tl = p; + } + setTabAt(tab, index, new TreeBin(hd)); + } + } + } + } + } +``` + +可以看出,生成树节点的代码块是同步的,进入同步代码块之后,再次验证 table 中 index 位置元素是否被修改过。 + +- 根据 table 中 index 位置 Node 链表,重新生成一个 hd 为头结点的 TreeNode 链表。 +- 根据 hd 头结点,生成 TreeBin 树结构,并把树结构的 root 节点写到 table 的 index 位置的内存中,具体实现如下: + +```java + TreeBin(TreeNode b) { + super(TREEBIN, null, null, null); + this.first = b; + TreeNode r = null; + for (TreeNode x = b, next; x != null; x = next) { + next = (TreeNode)x.next; + x.left = x.right = null; + if (r == null) { + x.parent = null; + x.red = false; + r = x; + } + else { + K k = x.key; + int h = x.hash; + Class kc = null; + for (TreeNode p = r;;) { + int dir, ph; + K pk = p.key; + if ((ph = p.hash) > h) + dir = -1; + else if (ph < h) + dir = 1; + else if ((kc == null && + (kc = comparableClassFor(k)) == null) || + (dir = compareComparables(kc, k, pk)) == 0) + dir = tieBreakOrder(k, pk); + TreeNode xp = p; + if ((p = (dir <= 0) ? p.left : p.right) == null) { + x.parent = xp; + if (dir <= 0) + xp.left = x; + else + xp.right = x; + r = balanceInsertion(r, x); + break; + } + } + } + } + this.root = r; + assert checkInvariants(root); + } +``` + +主要根据 Node 节点的 hash 值大小构建二叉树。 + +## 2.4 get 操作 + +get 操作和 put 操作相比,显得简单了许多。 + +```java + public V get(Object key) { + Node[] tab; + Node e, p; + int n, eh; + K ek; + int h = spread(key.hashCode()); + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { + if ((eh = e.hash) == h) { + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } +``` + +- 判断 table 是否为空,如果为空,直接返回 null。 +- 计算 key 的 hash 值,并获取指定 table 中指定位置的 Node 节点,通过遍历链表或则树结构找到对应的节点,返回 value 值。 + +## 2.5 size 操作 + +JDK1.8 size 是通过对 baseCount 和 counterCell 进行 CAS 计算,最终通过 baseCount 和 遍历 CounterCell 数组得出 size。 + +```java +public int size() { + long n = sumCount(); + return ((n < 0L) ? 0 : + (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); +} +``` + +最大值是 Integer 类型的最大值,但是 Map 的 size 可能超过 MAX_VALUE, 所以还有一个方法 `mappingCount()`,JDK 的建议使用 `mappingCount()` 而不是`size()`。`mappingCount()` 的代码如下: + +```java +public long mappingCount() { + long n = sumCount(); + return (n < 0L) ? 0L : n; // ignore transient negative values +} +``` + +以上可以看出,无论是 `size()` 还是 `mappingCount()`, 计算大小的核心方法都是 `sumCount()`。`sumCount()` 的代码如下: + +```java +final long sumCount() { + CounterCell[] as = counterCells; CounterCell a; + long sum = baseCount; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; +} +``` + +分析一下 `sumCount()` 代码。ConcurrentHashMap 提供了 baseCount、counterCells 两个辅助变量和一个 CounterCell 辅助内部类。`sumCount()` 就是迭代 counterCells 来统计 sum 的过程。 put 操作时,肯定会影响 `size()`,在 `put()` 方法最后会调用 `addCount()` 方法。 + +# 3. JDK 1.8 中为什么要摒弃分段锁 + +使用重级锁 synchronized,性能反而更高,原因如下: + +1. **jdk1.8 中锁的粒度更细了**。jdk1.7 中 ConcurrentHashMap 的 concurrentLevel(并发数)基本上是固定的。jdk1.8 中的 concurrentLevel 是和数组大小保持一致的,每次扩容,并发度扩大一倍. +2. **红黑树的引入**,对链表的优化使得 hash 冲突时的 put 和 get 效率更高 +3. **获得 JVM 的支持** ,ReentrantLock 毕竟是 API 这个级别的,后续的性能优化空间很小。 synchronized 则是 JVM 直接支持的, JVM 能够在运行时作出相应的优化措施:锁粗化、锁消除、锁自旋等等。这就使得 synchronized 能够随着 JDK 版本的升级而不改动代码的前提下获得性能上的提升。 + + + + + +引用: + +[**ConcurrentHashMap 基于 JDK1.8 源码剖析**](https://juejin.im/post/5ad1814d6fb9a028ce7c0835#comment) + +[ConcurrentHashMap 原理浅析](https://juejin.im/post/5df0f172e51d45583a66b756#heading-1) + +[ConcurrentHashMap 的size方法原理分析](https://zhuanlan.zhihu.com/p/40627259) \ No newline at end of file diff --git a/week_04/60/ConcurrentLinkedQueue-60.md b/week_04/60/ConcurrentLinkedQueue-60.md new file mode 100644 index 0000000..b785109 --- /dev/null +++ b/week_04/60/ConcurrentLinkedQueue-60.md @@ -0,0 +1,531 @@ +在并发编程中,我们可能经常需要用到线程安全的队列,JDK提供了两种模式的队列:阻塞队列和非阻塞队列。阻塞队列使用锁实现,非阻塞队列使用CAS实现。ConcurrentLinkedQueue是一个基于链表实现的无界线程安全队列。 + +# 1. ConcurrentLinkedQueue 原理分析 + +## 1.1 成员属性,构造方法 + + ConcurrentLinkedQueue 由 head 和 tail 节点组成,节点与节点之间通过 next 连接,从而来组成一个链表结构的队列。 + +```java +private transient volatile Node head; +private transient volatile Node tail; +``` + +当我们调用无参构造器时,其源码为: + +```java + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } +``` + +head 和 tail 指针会指向一个 item 域为 null 的节点, 此时 ConcurrentLinkedQueue 状态如下图所示: + +如图,head 和 tail 指向同一个节点 Node0,该节点 item 域为 null,next 域为 null。 + +![img](https://user-gold-cdn.xitu.io/2018/5/6/1633459982863c26?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + +## 1.2 Node类 + +```java + private static class Node { + volatile E item; + volatile Node next; + + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + //更改Node中的数据域item + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + //更改Node中的指针域next + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + //更改Node中的指针域next + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + } catch (Exception e) { + throw new Error(e); + } + } + } +``` + +Node 节点主要包含了两个域:一个是数据域 item,另一个是 next 指针,用于指向下一个节点从而构成链式队列。并且都是用 volatile 进行修饰的,以保证内存可见性。 + +在队列进行出队入队的时候免不了对节点需要进行操作,在多线程就很容易出现线程安全的问题。可以看出在处理器指令集能够支持 **CMPXCHG** 指令后,在 java 源码中涉及到并发处理都会使用 CAS 操作 ,那么在 ConcurrentLinkedQueue 对 Node 的 CAS 操作有这样几个: + +```java + //更改Node中的数据域item + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + //更改Node中的指针域next + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + //更改Node中的指针域next + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } +``` + +## 1.3 offer 方法 + +```java + private static void checkNotNull(Object v) { + if (v == null) + throw new NullPointerException(); + } + public boolean offer(E e) { + //(1)如果e为null会抛出空指针异常 + checkNotNull(e); + //(2)创建一个新的Node结点,Node的构造函数中会调用Unsafe类的putObject方法 + final Node newNode = new Node(e); + //(3)从尾节点插入新的结点 + for (Node t = tail, p = t;;) { + //q为尾节点的next结点,但是在多线程中,如果有别的线程修改了tail结点那么在本线程中可以看到p!=null(后 + //面的CAS就是这样做的) + Node q = p.next; + //(4)如果q为null,说明现在p是尾节点,那么可以执行添加 + if (q == null) { + //(5)这里使用cas设置p结点的next结点为newNode + //(传入null,比较p的next是否为null,为null则将next设置为newNode) + if (p.casNext(null, newNode)) { + //(6)下面是更新tail结点的代码 + //在CAS执行成功之后,p(原链表的tail)结点的next已经是newNode,这里就设置tail结点为newNode + if (p != t) // hop two nodes at a time + // 如果p不等于t,说明有其它线程先一步更新tail + // 也就不会走到q==null这个分支了 + // p取到的可能是t后面的值 + // 把tail原子更新为新节点 + casTail(t, newNode); // Failure is OK. + return true; + } + } + //如果被移除了 + else if (p == q) + //(7)多线程操作的时候,可能会有别的线程使用poll方法移除元素后可能会把head的next变成head,所以这里需要找到新的head:这里请参考后面的poll方法的讲解图示进行理解 + p = (t != (t = tail)) ? t : head; + else + // (8)查询尾节点 + p = (p != t && t != (t = tail)) ? t : q; + } + } +``` + +### 单线程执行 + + 在单线程环境下执行,那么就直接按照方法实现一步步执行判断即可,下面通过适当的图示来说明这个过程 + +- 首先当一个线程调用 offer 方法的时候,在代码(1)处进行非空检查,为 null 抛出异常,不为 null 执行`(2)` +- 代码(2)`Node newNode = new Node(e)`使用 item 作为构造函数的参数,创建一个新的结点 +- 代码(3)`for (Node t = tail, p = t;;)`从队列尾部开始自旋循环,保证从队列尾部添加新的结点 +- 获得`tail`的`next`结点 (`q`),此时的队列情况如下图所示(`默认构造方法中将head和tail都指向的是一个item为null的结点`)。此时的`q`指向的是 null + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5740db0f25a60?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 代码(4)`if (q == null)`处执行判断`q==null`为 true +- 代码(5)`if (p.casNext(null, newNode))`处执行的是将`p`的 next 结以 CAS 方式更新为我们创建的`newNode`。(其中 CAS 会判断 p 的 next 是否为 null,为 null 才更新为`newNode`) +- 此时的`p==t`, 所以不会执行更新 tail 的代码块(6)`casTail(t, newNode)`,而是从 offer 方法退出。这时候队列情况如下所示 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5741110d68814?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 那么这一个线程执行完,可是 tail 还没有改变呢:实际上第二次进行 offer 的时候,会发现`p=tail,p.next!=null`,就会执行代码(8)`p = (p != t && t != (t = tail)) ? t : q`,简单分析一下: + - `p != t`:p 为 tail,t 为 tail,所以为`false` + - `t != (t = tail)`:显然也是 false + +- 所以结果就是 p=q,然后进行下一次循环,之后判断的`p.next`就是 null,所以可以 CAS 成功,也因为`p!=t`,所以会更新 tail 结点。 + +`tail并不总是指向队列的尾节点`,那么实际上也可以换种方式让 tail 指向尾节点,即如下这样实现 + +```java + if (e == null) + throw new NullPointerException(); + Node n = new Node(e); + for (;;) { + Node t = tail; + if (t.casNext(null, n) && casTail(t, n)) { + return true; + } + } +``` + +但是如果大量的入队操作,那么每次都需要以 CAS 方式更新 tail 指向的结点,当数据量很大的时候对性能的影响是很大的。所以最终实现上,是以减少 CAS 操作来提高大数量的入队操作的性能:每间隔 1 次(tail 指向和真正的尾节点之间差 1)进行 CAS 操作更新 tail 指向尾节点(但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要`多循环一次来定位出尾节点(将指向真正的尾节点,然后添加newNode)`)。其实在前面分析成员属性时候也知道了,tail 是被 volatile 修饰的,而 CAS 方式本质上还是对于 volatile 变量的读写操作,而 volatile 的写操作开销大于读操作的,所以`Concurrent Linked Queue的是线上是通过增加对于volatile变量的读操作次数从而相对的减少对其写操作`。下面是单线程执行 offer 方法的时候 tail 指向的变化简图示意 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57416445c12e1?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + +### 多线程执行 + + 上面演示的单个线程的执行,那么当在多线程环境下执行的话会发生什么情况,这里假设两个线程并发的执行. + +#### 情况 1 + + `这里分析的其实就是假设多个线程都会执行到CAS更新p.next结点的代码`, 我们下面看一下,假设 threadA 调用 offer(item1),threadB 调用 offer(item2)都执行到`p.casNext(null, newNode)`位置处 + +- CAS 操作的原子性,假设 threadA 先执行了上面那行代码,并成功更新了`p.next为newNode` +- 这时候 threadB 自然在进行 CAS 比较的时候就会失败了(`p.next!=null`),所以会进行下一次循环重新获取 tail 结点然后尝试更新 + +这时候的队列情况如下 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5741986d52c20?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- threadB 获得 tail 结点之后,发现其`q!=null`(`q=p.next,p=tail`) +- 继续判断`p==q`也是`false`,所以执行代码(8) +- 分析一下`p = (p != t && t != (t = tail)) ? t : q`这个代码 + 1. `p != t`:p 为 tail,t 为 tail,所以为`false` + 2. `t != (t = tail)`:显然也是 false + 3. 所以上面三目运算的结果就是`p=q`,如下图所示结果 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5741ea58efe81?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 然后再次执行循环,这时候`p.next`就是 null 了,所以可以执行代码(5)`p.casNext(null,newNode)`。这个时候 CAS 判断得到`p.next == null`,所以可以设置`p.next=Node(item2)` +- CAS 成功后,判断`p!=t`(如上图所示),所以就可以设置 tail 为 Node(item2) 了。然后从 offer 退出,这个时候队列情况为 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57420a2e617b3?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + + 可以看出,`情况1`中假设两个线程初始时候都拿到的是`p=tail,p.next=null`,那么都会执行 CAS 尝试添加`newNode`,但是只有一个线程能够在第一次循环的时候添加成功然后返回 true(`但是这时候的tail还没有变化,类似单线程总结那块的tail和真正的尾节点差1或0`),所以另一个线程会在第二次循环中重新尝试,这个时候就会改变 p 的指向,即`p = (p != t && t != (t = tail)) ? t : q`代码处。然后再第三次循环中才能真正 CAS 添加成功 (当然我们这里分析的是假想的两个线程情况,实际多线程环境肯定更复杂,但是逻辑还是差不多的) + +#### 情况 2 + + 这里分析的是主要是代码`p = (p != t && t != (t = tail)) ? t : q`的另一种情况,即`p=t`的情况, 还是先分析一下这行,假设现在 + +- `p != t`为 true, +- t != (t = tail) : 也为 true(左边的 t 是再循环开始的时候获得的指向 tail 的信息,括号中重新获得 tail 并赋值给 t,这个时候有可能别的线程已经更改了 `volatile`修饰的 tail 了) + + 那么结果就是 p 重新指向队列的尾节点 tail 了,下面假想一种这样的情况 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57422728ecc3f?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + + 实际上这种是利用`volatile的可见性`,`快速将一个要添加元素的线程找到当前队列的尾节点`,避免多余的循环。 如图,假设 threadA 此时读取了变量 tail,threadB 刚好在这个时候添加若干 Node 后,此时会修改 tail 指针, 那么这个时候线程 A 再次执行 t=tail 时 t 会指向另外一个节点,所以 threadA 前后两次读取的变量 t 指向的节点不相同,即`t != (t = tail)`为 true, 并且由于 t 指向节点的变化`p != t`也为 true,此时该行代码的执行结果为 p 和 t 最新的 t 指针指向了同一个节点,并且此时 t 也是队列真正的尾节点。那么,现在已经定位到队列真正的队尾节点,就可以执行 offer 操作了。 + +#### 情况 3 + + 上面我们讨论的都是多线程去添加元素的操作,那么当既有线程 offer 也有线程调用 poll 方法的时候呢,这里就要调用 offer 方法中的代码块(7)了。因为还没有说到 poll 方法,所以这里的代码就先不做解释,下面讲 poll 方法在多线程中的执行的时候,会拿 offer-poll-offer 这种情况进行说明,那么 offer 方法就可能执行这几行代码了。 + +```java +else if (p == q) + //(7)多线程操作的时候,可能会有别的线程使用poll方法移除元素后可能会把head的next变成head,所以这里需要找到新的head + p = (t != (t = tail)) ? t : head; +``` + +## 1.4 add 方法 + +```java + public boolean add(E e) { + return offer(e);//这里还是调用的offer方法,上面说到了,这里就不说明了 + } +``` + +## 1.5 poll 方法 + + poll 方法是在队列头部获取并移除一个元素,如果队列为空就返回 null,下面先看下 poll 方法的源码,然后还是分别分析单线程和多线程下的执行 + +```java + public E poll() { + //标记 + restartFromHead: + for (;;) {//自旋循环 + for (Node h = head, p = h, q;;) { + //(1)保存当前结点的item + E item = p.item; + //(2)如果当前结点的值不为null,那就将其变为null + if (item != null && p.casItem(item, null)) { + //(3)CAS成功之后会标记当前结点,并从链表中移除 + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + //(4)如果队列为空会返回null + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + //(5)如果当前结点被自引用了,重新找寻新的队列头节点 + else if (p == q) + continue restartFromHead; + else + p = q; //进行下一次循环,改变p的指向位置 + } + } + } + final void updateHead(Node h, Node p) { + if (h != p && casHead(h, p)) + h.lazySetNext(h); + } +``` + +### 单线程执行 + + poll 操作是从队头获取元素,所以: + +- 从 head 结点开始循环,首先`for (Node h = head, p = h, q;;)`获得当前队列的头节点,当然如果队列一开始就为空的时候,就如下所示 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5742482f76203?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + + 由于 head 结点是作为哨兵结点存在的,所以会执行到代码(4)`else if ((q = p.next) == null)`,因为队列为空,所以直接执行`updateHead(h, p)`,而`updateHead`方法中判断的`h=p`,所以直接返回 null。 + +- 上面是队列为空的情况 ,那么当队列不为空的时候呢,假设现在队列情况如下所示 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c574273d22e8f9?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 所以在代码 (4)`else if ((q = p.next) == null)`处的判断结果是 false, +- 所以执行下一个判断`else if (p == q)`,判断结果还是 false +- 最后执行`p=q`,完了之后下一次循环队列状态为 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57429664b1512?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 在新的一次循环中,可以判断得到 item!=null,所以使用 CAS 方式将 item 设置为 null,(这是单线程情况下的测试)所以继续执行`if(p!=h)`,判断结果为 true。所以执行 if 中的内容:`updateHead(h, ((q = p.next) != null) ? q : p)`,什么意思呢?如下所示,所以我们这里的结果就是 q=null,所以传入的参数为 p(p 指向的位置如上图所示) + + ``` + //updateHead方法的参数(Node h,Node p) + q = p.next; + if(null != q) { + //第二个参数就是q + } else { + //第二个参数就是p + } + 复制代码 + ``` + + 然后执行 updateHead 方法,这里我们需要再看一下该方法的细节 + + ``` + final void updateHead(Node h, Node p) { + //如果h!=p,就以CAS的方式将head结点设置为p + if (h != p && casHead(h, p)) + //这里是将h结点的next结点设置为自己(h) + h.lazySetNext(h); + } + //Node类中的方法 + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + 复制代码 + ``` + + 那么执行完这些之后,队列中状态是什么样呢,如下图所示。执行完毕就返回被移除的元素怒 item1 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5742b33da5f91?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +### 多线程执行 offer、poll + + 上面分析了单线程下,调用 poll 方法的执行流程。其实刚刚再将 offer 方法的时候还有一个坑没有解决。如下描述的情况 + +- 假设原有队列中有一个元素 item1 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5742d0a260b43?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 假设在 thread1 调用 offer 方法的时候,别的线程刚好调用 poll 方法将 head 结点移除了,按照上面的分析,poll 方法调用后队列的情况如下 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c5742fcdf58d72?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- (这里回忆一下 offer 的执行流程) 所以在 thread1 继续执行的时候,执行的`for (Node t = tail, p = t;;)`之后获得 tail 指向的位置如上图所示,但是这个 tail 指向的结点的 next 指针指向的位置还是自己。所以`Node q = p.next`执行之后 q=tail=p。所以在 offer 方法中就会执行以下判断 + + ``` + else if (p == q) + //(7)多线程操作的时候,可能会有别的线程使用poll方法移除元素后可能会把head的next变成head,所以这里需要找到新的head + p = (t != (t = tail)) ? t : head; + 复制代码 + ``` + + 还是简单分析一下`p = (t != (t = tail)) ? t : head`这句,如下所示。简单分析之后就能得出,p 指向了 poll 方法调用完毕后的新的 head 结点(如上图所示的 head 结点),然后调用 offer 的线程就能正常的添加结点了,具体流程还是和上面讲到的一样。(那这个 tail 又在什么时候被指向队尾结点呢,实际上在调用 offer 方法添加完元素之后`p.casNext(null, newNode)`,就会判断得出`p != t`,那完了之后就会更新 tail 指向的位置了) + + ``` + //在最开始时候获得的t=tail + t=tail; //for循环中赋值t + //...offer的其他代码 + if(t != (t = tail)) { //这里还是一样:tail为volatile修饰,所以重新读取tail变量 + p = t; //这里表示tail结点不变(按照上图poll执行完后的情况,tail指向位置没有变化,所以p不会被赋值为t) + } else { + p = head; //注意这时候的head已经指向的新的首结点 + } + 复制代码 + ``` + +### 多线程执行 poll、poll + + 分析这么多,我们发现跟 offer 方法留坑一样,poll 还有一处代码还没有分析,所以下面还是通过图示进行分析,先看下这个代码框架。 + +``` +//标记 +restartFromHead: +for (;;) {//自旋循环 + for (Node h = head, p = h, q;;) { + //...other code + //这是自旋循环体中的一个判断 + else if (p == q) + continue restartFromHead; + } +} +复制代码 +``` + + 还是假设现在两个线程去执行 poll 方法, + +- 初始情况下的队列状态为 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57431d3d52bd9?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 假设 threadA 执行 poll 方法,并成功的执行`if (item != null && p.casItem(item, null))`这块,将 item1 设置为了 null,如下图所示。 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57433e91d9b48?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 但是 threadA 还没有执行 updateHead 方法,这个时候 threadB 执行 poll 之后,p 指向了上图中的 head,如下所示 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c57435bf2de80f?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +- 之后 threadA 执行 updateHead 方法更新了 head 的指向,并将原 head 的 next 结点指向自己. 那么线程 B 执行`q=p.next`,自然得到的就是`p==q`的结果了,所以这个时候就需要跳到外层循环重新获取最新的 head 结点,然后继续执行 + +![img](https://user-gold-cdn.xitu.io/2019/8/3/16c574383ac3b397?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + + + +### poll 方法总结 + + poll 方法在移除头部元素的时候,使用 CAS 操作将头节点的 item 设置为了 null,然后通过冲洗设置头节点 head 的指向位置来达到删除队列元素的效果。这个时候原来的头部哨兵结点就是一个孤立的结点了,会被回收掉。当然,如果线程执行 poll 方法的时候发现 head 结点被修改(上面说的这种情况),就需要跳转到最外层循环重新获取新的结点。 + +## 1.6 peek 方法 + + 获取队列头部的第一个元素但不删除,如果队列为空则返回 null。下面是该方法的实现 + +```java + public E peek() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + if (item != null || (q = p.next) == null) { + updateHead(h, p); + return item; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } +``` + + 需要注意的是,第一次调用 peek 方法的时候会删除哨兵结点,并让队列中的 head 结点指向队列中的第一个元素或者 null. + +## 1.7 size 方法 + + 计算当前队列元素个数,但是因为使用的是 CAS 的方式在并发环境下可能因为别的线程删除或者增加元素导致计算结果不准确。 + +```java + public int size() { + int count = 0; + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) + // Collection.size() spec says to max out + if (++count == Integer.MAX_VALUE) + break; + return count; + } + //找到队列中的第一个元素(head指向的item为null的结点不算(就是哨兵结点)), + //没有则返回null + Node first() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + boolean hasItem = (p.item != null); + if (hasItem || (q = p.next) == null) { + updateHead(h, p); + return hasItem ? p : null; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } +``` + +## 1.8 remove 方法 + + 传入的参数为要删除的元素,如果队列中存在该元素就删除找到的第一个,然后返回 true,否则返回 false + +```java + public boolean remove(Object o) { + if (o != null) { //如果传入参数为null,直接返回false + Node next, pred = null; + for (Node p = first(); p != null; pred = p, p = next) { + boolean removed = false; + E item = p.item; + //找到相等的就使用cas设置为null,只有一个线程操作成功 + //别的循环查找是否又别的匹配的obj + if (item != null) { + if (!o.equals(item)) { + //获取next元素 + next = succ(p); + continue; + } + removed = p.casItem(item, null); + } + + next = succ(p); + if (pred != null && next != null) // unlink + pred.casNext(p, next); + if (removed) + return true; + } + } + return false; + } +``` + +引用: + +[ConcurrentLinkedQueue源码分析](https://juejin.im/post/5d4789ca51882519ac307a6f) + +[**并发容器之 ConcurrentLinkedQueue**](https://juejin.im/post/5aeeae756fb9a07ab11112af#heading-0) + diff --git a/week_04/60/CopyOnWriteArrayList-60.md b/week_04/60/CopyOnWriteArrayList-60.md new file mode 100644 index 0000000..d27d801 --- /dev/null +++ b/week_04/60/CopyOnWriteArrayList-60.md @@ -0,0 +1,212 @@ +# 1. CopyOnWriteArrayList简介 + +CopyOnWriteArrayList 是同步 List 的替代品,CopyOnWriteArraySet 是同步 Set 的替代品。 + +无论是 Hashtable-->ConcurrentHashMap,还是说 Vector-->CopyOnWriteArrayList。JUC 下支持并发的容器与老一代的线程安全类相比,总结起来就是加锁**粒度**的问题 + +- Hashtable、Vector 加锁的粒度大 (直接在方法声明处使用 synchronized) +- ConcurrentHashMap、CopyOnWriteArrayList 加锁粒度小 (用各种的方式来实现线程安全,比如 ConcurrentHashMap 用了 cas 锁、volatile 等方式来实现线程安全..) +- JUC 下的线程安全容器在遍历的时候**不会**抛出 ConcurrentModificationException 异常 + +所以一般来说都会**使用 JUC 包下给我们提供的线程安全容器**,而不是使用老一代的线程安全容器。 + +# 2. CopyOnWriteArrayList 实现原理 + +## 2.1 COW 的设计思想 + +COW 通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行 Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。对 CopyOnWrite 容器进行并发的读的时候,不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,延时更新的策略是通过在写的时候针对的是不同的数据容器来实现的,放弃数据实时性达到数据的最终一致性。 + +## 2.2 CopyOnWriteArrayList 的实现原理 + +概括一下 CopyOnWriteArrayList 源码注释介绍了什么: + +- CopyOnWriteArrayList 是线程安全容器 (相对于 ArrayList),底层通过**复制数组**的方式来实现; +- CopyOnWriteArrayList 在遍历的使用不会抛出 ConcurrentModificationException 异常,并且遍历的时候就不用额外加锁; +- 元素可以为 null; + +看一下 CopyOnWriteArrayList 基本的结构: + +```java + /** 可重入锁对象 */ + final transient ReentrantLock lock = new ReentrantLock(); + + /** CopyOnWriteArrayList底层由数组实现,volatile修饰 */ + private transient volatile Object[] array; + + /** + * 得到数组 + */ + final Object[] getArray() { + return array; + } + + /** + * 设置数组 + */ + final void setArray(Object[] a) { + array = a; + } + + /** + * 初始化CopyOnWriteArrayList相当于初始化数组 + */ + public CopyOnWriteArrayList() { + setArray(new Object[0]); + } +``` + +实际上 CopyOnWriteArrayList 内部维护的就是一个数组,加锁就交由 ReentrantLock 来完成。 + +## 2.3 常见方法的实现 + +根据上面的分析我们知道如果遍历`Vector/SynchronizedList`是需要自己手动加锁的。 + +CopyOnWriteArrayList 使用迭代器遍历时不需要显示加锁,看看`add()、clear()、remove()`与`get()`方法的实现可能就有点眉目了。 + +首先我们可以看看`add()`方法: + +```java + public boolean add(E e) { + + // 加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + + // 得到原数组的长度和元素 + Object[] elements = getArray(); + int len = elements.length; + + // 复制出一个新数组 + Object[] newElements = Arrays.copyOf(elements, len + 1); + + // 添加时,将新元素添加到新数组中 + newElements[len] = e; + + // 将volatile Object[] array 的指向替换成新数组 + setArray(newElements); + return true; + } finally { + lock.unlock(); + } + } +``` + +通过代码我们可以知道:在添加的时候就上锁,并**复制一个新数组,增加操作在新数组上完成,将 array 指向到新数组中**,最后解锁。 + +- 采用 ReentrantLock,保证同一时刻只有一个写线程正在进行数组的复制,否则的话内存中会有多份被复制的数据; + +- 前面说过数组引用是 volatile 修饰的,因此将旧的数组引用指向新的数组,根据 volatile 的 happens-before 规则,写线程对数组引用的修改对读线程是可见的。 + +- 由于在写数据的时候,是在新的数组中插入数据的,从而保证读写实在两个不同的数据容器中进行操作。 + +再来看看`size()`方法: + +```java + public int size() { + + // 直接得到array数组的长度 + return getArray().length; + } +``` + +再来看看`get()`方法: + +```java + public E get(int index) { + return get(getArray(), index); + } + + final Object[] getArray() { + return array; + } +``` + +可以看出来 get 方法实现非常简单,几乎就是一个 “单线程” 程序,没有对多线程添加任何的线程安全控制,也没有加锁也没有 CAS 操作等等,原因是,所有的读线程只是会读取数据容器中的数据,并不会进行修改。 + +那再来看看`set()`方法 + +```java +public E set(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + + // 得到原数组的旧值 + Object[] elements = getArray(); + E oldValue = get(elements, index); + + // 判断新值和旧值是否相等 + if (oldValue != element) { + + // 复制新数组,新值在新数组中完成 + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len); + newElements[index] = element; + + // 将array引用指向新数组 + setArray(newElements); + } else { + // Not quite a no-op; enssures volatile write semantics + setArray(elements); + } + return oldValue; + } finally { + lock.unlock(); + } +} +``` + +对于`remove()、clear()`跟`set()和add()`是类似的,这里就不再贴出代码了。 + +总结: + +- **在修改时,复制出一个新数组,修改的操作在新数组中完成,最后将新数组交由 array 变量指向**。 +- **写加锁,读不加锁** + +## 2.4 为什么遍历时不用调用者显式加锁 + +为啥能够在容器遍历的时候对其进行修改而不抛出异常。所以,来看一下他的迭代器吧: + +```java + // 1. 返回的迭代器是COWIterator + public Iterator iterator() { + return new COWIterator(getArray(), 0); + } + + + // 2. 迭代器的成员属性 + private final Object[] snapshot; + private int cursor; + + // 3. 迭代器的构造方法 + private COWIterator(Object[] elements, int initialCursor) { + cursor = initialCursor; + snapshot = elements; + } + + // 4. 迭代器的方法... + public E next() { + if (! hasNext()) + throw new NoSuchElementException(); + return (E) snapshot[cursor++]; + } + //.... 可以发现的是,迭代器所有的操作都基于snapshot数组,而snapshot是传递进来的array数组 +``` + +由上可知,CopyOnWriteArrayList 在使用迭代器遍历的时候,操作的都是**原数组**! + +![img](https://user-gold-cdn.xitu.io/2018/11/7/166ebc3cd2472bcf?imageView2/0/w/1280/h/960/format/webp/ignore-error/1) + +## 2.5 CopyOnWriteArrayList 缺点 + +- 内存占用:如果 CopyOnWriteArrayList 经常要增删改里面的数据,经常要执行`add()、set()、remove()`的话,那是比较耗费内存的。 + - 因为我们知道每次`add()、set()、remove()`这些增删改操作都要**复制一个数组**出来。 +- 数据一致性:CopyOnWrite 容器只能保证数据的最终一致性,不能保证数据的实时一致性。 + - 从上面的例子也可以看出来,比如线程 A 在迭代 CopyOnWriteArrayList 容器的数据。线程 B 在线程 A 迭代的间隙中将 CopyOnWriteArrayList 部分的数据修改了 (已经调用`setArray()`了)。但是线程 A 迭代出来的是原有的数据。 + +引用: + +[**CopyOnWriteArrayList 你都不知道,怎么拿 offer?**](https://juejin.im/post/5be23e6ef265da6135720d61) + +[并发容器之CopyOnWriteArrayList](https://juejin.im/post/5aeeb55f5188256715478c21#heading-0) \ No newline at end of file diff --git a/week_04/60/DelayQueue-60.md b/week_04/60/DelayQueue-60.md new file mode 100644 index 0000000..66c655e --- /dev/null +++ b/week_04/60/DelayQueue-60.md @@ -0,0 +1,137 @@ +DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素,如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。 + +DelayQueue主要用于两个方面: + +- 缓存:清掉缓存中超时的缓存数据 + +- 任务超时处理 + +DelayQueue实现的关键主要有如下几个: + +1. 可重入锁ReentrantLock +2. 用于阻塞和通知的Condition对象 +3. 根据Delay时间排序的优先级队列:PriorityQueue +4. 用于优化阻塞通知的线程元素leader + +# 1. DelayQueue原理分析 + +```java + public class DelayQueue extends AbstractQueue + implements BlockingQueue { + /** + * 可重入锁 + * ReentrantLock 是一个可重入的互斥锁,将由最近成功获得锁, + * 并且还没有释放该锁的线程所拥有,当锁被其他线程获得时, + * 调用 lock 的线程将无法获得锁。 在 DelayQueue 中,只有一个互斥锁 lock。 + **/ + private final transient ReentrantLock lock = new ReentrantLock(); + /** + * 支持优先级的BlockingQueue + * PriorityQueue 是一个优先级队列,每次从队列中取出的是具有最高优先权的元素。 + * 在 DelayQueue 中,因为 E 继承于 Delayed,所以 q 表示一个按照 delayTime 排序的优先级队列, + * 用于存放需要延迟执行的元素。 + **/ + private final PriorityQueue q = new PriorityQueue(); + /** + * 用于优化阻塞 + * 这里的 leader 设计出来是为了 minimize unnecessary timed waiting(减少不必要的等待时间), + * 如何实现的方案会在详细解读中解释。 + * 在 DelayQueue 中 leader 表示一个等待从队列中获取消息的线程。 + **/ + private Thread leader = null; + /** + * Condition + * Condition 是 lock 对象的条件变量,只能和锁 lock 配合使用,用于控制并发程序访问竞争资源的安全。 + * 一个锁 lock 可以有多个条件变量 condition,每个条件上可以有多个线程等待,通过调用 await() 方法,可以让线程在该条件下等待。 + * 当调用 signalAll() 方法,又可以唤醒该条件下的等待的线程。 在 DelayQueue 中 lock 对象只有一个条件变量 available。 + **/ + private final Condition available = lock.newCondition(); + } +``` + +DelayQueue 的元素都必须继承 Delayed 接口。同时也可以从这里初步理清楚 DelayQueue 内部实现的机制了:以支持优先级无界队列的 PriorityQueue 作为一个容器,容器里面的元素都应该实现 Delayed 接口,在每次往优先级队列中添加元素时以元素的过期时间作为排序条件,最先过期的元素放在优先级最高。 + +## 1.1 offer()方法 + +```java + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 向 PriorityQueue中插入元素 + q.offer(e); + // 如果当前元素的对首元素(优先级最高),leader设置为空,唤醒所有等待线程 + if (q.peek() == e) { + leader = null; + available.signal(); + } + // 无界队列,永远返回true + return true; + } finally { + lock.unlock(); + } + } +``` + +offer(E e) 就是往 PriorityQueue 中添加元素。在判断当前元素是否为对首元素时,如果是的话则设置 leader=null,这是非常关键的一个步骤。 + +## 1.2 take() + +```java + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + // 对首元素 + E first = q.peek(); + // 对首为空,阻塞,等待off()操作唤醒 + if (first == null) + available.await(); + else { + // 获取对首元素的超时时间 + long delay = first.getDelay(NANOSECONDS); + // <=0 表示已过期,出对,return + if (delay <= 0) + return q.poll(); + first = null; // don't retain ref while waiting + // leader != null 证明有其他线程在操作,阻塞 + if (leader != null) + available.await(); + else { + // 否则将leader 设置为当前线程,独占 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + // 超时阻塞 + available.awaitNanos(delay); + } finally { + // 释放leader + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // 唤醒阻塞线程 + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } + } +``` + +首先是获取对首元素,如果对首元素的延时时间 delay <= 0 ,则可以出对了,直接 return 即可。否则设置 first = null,这里设置为 null 的主要目的是为了避免内存泄漏。如果 leader != null 则表示当前有线程占用,则阻塞,否则设置 leader 为当前线程,然后调用 awaitNanos() 方法超时等待。 + +**first = null** + +这里为什么如果不设置 first = null,则会引起内存泄漏呢?线程 A 到达,列首元素没有到期,设置 leader = 线程 A,这是线程 B 来了因为 leader != null,则会阻塞,线程 C 一样。假如线程阻塞完毕了,获取列首元素成功,出列。这个时候列首元素应该会被回收掉,但是问题是它还被线程 B、线程 C 持有着,所以不会回收,这里只有两个线程,如果有线程 D、线程 E… 呢?这样会无限期的不能回收,就会造成内存泄漏。 + +这个入队、出对过程和其他的阻塞队列没有很大区别,无非是在出对的时候增加了一个到期时间的判断。同时通过 leader 来减少不必要阻塞。 + +引用: + +[【死磕Java并发】—–J.U.C之阻塞队列:DelayQueue](https://mp.weixin.qq.com/s/0GOVOA0U3f92C5YfOLnGrg) + +[DelayQueue系列(一):源码分析](https://juejin.im/post/5c1f49046fb9a049ad770ad6) \ No newline at end of file -- Gitee