diff --git a/week_04/22/ArrayBlockingQueue-22.md b/week_04/22/ArrayBlockingQueue-22.md new file mode 100644 index 0000000000000000000000000000000000000000..aece41a4b9a3c6a41dab4a1f65bf03538afb3a9b --- /dev/null +++ b/week_04/22/ArrayBlockingQueue-22.md @@ -0,0 +1,191 @@ +/** + * 2020年1月4日09:00:35 + * 【ArrayBlockingQueue 】一个环形的队列FIFO, + * -items : Object[] + * -putIndex : int,表示当前有多少个数据 + * -takeIndex : int,表示队列拿出来的数量 + * -lock : ReentrantLock + * -notEmpty : Condition + * -notFull : Condition + * + * 【参考资料】 + * https://blog.csdn.net/mayongzhan_csdn/article/details/80888655 + * + * + * ArrayBlockingQueue的几个成员如下: + * final Object[] items;//final保证引用不会被修改 + * int takeIndex; + * int putIndex + * int count; + * + */ + + + /** + * 【简结】 + 1)ArrayBlockingQueue是有界的阻塞队列,不接受null; + 2)底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列; + 3)所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的。 + 4)阻塞的场景: + 1获取lock锁, + 2进入和取出还要满足condition 满了或者空了都等待出队和加入唤醒,ArrayBlockingQueue我们主要是put和take真正用到的阻塞方法(条件不满足)。 + 5)成员cout /putIndex、takeIndex是共享的,所以一些查询方法size、peek、toString、方法也是加上锁保证线程安全,但没有了并发损失了性能。 + 6)remove(Object obj) 返回了第一个equals的Object + */ +public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); +} + + +/*** + * add方法 + * 调用父类AbstractQueue 最终调用自身的offer(e) offer一次不成功就抛出异常,否则成功返回 + */ +public boolean add(E e) { + return super.add(e); +} +public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); +} + +//offer(e)方法 +public boolean offer(E e) { + checkNotNull(e); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (count == items.length) + return false; + else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } +} + + +/** + * 队列满了直接返回false + * 队列没满items[putIndex] = data;达到数组长度重置putIndex,达到环形队列目的 + */ +private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + items[putIndex] = x; + if (++putIndex == items.length) + putIndex = 0; + count++; + notEmpty.signal(); +} + +//put(e)方法 +public void put(E e) throws InterruptedException { + checkNotNull(e); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == items.length) + notFull.await(); + enqueue(e); + } finally { + lock.unlock(); + } +} + + + +/*** + * 这里使用的lock.lockInterruptibly() 之前没有说,当前线程如果调用了Thread.interrupt()方法,那么lockInterruptible()判断的Thread.interrupted()聚会成立,就会抛出异常,其实就是线程中断,该方法就抛出异常。 + * 跟offer(e)不同的是当队列满了执行notFull.await() 当前线程进入了条件队列,稍后会挂起,等待被唤醒不熟悉的看我之前的条件锁 + * 一旦被唤醒说明队列不满,将数据添加到队列中 + * 【offer(e,timeout)】 + */ +public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { + + checkNotNull(e); + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == items.length) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + enqueue(e); + return true; + } finally { + lock.unlock(); + } +} + +//peek()方法 +public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return itemAt(takeIndex); // null when queue is empty + } finally { + lock.unlock(); + } +} +final E itemAt(int i) { + return (E) items[i]; +} + +//size()方法 +public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return count; + } finally { + lock.unlock(); + } +} + +//remainingCapacity()方法 +public int remainingCapacity() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return items.length - count; + } finally { + lock.unlock(); + } +} + +//contains(Object obj)方法 +public boolean contains(Object o) { + if (o == null) return false; + final Object[] items = this.items; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (count > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + if (o.equals(items[i])) + return true; + if (++i == items.length) + i = 0; + } while (i != putIndex); + } + return false; + } finally { + lock.unlock(); + } +} diff --git a/week_04/22/ConcurrentHashMap-22.md b/week_04/22/ConcurrentHashMap-22.md new file mode 100644 index 0000000000000000000000000000000000000000..350f1811fb79a2d676554669b6881f0f7eaf16b7 --- /dev/null +++ b/week_04/22/ConcurrentHashMap-22.md @@ -0,0 +1,621 @@ +import java.io.Serializable; +import java.util.AbstractMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; + + + +/** + * 2020年1月2日09:49:58 【参考资料】 + * https://blog.csdn.net/weixin_44460333/article/details/86770169 + * https://blog.csdn.net/sihai12345/article/details/79383766 + * https://mp.weixin.qq.com/s/_Bf6XcH51lssC0mdF_oW9A + * + * + * 【Java8 ConcurrentHashMap】 + * + * ConcurrentHashMap和 HashMap 非常类似,唯一的区别就是其中的核心数据如 value ,以及链表都是 volatile 修饰的,保证了获取时的可见性。 + * 原理上来说:ConcurrentHashMap 采用了分段锁技术,其中 Segment 继承于 ReentrantLock。 + * 不会像 HashTable 那样不管是 put 还是 get 操作都需要做同步处理,理论上 ConcurrentHashMap 支持 CurrencyLevel (Segment 数组数量)的线程并发。 + * 每当一个线程占用锁访问一个 Segment 时,不会影响到其他的 Segment。 + * + * + * 【各种锁简介】 + * (1)synchronized + * java中的关键字,内部实现为监视器锁,主要是通过对象监视器在对象头中的字段来表明的。 +synchronized从旧版本到现在已经做了很多优化了,在运行时会有三种存在方式:偏向锁,轻量级锁,重量级锁。 +偏向锁,是指一段同步代码一直被一个线程访问,那么这个线程会自动获取锁,降低获取锁的代价。 +轻量级锁,是指当锁是偏向锁时,被另一个线程所访问,偏向锁会升级为轻量级锁,这个线程会通过自旋的方式尝试获取锁,不会阻塞,提高性能。 +重量级锁,是指当锁是轻量级锁时,当自旋的线程自旋了一定的次数后,还没有获取到锁,就会进入阻塞状态,该锁升级为重量级锁,重量级锁会使其他线程阻塞,性能降低。 + +(2)CAS +CAS,Compare And Swap,它是一种乐观锁,认为对于同一个数据的并发操作不一定会发生修改,在更新数据的时候,尝试去更新数据,如果失败就不断尝试。 + +(3)volatile(非锁) +java中的关键字,当多个线程访问同一个变量时,一个线程修改了这个变量的值,其他线程能够立即看得到修改的值。(这里牵涉到java内存模型的知识,感兴趣的同学可以自己查查相关资料) +volatile只保证可见性,不保证原子性,比如 volatile修改的变量 i,针对i++操作,不保证每次结果都正确,因为i++操作是两步操作,相当于 i = i +1,先读取,再加1,这种情况volatile是无法保证的。 + +(4)自旋锁 +自旋锁,是指尝试获取锁的线程不会阻塞,而是循环的方式不断尝试,这样的好处是减少线程的上下文切换带来的开锁,提高性能,缺点是循环会消耗CPU。 + +(5)分段锁 +分段锁,是一种锁的设计思路,它细化了锁的粒度,主要运用在ConcurrentHashMap中,实现高效的并发操作,当操作不需要更新整个数组时,就只锁数组中的一项就可以了。 + +(6)ReentrantLock +可重入锁,是指一个线程获取锁之后再尝试获取锁时会自动获取锁,可重入锁的优点是避免死锁。 +其实,synchronized也是可重入锁。 + */ + +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable { + private static final long serialVersionUID = 7249069246763182397L; + + /** + * 【核心成员变量】 + * 由 Segment 数组、HashEntry 组成,和 HashMap 一样,仍然是数组加链表。 + * Segment 数组,存放数据时首先需要定位到具体的 Segment 中。 + */ + final Segment[] segments; + transient Set keySet; + transient Set> entrySet; + static final class Segment extends ReentrantLock implements Serializable { + //和 HashMap 中的 HashEntry 作用一样,真正存放数据的桶 + private static final long serialVersionUID = 1L; + transient volatile HashEntry[] table; + transient int count; + transient int modCount; + transient int threshold; + final float loadFactor; + static final class HashEntry{ + final int hash; + final K key; + volatile V value; + volatile HashEntry next; + HashEntry(int hash,K key,V value,HashEntry next){ + this.hash = hash; + this.key = key; + this.value = value; + this.next = next; + } + } + } + + + /** + * 【核心方法】 + * 构造方法与HashMap对比可以发现,没有了HashMap中的threshold和loadFactor,而是改用了sizeCtl来控制,而且只存储了容量在里面,那么它是怎么用的呢? + * 官方给出的解释如下: + +(1)-1,表示有线程正在进行初始化操作 +(2)-(1 + nThreads),表示有n个线程正在一起扩容 +(3)0,默认值,后续在真正初始化的时候使用默认容量 +(4)> 0,初始化或扩容完成后下一次的扩容门槛 + + */ + public ConcurrentHashMap() { + } + public ConcurrentHashMap(int initialCapacity) { + if (initialCapacity < 0) + throw new IllegalArgumentException(); + int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); + this.sizeCtl = cap; + } + public ConcurrentHashMap(Map m) { + this.sizeCtl = DEFAULT_CAPACITY; + putAll(m); + } + public ConcurrentHashMap(int initialCapacity, float loadFactor) { + this(initialCapacity, loadFactor, 1); + } + public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) { + if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) + throw new IllegalArgumentException(); + if (initialCapacity < concurrencyLevel) // Use at least as many bins + initialCapacity = concurrencyLevel; // as estimated threads + long size = (long)(1.0 + (long)initialCapacity / loadFactor); + int cap = (size >= (long)MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int)size); + this.sizeCtl = cap; + } + + + + + /** + * 【put方法】 + */ + public V put(K key, V value) { + return putVal(key, value, false); + } + final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) + throw new NullPointerException(); + + // 得到 hash 值 + int hash = spread(key.hashCode()); + + // 用于记录相应链表的长度 + int binCount = 0; + + for (Node[] tab = table;;) { + Node f; int n, i, fh; + // 如果数组"空",进行数组初始化 + if (tab == null || (n = tab.length) == 0) + // 初始化数组 + tab = initTable(); + + // 找该 hash 值对应的数组下标,得到第一个节点 f + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + // 如果数组该位置为空, + // 用一次 CAS 操作将这个新值放入其中即可; 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了 + if (casTabAt(tab, i, null, new Node(hash, key, value, null))) + break; + } + + else if ((fh = f.hash) == MOVED) + // 帮助数据迁移 + tab = helpTransfer(tab, f); + + else { // 到这里就是说,f 是该位置的头结点,而且不为空 + + V oldVal = null; + // 获取数组该位置的头结点的监视器锁 + synchronized (f) { + if (tabAt(tab, i) == f) { + if (fh >= 0) { + // 头结点的 hash 值大于 0,说明是链表;用于累加,记录链表的长度 + binCount = 1; + // 遍历链表 + for (Node e = f;; ++binCount) { + K ek; + // 如果发现了"相等"的 key,判断是否要进行值覆盖 + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + // 到了链表的最末端,将这个新值放到链表的最后面 + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + else if (f instanceof TreeBin) { // 红黑树 + Node p; + binCount = 2; + // 调用红黑树的插值方法插入新节点 + if ((p = ((TreeBin)f).putTreeVal(hash, key,value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + // binCount != 0 说明上面在做链表操作 + if (binCount != 0) { + // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8 + if (binCount >= TREEIFY_THRESHOLD) + // 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换, + // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树 + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + addCount(1L, binCount); + return null; + } + /** + * 判断是否需要扩容 + * @return + */ + private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + // 把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想),并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段,这样可以保证尽量小的减少冲突 + // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上 + if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { + CounterCell a; long v; int m; + boolean uncontended = true; + // 如果as为空,或者长度为0,或者当前线程所在的段为null,或者在当前线程的段上加数量失败 + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { + // 强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋).不同线程对应不同的段都更新失败了 + // 说明已经发生冲突了,那么就对counterCells进行扩容.以减少多个线程hash到同一个段的概率 + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + // 计算元素个数 + s = sumCount(); + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + // 如果元素个数达到了扩容门槛,则进行扩容;注意,正常情况下sizeCtl存储的是扩容门槛,即容量的0.75倍 + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + // rs是扩容时的一个邮戳标识 + int rs = resizeStamp(n); + if (sc < 0) { + // sc<0说明正在扩容中 + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + // 扩容已经完成了,退出循环 + // 正常应该只会触发nextTable==null这个条件,其它条件没看出来何时触发 + break; + + // 扩容未完成,则当前线程加入迁移元素中 + // 并把扩容线程数加1 + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + // 这里是触发扩容的那个线程进入的地方 + // sizeCtl的高16位存储着rs这个扩容邮戳 + // sizeCtl的低16位存储着扩容线程数加1,即(1+nThreads) + // 所以官方说的扩容时sizeCtl的值为 -(1+nThreads)是错误的 + + // 进入迁移元素 + transfer(tab, null); + // 重新计算元素个数 + s = sumCount(); + } + } + } + + + + /** + * 【初始化数组:initTable】 + * 主要就是初始化一个合适大小的数组,然后会设置 sizeCtl。 + * 初始化方法中的并发问题是通过对 sizeCtl 进行一个 CAS 操作来控制的。 + */ + private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + if ((sc = sizeCtl) < 0) + Thread.yield(); + // CAS 一下,将 sizeCtl 设置为 -1,代表抢到了锁 + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + if ((tab = table) == null || tab.length == 0) { + // DEFAULT_CAPACITY 默认初始容量是 16 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + // 初始化数组,长度为 16 或初始化时提供的长度 + Node[] nt = (Node[])new Node[n]; + // 将这个数组赋值给 table,table 是 volatile 的 + table = tab = nt; + // 如果 n 为 16 的话,那么这里 sc = 12;其实就是 0.75 * n + sc = n - (n >>> 2); + } + } finally { + // 设置 sizeCtl 为 sc + sizeCtl = sc; + } + break; + } + } + return tab; + } + + + /** + * 【链表转红黑树: treeifyBin】 + * + */ + private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { + // MIN_TREEIFY_CAPACITY 为 64 + // 所以,如果数组长度小于 64 的时候,其实也就是 32 或者 16 或者更小的时候,会进行数组扩容 + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) + tryPresize(n << 1); + // b 是头结点 + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { + // 加锁 + synchronized (b) { + if (tabAt(tab, index) == b) { + // 下面就是遍历链表,建立一颗红黑树 + TreeNode hd = null, tl = null; + for (Node e = b; e != null; e = e.next) { + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) + hd = p; + else + tl.next = p; + tl = p; + } + // 将红黑树设置到数组相应位置中 + setTabAt(tab, index, new TreeBin(hd)); + } + } + } + } + } + + + + /** + * 【扩容:tryPresize】 + * 首先要说明的是,方法参数 size 传进来的时候就已经翻了倍了 + */ + private final void tryPresize(int size) { + // c:size 的 1.5 倍,再加 1,再往上取最近的 2 的 n 次方。 + int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); + int sc; + while ((sc = sizeCtl) >= 0) { + Node[] tab = table; int n; + + if (tab == null || (n = tab.length) == 0) { + n = (sc > c) ? sc : c; + if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + if (table == tab) { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = nt; + sc = n - (n >>> 2); // 0.75 * n + } + } finally { + sizeCtl = sc; + } + } + } + else if (c <= sc || n >= MAXIMUM_CAPACITY) + break; + else if (tab == table) { + int rs = resizeStamp(n); + + if (sc < 0) { + Node[] nt; + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + // 2. 用 CAS 将 sizeCtl 加 1,然后执行 transfer 方法;此时 nextTab 不为 null + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + // 1. 将 sizeCtl 设置为 (rs << RESIZE_STAMP_SHIFT) + 2);调用 transfer 方法,此时 nextTab 参数为 null + else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + } + } + } + + + /** + * 【数据迁移:transfer】 + */ + private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + + // stride 在单核下直接等于 n,多核模式下为 (n>>>3)/NCPU,最小值是 16 + // stride 可以理解为”步长“,有 n 个位置是需要进行迁移的, 将这 n 个任务分为多个任务包,每个任务包有 stride 个任务 + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) + stride = MIN_TRANSFER_STRIDE; // subdivide range + + // 如果 nextTab 为 null,先进行一次初始化;保证第一个发起迁移的线程调用此方法时,参数 nextTab 为 null;之后参与迁移的线程调用此方法时,nextTab 不会为 null + if (nextTab == null) { + try {// 容量翻倍 + Node[] nt = (Node[])new Node[n << 1]; + nextTab = nt; + } catch (Throwable ex) { + sizeCtl = Integer.MAX_VALUE; + return; + } + // nextTable 是 ConcurrentHashMap 中的属性 + nextTable = nextTab; + // transferIndex 也是 ConcurrentHashMap 的属性,用于控制迁移的位置 + transferIndex = n; + } + + int nextn = nextTab.length; + + // ForwardingNode:正在被迁移的 Node + // 这个构造方法会生成一个Node,key、value 和 next 都为 null,关键是 hash 为 MOVED + // 原数组中位置 i 处的节点完成迁移工作后,就会将位置 i 处设置为这个 ForwardingNode,用来告诉其他线程该位置已经处理过了,所以它其实相当于是一个标志。 + ForwardingNode fwd = new ForwardingNode(nextTab); + + // advance 指的是做完了一个位置的迁移工作,可以准备做下一个位置的了 + boolean advance = true; + boolean finishing = false; + + // i 是位置索引,bound 是边界,注意是从后往前 + for (int i = 0, bound = 0;;) { + Node f; int fh; + + // advance 为 true 表示可以进行下一个位置的迁移了 + // i 指向了 transferIndex,bound 指向了 transferIndex-stride + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) + advance = false; + + // 将 transferIndex 值赋给 nextIndex + // 这里 transferIndex 一旦小于等于 0,说明原数组的所有位置都有相应的线程去处理了 + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + // 看括号中的代码,nextBound 是这次迁移任务的边界,注意,是从后往前 + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + if (finishing) { + // 所有的迁移操作已经完成 + nextTable = null; + // 将新的 nextTab 赋值给 table 属性,完成迁移 + table = nextTab; + // 重新计算 sizeCtl:n 是原数组长度,所以 sizeCtl 得出的值将是新数组长度的 0.75 倍 + sizeCtl = (n << 1) - (n >>> 1); + return; + } + + // 之前我们说过,sizeCtl 在迁移前会设置为 (rs << RESIZE_STAMP_SHIFT) + 2;然后,每有一个线程参与迁移就会将 sizeCtl 加 1, + // 这里使用 CAS 操作对 sizeCtl 进行减 1,代表做完了属于自己的任务 + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + // 任务结束,方法退出 + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) + return; + + // 说明 (sc - 2) == resizeStamp(n) << RESIZE_STAMP_SHIFT,所有的迁移任务都做完了,也就会进入到上面的 if(finishing){} 分支了 + finishing = advance = true; + i = n; + } + } + // 如果位置 i 处是空的,没有任何节点,那么放入刚刚初始化的 ForwardingNode ”空节点“ + else if ((f = tabAt(tab, i)) == null) + advance = casTabAt(tab, i, null, fwd); + // 该位置处是一个 ForwardingNode,代表该位置已经迁移过了 + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + // 对数组该位置处的结点加锁,开始处理数组该位置处的迁移工作 + synchronized (f) { + if (tabAt(tab, i) == f) { + Node ln, hn; + // 头结点的 hash 大于 0,说明是链表的 Node 节点 + if (fh >= 0) { + // 需要将链表一分为二,找到原链表中的 lastRun,然后 lastRun 及其之后的节点是一起进行迁移的,lastRun 之前的节点需要进行克隆,然后分到两个链表中 + int runBit = fh & n; + Node lastRun = f; + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + ln = new Node(ph, pk, pv, ln); + else + hn = new Node(ph, pk, pv, hn); + } + // 其中的一个链表放在新数组的位置 i + setTabAt(nextTab, i, ln); + // 另一个链表放在新数组的位置 i+n + setTabAt(nextTab, i + n, hn); + // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕,其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了 + setTabAt(tab, i, fwd); + // advance 设置为 true,代表该位置已经迁移完毕 + advance = true; + } + else if (f instanceof TreeBin) { + // 红黑树的迁移 + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + // 如果一分为二后,节点数少于 8,那么将红黑树转换回链表 + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + + // 将 ln 放置在新数组的位置 i + setTabAt(nextTab, i, ln); + // 将 hn 放置在新数组的位置 i+n + setTabAt(nextTab, i + n, hn); + // 将原数组该位置处设置为 fwd,代表该位置已经处理完毕, 其他线程一旦看到该位置的 hash 值为 MOVED,就不会进行迁移了 + setTabAt(tab, i, fwd); + // advance 设置为 true,代表该位置已经迁移完毕 + advance = true; + } + } + } + } + } + } + + + /** + * 【get过程】 + * 1、计算 hash 值 + * 2、根据 hash 值找到数组对应位置: (n - 1) & h + * 3、根据该位置处结点性质进行相应查找 + * + * 如果该位置为 null,那么直接返回 null 就可以了 + * 如果该位置处的节点刚好就是我们需要的,返回该节点的值即可 + * 如果该位置节点的 hash 值小于 0,说明正在扩容,或者是红黑树,后面我们再介绍 find 方法。 + * 如果以上 3 条都不满足,那就是链表,进行遍历比对即可 + */ + public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { + // 判断头结点是否就是我们需要的节点 + if ((eh = e.hash) == h) { + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + // 如果头结点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树 + else if (eh < 0) + // 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k) + return (p = e.find(h, key)) != null ? p.val : null; + + // 遍历链表 + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } +} \ No newline at end of file diff --git a/week_04/22/ConcurrentLinkedQueue-22.md b/week_04/22/ConcurrentLinkedQueue-22.md new file mode 100644 index 0000000000000000000000000000000000000000..b7e0c200e8a47059665aa7ea97301c8e7997c803 --- /dev/null +++ b/week_04/22/ConcurrentLinkedQueue-22.md @@ -0,0 +1,236 @@ +/** + * 2020年1月4日09:08:38 + * 【简介】 + * ConcurrentLinkedQueue是一个线程安全的队列,它采用的是 CAS 算法来进行实现,也就是说它是非阻塞的; + * 队列中的元素按照 FIFO(先进先出)的原则对元素进行排列,此外,它是一个无界队列; + * 添加元素的时候,在链表的尾部进行添加,获取元素的时候,从链表的头部获取。 + * 它内部采用的单向链表的形式来表示,链表的节点是定义在ConcurrentLinkedQueue的一个内部类。 + * + * + * ConcurrentLinkedQueue 实现了 Queue 接口和实现了继承了 AbstractQueue 类,而 Itr 和 Node则是它的一个内部类; + */ + + + //Queue 接口只是定义了一些队列的公共方法 +public interface Queue extends Collection { + // 添加元素 + boolean add(E e); + // 添加元素 + boolean offer(E e); + // 删除元素 + E remove(); + // 删除并返回第一个元素,如果队列为空,则返回 null + E poll(); +   // 返回第一个元素,如果不存在,则抛出NoSuchElementException异常 + E element(); + // 返回第一个元素,但不删除,如果队列为空,则返回 null + E peek(); +} + +//AbstractQueue 类也继承了 Queue接口,提供了某些方法的实现 +public abstract class AbstractQueue extends AbstractCollection implements Queue { + public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); + } + public E remove() { + E x = poll(); + if (x != null) + return x; + else + throw new NoSuchElementException(); + } +//............................... +} + +/** + * 一、队列中链表节点的定义,链表中的节点使用一个 Node 内部类来表示: + * E item 元素和 Node next 节点都使用了 volatile 来修饰,这说明了元素或某个节点被一个线程修改了之后,其他的线程是立马看到修改后的值的。 + */ +private static class Node { + volatile E item;// 节点中的元素 + volatile Node next;// 下一个节点,没有上一个节点,表示它是一个单向链表的形式 + + Node(E item) {// 构造一个节点 + UNSAFE.putObject(this, itemOffset, item); + } + // 使用 CAS 的方式设置节点的元素 + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + // 设置下一个节点 + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + // 采用 CAS 的方式设置下一个节点 + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + // Unsafe 类的一些初始化 +} + + +/** + * 二、ConcurrentLinkedQueue 类中的属性和方法 + * 当使用空的构造其是实例化一个对象的时候,会创建一个节点,节点的值为 null(添加的时候,是不能为null的),并把头节点和尾节点都指向该节点 + * + */ +public class ConcurrentLinkedQueue extends AbstractQueue implements Queue, java.io.Serializable { + + // 头节点, + private transient volatile Node head; + // 尾节点,尾节点不一定是链表的最后一个节点 + private transient volatile Node tail; + // 构造 + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } + + + + /** + * 【添加元素】 + * (1)向链表中添加元素,添加元素的时候,是在链表的尾部进行添加,添加元素有两个方法 add() 和 offer(),add() 会调用 offer() 进行添加, + * 这两个方法永远都会返回 true,所以不要使用 true | false 来判断是否添加成功; + * (2)入队主要做两件事情: + * 第一是将新添加的节点设置成当前队列尾节点的下一个节点; + * 第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。 + */ + public boolean add(E e) { + return offer(e); + } + public boolean offer(E e) { + // 判空,为空则抛出空指针异常 + checkNotNull(e); + // 创建要添加的节点 + final Node newNode = new Node(e); + + // 无限循环,入队不成功,则反复入队;t 表示 tail 节点; p 表示链表的尾节点,默认等于 tail 节点 + for (Node t = tail, p = t;;) { + // q 为尾节点的下一个节点 + Node q = p.next; + // 如果尾节点的下一个节点为空,则表示 p 为尾节点 + if (q == null) { + // CAS 设置尾节点的下一个节点为新添加的节点,如果设置失败,在再次尝试 + if (p.casNext(null, newNode)) { + // 如果tail节点有大于等于1个的 next 节点,则更新 tail 节点,将新添加的节点设置为 tail 节点 + if (p != t) // 相当于循环两次更新一次 tail 节点 + casTail(t, newNode); // 新添加的节点设置为tail节点,允许失败,失败了表示有其他线程成功更新了tail节点 + return true; + } + } + else if (p == q) // 只有在尾节点和尾节点的下一个节点为空的情况下成立 + p = (t != (t = tail)) ? t : head; + else + // 把 tail节点设置为为尾节点,再次循环设置下一个节点 + p = (p != t && t != (t = tail)) ? t : q; + } + } + + + + /** + * 【获取元素】 + * (1)ConcurrentLinkedQueue是一个FIFO的队列,所以获取元素的时候,总是获取到队列的第一个元素;获取元素有两个方法,poll() 和 peek(), + * poll()方法获取元素的时候,返回链表的第一个元素,并删除,而 peek() 方法获取元素的时候则不删除 + * (2)head节点不一定就是队列的第一个含有元素的节点,也不是每次获取元素后就更新head节点,只有当head中的元素为空的时候才更新head节点, + * 这和添加 offer() 方法中更新tail节点类似,减少 CAS 更新head节点的次数,出队的效率会更高。 + */ + public E poll() { + // 循环跳转,goto语法 + restartFromHead: + for (;;) { + // p 表示要出队的节点,默认为 head节点 + for (Node h = head, p = h, q;;) { + // 出队的元素 + E item = p.item; + // 如果出队的元素不为空,则把要出队的元素设置null,不更新head节点;如果出队元素为null或者cas设置失败,则表示有其他线程已经进行修改,则需要重写获取 + if (item != null && p.casItem(item, null)) { + if (p != h) // 当head元素为空,才会更新head节点,这里循环两次,更新一次head节点 + updateHead(h, ((q = p.next) != null) ? q : p); // 更新head节点 + return item; + } + // 队列为空,返回null + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + else if (p == q) + continue restartFromHead; + // 把 p 的next节点赋值给p + else + p = q; + } + } + } + + + /** + * 【isEmpty()方法】 + * ConcurrentLinkedQueue 通过 isEmpty来判断队列是否为空 + * isEmpty 方法会判断链表的第一个元素是否为空来进行判断的。 + */ + public boolean isEmpty() { + return first() == null; + } + Node first() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + boolean hasItem = (p.item != null); + if (hasItem || (q = p.next) == null) { + updateHead(h, p); + return hasItem ? p : null; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } + + + /** + * 【size()方法】 + * size()方法会遍历所有的链表来查看有多少个元素。 + * 对于在开发的时候,如果需要判断是否为空,则应该使用 isEmpty 而不应该使用 size() > 0 的方式,因为 size()会变量整个链表,效率较低。 + */ + public int size() { + int count = 0; + // succ() 获取下一个元素 + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) + if (++count == Integer.MAX_VALUE) + break; + return count; + } +} + + +/** + * + * 调试例程 + * + */ +public class ConcurrentLinkedQueueTest { + public static void main(String[] args) throws InterruptedException { + new ConcurrentLinkedQueueTest().testConcurrentLinkedQueue(); + Thread.sleep(5000L); + } + + int num = 0; + + public ConcurrentLinkedQueue testConcurrentLinkedQueue(){ + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + for(int i = 0; i < 100; i++) { + new Thread(() -> { + num++; + queue.offer(num); + }).start(); + } + return queue; + } +} \ No newline at end of file diff --git a/week_04/22/CopyOnWriteArrayList-22.md b/week_04/22/CopyOnWriteArrayList-22.md new file mode 100644 index 0000000000000000000000000000000000000000..ff71a0026618b0e2bb2d8b6a8b461c3da533e9df --- /dev/null +++ b/week_04/22/CopyOnWriteArrayList-22.md @@ -0,0 +1,289 @@ +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.RandomAccess; +import java.util.concurrent.locks.ReentrantLock; + +/** + * 2020年1月2日08:28:04 + * 【参考资料】 + * https://www.cnblogs.com/simple-focus/p/7439919.html + * https://mp.weixin.qq.com/s/TDmrSxwmgUS8xohWiKc3hQ + * + * CopyOnWriteArrayList,是一个写入时复制的容器。简单来说,就是平时查询的时候,都不需要加锁,随便访问, + * 只有在增删改的时候,才会从原来的数据复制一个副本出来,然后修改这个副本,最后把原数据替换成当前的副本。 + * 修改操作的同时,读操作不会被阻塞,而是继续读取旧的数据。 + * + * (1)实现了List接口; + * (2)内部持有一个ReentrantLock lock = new ReentrantLock(); + * (3)底层是用volatile transient声明的数组 array; + * (4)读写分离,写时复制出一个新的数组,完成插入、修改或者移除操作后将新数组赋值给array + * + * 【优点】:对于一些读多写少的数据,这种做法的确很不错,例如配置、黑名单、物流地址等变化非常少的数据,这是一种无锁的实现。可以帮我们实现程序更高的并发。 + * + * 【缺点】:这种实现只是保证数据的最终一致性,在添加到拷贝数据而还没进行替换的时候,读到的仍然是旧数据。 + * 如果对象比较大,频繁地进行替换会消耗内存,从而引发Java的GC问题,这个时候,我们应该考虑其他的容器,例如ConcurrentHashMap。 + * + * + * 重点掌握:add()、remove()、set()、get() + */ + + +public class CopyOnWriteArrayList implements List, RandomAccess, Cloneable, java.io.Serializable { + //序列号 + private static final long serialVersionUID = 8673264195747942595L; + //可重入锁,对数组增删改时,使用它加锁 + final transient ReentrantLock lock = new ReentrantLock(); + //存放元素的数组,其实就是本体 + private transient volatile Object[] array; + + + /** + * 默认构造方法,构建数组长度为0 + */ + public CopyOnWriteArrayList() { + setArray(new Object[0]); + } + //通过集合类来构造 + public CopyOnWriteArrayList(Collection c) { + Object[] elements; + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + else { + elements = c.toArray(); + // c.toArray might (incorrectly) not return Object[] (see 6260652) + if (elements.getClass() != Object[].class) + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); + } + //通过数组来构造 + public CopyOnWriteArrayList(E[] toCopyIn) { + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); + } + //设置数组引用指向的数组对象 + final void setArray(Object[] a) { + array = a; + } + + + /** + * 【查】 + * 这个是真正用于查询的方法。 + * 在对CopyOnWriteArrayList读取元素时,根本没有加锁,这极大的避免了加锁带来的开销。 + * @param a + * @param index + * @return + * (1)获取元素数组; + * (2)返回数组指定索引位置的元素; + */ + @SuppressWarnings("unchecked") + private E get(Object[] a, int index) { + return (E) a[index]; + } + //向外开放的方法 + public E get(int index) { + return get(getArray(), index); + } + final Object[] getArray() { + return array; + } + + + /** + * 【增】 + * CopyOnWriteArrayList刚创建时,默认的大小为0, + * 当向其插入一个元素时,将原数组复制到一个比原数组大1的新数组中,然后直接将插入的元素放置到新数组末尾,之后修改array引用到新数组就可以,原来的数组就会被垃圾收集器回收。 + * 初始化为什么要设置数组大小为0呢? + * 这是因为每次进行添加操作时,都会复制原数组到新的数组中,相当于CopyOnWriteArrayList在进行add操作时,实际占用的空间是原来的两倍, + * 这样的空间开销,导致了CopyOnWriteArrayList不能像ArrayList那样初始化大小为10,不然太浪费空间了,而且CopyOnWriteArrayList主要用于读多写少的地方。 + * 因为CopyOnWriteArrayList在进行增删改操作时,都是在新数组上进行,所以此时对CopyOnWriteArrayList进行读取完全不会导致阻塞或是出错。 + * CopyOnWriteArrayList通过将读写分离实现线程安全。 + * + */ + //直接将元素添加到末尾 + public boolean add(E e) { + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + //先获取原先的数组 + Object[] elements = getArray(); + int len = elements.length; + //构建一个新的数组,大小是原数组的大小 1 + Object[] newElements = Arrays.copyOf(elements, len); + //将元素插入到数组末尾 + newElements[len] = e; + //将array引用指向新的数组,原来的数组会被垃圾收集器回收 + setArray(newElements); + return true; + } finally { + //释放锁 + lock.unlock(); + } + } + //在指定位置插入新元素 + public void add(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + //判断是否越界 + if (index > len || index < 0) + throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + len); + Object[] newElements; + //计算插入位置与数组末尾下标的距离 + int numMoved = len - index; + //若为0,则是添加到数组末尾 + if (numMoved == 0) + newElements = Arrays.copyOf(elements, len); + else { + //不为0,则将原数组分开复制 + newElements = new Object[len - 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index, newElements, index - 1, numMoved); + } + newElements[index] = element; + setArray(newElements); + } finally { + lock.unlock(); + } + } + + + /** + * 【改】 + */ + public E set(int index, E element) { + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + Object[] elements = getArray(); + E oldValue = get(elements, index); + //判断原来的元素的值是否等于新值,相等则直接修改array的引用 + //不相等则复制一份到新数组中再进行修改 + if (oldValue != element) { + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len); + newElements[index] = element; + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; + } finally { + //释放锁 + lock.unlock(); + } + } + + + /** + * 【删】 + */ + public E remove(int index) { + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + E oldValue = get(elements, index); + //这里跟add方法很像,判断删除元素的下标与数组末尾下标的距离 + //如果为0,则是删除末尾元素,直接将前面的元素复制到新数组中 + int numMoved = len - index - 1; + if (numMoved == 0) + setArray(Arrays.copyOf(elements, len - 1)); + else { + //若不为0,则创建一个比原来数组小1的新数组,再将要删除元素下标之外的所有元素全部复制到新数组 + Object[] newElements = new Object[len - 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index - 1, newElements, index, numMoved); + setArray(newElements); + } + return oldValue; + } finally { + //释放锁 + lock.unlock(); + } + } + //通过元素删除元素 + public boolean remove(Object o) { + Object[] snapshot = getArray(); + //获取元素下标 + int index = indexOf(o, snapshot, 0, snapshot.length); + return (index < 0) ? false : remove(o, snapshot, index); + } + /** + * 删除方法本体 + */ + private boolean remove(Object o, Object[] snapshot, int index) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] current = getArray(); + int len = current.length; + //若在执行romove操作时,数组已经改变了,则需要对要删除的元素重新定位,防止误删(因为这个删除方法在最初进入时没有加锁,在这个时候可能会发生改变) + if (snapshot != current) findIndex: { + //取最小的遍历范围 + int prefix = Math.min(index, len); + for (int i = 0; i < prefix; i ) { + //找到元素对应下标,跳出,重新判断 + if (current[i] != snapshot[i] && eq(o, current[i])) { + index = i; + break findIndex; + } + } + //若没有在指定范围中找到对应元素,则进行下一步判断 + //元素被删除或不存在 + if (index >= len) + return false; + if (current[index] == o) + break findIndex; + index = indexOf(o, current, index, len); + //元素不存在或是被删除 + if (index < 0) + return false; + } + //删除 + Object[] newElements = new Object[len - 1]; + System.arraycopy(current, 0, newElements, 0, index); + System.arraycopy(current, index 1, newElements, index, len - index - 1); + setArray(newElements); + return true; + } finally { + //释放锁 + lock.unlock(); + } + } + + private static boolean eq(Object o1, Object o2) { + return (o1 == null) ? o2 == null : o1.equals(o2); + } + private static int indexOf(Object o, Object[] elements, int index, int fence) { + if (o == null) { + for (int i = index; i < fence; i ) + if (elements[i] == null) + return i; + } else { + for (int i = index; i < fence; i ) + if (o.equals(elements[i])) + return i; + } + return -1; + } + + + /** + * 【总结】 + * (1)CopyOnWriteArrayList使用ReentrantLock重入锁加锁,保证线程安全; + * (2)CopyOnWriteArrayList的写操作都要先拷贝一份新数组,在新数组中做修改,修改完了再用新数组替换老数组,所以空间复杂度是O(n),性能比较低下; + * (3)CopyOnWriteArrayList的读操作支持随机访问,时间复杂度为O(1); + * (4)CopyOnWriteArrayList采用读写分离的思想,读操作不加锁,写操作加锁,且写操作占用较大内存空间,所以适用于读多写少的场合; + * (5)CopyOnWriteArrayList只保证最终一致性,不保证实时一致性; + */ +} + diff --git a/week_04/22/DelayQueue-22.md b/week_04/22/DelayQueue-22.md new file mode 100644 index 0000000000000000000000000000000000000000..dc53f29d784b911ca6ea41b4e62353eb505439ed --- /dev/null +++ b/week_04/22/DelayQueue-22.md @@ -0,0 +1,214 @@ +/** + * 2020年1月4日09:31:54 + * 【DelayQueue】 + * (1)DelayQueue是一个支持延时获取元素的无界阻塞队列。里面的元素全部都是“可延期”的元素,列头的元素是最先“到期”的元素, + * 如果队列里面没有元素到期,是不能从列头获取元素的,哪怕有元素也不行。也就是说只有在延迟期到时才能够从队列中取元素。 + * 所谓延时处理就是说可以为队列中元素设定一个 过期时间。相关操作受到这个设定时间的控制。 + * (2)delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首, 每次从队列里取出来都是最先要过期的元素. + * 其中,delayed是一个具有过期时间的元素; PriorityQueue是一个根据队列里元素某些属性排列先后的顺序队列 + * + * + * 【内部结构】 + * 可重入锁 + * 用于根据delay时间排序的优先级队列 + * 用于优化阻塞通知的线程元素leader + * 用于实现阻塞和通知的Condition对象 + * + * + * 【用途】DelayQueue主要用于两个方面: + * - 缓存:清掉缓存中超时的缓存数据 + * - 任务超时处理 + * + * 【参考资料】 + * https://www.cnblogs.com/yaowen/p/10705256.html + * https://www.jianshu.com/p/e0bcc9eae0ae + */ + + + + /** + * 【DelayQueue类变量和构造函数】 + (1)PriorityQueue : 表明DelayQueue内部使用 PriorityQueue的最小堆保证有序 + (2)E extends Delayed 标明存入DelayQueue的变量必须实现Delayed接口,实现getDelay和compareTo接口 + */ + public class DelayQueue extends AbstractQueue implements BlockingQueue { + // 相关的锁 + private final transient ReentrantLock lock = new ReentrantLock(); + private final PriorityQueue q = new PriorityQueue(); + private Thread leader = null; + //基于锁的状态通知变量 + private final Condition available = lock.newCondition(); + + public DelayQueue() {} + + public DelayQueue(Collection c) { + this.addAll(c); + } + + public interface Comparable { + public int compareTo(T o); + } + + public interface Delayed extends Comparable { + long getDelay(TimeUnit unit); + } +} + + +/** + * 【DelayQueue的add过程】 + * (1)执行加锁操作 + * (2)把元素添加到优先级队列中 + * (3)查看元素是否为队首 + * (4)如果是队首的话,设置leader为空并唤醒所有等待的队列 + * (5)释放锁 + */ +public boolean add(E e) { + return offer(e); +} + +public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.offer(e); + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); + } +} + +public void put(E e) { + offer(e); +} + +public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); +} + + + +/** + * 【DelayQueue的take过程】 + * 获取元素的 过程如下: + * 执行加锁操作 + * 取出优先级队列元素q的队首 + * 如果元素q的队首/队列为空,阻塞请求 + * 如果元素q的队首(first)不为空,获得这个元素的delay时间值 + * 如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法 + * 如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露 + * 判断leader元素是否为空,不为空的话阻塞当前线程 + * 如果leader元素为空的话,把当前线程赋值给leader元素,然后阻塞delay的时间,即等待队首到达可以出队的时间,在finally块中释放leader元素的引用 + * 循环执行从1~8的步骤 + * 如果leader为空并且优先级队列不为空的情况下(判断还有没有其他后续节点),调用signal通知其他的线程 + * 执行解锁操作 + */ +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + return q.poll(); + } finally { + lock.unlock(); + } +} + +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + return q.poll(); + first = null; // don't retain ref while waiting + if (leader != null) + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} + +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) { + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); + } else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + return q.poll(); + if (nanos <= 0) + return null; + first = null; // don't retain ref while waiting + if (nanos < delay || leader != null) + nanos = available.awaitNanos(nanos); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} + +public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.peek(); + } finally { + lock.unlock(); + } +} + +private E peekExpired() { + // assert lock.isHeldByCurrentThread(); + E first = q.peek(); + return (first == null || first.getDelay(NANOSECONDS) > 0) ? + null : first; +} \ No newline at end of file