From ab709ff5b535f15e54be68723fc2d0f15eaa9937 Mon Sep 17 00:00:00 2001 From: qjwxpz Date: Sat, 4 Jan 2020 17:53:04 +0800 Subject: [PATCH] 032-week-04 --- week_04/32/ArrayBlockingQueue.md | 210 +++++++++ week_04/32/ConcurrentHashMap.md | 668 ++++++++++++++++++++++++++++ week_04/32/ConcurrentLinkedQueue.md | 217 +++++++++ week_04/32/CopyOnWriteArrayList.md | 199 +++++++++ week_04/32/DelayQueue.md | 161 +++++++ 5 files changed, 1455 insertions(+) create mode 100644 week_04/32/ArrayBlockingQueue.md create mode 100644 week_04/32/ConcurrentHashMap.md create mode 100644 week_04/32/ConcurrentLinkedQueue.md create mode 100644 week_04/32/CopyOnWriteArrayList.md create mode 100644 week_04/32/DelayQueue.md diff --git a/week_04/32/ArrayBlockingQueue.md b/week_04/32/ArrayBlockingQueue.md new file mode 100644 index 0000000..6d4dff5 --- /dev/null +++ b/week_04/32/ArrayBlockingQueue.md @@ -0,0 +1,210 @@ +## ArrayBlockingQueue源码阅读 + +### 1.1 ArrayBlockingQueue属性: + +```java + + final Object[] items; //储存元素的数组 + + int takeIndex; //下一次读取或移除的位置 + + int putIndex; //下一次存放元素的位置 + + int count; //队列中元素的总数 + + final ReentrantLock lock; //所有访问的保护锁 + + private final Condition notEmpty; //获取元素的条件 + + private final Condition notFull; //存放元素的条件 + +``` + +### 1.2 ArrayBlockingQueue构造器: + +```java + + public ArrayBlockingQueue(int capacity) { //初始化一个长度为capacity的队列 + this(capacity, false); + } + + public ArrayBlockingQueue(int capacity, boolean fair) { //初始化一个长度为capacity的fair(公平,非公平锁)的队列 + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); //重入锁,出队和入队持有这一把锁 + notEmpty = lock.newCondition(); //初始化非空等待队列 + notFull = lock.newCondition(); //初始化非满等待队列 + } + + public ArrayBlockingQueue(int capacity, boolean fair, + Collection c) { //初始化一个长度为capacity的fair(公平,非公平锁)的队列并且添加c数据到集合中 + this(capacity, fair); + + final ReentrantLock lock = this.lock; + lock.lock(); // Lock only for visibility, not mutual exclusion + try { + int i = 0; + try { + for (E e : c) { + checkNotNull(e); + items[i++] = e; + } + } catch (ArrayIndexOutOfBoundsException ex) { + throw new IllegalArgumentException(); + } + count = i; + putIndex = (i == capacity) ? 0 : i; + } finally { + lock.unlock(); + } + } +``` + +### 1.3 ArrayBlockingQueue方法: +```java + + //添加时如果队列满了抛异常 + public boolean add(E e) { //调用父类AbstractQueue的add(e)方法,但是最后是调用自己实现父类的offer方法 + return super.add(e); + } + + public boolean offer(E e) { + checkNotNull(e); //非空验证 + final ReentrantLock lock = this.lock; + lock.lock(); //上锁 + try { + if (count == items.length) // 如果数组满了就返回false + return false; + else { // 如果数组没满就调用入队方法并返回true + enqueue(e); + return true; + } + } finally { + lock.unlock(); //释放锁 + } + } + + public void put(E e) throws InterruptedException { //添加元素,如果队列满了会等待 + checkNotNull(e); //非空验证 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //上锁,支持中断 + try { + while (count == items.length) //队列满了 + notFull.await(); //等待 + enqueue(e); //加入队列 + } finally { + lock.unlock(); //释放锁 + } + } + + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { //如果队列满了会等待一段时间timeout,然后队列还满才返回false + + checkNotNull(e); + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //上锁 + try { + while (count == items.length) { //如果队列满了 + if (nanos <= 0) //等待时间小于或等于0直接返回false + return false; + nanos = notFull.awaitNanos(nanos); //等待nanos时间 + } + enqueue(e); + return true; + } finally { + lock.unlock(); + } + } + + private void enqueue(E x) { //将元素添加到队列中 + final Object[] items = this.items; + items[putIndex] = x; //通过 putIndex 对数据赋值 + if (++putIndex == items.length) // 当putIndex 等于数组长度时,将 putIndex 重置为 0 + putIndex = 0; + count++; //记录队列元素的个数 + notEmpty.signal(); //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素 + } + + public boolean remove(Object o) { //删除元素o + if (o == null) return false; //判断o是否为空 + final Object[] items = this.items; //获取数组元素 + final ReentrantLock lock = this.lock; + lock.lock(); //上锁 + try { + if (count > 0) { //如果队列不为空 + final int putIndex = this.putIndex; //获取下一个要添加元素时的索引 + int i = takeIndex; //获取当前要被移除的元素的索引 + do { + if (o.equals(items[i])) { //从takeIndex 下标开始,找到要被删除的元素 + removeAt(i); //移除指定元素 + return true; //返回执行结果 + } + if (++i == items.length) //当前删除索引执行加 1 后判断是否与数组长度相等,若为 true,说明索引已到数组尽头,将 i 设置为 0 + i = 0; + } while (i != putIndex);//继续查找,直到找到最后一个元素 + } + return false; + } finally { + lock.unlock(); //释放锁 + } + } + + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock();//上锁 + try { + return (count == 0) ? null : dequeue();// 如果队列没有元素则返回null,否则出队 + } finally { + lock.unlock(); //释放锁 + } + } + + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly();// 上锁 + try { + while (count == 0) // 如果队列无元素,则阻塞等待在条件notEmpty上 + notEmpty.await(); + return dequeue(); //当队列有元素后,调用出队方法 + } finally { + lock.unlock(); //释放锁 + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { //带了等待时间和中断的出队 + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //上锁 + try { + while (count == 0) { // 如果队列无元素,则阻塞等待nanos纳秒 + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + return dequeue(); + } finally { + lock.unlock(); //释放锁 + } + } + + private E dequeue() { //出队 + final Object[] items = this.items; + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; //默认获取 0 位置的元素 + items[takeIndex] = null; /将该位置的元素设置为空 + if (++takeIndex == items.length) //这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据 + takeIndex = 0; + count--; //记录 元素个数递减 + if (itrs != null) + itrs.elementDequeued(); //同时更新迭代器中的元素数据 + notFull.signal(); /触发 因为队列满了以后导致的被阻塞的线程 + return x; + } +``` + +### 1.4 ArrayBlockingQueue总结: + 1.4.1:ArrayBlockingQueue是有界的,它不会被扩容。 + 1.4.2:所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的 + 1.4.3:底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列 diff --git a/week_04/32/ConcurrentHashMap.md b/week_04/32/ConcurrentHashMap.md new file mode 100644 index 0000000..472bbe2 --- /dev/null +++ b/week_04/32/ConcurrentHashMap.md @@ -0,0 +1,668 @@ +## ConcurrentHashMap源码学习 + +### 1.1 ConcurrentHashMap: + 1.1.1: ConcurrentHashMap继承了抽象类AbstractMap, + 1.1.2: ConcurrentHashMap实现了ConcurrentMap接口 + 1.1.3: ConcurrentHashMap实现了Serializable表示可以被序列化 + +### 1.2 ConcurrentHashMap属性: + +```java + + private static final int MAXIMUM_CAPACITY = 1 << 30; //表的最大容量 + + private static final int DEFAULT_CAPACITY = 16; // 默认表的大小 + + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大数组大小 + + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默认并发数 + + private static final float LOAD_FACTOR = 0.75f; //装载因子 + + static final int TREEIFY_THRESHOLD = 8; //转化为红黑树的阈值 + + static final int UNTREEIFY_THRESHOLD = 6; // 由红黑树转化为链表的阈值 + + static final int MIN_TREEIFY_CAPACITY = 64; //转化为红黑树的表的最小容量 + + private static final int MIN_TRANSFER_STRIDE = 16; //每次进行转移的最小值 + + private static int RESIZE_STAMP_BITS = 16; //生成sizeCtl所使用的bit位数 + + private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //进行扩容所允许的最大线程数 + + private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //记录sizeCtl中的大小所需要进行的偏移位数 + + static final int MOVED = -1; //表示正在转移 + static final int TREEBIN = -2; //表示已经转换成树 + static final int RESERVED = -3; //临时的hash + static final int HASH_BITS = 0x7fffffff; //能用的hash位数 + + static final int NCPU = Runtime.getRuntime().availableProcessors(); //获取可用的CPU个数 +``` + +### 1.3 ConcurrentHashMap内部类: +```java + + static class Node implements Map.Entry { //保存key,value及key的hash值的数据结构. + final int hash; //key的hash值 + final K key; + volatile V val; //使用volatile修饰,保证并发的可见性 + volatile Node next; //表示链表中的下一个节点,使用volatile修饰,保证并发的可见性 + + Node(int hash, K key, V val, Node next) { + this.hash = hash; + this.key = key; + this.val = val; + this.next = next; + } + + public final K getKey() { return key; } + public final V getValue() { return val; } + public final int hashCode() { return key.hashCode() ^ val.hashCode(); } + public final String toString(){ return key + "=" + val; } + public final V setValue(V value) { + throw new UnsupportedOperationException(); + } + + public final boolean equals(Object o) { + Object k, v, u; Map.Entry e; + return ((o instanceof Map.Entry) && + (k = (e = (Map.Entry)o).getKey()) != null && + (v = e.getValue()) != null && + (k == key || k.equals(key)) && + (v == (u = val) || v.equals(u))); + } + + /** + *支持map的get方法,通过hashcode和Key来获取一个node结点 + */ + Node find(int h, Object k) { + Node e = this; + if (k != null) { + do { + K ek; + if (e.hash == h && + ((ek = e.key) == k || (ek != null && k.equals(ek)))) + return e; + } while ((e = e.next) != null); + } + return null; + } + } + + + static final class TreeNode extends Node { //红黑树节点 + TreeNode parent; // + TreeNode left; + TreeNode right; + TreeNode prev; // + boolean red; + + TreeNode(int hash, K key, V val, Node next, + TreeNode parent) { + super(hash, key, val, next); + this.parent = parent; + } + + Node find(int h, Object k) { + return findTreeNode(h, k, null); + } + + final TreeNode findTreeNode(int h, Object k, Class kc) { + if (k != null) { + TreeNode p = this; + do { + int ph, dir; K pk; TreeNode q; + TreeNode pl = p.left, pr = p.right; + if ((ph = p.hash) > h) + p = pl; + else if (ph < h) + p = pr; + else if ((pk = p.key) == k || (pk != null && k.equals(pk))) + return p; + else if (pl == null) + p = pr; + else if (pr == null) + p = pl; + else if ((kc != null || + (kc = comparableClassFor(k)) != null) && + (dir = compareComparables(kc, k, pk)) != 0) + p = (dir < 0) ? pl : pr; + else if ((q = pr.findTreeNode(h, k, kc)) != null) + return q; + else + p = pl; + } while (p != null); + } + return null; + } + } +``` + +### 1.4 ConcurrentHashMap构造器: + +```java + public ConcurrentHashMap() {//创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 + } + + public ConcurrentHashMap(int initialCapacity) { //创建一个带有指定初始容量initialCapacity、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 + if (initialCapacity < 0) // 初始容量小于0,抛出异常 + throw new IllegalArgumentException(); + int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? + MAXIMUM_CAPACITY : + tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));// 找到最接近该容量的2的幂次方数 + this.sizeCtl = cap; // 初始化 + } + + public ConcurrentHashMap(Map m) { //构造一个与给定映射具有相同映射关系的新映射 + this.sizeCtl = DEFAULT_CAPACITY; + putAll(m); // // 将集合m的元素全部放入 + } + + public ConcurrentHashMap(int initialCapacity, float loadFactor) { //创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射 + this(initialCapacity, loadFactor, 1); + } + + public ConcurrentHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { //创建一个带有指定初始容量、加载因子和并发级别的新的空映射 + if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) //合法性判断 + throw new IllegalArgumentException(); + if (initialCapacity < concurrencyLevel) // Use at least as many bins + initialCapacity = concurrencyLevel; // as estimated threads + long size = (long)(1.0 + (long)initialCapacity / loadFactor); + int cap = (size >= (long)MAXIMUM_CAPACITY) ? + MAXIMUM_CAPACITY : tableSizeFor((int)size); + this.sizeCtl = cap; + } +``` + +### 1.5 ConcurrentHashMap重要方法: + +```java + + final V putVal(K key, V value, boolean onlyIfAbsent) { //添加元素 + if (key == null || value == null) throw new NullPointerException(); //key和value都不能为空 + int hash = spread(key.hashCode()); //计算key的hash值 + int binCount = 0; //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树 + for (Node[] tab = table;;) { // 无限循环 + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0) + tab = initTable(); //第一次put的时候表没有初始化,则初始化表 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 表不为空并且表的长度大于0,并且该桶不为空 + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) //比较并且交换值,如tab的第i项为空则用新生成的node替换 + break; + } + else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED + tab = helpTransfer(tab, f); //进行结点的转移(在扩容的过程中) + else { + /* + * 如果在这个位置有元素的话,就采用synchronized的方式加锁, + * 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历, + * 如果找到了key和key的hash值都一样的节点,则把它的值替换到 + * 如果没找到的话,则添加在链表的最后面 + * 否则,是树的话,则调用putTreeVal方法添加到树中去 + * + * 在添加完之后,会对该节点上关联的的数目进行判断, + * 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容 + */ + V oldVal = null; + synchronized (f) { // 加锁同步 + if (tabAt(tab, i) == f) { // 找到table表下标为i的节点,如果没有变化 + if (fh >= 0) { // 该table表中该结点的hash值大于0 + binCount = 1; // binCount赋值为1 + for (Node e = f;; ++binCount) { // 无限循环 + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等 + oldVal = e.val; // 保存该结点的val值 + if (!onlyIfAbsent) // 进行判断 + e.val = value; // 将指定的value保存至结点,即进行了结点值的更新 + break; + } + Node pred = e; // 保存当前结点 + if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点 + pred.next = new Node(hash, key, + value, null); // 新生一个结点并且赋值给next域 + break; + } + } + } + else if (f instanceof TreeBin) { // 结点为红黑树结点类型 + Node p; + binCount = 2; // binCount赋值为2 + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { // 将hash、key、value放入红黑树 + oldVal = p.val; // 保存结点的val + if (!onlyIfAbsent) + p.val = value; / 赋值结点value值 + } + } + } + } + if (binCount != 0) { // binCount不为0 + if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值 + treeifyBin(tab, i); // 进行转化 + if (oldVal != null) // 旧值不为空 + return oldVal; // 返回旧值 + break; + } + } + } + addCount(1L, binCount); // 增加binCount的数量 + return null; + } + + /** + * 整体流程大概为: + + * ① 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤② + +   * ② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③ + +   * ③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④ + +   * ④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤ + +   * ⑤ 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值 + + * 替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥ + +   * ⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。 + **/ + + + + private final Node[] initTable() { //初始化表(桶数组) + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { // 无限循环 + if ((sc = sizeCtl) < 0) // sizeCtl小于0,则进行线程让步等待(说明正在初始化或者扩容) + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 比较sizeCtl的值与sc是否相等,相等则用-1替换(cas操作,只有一个线程初始化表(桶数组)) + try { + if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量 + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; // 新生结点数组 + table = tab = nt; // 赋值给table + sc = n - (n >>> 2); // sc为n * 3/4 + } + } finally { + sizeCtl = sc; // 设置sizeCtl的值 + } + break; + } + } + return tab; + } + + final Node[] helpTransfer(Node[] tab, Node f) { //在扩容时将table表中的结点转移到nextTable中 + Node[] nextTab; int sc; + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { // table表不为空并且结点类型使ForwardingNode类型,并且结点的nextTable不为空 + int rs = resizeStamp(tab.length); + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { //说明正在扩容 + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 扩容线程数加1 + transfer(tab, nextTab); // 当前线程帮忙迁移元素 + break; + } + } + return nextTab; + } + return table; + } + + /** + * 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置 + * 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用, + * 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作 + * 扩容的时候会一直遍历,知道复制完所有节点,没处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他, + * 复制后在新数组中的链表不是绝对的反序的 + */ + private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) + stride = MIN_TRANSFER_STRIDE; + + /* + * 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab + * 此时nextTable被设置值了(在初始情况下是为null的) + * 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张, + * 而只是第一个开始扩张的线程需要初始化下目标数组 + */ + if (nextTab == null) { + try { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n << 1]; + nextTab = nt; + } catch (Throwable ex) { // try to cope with OOME + sizeCtl = Integer.MAX_VALUE; + return; + } + nextTable = nextTab; + transferIndex = n; + } + int nextn = nextTab.length; + /* + * 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点 + * 这是一个空的标志节点 + */ + ForwardingNode fwd = new ForwardingNode(nextTab); + boolean advance = true; //是否继续向前查找的标志位 + boolean finishing = false; //在完成之前重新在扫描一遍数组,看看有没完成的没 + for (int i = 0, bound = 0;;) { + Node f; int fh; + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) + advance = false; + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + if (finishing) { //已经完成转移 + nextTable = null; + table = nextTab; + sizeCtl = (n << 1) - (n >>> 1); //设置sizeCtl为扩容后的0.75 + return; + } + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) + return; + finishing = advance = true; + i = n; // recheck before commit + } + } + else if ((f = tabAt(tab, i)) == null) //数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1]) + advance = casTabAt(tab, i, null, fwd); + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + synchronized (f) { //加锁操作 + if (tabAt(tab, i) == f) { + Node ln, hn; + if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点 + /* + * 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n + * 根据这个规则 + * 0--> 放在新表的相同位置 + * n--> 放在新表的(n+原来位置) + */ + int runBit = fh & n; + Node lastRun = f; + /* + * lastRun 表示的是需要复制的最后一个节点 + * 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b + * 这样for循环之后,runBit的值就是最后不变的hash&n的值 + * 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点) + * 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的, + * 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置 + * 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行复制了,自己就被带过去了 + * 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序 + */ + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; //n的值为扩张前的数组的长度 + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + /* + * 构造两个链表,顺序大部分和原来是反的 + * 分别放到原来的位置和新增加的长度的相同位置(i/n+i) + */ + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + /* + * 假设runBit的值为0, + * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点) + * 设置到旧的table的第一个hash计算后为0的节点下一个节点 + * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点 + */ + ln = new Node(ph, pk, pv, ln); + else + /* + * 假设runBit的值不为0, + * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点) + * 设置到旧的table的第一个hash计算后不为0的节点下一个节点 + * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点 + */ + hn = new Node(ph, pk, pv, hn); + } + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + else if (f instanceof TreeBin) { //否则的话是一个树节点 + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + /* + * 在复制完树节点之后,判断该节点处构成的树还有几个节点, + * 如果≤6个的话,就转回为一个链表 + */ + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + } + } + } + } + } + + + private final void addCount(long x, int check) { //判断是否需要扩容 + CounterCell[] as; long b, s; + /** + *把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想) + *并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段 + *这样可以保证尽量小的减少冲突 + */ + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上 + CounterCell a; long v; int m; + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //如果as为空,或者长度为0,或者当前线程所在的段为null,或者在当前线程的段上加数量失败 + /** + *强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋)不同线程对应不同的段都更新失败了 + *说明已经发生冲突了,那么就对counterCells进行扩容 + *以减少多个线程hash到同一个段的概率 + */ + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); // 计算元素个数 + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { //如果元素个数达到了扩容门槛,则进行扩容 + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) // sc<0说明正在扩容中 + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 扩容未完成,则当前线程加入迁移元素中,并把扩容线程数加1 + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } + } + + public V get(Object key) { //获取元素 + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); //// 计算key的hash值 + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空 + if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等 + if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等 + return e.val; // 返回值 + } + else if (eh < 0) // 结点hash值小于0 + return (p = e.find(h, key)) != null ? p.val : null; // 在桶(链表/红黑树)中查找 + while ((e = e.next) != null) { // 遍历整个链表寻找元素 + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } + + final V replaceNode(Object key, V value, Object cv) { //删除元素 + int hash = spread(key.hashCode()); // 计算key的hash值 + for (Node[] tab = table;;) { // 无限循环 + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0 || + (f = tabAt(tab, i = (n - 1) & hash)) == null) // table表为空或者表长度为0或者key所对应的桶为空 + break; //跳出循环 + else if ((fh = f.hash) == MOVED) // 桶中第一个结点的hash值为MOVED + tab = helpTransfer(tab, f); // 如果正在扩容中,协助扩容 + else { + V oldVal = null; + boolean validated = false; + synchronized (f) { // 加锁同步 + if (tabAt(tab, i) == f) { // 桶中的第一个结点没有发生变化 + if (fh >= 0) { // 结点hash值大于0 + validated = true; + for (Node e = f, pred = null;;) { // 无限循环 + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { // 结点的hash值与指定的hash值相等,并且key也相等 + V ev = e.val; + if (cv == null || cv == ev || + (ev != null && cv.equals(ev))) { // cv为空或者与结点value相等或者不为空并且相等 + oldVal = ev; // 保存该结点的val值 + if (value != null) // value为null + e.val = value; // 设置结点value值 + else if (pred != null) // 前驱不为空 + pred.next = e.next; // 前驱的后继为e的后继,即删除了e结点 + else + setTabAt(tab, i, e.next); // 设置table表中下标为index的值为e.next + } + break; + } + pred = e; + if ((e = e.next) == null) + break; + } + } + else if (f instanceof TreeBin) { // 为红黑树结点类型 + validated = true; + TreeBin t = (TreeBin)f; // 类型转化 + TreeNode r, p; + if ((r = t.root) != null && + (p = r.findTreeNode(hash, key, null)) != null) { // 根节点不为空并且存在与指定hash和key相等的结点 + V pv = p.val; // 保存p结点的value + if (cv == null || cv == pv || + (pv != null && cv.equals(pv))) { // cv为空或者与结点value相等或者不为空并且相等 + oldVal = pv; + if (value != null) + p.val = value; + else if (t.removeTreeNode(p)) // 移除p结点 + setTabAt(tab, i, untreeify(t.first)); + } + } + } + } + } + if (validated) { + if (oldVal != null) { //如果找到了元素,返回其旧值 + if (value == null) // 如果要替换的值为空,元素个数减1 + addCount(-1L, -1); + return oldVal; + } + break; + } + } + } + return null; // 没找到元素返回空 + } +``` + +### 1.6 ConcurrentHashMap总结: + 1.6.1:ConcurrentHashMap底层数据结构为数组+链表+红黑树,默认容量为16,不允许[key,value]为null + 1.6.2:ConcurrentHashMap是线程安全的,HashMap线程不安全 + 1.6.3:ConcurrentHashMap内部采用的锁有synchronized、CAS、自旋锁、分段锁、volatile + 1.6.4:通过sizeCtl变量来控制扩容、初始化等操作 + 1.6.5:ConcurrentHashMap线程安全是因为更新操作时会锁当前桶的第一个元素,由此实现分段锁 + 1.6.6:查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的 + + + + + + + + + + + + + diff --git a/week_04/32/ConcurrentLinkedQueue.md b/week_04/32/ConcurrentLinkedQueue.md new file mode 100644 index 0000000..43bf3b9 --- /dev/null +++ b/week_04/32/ConcurrentLinkedQueue.md @@ -0,0 +1,217 @@ +## ConcurrentLinkedQueue源码阅读 + +### 1.1 ConcurrentLinkedQueue内部类: +```java + + private static class Node { //节点类 + volatile E item; // 存储的数据 + volatile Node next; // 下一个节点引用 + + //构造一个node节点 + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + + boolean casItem(E cmp, E val) { //原子修改节点的item + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + + void lazySetNext(Node val) { //原子懒修改节点的next + UNSAFE.putOrderedObject(this, nextOffset, val); + } + + boolean casNext(Node cmp, Node val) { // cas修改节点的next节点 + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + + // Unsafe mechanics + + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + } catch (Exception e) { + throw new Error(e); + } + } + } + +``` + +### 1.2 ConcurrentLinkedQueue属性: +```java + + private transient volatile Node head; //链表头节点 + + private transient volatile Node tail; //链表尾节点 + +``` + +### 1.3 ConcurrentLinkedQueue构造器: +```java + + public ConcurrentLinkedQueue() { //构造一个node节点的item为null的节点,然后将链表的头节点和尾节点都指向它 + head = tail = new Node(null); + } + + public ConcurrentLinkedQueue(Collection c) { //将集合c中的元素转换为node节点添加到链表中 + Node h = null, t = null; + for (E e : c) { //遍历集合c + checkNotNull(e); //非空验证 + Node newNode = new Node(e); //将遍历的元素转换为node节点 + if (h == null) //如果为空,第一次向链表中添加元素,赋值给头节点 + h = t = newNode; + else { //因为第一次添加元素的时候头节点也是尾节点, + t.lazySetNext(newNode); + t = newNode; + } + } + if (h == null) //如果h为空,表示集合c为null,就创建一个null链表 + h = t = new Node(null); + head = h; + tail = t; + } + +``` + +### 1.4 ConcurrentLinkedQueue主要方法: +```java + + public boolean add(E e) { //添加元素到链表中 + return offer(e); + } + + final void updateHead(Node h, Node p) { //更换链表头节点 + if (h != p && casHead(h, p)) //cas更换头节点 + h.lazySetNext(h); // 将旧的头结点h的next域指向为h + } + + final Node succ(Node p) { //获取节点p的下一个节点 + Node next = p.next; + return (p == next) ? head : next; //如果当前节点和下一个节点相同,则返回真正头节点 + } + + public boolean offer(E e) { //添加元素到链表尾 + checkNotNull(e); //非空验证 + final Node newNode = new Node(e); //创建一个node的item为e的节点 + + for (Node t = tail, p = t;;) { + Node q = p.next; //获取尾节点的下一个节点 + if (q == null) { //如果尾节点的下一个节点为null,说明p为尾节点了 + // p is last node + if (p.casNext(null, newNode)) { //cas操作将p的next节点设置为newNode + //如果p不等于t,说明有其它线程先一步更新tail + //也就不会走到q==null这个分支了 + //p取到的可能是t后面的值,把tail原子更新为新节点 + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + return true; // 返回入队成功 + } + // Lost CAS race to another thread; re-read next + } + else if (p == q) // 如果p的next等于p,说明p已经被删除了(已经出队了) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; //重新设置p的值 + else + // t后面还有值,重新设置p的值 + p = (p != t && t != (t = tail)) ? t : q; + } + } + + public E poll() { //删除并返回第一个元素,如果队列为空,则返回 null + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { // 获取头结点 + E item = p.item; / 获取节点的内容 + + if (item != null && p.casItem(item, null)) { // item不为null ,使用cas设置item为空 + // 更新头结点,和尾节点一样,不是每次都更新 + // 头结点item为null是,下个节点就必须更新头结点 + // 头结点item不为null时,规则和更新尾节点一样 + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + else if ((q = p.next) == null) { // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了 + updateHead(h, p); + return null; + } + // p == q,说明别的线程调用了updateHead, + // 自己的next 指向了自己,重新循环,获取最新的头结点 + else if (p == q) + continue restartFromHead; + else // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 + p = q; + } + } + } + + public E peek() { //获取第一个元素,如果队列为空,则返回null + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + if (item != null || (q = p.next) == null) { + updateHead(h, p); // 更新头结点 + return item; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } + + Node first() { //获取第一个队列元素,没有则为null + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + boolean hasItem = (p.item != null); + if (hasItem || (q = p.next) == null) { + updateHead(h, p); + return hasItem ? p : null; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } + + public boolean remove(Object o) { //删除队列中从头开始第一个o元素 + if (o == null) return false; //如果删除元素为null直接返回false + Node pred = null; + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + if (item != null && + o.equals(item) && + p.casItem(item, null)) { //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。 + Node next = succ(p);//获取next元素 + if (pred != null && next != null) /如果有前驱节点,并且next不为空则链接前驱节点到next, + pred.casNext(p, next); + return true; + } + pred = p; + } + return false; + } +``` + + +### 1.5 ConcurrentLinkedQueue总结: + 1.5.1:head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态 + 1.5.2:使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础 + 1.5.3:为了有利于垃圾收集,队列使用特有的 head 更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略 \ No newline at end of file diff --git a/week_04/32/CopyOnWriteArrayList.md b/week_04/32/CopyOnWriteArrayList.md new file mode 100644 index 0000000..1543147 --- /dev/null +++ b/week_04/32/CopyOnWriteArrayList.md @@ -0,0 +1,199 @@ +## CopyOnWriteArrayList源码学习 + +### 1.1 CopyOnWriteArrayList: + 1.1.1:CopyOnWriteArrayList实现了List接口,表示该容器具有列表的功能。 + 1.1.2:CopyOnWriteArrayList实现了RandomAccess接口,表示该容器提供随机访问的特性 + 1.1.3:CopyOnWriteArrayList实现了Cloneable接口,表示可以被克隆。 + 1.1.4:CopyOnWriteArrayList实现了java.io.Serializable,表示可以序列化。 + +### 1.2 CopyOnWriteArrayList属性: + +```java + + final transient ReentrantLock lock = new ReentrantLock(); //重入锁 + + private transient volatile Object[] array; //对象数组,用于存放数据,用volatile修饰表示一个线程修改,其他线程马上就知道修改后的内容 +``` + +### 1.3:CopyOnWriteArrayList构造器: + +```java + + public CopyOnWriteArrayList() { //创建一个空的列表 + setArray(new Object[0]); + } + + public CopyOnWriteArrayList(Collection c) { //创建一个包含集合c的列表 + Object[] elements; + if (c.getClass() == CopyOnWriteArrayList.class) //判断c的类型是否是CopyOnWriteArrayList,如果是则获取集合的数组 + elements = ((CopyOnWriteArrayList)c).getArray(); + else { //c的类型不是CopyOnWriteArrayList + elements = c.toArray(); //将c转换为数组 + if (elements.getClass() != Object[].class) //判断数组elements的类型是否为Object[]类型,如果不是则转换为Object[] + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); //设置数组 + } + + + public CopyOnWriteArrayList(E[] toCopyIn) { //将toCopyIn转换为Object[]类型,然后设置数组 + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); + } +``` + +### 1.4:CopyOnWriteArrayList重要方法: + +```java + + final Object[] getArray() { //获取数组,没有上锁 + return array; + } + + final void setArray(Object[] a) { //设置数组 + array = a; + } + + public int size() { //获取数组长度 + return getArray().length; + } + + public boolean isEmpty() { //判断数组是否为空 + return size() == 0; + } + + public E set(int index, E element) { //替换index位置的元素为element + final ReentrantLock lock = this.lock; //获取锁 + lock.lock(); //上锁 + try { + Object[] elements = getArray(); //获取数组 + E oldValue = get(elements, index); //获取旧数组index位置的元素oldValue + + if (oldValue != element) { + //如果旧值和新值不一样 + int len = elements.length; //获取数组长度 + Object[] newElements = Arrays.copyOf(elements, len); //赋值原来的数组对象 + newElements[index] = element; //将index位置上的值设置为element + setArray(newElements); //将新数组对象写会CopyOnWriteArrayList中 + } else { + //旧值和新值一样不需要修改数组 + setArray(elements); //将旧数组对象写会CopyOnWriteArrayList中 + } + return oldValue; //返回旧值 + } finally { + lock.unlock(); //释放锁 + } + } + + public boolean add(E e) { //在数组末尾添加添加元素e + final ReentrantLock lock = this.lock; //获取锁 + lock.lock(); //上锁 + try { + Object[] elements = getArray(); //获取数组 + int len = elements.length; //获取数组长度 + Object[] newElements = Arrays.copyOf(elements, len + 1); //创建一个原数组长度+1的新数组,并将原数组的元素复制到新数组 + newElements[len] = e; //在新数组的末尾添加元素e + setArray(newElements); //将新数组对象写会CopyOnWriteArrayList中 + return true; + } finally { + lock.unlock(); //释放锁 + } + } + + public void add(int index, E element) { //在index位置添加元素element + final ReentrantLock lock = this.lock; //获取锁 + lock.lock(); //上锁 + try { + Object[] elements = getArray(); //获取数组 + int len = elements.length; //获取数组长度 + if (index > len || index < 0) //验证添加的位置index是否合法 + throw new IndexOutOfBoundsException("Index: "+index+ + ", Size: "+len); + Object[] newElements; + int numMoved = len - index; + if (numMoved == 0) //表示在末尾添加元素 + newElements = Arrays.copyOf(elements, len + 1); + else { //表示在数组中间插入元素element + newElements = new Object[len + 1]; //创建一个原数组长度+1长度的新数组 + System.arraycopy(elements, 0, newElements, 0, index);//从原数组的0到index-1位置上的元素复制到新数组中 + System.arraycopy(elements, index, newElements, index + 1, + numMoved);//将原数组的index位置到末尾上的元素复制到新数组中 + } + newElements[index] = element; //在index位置上设置元素为element + setArray(newElements);//将新数组对象写会CopyOnWriteArrayList中 + } finally { + lock.unlock(); //释放锁 + } + } + + public E remove(int index) { //删除index位置上的元素 + final ReentrantLock lock = this.lock; //获取锁 + lock.lock(); //上锁 + try { + Object[] elements = getArray(); //获取数组 + int len = elements.length; //获取数组长度 + E oldValue = get(elements, index); //获取index位置的元素 + int numMoved = len - index - 1; //计算要移动的位置 + if (numMoved == 0) //表示删除的是数组的最后一个元素 + setArray(Arrays.copyOf(elements, len - 1)); //直接将0~len-1上的元素拷贝到新数组中,并将新数组对象写会CopyOnWriteArrayList中 + else { //表示删除的是数组的头或者中间一个元素 + Object[] newElements = new Object[len - 1]; //创建一个长度为len-1的数组对象 + System.arraycopy(elements, 0, newElements, 0, index); //将原数组中0~index-1的元素复制到新数组中 + System.arraycopy(elements, index + 1, newElements, index, + numMoved); //将原数组中index+1到末尾的元素复制到新数组中 + setArray(newElements); //将新数组对象写会CopyOnWriteArrayList中 + } + return oldValue; //返回删除元素 + } finally { + lock.unlock(); //释放锁 + } + } + + public boolean remove(Object o) { //删除元素o + Object[] snapshot = getArray(); //获取数组 + int index = indexOf(o, snapshot, 0, snapshot.length); //获取元素o在数组的位置 + return (index < 0) ? false : remove(o, snapshot, index); //如果index小于0表示元素o没有在数组中,否则删除元素 + } + + private boolean remove(Object o, Object[] snapshot, int index) { //删除元素 + final ReentrantLock lock = this.lock; //获取锁 + lock.lock(); //上锁 + try { + Object[] current = getArray(); //获取原数组 + int len = current.length; //获取原数组长度 + if (snapshot != current) findIndex: { //如果当前的数组对象和传入的snapshot数组对象不一样,说明被其他线程修改了 + int prefix = Math.min(index, len); //获取需要遍历的最小值 + for (int i = 0; i < prefix; i++) { + if (current[i] != snapshot[i] && eq(o, current[i])) { //判断当前数组和传入数组i位置的 元素是否一样 + index = i; //获取需要删除元素的位置i + break findIndex; + } + } + if (index >= len) //判断删除位置index是否越界 + return false; + if (current[index] == o) //判断index位置元素是否为o,如果不是调到方法findIndex重新找 + break findIndex; + index = indexOf(o, current, index, len); //前面查找失败,就继续从index往后查找 + if (index < 0) + return false; + } + Object[] newElements = new Object[len - 1]; //创建一个长度为len-1的新数组对象 + //将原数组中除了index位置上的元素,全部搬迁到新数组中 + System.arraycopy(current, 0, newElements, 0, index); + System.arraycopy(current, index + 1, + newElements, index, + len - index - 1); + setArray(newElements);//将新数组对象写会CopyOnWriteArrayList中 + return true; + } finally { + lock.unlock(); //释放锁 + } + } + +``` + +### 1.5:CopyOnWriteArrayList总结: + + 1.5.1: CopyOnWriteArrayList主要使用于读多写少的应用场景 + 1.5.2: CopyOnWriteArrayList每次添加,修改,删除都会上锁,并且会创建一个新的数组 + 1.5.3: CopyOnWriteArrayList是通过ReentrantLock来上锁 + 1.5.4: CopyOnWriteArrayList能保证最终一致性,但是不能保证实时一致性 \ No newline at end of file diff --git a/week_04/32/DelayQueue.md b/week_04/32/DelayQueue.md new file mode 100644 index 0000000..2164d60 --- /dev/null +++ b/week_04/32/DelayQueue.md @@ -0,0 +1,161 @@ +## DelayQueue源码阅读 + +### 1.1 DelayQueue属性: +```java + + private final transient ReentrantLock lock = new ReentrantLock(); //可重入锁 + + private final PriorityQueue q = new PriorityQueue(); //存储元素的优先级队列 + + private Thread leader = null; //获取数据 等待线程标识 + + private final Condition available = lock.newCondition();//条件控制,表示是否可以从队列中取数据 + +``` + +### 1.2 DelayQueue构造器: +```java + + public DelayQueue() {} //默认构造器 + + public DelayQueue(Collection c) {//通过集合初始化 + this.addAll(c); + } + +``` + +### 1.3 DelayQueue构造器: +```java + + public boolean offer(E e) { //将元素e插入到队列中 + final ReentrantLock lock = this.lock; + lock.lock();//上锁 + try { + q.offer(e); ////通过PriorityQueue 来将元素入队 + if (q.peek() == e) { //peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了 + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); //释放锁 + } + } + +``` + +### 1.4 DelayQueue方法: + public boolean offer(E e, long timeout, TimeUnit unit) { //将元素e插入到队列中,如果队列满了就等待timeout时间 + return offer(e); //调用将元素插入队列 + } + + public E poll() { //出队 + final ReentrantLock lock = this.lock; + lock.lock(); //上锁 + try { + E first = q.peek(); //获取队列头 + if (first == null || first.getDelay(NANOSECONDS) > 0) //如果队头为null 或者 延时还没有到,则返回null + return null; + else + return q.poll(); //元素出队 + } finally { + lock.unlock(); //释放锁 + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { //获取并移除此队列的头部,在指定的等待时间前等待 + long nanos = unit.toNanos(timeout); //超时等待时间 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly();//可中断的获取锁 + try { + for (;;) { + E first = q.peek(); //获取队头元素 + if (first == null) { //队头为空,也就是队列为空 + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); // 如果还没有超时,那么再available条件上进行等待nanos时间 + } else { + long delay = first.getDelay(NANOSECONDS);//获取元素延迟时间 + if (delay <= 0) /延时到期 + return q.poll();//返回出队元素 + if (nanos <= 0)//延时未到期,超时到期,返回null + return null; + first = null; // don't retain ref while waiting + if (nanos < delay || leader != null) // 超时等待时间 < 延迟时间 或者有其它线程再取数据 + nanos = available.awaitNanos(nanos); //在available 条件上进行等待nanos 时间 + else { + //超时等待时间 > 延迟时间 并且没有其它线程在等待,那么当前元素成为leader,表示leader 线程最早 正在等待获取元素 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + //等待 延迟时间 超时 + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; //还需要继续等待 nanos + } finally { + if (leader == thisThread) + leader = null;//清除 leader + } + } + } + } + } finally { + if (leader == null && q.peek() != null) //唤醒阻塞在available 的一个线程,表示可以取数据了 + available.signal(); + lock.unlock(); //释放锁 + } + } + + public E take() throws InterruptedException { //获取并移除此队列的头部,在元素变得可用之前一直等待 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) //延迟到期 + return q.poll(); + first = null; // don't retain ref while waiting + if (leader != null) //如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待 + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay);//等待延迟时间到期 + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } + } + + + public E peek() { //返回队头元素,但是元素并不出队 + final ReentrantLock lock = this.lock; + lock.lock(); //上锁 + try { + return q.peek();//返回队列头部元素,元素不出队 + } finally { + lock.unlock(); //释放锁 + } + } +``` + +### 1.5 DelayQueue总结: + 1.5.1:DelayQueue存储元素必须实现Delayed接口,通过实现Delayed接口 + 1.5.2:DelayQueue内部通过组合PriorityQueue来实现存储和维护元素顺序的 + 1.5.3:DelayQueue通过一个可重入锁来控制元素的入队出队行为 + 1.5.4:DelayQueue中leader标识用于减少线程的竞争,表示当前有其它线程正在获取队头元素 + 1.5.5:PriorityQueue只是负责存储数据以及维护元素的顺序,对于延迟时间取数据则是在DelayQueue中进行判断控制的 + \ No newline at end of file -- Gitee