From be0e67d3ab60471532c3bba294c6ae4dfd26c0f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BD=95=E9=A1=BA=E5=B9=B3?= <1078094401@qq.com> Date: Mon, 6 Jan 2020 11:29:40 +0800 Subject: [PATCH] 059_week04 --- week_04/59/ArrayBlockingQueue.md | 269 ++++++++++++++ week_04/59/ConcurrentHashMap.md | 519 ++++++++++++++++++++++++++++ week_04/59/ConcurrentLinkedQueue.md | 230 ++++++++++++ week_04/59/CopyOnWriteArrayList.md | 224 ++++++++++++ week_04/59/DelayQueue.md | 168 +++++++++ 5 files changed, 1410 insertions(+) create mode 100644 week_04/59/ArrayBlockingQueue.md create mode 100644 week_04/59/ConcurrentHashMap.md create mode 100644 week_04/59/ConcurrentLinkedQueue.md create mode 100644 week_04/59/CopyOnWriteArrayList.md create mode 100644 week_04/59/DelayQueue.md diff --git a/week_04/59/ArrayBlockingQueue.md b/week_04/59/ArrayBlockingQueue.md new file mode 100644 index 0000000..a5037da --- /dev/null +++ b/week_04/59/ArrayBlockingQueue.md @@ -0,0 +1,269 @@ +### ArrayBlockingQueue + +本文参考自博客:https://www.cnblogs.com/kexianting/p/8550598.html + +#### 介绍 + +ArrayBlockingQueue是数组实现的线程安全的有界的阻塞队列。 +线程安全是指,ArrayBlockingQueue内部通过“互斥锁”保护竞争资源,实现了多线程对竞争资源的互斥访问。而有界,则是指ArrayBlockingQueue对应的数组是有界限的。 阻塞队列,是指多线程访问竞争资源时,当竞争资源已被某线程获取时,其它要获取该资源的线程需要阻塞等待;而且,ArrayBlockingQueue是按 FIFO(先进先出)原则对元素进行排序,元素都是从尾部插入到队列,从头部开始返回。 + +注意:ArrayBlockingQueue不同于ConcurrentLinkedQueue,ArrayBlockingQueue是数组实现的,并且是有界限的;而ConcurrentLinkedQueue是链表实现的,是无界限的。 + +**说明**: + + 1. ArrayBlockingQueue继承于AbstractQueue,并且它实现了BlockingQueue接口 + 2. ArrayBlockingQueue内部是通过Object[]数组保存数据的,也就是说ArrayBlockingQueue本质上是通过数组实现的。ArrayBlockingQueue的大小,即数组的容量是创建ArrayBlockingQueue时指定的。 + 3. ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象(lock)。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。 + 4. ArrayBlockingQueue与Condition是组合关系,ArrayBlockingQueue中包含两个Condition对象(notEmpty和notFull)。而且,Condition又依赖于ArrayBlockingQueue而存在,通过Condition可以实现对ArrayBlockingQueue的更精确的访问 -- (01)若某线程(线程A)要取数据时,数组正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向数组中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。(02)若某线程(线程H)要插入数据时,数组已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 + +#### ArrayBlockingQueue函数列表 + +```java +// 创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue。 +ArrayBlockingQueue(int capacity) +// 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue。 +ArrayBlockingQueue(int capacity, boolean fair) +// 创建一个具有给定的(固定)容量和指定访问策略的 ArrayBlockingQueue,它最初包含给定 collection 的元素,并以 collection 迭代器的遍历顺序添加元素。 +ArrayBlockingQueue(int capacity, boolean fair, Collection c) + +// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则抛出 IllegalStateException。 +boolean add(E e) +// 自动移除此队列中的所有元素。 +void clear() +// 如果此队列包含指定的元素,则返回 true。 +boolean contains(Object o) +// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。 +int drainTo(Collection c) +// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。 +int drainTo(Collection c, int maxElements) +// 返回在此队列中的元素上按适当顺序进行迭代的迭代器。 +Iterator iterator() +// 将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量),在成功时返回 true,如果此队列已满,则返回 false。 +boolean offer(E e) +// 将指定的元素插入此队列的尾部,如果该队列已满,则在到达指定的等待时间之前等待可用的空间。 +boolean offer(E e, long timeout, TimeUnit unit) +// 获取但不移除此队列的头;如果此队列为空,则返回 null。 +E peek() +// 获取并移除此队列的头,如果此队列为空,则返回 null。 +E poll() +// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。 +E poll(long timeout, TimeUnit unit) +// 将指定的元素插入此队列的尾部,如果该队列已满,则等待可用的空间。 +void put(E e) +// 返回在无阻塞的理想情况下(不存在内存或资源约束)此队列能接受的其他元素数量。 +int remainingCapacity() +// 从此队列中移除指定元素的单个实例(如果存在)。 +boolean remove(Object o) +// 返回此队列中元素的数量。 +int size() +// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。 +E take() +// 返回一个按适当顺序包含此队列中所有元素的数组。 +Object[] toArray() +// 返回一个按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。 + T[] toArray(T[] a) +// 返回此 collection 的字符串表示形式。 +String toString() +``` + +#### ArrayBlockingQueue源码分析 + +**1. 创建** + +```java +public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); +} +``` + +**说明**: +(01) items是保存“阻塞队列”数据的数组。它的定义如下: + +``` +final Object[] items; +``` + +(02) fair是“可重入的独占锁(ReentrantLock)”的类型。fair为true,表示是公平锁;fair为false,表示是非公平锁。 +notEmpty和notFull是锁的两个Condition条件。它们的定义如下: + +``` +final ReentrantLock lock; +private final Condition notEmpty; +private final Condition notFull; +``` + +简单对Condition和Lock的用法进行说明, +Lock的作用是提供独占锁机制,来保护竞争资源;而Condition是为了更加精细的对锁进行控制,它依赖于Lock,通过某个条件对多线程进行控制。 +notEmpty表示“锁的非空条件”。当某线程想从队列中取数据时,而此时又没有数据,则该线程通过notEmpty.await()进行等待;当其它线程向队列中插入了元素之后,就调用notEmpty.signal()唤醒“之前通过notEmpty.await()进入等待状态的线程”。 +同理,notFull表示“锁的满条件”。当某线程想向队列中插入元素,而此时队列已满时,该线程等待;当其它线程从队列中取出元素之后,就唤醒该等待的线程。 + +**2. 添加** + +下面以offer(E e)为例,对ArrayBlockingQueue的添加方法进行说明。 + +```java +public boolean offer(E e) { + // 创建插入的元素是否为null,是的话抛出NullPointerException异常 + checkNotNull(e); + // 获取“该阻塞队列的独占锁” + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 如果队列已满,则返回false。 + if (count == items.length) + return false; + else { + // 如果队列未满,则插入e,并返回true。 + insert(e); + return true; + } + } finally { + // 释放锁 + lock.unlock(); + } +} +``` + +**说明**:offer(E e)的作用是将e插入阻塞队列的尾部。如果队列已满,则返回false,表示插入失败;否则,插入元素,并返回true。 + +**3. 取出** + +下面以take()为例,对ArrayBlockingQueue的取出方法进行说明。 + +```java +public E take() throws InterruptedException { + // 获取“队列的独占锁” + final ReentrantLock lock = this.lock; + // 获取“锁”,若当前线程是中断状态,则抛出InterruptedException异常 + lock.lockInterruptibly(); + try { + // 若“队列为空”,则一直等待。 + while (count == 0) + notEmpty.await(); + // 取出元素 + return extract(); + } finally { + // 释放“锁” + lock.unlock(); + } +} +``` + +**说明**:take()的作用是取出并返回队列的头。若队列为空,则一直等待。 + +```java +private E extract() { + final Object[] items = this.items; + // 强制将元素转换为“泛型E” + E x = this.cast(items[takeIndex]); + // 将第takeIndex元素设为null,即删除。同时,帮助GC回收。 + items[takeIndex] = null; + // 设置“下一个被取出元素的索引” + takeIndex = inc(takeIndex); + // 将“队列中元素数量”-1 + --count; + // 唤醒notFull上的等待线程。 + notFull.signal(); + return x; +} +``` + +**4. 遍历** + +下面对ArrayBlockingQueue的遍历方法进行说明。 + +```java +public Iterator iterator() { + return new Itr(); +} + +``` + +Itr是实现了Iterator接口的类,它的源码如下: + +```java +private class Itr implements Iterator { + // 队列中剩余元素的个数 + private int remaining; // Number of elements yet to be returned + // 下一次调用next()返回的元素的索引 + private int nextIndex; // Index of element to be returned by next + // 下一次调用next()返回的元素 + private E nextItem; // Element to be returned by next call to next + // 上一次调用next()返回的元素 + private E lastItem; // Element returned by last call to next + // 上一次调用next()返回的元素的索引 + private int lastRet; // Index of last element returned, or -1 if none + + Itr() { + // 获取“阻塞队列”的锁 + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + lastRet = -1; + if ((remaining = count) > 0) + nextItem = itemAt(nextIndex = takeIndex); + } finally { + // 释放“锁” + lock.unlock(); + } + } + + public boolean hasNext() { + return remaining > 0; + } + + public E next() { + // 获取“阻塞队列”的锁 + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + // 若“剩余元素<=0”,则抛出异常。 + if (remaining <= 0) + throw new NoSuchElementException(); + lastRet = nextIndex; + // 获取第nextIndex位置的元素 + E x = itemAt(nextIndex); // check for fresher value + if (x == null) { + x = nextItem; // we are forced to report old value + lastItem = null; // but ensure remove fails + } + else + lastItem = x; + while (--remaining > 0 && // skip over nulls + (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) + ; + return x; + } finally { + lock.unlock(); + } + } + + public void remove() { + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + int i = lastRet; + if (i == -1) + throw new IllegalStateException(); + lastRet = -1; + E x = lastItem; + lastItem = null; + // only remove if item still at index + if (x != null && x == items[i]) { + boolean removingHead = (i == takeIndex); + removeAt(i); + if (!removingHead) + nextIndex = dec(nextIndex); + } + } finally { + lock.unlock(); + } + } +} +``` + diff --git a/week_04/59/ConcurrentHashMap.md b/week_04/59/ConcurrentHashMap.md new file mode 100644 index 0000000..49631ef --- /dev/null +++ b/week_04/59/ConcurrentHashMap.md @@ -0,0 +1,519 @@ +### ConcurrentHashMap + +本文参考自博客:https://www.jianshu.com/p/5dbaa6707017 + +#### 简要概述 + +ConcurrentHashMap是Java中的一个**线程安全且高效的HashMap实现**。平时涉及高并发如果要用map结构,那第一时间想到的就是它。 + +从以下几个方面学习 + +``` +1)ConcurrentHashMap在JDK8里结构 +2)ConcurrentHashMap的put方法、szie方法等 +3)ConcurrentHashMap的扩容 +4)HashMap、Hashtable、ConccurentHashMap三者的区别 +5)ConcurrentHashMap在JDK7和JDK8的区别 +``` + +其实和 1.8 HashMap 结构类似,当**链表节点数超过指定阈值**的话,也是会转换成**红黑树**的,大体结构也是一样的。 + +**那么它到底是如何实现线程安全的?** + 答案:其中抛弃了原有的**Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性**。至于如何实现,继续看一下put方法逻辑 + +### put方法的逻辑 + +ConcurrentHashMap最常用的方法也就是put方法和get方法,那么下面主要看代码注释,便于理解。 + +```java +/** Implementation for put and putIfAbsent */ +final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); + //1. 计算key的hash值 + int hash = spread(key.hashCode()); + int binCount = 0; + for (Node[] tab = table;;) { + Node f; int n, i, fh; + //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化 + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + //4. 当前正在扩容 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + synchronized (f) { + if (tabAt(tab, i) == f) { + //5. 当前为链表,在链表中插入新的键值对 + if (fh >= 0) { + binCount = 1; + for (Node e = f;; ++binCount) { + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + // 6.当前为红黑树,将新的键值对插入到红黑树中 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树 + if (binCount != 0) { + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 + addCount(1L, binCount); + return null; +} + +``` + +这个put的过程很清晰,对当前的table进行**无条件自循环直到put成功**,可以分成以下六步流程来概述:1、判断Node[]数组是否初始化,没有则**进行初始化操作** +2、通过**hash定位数组的索引坐标**,是否有Node节点,如果没有则使用CAS进行添加(链表的头节点),添加失败则进入下次循环。 +3、检查到内部正在扩容,就帮助它一块扩容。 +4、如果f!=null,则**使用synchronized锁住**f元素(链表/红黑树的头元素)。如果是Node(链表结构)则执行链表的添加操作;如果是TreeNode(树型结构)则执行树添加操作。 +5、判断链表长度已经达到临界值8(默认值),当节点超过这个值就需要**把链表转换为树结构**。 +6、如果添加成功就调用addCount()方法统计size,并且检查是否需要扩容 + +##### 1.spread(key,hashCode()) + +```java +static final int spread(int h) { + return (h ^ (h >>> 16)) & HASH_BITS; +} +``` + +该方法主要是**将key的hashCode的低16位于高16位进行异或运算**,这样不仅能够使得hash值能够分散能够均匀减小hash冲突的概率,另外只用到了异或运算,在性能开销上也能兼顾。 + +##### 2.initTable方法 + +主要作用将tab进行初始化 + +```kotlin +private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + if ((sc = sizeCtl) < 0) + // 1. 保证只有一个线程正在进行初始化操作 + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + if ((tab = table) == null || tab.length == 0) { + // 2. 得出数组的大小 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + // 3. 这里才真正的初始化数组 + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + // 4. 计算数组中可用的大小:实际大小n*0.75(加载因子) + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; + } + break; + } + } + return tab; +} +``` + +为了保证能够正确初始化,在第1步中会先通过if进行判断,**若当前已经有一个线程正在初始化即sizeCtl值变为-1**,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。另外还需要注意的事情是,在第四步中会进一步计算数组中可用的大小即为数组实际大小n乘以加载因子0.75.可以看看这里乘以0.75是怎么算的,0.75为四分之三,**这里n - (n >>> 2)是不是刚好是n-(1/4)n=(3/4)n**,挺有意思的吧:)。如果选择是无参的构造器的话,这里在new Node数组的时候会使用默认大小为DEFAULT_CAPACITY(16),然后乘以加载因子0.75为12,也就是说数组的可用大小为12。 + +##### 3.CAS关键操作 + +tabAt()该方法用来**获取table数组中索引为i的Node元素**。 +casTabAt()利用**CAS操作设置table数组中索引为i的元素** +setTabAt()该方法用来设置table数组中索引为i的元素 + +##### 4.ConcurrentHashMap的扩容 + +通过判断该节点的hash值是不是等于-1(MOVED),**代码为(fh = f.hash) == MOVED,说明 Map 正在扩容**。那么就帮助 Map 进行扩容。以加快速度。 +如何帮助扩容呢?那要看看 helpTransfer 方法的实现。 + +```kotlin +/** + * Helps transfer if a resize is in progress. + */ +final Node[] helpTransfer(Node[] tab, Node f) { + Node[] nextTab; int sc; + // 如果 table 不是空 且 node 节点是转移类型,数据检验 + // 且 node 节点的 nextTable(新 table) 不是空,同样也是数据校验 + // 尝试帮助扩容 + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { + // 根据 length 得到一个标识符号 + int rs = resizeStamp(tab.length); + // 如果 nextTab 没有被并发修改 且 tab 也没有被并发修改 + // 且 sizeCtl < 0 (说明还在扩容) + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { + // 如果 sizeCtl 无符号右移 16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了) + // 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)(默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1) + // 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535) + // 或者转移下标正在调整 (扩容结束) + // 结束循环,返回 table + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + // 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容) + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { + // 进行转移 + transfer(tab, nextTab); + // 结束循环 + break; + } + } + return nextTab; + } + return table; +} +``` + +基本逻辑已在代码注释中,这里关键transfer(),那么我们继续深入了解一下 + +```java +private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + // 每核处理的量小于16,则强制赋值16 + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) + stride = MIN_TRANSFER_STRIDE; // subdivide range + if (nextTab == null) { // initiating + try { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n << 1]; //构建一个nextTable对象,其容量为原来容量的两倍 + nextTab = nt; + } catch (Throwable ex) { // try to cope with OOME + sizeCtl = Integer.MAX_VALUE; + return; + } + nextTable = nextTab; + transferIndex = n; + } + int nextn = nextTab.length; + // 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab) + ForwardingNode fwd = new ForwardingNode(nextTab); + // 当advance == true时,表明该节点已经处理过了 + boolean advance = true; + boolean finishing = false; // to ensure sweep before committing nextTab + for (int i = 0, bound = 0;;) { + Node f; int fh; + // 控制 --i ,遍历原hash表中的节点 + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) + advance = false; + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + // 用CAS计算得到的transferIndex + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + // 已经完成所有节点复制了 + if (finishing) { + nextTable = null; + table = nextTab; // table 指向nextTable + sizeCtl = (n << 1) - (n >>> 1); // sizeCtl阈值为原来的1.5倍 + return; // 跳出死循环, + } + // CAS 更扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) + return; + finishing = advance = true; + i = n; // recheck before commit + } + } + // 遍历的节点为null,则放入到ForwardingNode 指针节点 + else if ((f = tabAt(tab, i)) == null) + advance = casTabAt(tab, i, null, fwd); + // f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了 + // 这里是控制并发扩容的核心 + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + // 节点加锁 + synchronized (f) { + // 节点复制工作 + if (tabAt(tab, i) == f) { + Node ln, hn; + // fh >= 0 ,表示为链表节点 + if (fh >= 0) { + // 构造两个链表 一个是原链表 另一个是原链表的反序排列 + int runBit = fh & n; + Node lastRun = f; + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + ln = new Node(ph, pk, pv, ln); + else + hn = new Node(ph, pk, pv, hn); + } + // 在nextTable i 位置处插上链表 + setTabAt(nextTab, i, ln); + // 在nextTable i + n 位置处插上链表 + setTabAt(nextTab, i + n, hn); + // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了 + setTabAt(tab, i, fwd); + // advance = true 可以执行--i动作,遍历节点 + advance = true; + } + // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致 + else if (f instanceof TreeBin) { + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + // 扩容后树节点个数若<=6,将树转链表 + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + } + } + } + } + } +``` + +扩容过程有点复杂,可以查看上面注释。这里主要涉及到多线程并发扩容,ForwardingNode的作用就是支持扩容操作,将已处理的节点和空节点置为ForwardingNode,并发处理时多个线程经过ForwardingNode就表示已经遍历了,就往后遍历。 + +##### 5.treeifyBin()链表转红黑树的过程 + +```java +private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { + //如果整个table的数量小于64,就扩容至原来的一倍,不转红黑树了 + //因为这个阈值扩容可以减少hash冲突,不必要去转红黑树 + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) + tryPresize(n << 1); + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { + synchronized (b) { + if (tabAt(tab, index) == b) { + TreeNode hd = null, tl = null; + for (Node e = b; e != null; e = e.next) { + //封装成TreeNode + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) + hd = p; + else + tl.next = p; + tl = p; + } + //通过TreeBin对象对TreeNode转换成红黑树 + setTabAt(tab, index, new TreeBin(hd)); + } + } + } + } +} +``` + +#### addCount()方法计算ConcurrentHashMap的size + +```csharp +private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + //更新baseCount,table的数量,counterCells表示元素个数的变化 + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { + CounterCell a; long v; int m; + boolean uncontended = true; + //如果多个线程都在执行,则CAS失败,执行fullAddCount,全部加入count + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); + } + //check>=0表示需要进行扩容操作 + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + //当前线程发起库哦哦让操作,nextTable=null + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } +} +``` + +put的流程现在已经分析完了,你可以从中发现,他在并发处理中使用的是乐观锁,当有冲突的时候才进行并发处理,而且流程步骤很清晰,但是细节设计的很复杂,毕竟多线程的场景也复杂. + +##### get方法 + +1.计算hash值,定位到该table索引位置,如果是首节点符合就返回。 +2.如果遇到扩容的时候,会调用标志正在扩容节点ForwardingNode的find方法,查找该节点,匹配就返回。 +3.以上都不符合的话,就往下遍历节点,匹配就返回,否则最后就返回null + +```kotlin +public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); //计算两次hash + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) {//读取首节点的Node元素 + if ((eh = e.hash) == h) { //如果该节点就是首节点就返回 + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + //hash值为负值表示正在扩容,这个时候查的是ForwardingNode的find方法来定位到nextTable来 + //查找,查找到就返回 + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + while ((e = e.next) != null) {//既不是首节点也不是ForwardingNode,那就往下遍历 + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; +} +``` + +##### szie方法的逻辑 + +对于size的计算,在扩容和addCount()方法就已经有处理了,**可以注意一下Put函数,里面就有addCount()函数**,早就计算好的,然后你size的时候直接给你. + +```csharp +public int size() { + long n = sumCount(); + return ((n < 0L) ? 0 : + (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : + (int)n); +} +final long sumCount() { + CounterCell[] as = counterCells; CounterCell a; //变化的数量 + long sum = baseCount; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; +} +``` + +#### 总结 + +##### HashMap、Hashtable、ConccurentHashMap三者的区别 + +HashMap线程不安全,数组+链表+红黑树 +Hashtable线程安全,锁住整个对象,数组+链表 +ConccurentHashMap线程安全,CAS+同步锁,数组+链表+红黑树 +HashMap的key,value均可为null,其他两个不行。 + +##### 在JDK1.7和JDK1.8中的区别 + +在JDK1.8主要设计上的改进有以下几点: + +1、**不采用segment而采用node,锁住node来实现减小锁粒度**。 + 2、设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize。 + 3、使用3个**CAS操作来确保node的一些操作的原子性**,这种方式代替了锁。 + 4、sizeCtl的不同值来代表不同含义,起到了控制的作用。 + **采用synchronized而不是ReentrantLock** + diff --git a/week_04/59/ConcurrentLinkedQueue.md b/week_04/59/ConcurrentLinkedQueue.md new file mode 100644 index 0000000..af2e2cd --- /dev/null +++ b/week_04/59/ConcurrentLinkedQueue.md @@ -0,0 +1,230 @@ +### ConcurrentLinkedQueue + +本文参考自博客:https://blog.csdn.net/qq_38293564/article/details/80798310 + +#### 简介 + +在并发编程中我们有时候需要使用线程安全的队列。如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,而非阻塞的实现方式则可以使用循环CAS的方式来实现,下面我们一起来研究下Doug Lea是如何使用非阻塞的方式来实现线程安全队列ConcurrentLinkedQueue的。 + +ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法来实现,该算法在Michael & Scott算法上进行了一些修改。 + +ConcurrentLinkedQueue由head节点和tail节点组成,每个节点(Node)由节点元素(item)和指向下一个节点的引用(next)组成,节点与节点之间就是通过这个next关联起来,从而组成一张链表结构的队列。 + +ConcurrrentLinkedQueue只实现了Queue接口,没有实现BlockingQueue接口,所以他不是阻塞队列,但是他是线程安全的,所以他可以用于线程安全环境中。 + +### ConcurrentLinkedQueue源码详解 + +我们前面介绍了,ConcurrentLinkedQueue的节点都是Node类型的: + +```java +private static class Node { + volatile E item; + volatile Node next; + + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + } catch (Exception e) { + throw new Error(e); + } + } +} +``` + +Node类也比较简单,ConcurrentLinkedQueue类有下面两个构造方法: + +```java +// 默认构造方法,head节点存储的元素为空,tail节点等于head节点 +public ConcurrentLinkedQueue() { + head = tail = new Node(null); +} + +// 根据其他集合来创建队列 +public ConcurrentLinkedQueue(Collection c) { + Node h = null, t = null; + // 遍历节点 + for (E e : c) { + // 若节点为null,则直接抛出NullPointerException异常 + checkNotNull(e); + Node newNode = new Node(e); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + t = newNode; + } + } + if (h == null) + h = t = new Node(null); + head = h; + tail = t; +} +``` + +默认情况下head节点存储的元素为空,tail节点等于head节点。 + +```java +head = tail = new Node(null); +``` + +#### 入队出队操作 + +入队列就是将入队节点添加到队列的尾部。 + +添加元素1:队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。 +添加元素2:队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。 +添加元素3:设置tail节点的next节点为元素3节点。 +添加元素4:设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。 + +因为它不是阻塞队列,所以只有两个入队方法,add(e)何offe(e). + +因为是无界队列,所以add方法也不用抛出异常了。 + +```java +public boolean add(E e) { + return offer(e); +} + +public boolean offer(E e) { + // 如果e为null,则直接抛出NullPointerException异常 + checkNotNull(e); + // 创建入队节点 + final Node newNode = new Node(e); + + // 循环CAS直到入队成功 + // 1、根据tail节点定位出尾节点(last node);2、将新节点置为尾节点的下一个节点;3、casTail更新尾节点 + for (Node t = tail, p = t;;) { + // p用来表示队列的尾节点,初始情况下等于tail节点 + // q是p的next节点 + Node q = p.next; + // 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null + // 如果p是尾节点 + if (q == null) { + // p is last node + // 设置p节点的下一个节点为新节点,设置成功则casNext返回true;否则返回false,说明有其他线程更新过尾节点 + if (p.casNext(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this queue, + // and for newNode to become "live". + // 如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点 + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + return true; + } + // Lost CAS race to another thread; re-read next + } + // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head + // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点 + else if (p == q) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; + // 寻找尾节点 + else + // Check for tail updates after two hops. + p = (p != t && t != (t = tail)) ? t : q; + } +} +``` + +##### 出队 + +因为他不是阻塞队列,所以只有两个出队的方法。remove和poll + +```java +public E poll() { + restartFromHead: + for (;;) { + // p节点表示首节点,即需要出队的节点 + for (Node h = head, p = h, q;;) { + E item = p.item; + + // 如果p节点的元素不为null,则通过CAS来设置p节点引用的元素为null,如果成功则返回p节点的元素 + if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + // 如果p != h,则更新head + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。 + // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了 + else if ((q = p.next) == null) { + // 更新头结点 + updateHead(h, p); + return null; + } + // p == q,则使用新的head重新开始 + else if (p == q) + continue restartFromHead; + // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 + else + p = q; + } + } +} +public E remove(){ + E x = poll(); + if(x != null) + return x; + else + throw new NoSuchElementException(); +} +``` + +该方法的主要逻辑就是首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。 + +在弹出一个节点之后,tail节点有一条指向自己的虚线,这是什么意思呢?我们来看poll()方法,在该方法中,移除元素之后,会调用updateHead方法: + +```java +final void updateHead(Node h, Node p) { + if (h != p && casHead(h, p)) + // 将旧的头结点h的next域指向为h + h.lazySetNext(h); +} +``` + +如果这时,再有一个线程来添加元素,通过tail获取的next节点则仍然是它本身,这就出现了p == q的情况,出现该种情况之后,则会触发执行head的更新,将p节点重新指向为head,所有“活着”的节点(指未删除节点),都能从head通过遍历可达,这样就能通过head成功获取到尾节点,然后添加元素了。 +出队的逻辑: + +1.定位头节点,尝试将其更新为Null; + +2.如果成功,则成功出队,否则重新查找头节点,并重试。 + +4.出队过程没有一点阻塞相关代码,所以出队不会阻塞线程,没找到元素就返回Null; + +#### 总结 + +1.ConcurrentLinkedQueue不是阻塞队列。 + +2.ConcurrentLinkedQueue不能用在线程池中; + +3.ConcurrentLinkedQueue使用CAS+自旋更新头尾节点和控制出队入队操作。 \ No newline at end of file diff --git a/week_04/59/CopyOnWriteArrayList.md b/week_04/59/CopyOnWriteArrayList.md new file mode 100644 index 0000000..8e627af --- /dev/null +++ b/week_04/59/CopyOnWriteArrayList.md @@ -0,0 +1,224 @@ +### CopyOnWriteArrayList + +#### 简要概述 + +- 实现了List接口 +- 内部持有一个ReentrantLock lock = new ReentrantLock(); +- 底层是用volatile transient声明的数组 array +- 读写分离,写时复制出一个新的数组,完成插入、修改或者移除操作后将新数组赋值给array + +#### 主要属性 + +``` +//可重入锁 +final transient ReentrantLock lock = new ReentrantLock(); +//一个被volatile修饰的变量数组 +private transient volatile Object[] array; +``` + +#### get、set和构造方法 + +``` +final Object[] getArray() { return array;} +final void setArray(Object[] a) { + array = a; + } +//空的构造方法 +public CopyOnWriteArrayList() { + setArray(new Object[0]); + } +//根据传入的集合的构造方法, +public CopyOnWriteArrayList(Collection c) { + Object[] elements; + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + else { + elements = c.toArray(); + // c.toArray might (incorrectly) not return Object[] (see 6260652) + if (elements.getClass() != Object[].class) + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); + } +//根据传入的集合构造方法 +public CopyOnWriteArrayList(E[] toCopyIn) { + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); + } +//获取list的长度 +public int size() { + return getArray().length; + } + //判空 +public boolean isEmpty() { + return size() == 0; + } + //判断两个对象是否相等 +private static boolean eq(Object o1, Object o2) { + return (o1 == null) ? o2 == null : o1.equals(o2); + } +//在索引index和fence中间查找元素等于o的下表 +private static int indexOf(Object o, Object[] elements, + int index, int fence) { + if (o == null) { + for (int i = index; i < fence; i++) + if (elements[i] == null) + return i; + } else { + for (int i = index; i < fence; i++) + if (o.equals(elements[i])) + return i; + } + return -1; + } +//在索引index处设置元素element +public E set(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + E oldValue = get(elements, index); + + if (oldValue != element) { + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len); + newElements[index] = element; + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; + } finally { + lock.unlock(); + } + } +``` + +#### 操作元素的方法 + +1.增加一个元素 + +```java +public boolean add(E e) { + final ReentrantLock lock = this.lock; + //获得锁 + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + //复制一个新的数组 + Object[] newElements = Arrays.copyOf(elements, len + 1); + //插入新值 + newElements[len] = e; + //将新的数组指向原来的引用 + setArray(newElements); + return true; + } finally { + //释放锁 + lock.unlock(); + } +} + + +public void add(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + if (index > len || index < 0) + throw new IndexOutOfBoundsException("Index: "+index+ + ", Size: "+len); + Object[] newElements; + int numMoved = len - index; + if (numMoved == 0) + newElements = Arrays.copyOf(elements, len + 1); + else { + newElements = new Object[len + 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index, newElements, index + 1, + numMoved); + } + newElements[index] = element; + setArray(newElements); + } finally { + lock.unlock(); + } +} +``` + +2.删除 + +```java +public E remove(int index) { + final ReentrantLock lock = this.lock; + //获得锁 + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + E oldValue = get(elements, index); + int numMoved = len - index - 1; + if (numMoved == 0) + //如果删除的元素是最后一个,直接复制该元素前的所有元素到新的数组 + setArray(Arrays.copyOf(elements, len - 1)); + else { + //创建新的数组 + Object[] newElements = new Object[len - 1]; + //将index+1至最后一个元素向前移动一格 + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index + 1, newElements, index, + numMoved); + setArray(newElements); + } + return oldValue; + } finally { + lock.unlock(); + } +} +``` + +3.修改 + +```java +public E set(int index, E element) { + final ReentrantLock lock = this.lock; + //获得锁 + lock.lock(); + try { + Object[] elements = getArray(); + E oldValue = get(elements, index); + + if (oldValue != element) { + int len = elements.length; + //创建新数组 + Object[] newElements = Arrays.copyOf(elements, len); + //替换元素 + newElements[index] = element; + //将新数组指向原来的引用 + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; + } finally { + //释放锁 + lock.unlock(); + } +} +``` + +4.获取一个元素 + +```java +//直接获取index对应的元素 +public E get(int index) {return get(getArray(), index);} +private E get(Object[] a, int index) {return (E) a[index];} +``` + +#### 总结 + +​ 从代码可以看到,增删改都需要获得锁,并且锁只有一把,而读操作不需要获得锁,支持并发。为什么增删改中都需要创建一个新的数组,操作完成之后再赋给原来的引用?这是为了保证get的时候都能获取到元素,如果在增删改过程直接修改原来的数组,可能会造成执行读操作获取不到数据。 + +​ Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况。 \ No newline at end of file diff --git a/week_04/59/DelayQueue.md b/week_04/59/DelayQueue.md new file mode 100644 index 0000000..87164bd --- /dev/null +++ b/week_04/59/DelayQueue.md @@ -0,0 +1,168 @@ +### DelayQueue + +#### 介绍 + +DelayQueue是java并发包下的阻塞队列,常用于实现定时任务。 + +DelayQueue实现了BlockingQueue,它是一个阻塞队列。 + +另外,DelayQueue组合了一个Delayed接口,DelayQueue中存储的元素必须实现Delayed接口。 + +#### 源码分析 + +##### Delayed + +```java +public interface Delayed extends Comparable { + + /** + * Returns the remaining delay associated with this object, in the + * given time unit. + * + * @param unit the time unit + * @return the remaining delay; zero or negative values indicate + * that the delay has already elapsed + */ + long getDelay(TimeUnit unit);//表示还有多少时间到期,到期应返回小于等于0的数值 +} + +``` + +##### DelayQueue主要属性 + +```java +//用于控制并发的可重入锁 +private final transient ReentrantLock lock = new ReentrantLock(); +//优先级队列 + private final PriorityQueue q = new PriorityQueue(); +//用于标记当前是否有线程排队 + private Thread leader = null; +//条件,用于表示现在是否有可取元素 + private final Condition available = lock.newCondition(); +``` + +##### DelayQueue主要构造方法 + +```java +public DelayQueue() {}//无参构造 +public DelayQueue(Collection c) {//有参构造集合 + this.addAll(c); + } +``` + +##### 入队 + +因为DelayQueue是阻塞队列,且优先级队列是无界的,所以入队不会阻塞超时,因此它的是个入队方法是一样的。 + +```java +public boolean add(E e) { + return offer(e); + } +public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.offer(e); + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); + } + } +public void put(E e) { + offer(e); + } +public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); + } +``` + +入队步骤: + +1.加锁。 + +2.添加元素到优先级队列中。 + +3.如果添加的元素是堆顶元素,就把leader置为空并唤醒等待在条件available上的线程。 + +4.解锁。 + +##### 出队 + +因为DelayQueue是阻塞队列,所以它的出队有四个不同的方法,抛异常,阻塞,不阻塞,有超时。 + +```java +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + return q.poll(); + } finally { + lock.unlock(); + } + } +``` + +poll方法逻辑 + +1.加锁,检查第一个元素,如果为空或者还没到期,就返回Null + +2.如果第一个元素已到期,则调用优先级队列的poll方法弹出第一个元素。然后解锁。 + +```java +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) //没有元素,让出线程,等待java.lang.Thread.State#WAITING + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) // 已到期,元素出队 + return q.poll(); + first = null; // don't retain ref while waiting + if (leader != null) + available.await();// 其它线程在leader线程TIMED_WAITING期间,会进入等待状态,这样可以只有一个线程去等待到时唤醒,避免大量唤醒操作 + else { + Thread thisThread = Thread.currentThread(); leader = thisThread; + try { + available.awaitNanos(delay); + // 等待剩余时间后,再尝试获取元素,他在等待期间,由于leader是当前线程,所以其它线程会等待。 + } + finally { + if (leader == thisThread) leader = null; } } } } } finally { + if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } +``` + +take()方法比较复杂。 + +1.加锁,判断堆顶元素是否为空,为空的话直接阻塞等待; + +2.判断堆顶元素是否到期,到期则调用优先级队列poll方法弹出元素。 + +3.没到期,再判断前面是否有其他线程等待,有则直接等待 + +4.前面没有其他线程等待,则把自己当作递易个线程,等delay时间后唤醒,再尝试获取元素。 + +5.获取元素之后再唤醒下一个等待线程。 + +6解锁。 + +#### 总结 + +1.DelayQueue是阻塞队列; + +2.DelayQueue内存存储结构使用优先级队列; + +3.DelayQueue使用重入锁和条件来控制并发安全。 + +4.DelayQueue常用于定时任务。 \ No newline at end of file -- Gitee