From 43f2bbd156ce0d0a8fa13205247e390961a40da7 Mon Sep 17 00:00:00 2001 From: wxm <602260235@qq.com> Date: Sun, 29 Mar 2020 15:47:07 +0800 Subject: [PATCH] 4 --- second/week_04/.DS_Store | Bin 0 -> 6148 bytes second/week_04/73/ArrayBlockingQueue.md | 210 +++++++ second/week_04/73/ConcurrentHashMap.md | 668 +++++++++++++++++++++ second/week_04/73/ConcurrentLinkedQueue.md | 208 +++++++ second/week_04/73/CopyOnWriteArrayList.md | 72 +++ 5 files changed, 1158 insertions(+) create mode 100644 second/week_04/.DS_Store create mode 100644 second/week_04/73/ArrayBlockingQueue.md create mode 100644 second/week_04/73/ConcurrentHashMap.md create mode 100644 second/week_04/73/ConcurrentLinkedQueue.md create mode 100644 second/week_04/73/CopyOnWriteArrayList.md diff --git a/second/week_04/.DS_Store b/second/week_04/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..6f156d6a78388b85c06803ee8baa771d569ebfca GIT binary patch literal 6148 zcmeH~u?oUK42F~HAh>jNyu}9)2iK$M6Z8cf1Q$^dbiPOTPcDn6vlRIQ$#=;$wEc%( zBO*FHP8*S4L>6$PtSvOA$UE7|_Br0Km)rG>+-hcBh3BaC((K1JK?SG)6`%rCfC@}V zfjGw(%LzRbAB74~foUjU--iS@=3uGxPX~gJ0N@;DH>`b@0FxEK94wWnz%-@@jaKzB z#OmG-rnoK#OJ!{rjp0M%&T3N(Ol!MnLITt5V4wn2piy8}^WM(?1N__k-?T8L0#xA7 z6wvv8+z)uEI9oqn&+5mh+Pc6&zZ~K1Cjf~Z#VfcQ_LD8Z94wWn!1yEJGB8kqpDOSI D+0+lx literal 0 HcmV?d00001 diff --git a/second/week_04/73/ArrayBlockingQueue.md b/second/week_04/73/ArrayBlockingQueue.md new file mode 100644 index 0000000..3dc7af5 --- /dev/null +++ b/second/week_04/73/ArrayBlockingQueue.md @@ -0,0 +1,210 @@ +## ArrayBlockingQueue源码阅读 + +### 1.1 ArrayBlockingQueue属性: + +```java + + final Object[] items; //储存元素的数组 + + int takeIndex; //下一次读取或移除的位置 + + int putIndex; //下一次存放元素的位置 + + int count; //队列中元素的总数 + + final ReentrantLock lock; //所有访问的保护锁 + + private final Condition notEmpty; //获取元素的条件 + + private final Condition notFull; //存放元素的条件 + +``` + +### 1.2 ArrayBlockingQueue构造器: + +```java + + public ArrayBlockingQueue(int capacity) { //初始化一个长度为capacity的队列 + this(capacity, false); + } + + public ArrayBlockingQueue(int capacity, boolean fair) { //初始化一个长度为capacity的fair(公平,非公平锁)的队列 + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); //重入锁,出队和入队持有这一把锁 + notEmpty = lock.newCondition(); //初始化非空等待队列 + notFull = lock.newCondition(); //初始化非满等待队列 + } + + public ArrayBlockingQueue(int capacity, boolean fair, + Collection c) { //初始化一个长度为capacity的fair(公平,非公平锁)的队列并且添加c数据到集合中 + this(capacity, fair); + + final ReentrantLock lock = this.lock; + lock.lock(); // Lock only for visibility, not mutual exclusion + try { + int i = 0; + try { + for (E e : c) { + checkNotNull(e); + items[i++] = e; + } + } catch (ArrayIndexOutOfBoundsException ex) { + throw new IllegalArgumentException(); + } + count = i; + putIndex = (i == capacity) ? 0 : i; + } finally { + lock.unlock(); + } + } +``` + +### 1.3 ArrayBlockingQueue方法: +```java + + //添加时如果队列满了抛异常 + public boolean add(E e) { //调用父类AbstractQueue的add(e)方法,但是最后是调用自己实现父类的offer方法 + return super.add(e); + } + + public boolean offer(E e) { + checkNotNull(e); //非空验证 + final ReentrantLock lock = this.lock; + lock.lock(); //上锁 + try { + if (count == items.length) // 如果数组满了就返回false + return false; + else { // 如果数组没满就调用入队方法并返回true + enqueue(e); + return true; + } + } finally { + lock.unlock(); //释放锁 + } + } + + public void put(E e) throws InterruptedException { //添加元素,如果队列满了会等待 + checkNotNull(e); //非空验证 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //上锁,支持中断 + try { + while (count == items.length) //队列满了 + notFull.await(); //等待 + enqueue(e); //加入队列 + } finally { + lock.unlock(); //释放锁 + } + } + + public boolean offer(E e, long timeout, TimeUnit unit) + throws InterruptedException { //如果队列满了会等待一段时间timeout,然后队列还满才返回false + + checkNotNull(e); + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //上锁 + try { + while (count == items.length) { //如果队列满了 + if (nanos <= 0) //等待时间小于或等于0直接返回false + return false; + nanos = notFull.awaitNanos(nanos); //等待nanos时间 + } + enqueue(e); + return true; + } finally { + lock.unlock(); + } + } + + private void enqueue(E x) { //将元素添加到队列中 + final Object[] items = this.items; + items[putIndex] = x; //通过 putIndex 对数据赋值 + if (++putIndex == items.length) // 当putIndex 等于数组长度时,将 putIndex 重置为 0 + putIndex = 0; + count++; //记录队列元素的个数 + notEmpty.signal(); //唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素 + } + + public boolean remove(Object o) { //删除元素o + if (o == null) return false; //判断o是否为空 + final Object[] items = this.items; //获取数组元素 + final ReentrantLock lock = this.lock; + lock.lock(); //上锁 + try { + if (count > 0) { //如果队列不为空 + final int putIndex = this.putIndex; //获取下一个要添加元素时的索引 + int i = takeIndex; //获取当前要被移除的元素的索引 + do { + if (o.equals(items[i])) { //从takeIndex 下标开始,找到要被删除的元素 + removeAt(i); //移除指定元素 + return true; //返回执行结果 + } + if (++i == items.length) //当前删除索引执行加 1 后判断是否与数组长度相等,若为 true,说明索引已到数组尽头,将 i 设置为 0 + i = 0; + } while (i != putIndex);//继续查找,直到找到最后一个元素 + } + return false; + } finally { + lock.unlock(); //释放锁 + } + } + + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock();//上锁 + try { + return (count == 0) ? null : dequeue();// 如果队列没有元素则返回null,否则出队 + } finally { + lock.unlock(); //释放锁 + } + } + + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly();// 上锁 + try { + while (count == 0) // 如果队列无元素,则阻塞等待在条件notEmpty上 + notEmpty.await(); + return dequeue(); //当队列有元素后,调用出队方法 + } finally { + lock.unlock(); //释放锁 + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { //带了等待时间和中断的出队 + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //上锁 + try { + while (count == 0) { // 如果队列无元素,则阻塞等待nanos纳秒 + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + return dequeue(); + } finally { + lock.unlock(); //释放锁 + } + } + + private E dequeue() { //出队 + final Object[] items = this.items; + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; //默认获取 0 位置的元素 + items[takeIndex] = null; /将该位置的元素设置为空 + if (++takeIndex == items.length) //这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据 + takeIndex = 0; + count--; //记录 元素个数递减 + if (itrs != null) + itrs.elementDequeued(); //同时更新迭代器中的元素数据 + notFull.signal(); /触发 因为队列满了以后导致的被阻塞的线程 + return x; + } +``` + +### 1.4 ArrayBlockingQueue总结: + 1.4.1:ArrayBlockingQueue是有界的,它不会被扩容。 + 1.4.2:所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的 + 1.4.3:底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列 diff --git a/second/week_04/73/ConcurrentHashMap.md b/second/week_04/73/ConcurrentHashMap.md new file mode 100644 index 0000000..5f77141 --- /dev/null +++ b/second/week_04/73/ConcurrentHashMap.md @@ -0,0 +1,668 @@ +## ConcurrentHashMap源码学习 + +### 1.1 ConcurrentHashMap: + 1.1.1: ConcurrentHashMap继承了抽象类AbstractMap, + 1.1.2: ConcurrentHashMap实现了ConcurrentMap接口 + 1.1.3: ConcurrentHashMap实现了Serializable表示可以被序列化 + +### 1.2 ConcurrentHashMap属性: + +```java + + private static final int MAXIMUM_CAPACITY = 1 << 30; //表的最大容量 + + private static final int DEFAULT_CAPACITY = 16; // 默认表的大小 + + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; //最大数组大小 + + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //默认并发数 + + private static final float LOAD_FACTOR = 0.75f; //装载因子 + + static final int TREEIFY_THRESHOLD = 8; //转化为红黑树的阈值 + + static final int UNTREEIFY_THRESHOLD = 6; // 由红黑树转化为链表的阈值 + + static final int MIN_TREEIFY_CAPACITY = 64; //转化为红黑树的表的最小容量 + + private static final int MIN_TRANSFER_STRIDE = 16; //每次进行转移的最小值 + + private static int RESIZE_STAMP_BITS = 16; //生成sizeCtl所使用的bit位数 + + private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; //进行扩容所允许的最大线程数 + + private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; //记录sizeCtl中的大小所需要进行的偏移位数 + + static final int MOVED = -1; //表示正在转移 + static final int TREEBIN = -2; //表示已经转换成树 + static final int RESERVED = -3; //临时的hash + static final int HASH_BITS = 0x7fffffff; //能用的hash位数 + + static final int NCPU = Runtime.getRuntime().availableProcessors(); //获取可用的CPU个数 +``` + +### 1.3 ConcurrentHashMap内部类: +```java + + static class Node implements Map.Entry { //保存key,value及key的hash值的数据结构. + final int hash; //key的hash值 + final K key; + volatile V val; //使用volatile修饰,保证并发的可见性 + volatile Node next; //表示链表中的下一个节点,使用volatile修饰,保证并发的可见性 + + Node(int hash, K key, V val, Node next) { + this.hash = hash; + this.key = key; + this.val = val; + this.next = next; + } + + public final K getKey() { return key; } + public final V getValue() { return val; } + public final int hashCode() { return key.hashCode() ^ val.hashCode(); } + public final String toString(){ return key + "=" + val; } + public final V setValue(V value) { + throw new UnsupportedOperationException(); + } + + public final boolean equals(Object o) { + Object k, v, u; Map.Entry e; + return ((o instanceof Map.Entry) && + (k = (e = (Map.Entry)o).getKey()) != null && + (v = e.getValue()) != null && + (k == key || k.equals(key)) && + (v == (u = val) || v.equals(u))); + } + + /** + *支持map的get方法,通过hashcode和Key来获取一个node结点 + */ + Node find(int h, Object k) { + Node e = this; + if (k != null) { + do { + K ek; + if (e.hash == h && + ((ek = e.key) == k || (ek != null && k.equals(ek)))) + return e; + } while ((e = e.next) != null); + } + return null; + } + } + + + static final class TreeNode extends Node { //红黑树节点 + TreeNode parent; // + TreeNode left; + TreeNode right; + TreeNode prev; // + boolean red; + + TreeNode(int hash, K key, V val, Node next, + TreeNode parent) { + super(hash, key, val, next); + this.parent = parent; + } + + Node find(int h, Object k) { + return findTreeNode(h, k, null); + } + + final TreeNode findTreeNode(int h, Object k, Class kc) { + if (k != null) { + TreeNode p = this; + do { + int ph, dir; K pk; TreeNode q; + TreeNode pl = p.left, pr = p.right; + if ((ph = p.hash) > h) + p = pl; + else if (ph < h) + p = pr; + else if ((pk = p.key) == k || (pk != null && k.equals(pk))) + return p; + else if (pl == null) + p = pr; + else if (pr == null) + p = pl; + else if ((kc != null || + (kc = comparableClassFor(k)) != null) && + (dir = compareComparables(kc, k, pk)) != 0) + p = (dir < 0) ? pl : pr; + else if ((q = pr.findTreeNode(h, k, kc)) != null) + return q; + else + p = pl; + } while (p != null); + } + return null; + } + } +``` + +### 1.4 ConcurrentHashMap构造器: + +```java + public ConcurrentHashMap() {//创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 + } + + public ConcurrentHashMap(int initialCapacity) { //创建一个带有指定初始容量initialCapacity、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 + if (initialCapacity < 0) // 初始容量小于0,抛出异常 + throw new IllegalArgumentException(); + int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? + MAXIMUM_CAPACITY : + tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));// 找到最接近该容量的2的幂次方数 + this.sizeCtl = cap; // 初始化 + } + + public ConcurrentHashMap(Map m) { //构造一个与给定映射具有相同映射关系的新映射 + this.sizeCtl = DEFAULT_CAPACITY; + putAll(m); // // 将集合m的元素全部放入 + } + + public ConcurrentHashMap(int initialCapacity, float loadFactor) { //创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (1) 的新的空映射 + this(initialCapacity, loadFactor, 1); + } + + public ConcurrentHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { //创建一个带有指定初始容量、加载因子和并发级别的新的空映射 + if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) //合法性判断 + throw new IllegalArgumentException(); + if (initialCapacity < concurrencyLevel) // Use at least as many bins + initialCapacity = concurrencyLevel; // as estimated threads + long size = (long)(1.0 + (long)initialCapacity / loadFactor); + int cap = (size >= (long)MAXIMUM_CAPACITY) ? + MAXIMUM_CAPACITY : tableSizeFor((int)size); + this.sizeCtl = cap; + } +``` + +### 1.5 ConcurrentHashMap重要方法: + +```java + + final V putVal(K key, V value, boolean onlyIfAbsent) { //添加元素 + if (key == null || value == null) throw new NullPointerException(); //key和value都不能为空 + int hash = spread(key.hashCode()); //计算key的hash值 + int binCount = 0; //用来计算在这个节点总共有多少个元素,用来控制扩容或者转移为树 + for (Node[] tab = table;;) { // 无限循环 + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0) + tab = initTable(); //第一次put的时候表没有初始化,则初始化表 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 表不为空并且表的长度大于0,并且该桶不为空 + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) //比较并且交换值,如tab的第i项为空则用新生成的node替换 + break; + } + else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED + tab = helpTransfer(tab, f); //进行结点的转移(在扩容的过程中) + else { + /* + * 如果在这个位置有元素的话,就采用synchronized的方式加锁, + * 如果是链表的话(hash大于0),就对这个链表的所有元素进行遍历, + * 如果找到了key和key的hash值都一样的节点,则把它的值替换到 + * 如果没找到的话,则添加在链表的最后面 + * 否则,是树的话,则调用putTreeVal方法添加到树中去 + * + * 在添加完之后,会对该节点上关联的的数目进行判断, + * 如果在8个以上的话,则会调用treeifyBin方法,来尝试转化为树,或者是扩容 + */ + V oldVal = null; + synchronized (f) { // 加锁同步 + if (tabAt(tab, i) == f) { // 找到table表下标为i的节点,如果没有变化 + if (fh >= 0) { // 该table表中该结点的hash值大于0 + binCount = 1; // binCount赋值为1 + for (Node e = f;; ++binCount) { // 无限循环 + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等 + oldVal = e.val; // 保存该结点的val值 + if (!onlyIfAbsent) // 进行判断 + e.val = value; // 将指定的value保存至结点,即进行了结点值的更新 + break; + } + Node pred = e; // 保存当前结点 + if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点 + pred.next = new Node(hash, key, + value, null); // 新生一个结点并且赋值给next域 + break; + } + } + } + else if (f instanceof TreeBin) { // 结点为红黑树结点类型 + Node p; + binCount = 2; // binCount赋值为2 + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { // 将hash、key、value放入红黑树 + oldVal = p.val; // 保存结点的val + if (!onlyIfAbsent) + p.val = value; / 赋值结点value值 + } + } + } + } + if (binCount != 0) { // binCount不为0 + if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值 + treeifyBin(tab, i); // 进行转化 + if (oldVal != null) // 旧值不为空 + return oldVal; // 返回旧值 + break; + } + } + } + addCount(1L, binCount); // 增加binCount的数量 + return null; + } + + /** + * 整体流程大概为: + + * ① 判断存储的key、value是否为空,若为空,则抛出异常,否则,进入步骤② + +   * ② 计算key的hash值,随后进入无限循环,该无限循环可以确保成功插入数据,若table表为空或者长度为0,则初始化table表,否则,进入步骤③ + +   * ③ 根据key的hash值取出table表中的结点元素,若取出的结点为空(该桶为空),则使用CAS将key、value、hash值生成的结点放入桶中。否则,进入步骤④ + +   * ④ 若该结点的的hash值为MOVED,则对该桶中的结点进行转移,否则,进入步骤⑤ + +   * ⑤ 对桶中的第一个结点(即table表中的结点)进行加锁,对该桶进行遍历,桶中的结点的hash值与key值与给定的hash值和key值相等,则根据标识选择是否进行更新操作(用给定的value值 + + * 替换该结点的value值),若遍历完桶仍没有找到hash值与key值和指定的hash值与key值相等的结点,则直接新生一个结点并赋值为之前最后一个结点的下一个结点。进入步骤⑥ + +   * ⑥ 若binCount值达到红黑树转化的阈值,则将桶中的结构转化为红黑树存储,最后,增加binCount的值。 + **/ + + + + private final Node[] initTable() { //初始化表(桶数组) + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { // 无限循环 + if ((sc = sizeCtl) < 0) // sizeCtl小于0,则进行线程让步等待(说明正在初始化或者扩容) + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 比较sizeCtl的值与sc是否相等,相等则用-1替换(cas操作,只有一个线程初始化表(桶数组)) + try { + if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量 + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; // 新生结点数组 + table = tab = nt; // 赋值给table + sc = n - (n >>> 2); // sc为n * 3/4 + } + } finally { + sizeCtl = sc; // 设置sizeCtl的值 + } + break; + } + } + return tab; + } + + final Node[] helpTransfer(Node[] tab, Node f) { //在扩容时将table表中的结点转移到nextTable中 + Node[] nextTab; int sc; + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { // table表不为空并且结点类型使ForwardingNode类型,并且结点的nextTable不为空 + int rs = resizeStamp(tab.length); + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { //说明正在扩容 + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 扩容线程数加1 + transfer(tab, nextTab); // 当前线程帮忙迁移元素 + break; + } + } + return nextTab; + } + return table; + } + + /** + * 把数组中的节点复制到新的数组的相同位置,或者移动到扩张部分的相同位置 + * 在这里首先会计算一个步长,表示一个线程处理的数组长度,用来控制对CPU的使用, + * 每个CPU最少处理16个长度的数组元素,也就是说,如果一个数组的长度只有16,那只有一个线程会对其进行扩容的复制移动操作 + * 扩容的时候会一直遍历,知道复制完所有节点,没处理一个节点的时候会在链表的头部设置一个fwd节点,这样其他线程就会跳过他, + * 复制后在新数组中的链表不是绝对的反序的 + */ + private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) + stride = MIN_TRANSFER_STRIDE; + + /* + * 如果复制的目标nextTab为null的话,则初始化一个table两倍长的nextTab + * 此时nextTable被设置值了(在初始情况下是为null的) + * 因为如果有一个线程开始了表的扩张的时候,其他线程也会进来帮忙扩张, + * 而只是第一个开始扩张的线程需要初始化下目标数组 + */ + if (nextTab == null) { + try { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n << 1]; + nextTab = nt; + } catch (Throwable ex) { // try to cope with OOME + sizeCtl = Integer.MAX_VALUE; + return; + } + nextTable = nextTab; + transferIndex = n; + } + int nextn = nextTab.length; + /* + * 创建一个fwd节点,这个是用来控制并发的,当一个节点为空或已经被转移之后,就设置为fwd节点 + * 这是一个空的标志节点 + */ + ForwardingNode fwd = new ForwardingNode(nextTab); + boolean advance = true; //是否继续向前查找的标志位 + boolean finishing = false; //在完成之前重新在扫描一遍数组,看看有没完成的没 + for (int i = 0, bound = 0;;) { + Node f; int fh; + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) + advance = false; + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + if (finishing) { //已经完成转移 + nextTable = null; + table = nextTab; + sizeCtl = (n << 1) - (n >>> 1); //设置sizeCtl为扩容后的0.75 + return; + } + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) + return; + finishing = advance = true; + i = n; // recheck before commit + } + } + else if ((f = tabAt(tab, i)) == null) //数组中把null的元素设置为ForwardingNode节点(hash值为MOVED[-1]) + advance = casTabAt(tab, i, null, fwd); + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + synchronized (f) { //加锁操作 + if (tabAt(tab, i) == f) { + Node ln, hn; + if (fh >= 0) { //该节点的hash值大于等于0,说明是一个Node节点 + /* + * 因为n的值为数组的长度,且是power(2,x)的,所以,在&操作的结果只可能是0或者n + * 根据这个规则 + * 0--> 放在新表的相同位置 + * n--> 放在新表的(n+原来位置) + */ + int runBit = fh & n; + Node lastRun = f; + /* + * lastRun 表示的是需要复制的最后一个节点 + * 每当新节点的hash&n -> b 发生变化的时候,就把runBit设置为这个结果b + * 这样for循环之后,runBit的值就是最后不变的hash&n的值 + * 而lastRun的值就是最后一次导致hash&n 发生变化的节点(假设为p节点) + * 为什么要这么做呢?因为p节点后面的节点的hash&n 值跟p节点是一样的, + * 所以在复制到新的table的时候,它肯定还是跟p节点在同一个位置 + * 在复制完p节点之后,p节点的next节点还是指向它原来的节点,就不需要进行复制了,自己就被带过去了 + * 这也就导致了一个问题就是复制后的链表的顺序并不一定是原来的倒序 + */ + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; //n的值为扩张前的数组的长度 + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + /* + * 构造两个链表,顺序大部分和原来是反的 + * 分别放到原来的位置和新增加的长度的相同位置(i/n+i) + */ + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + /* + * 假设runBit的值为0, + * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同为0的节点) + * 设置到旧的table的第一个hash计算后为0的节点下一个节点 + * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点 + */ + ln = new Node(ph, pk, pv, ln); + else + /* + * 假设runBit的值不为0, + * 则第一次进入这个设置的时候相当于把旧的序列的最后一次发生hash变化的节点(该节点后面可能还有hash计算后同不为0的节点) + * 设置到旧的table的第一个hash计算后不为0的节点下一个节点 + * 并且把自己返回,然后在下次进来的时候把它自己设置为后面节点的下一个节点 + */ + hn = new Node(ph, pk, pv, hn); + } + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + else if (f instanceof TreeBin) { //否则的话是一个树节点 + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + /* + * 在复制完树节点之后,判断该节点处构成的树还有几个节点, + * 如果≤6个的话,就转回为一个链表 + */ + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + } + } + } + } + } + + + private final void addCount(long x, int check) { //判断是否需要扩容 + CounterCell[] as; long b, s; + /** + *把数组的大小存储根据不同的线程存储到不同的段上(也是分段锁的思想) + *并且有一个baseCount,优先更新baseCount,如果失败了再更新不同线程对应的段 + *这样可以保证尽量小的减少冲突 + */ + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // 先尝试把数量加到baseCount上,如果失败再加到分段的CounterCell上 + CounterCell a; long v; int m; + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //如果as为空,或者长度为0,或者当前线程所在的段为null,或者在当前线程的段上加数量失败 + /** + *强制增加数量(无论如何数量是一定要加上的,并不是简单地自旋)不同线程对应不同的段都更新失败了 + *说明已经发生冲突了,那么就对counterCells进行扩容 + *以减少多个线程hash到同一个段的概率 + */ + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); // 计算元素个数 + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { //如果元素个数达到了扩容门槛,则进行扩容 + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) // sc<0说明正在扩容中 + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) // 扩容未完成,则当前线程加入迁移元素中,并把扩容线程数加1 + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } + } + + public V get(Object key) { //获取元素 + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); //// 计算key的hash值 + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空 + if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等 + if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等 + return e.val; // 返回值 + } + else if (eh < 0) // 结点hash值小于0 + return (p = e.find(h, key)) != null ? p.val : null; // 在桶(链表/红黑树)中查找 + while ((e = e.next) != null) { // 遍历整个链表寻找元素 + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } + + final V replaceNode(Object key, V value, Object cv) { //删除元素 + int hash = spread(key.hashCode()); // 计算key的hash值 + for (Node[] tab = table;;) { // 无限循环 + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0 || + (f = tabAt(tab, i = (n - 1) & hash)) == null) // table表为空或者表长度为0或者key所对应的桶为空 + break; //跳出循环 + else if ((fh = f.hash) == MOVED) // 桶中第一个结点的hash值为MOVED + tab = helpTransfer(tab, f); // 如果正在扩容中,协助扩容 + else { + V oldVal = null; + boolean validated = false; + synchronized (f) { // 加锁同步 + if (tabAt(tab, i) == f) { // 桶中的第一个结点没有发生变化 + if (fh >= 0) { // 结点hash值大于0 + validated = true; + for (Node e = f, pred = null;;) { // 无限循环 + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { // 结点的hash值与指定的hash值相等,并且key也相等 + V ev = e.val; + if (cv == null || cv == ev || + (ev != null && cv.equals(ev))) { // cv为空或者与结点value相等或者不为空并且相等 + oldVal = ev; // 保存该结点的val值 + if (value != null) // value为null + e.val = value; // 设置结点value值 + else if (pred != null) // 前驱不为空 + pred.next = e.next; // 前驱的后继为e的后继,即删除了e结点 + else + setTabAt(tab, i, e.next); // 设置table表中下标为index的值为e.next + } + break; + } + pred = e; + if ((e = e.next) == null) + break; + } + } + else if (f instanceof TreeBin) { // 为红黑树结点类型 + validated = true; + TreeBin t = (TreeBin)f; // 类型转化 + TreeNode r, p; + if ((r = t.root) != null && + (p = r.findTreeNode(hash, key, null)) != null) { // 根节点不为空并且存在与指定hash和key相等的结点 + V pv = p.val; // 保存p结点的value + if (cv == null || cv == pv || + (pv != null && cv.equals(pv))) { // cv为空或者与结点value相等或者不为空并且相等 + oldVal = pv; + if (value != null) + p.val = value; + else if (t.removeTreeNode(p)) // 移除p结点 + setTabAt(tab, i, untreeify(t.first)); + } + } + } + } + } + if (validated) { + if (oldVal != null) { //如果找到了元素,返回其旧值 + if (value == null) // 如果要替换的值为空,元素个数减1 + addCount(-1L, -1); + return oldVal; + } + break; + } + } + } + return null; // 没找到元素返回空 + } +``` + +### 1.6 ConcurrentHashMap总结: + 1.6.1:ConcurrentHashMap底层数据结构为数组+链表+红黑树,默认容量为16,不允许[key,value]为null + 1.6.2:ConcurrentHashMap是线程安全的,HashMap线程不安全 + 1.6.3:ConcurrentHashMap内部采用的锁有synchronized、CAS、自旋锁、分段锁、volatile + 1.6.4:通过sizeCtl变量来控制扩容、初始化等操作 + 1.6.5:ConcurrentHashMap线程安全是因为更新操作时会锁当前桶的第一个元素,由此实现分段锁 + 1.6.6:查询操作是不会加锁的,所以ConcurrentHashMap不是强一致性的 + + + + + + + + + + + + + diff --git a/second/week_04/73/ConcurrentLinkedQueue.md b/second/week_04/73/ConcurrentLinkedQueue.md new file mode 100644 index 0000000..8d8c50c --- /dev/null +++ b/second/week_04/73/ConcurrentLinkedQueue.md @@ -0,0 +1,208 @@ +## ConcurrentLinkedQueue源码阅读 + + +## 简介 + +​ 在并发编程中,有时候需要使用线程安全的队列。如果要实现一个线程安全的队列有两种方式:一种是使用阻塞算法,另一种是使用非阻塞算法。使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现。非阻塞的实现方式则可以使用循环CAS的方式来实现。 + +​ ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现,该算法在Michael&Scott算法上进行了一些修改。 + +​ ConcurrentLinkedQueue只实现了Queue接口,并没有实现BlockingQueue接口,所以它不是阻塞队列,也不能用于线程池中,但是它是线程安全的,可用于多线程环境中。 + +## 源码分析 + +### 属性 + +```java +// 链表头节点 +private transient volatile Node head; +// 链表尾节点 +private transient volatile Node tail; +``` + +### 主要内部类 + +```java +private static class Node { + volatile E item; + volatile Node next; +} +``` + +典型的单链表结构,非常纯粹。 + +### 主要构造方式 + +```java +public ConcurrentLinkedQueue() { + // 初始化头尾节点 + head = tail = new Node(null); +} + +public ConcurrentLinkedQueue(Collection c) { + Node h = null, t = null; + // 遍历c,并把它元素全部添加到单链表中 + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + t = newNode; + } + } + if (h == null) + h = t = new Node(null); + head = h; + tail = t; +} +``` + +这两个构造方法也很简单,可以看到这是一个无界的单链表实现的队列。 + +### 入队 + +因为它不是阻塞队列,所以只有两个入队的方法,add(e)和offer(e)。 + +因为是无界队列,所以add(e)方法也不用抛出异常了。 + +```java +public boolean add(E e) { + return offer(e); +} + +public boolean offer(E e) { + // 不能添加空元素 + checkNotNull(e); + // 新节点 + final Node newNode = new Node(e); + + // 入队到链表尾 + for (Node t = tail, p = t;;) { + Node q = p.next; + // 如果没有next,说明到链表尾部了,就入队 + if (q == null) { + // CAS更新p的next为新节点 + // 如果成功了,就返回true + // 如果不成功就重新取next重新尝试 + if (p.casNext(null, newNode)) { + // 如果p不等于t,说明有其它线程先一步更新tail + // 也就不会走到q==null这个分支了 + // p取到的可能是t后面的值 + // 把tail原子更新为新节点 + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + // 返回入队成功 + return true; + } + } + else if (p == q) + // 如果p的next等于p,说明p已经被删除了(已经出队了) + // 重新设置p的值 + p = (t != (t = tail)) ? t : head; + else + // t后面还有值,重新设置p的值 + p = (p != t && t != (t = tail)) ? t : q; + } +} +``` + +入队整个流程还是比较清晰的,这里有个前提是出队时会把出队的那个节点的next设置为节点本身。 + +(1)定位到链表尾部,尝试把新节点放到后面; + +(2)如果尾部变化了,则重新获取尾部,再重试; + +### 出队 + +因为它不是阻塞队列,所以只有两个出队的方法,remove()和poll()。 + +```java +public E remove() { + E x = poll(); + if (x != null) + return x; + else + throw new NoSuchElementException(); +} + +public E poll() { + restartFromHead: + for (;;) { + // 尝试弹出链表的头节点 + for (Node h = head, p = h, q;;) { + E item = p.item; + // 如果节点的值不为空,并且将其更新为null成功了 + if (item != null && p.casItem(item, null)) { + // 如果头节点变了,则不会走到这个分支 + // 会先走下面的分支拿到新的头节点 + // 这时候p就不等于h了,就更新头节点 + // 在updateHead()中会把head更新为新节点 + // 并让head的next指向其自己 + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + // 上面的casItem()成功,就可以返回出队的元素了 + return item; + } + // 下面三个分支说明头节点变了 + // 且p的item肯定为null + else if ((q = p.next) == null) { + // 如果p的next为空,说明队列中没有元素了 + // 更新h为p,也就是空元素的节点 + updateHead(h, p); + // 返回null + return null; + } + else if (p == q) + // 如果p等于p的next,说明p已经出队了,重试 + continue restartFromHead; + else + // 将p设置为p的next + p = q; + } + } +} +// 更新头节点的方法 +final void updateHead(Node h, Node p) { + // 原子更新h为p成功后,延迟更新h的next为它自己 + // 这里用延迟更新是安全的,因为head节点已经变了 + // 只要入队出队的时候检查head有没有变化就行了,跟它的next关系不大 + if (h != p && casHead(h, p)) + h.lazySetNext(h); +} +``` + +出队的整个逻辑也是比较清晰的: + +(1)定位到头节点,尝试更新其值为null; + +(2)如果成功了,就成功出队; + +(3)如果失败或者头节点变化了,就重新寻找头节点,并重试; + +(4)整个出队过程没有一点阻塞相关的代码,所以出队的时候不会阻塞线程,没找到元素就返回null; + +## 总结 + +(1)ConcurrentLinkedQueue不是阻塞队列; + +(2)ConcurrentLinkedQueue不能用在线程池中; + +(3)ConcurrentLinkedQueue使用(CAS+自旋)更新头尾节点控制出队入队操作; + +ConcurrentLinkedQueue与LinkedBlockingQueue对比? + +(1)两者都是线程安全的队列; + +(2)两者都可以实现取元素时队列为空直接返回null,后者的poll()方法可以实现此功能; + +(3)前者全程无锁,后者全部都是使用重入锁控制的; + +(4)前者效率较高,后者效率较低; + +(5)前者无法实现如果队列为空等待元素到来的操作; + +(6)前者是非阻塞队列,后者是阻塞队列; + +(7)前者无法用在线程池中,后者可以。 \ No newline at end of file diff --git a/second/week_04/73/CopyOnWriteArrayList.md b/second/week_04/73/CopyOnWriteArrayList.md new file mode 100644 index 0000000..a07cfa5 --- /dev/null +++ b/second/week_04/73/CopyOnWriteArrayList.md @@ -0,0 +1,72 @@ +# CopyOnWriteArrayList 源码分析 + +## TOP 带着问题看源码 + +1. CopyOnWriteArrayList 是怎么保证线程安全的 +2. 适用的场景 + +## 1. 基本介绍 + +顾名思义,这是一个每次写入都采用先复制再写入的方式来实现的线程安全的 List。这样的好处是可以读写并行,而且实现简单。 + +## 2. 成员变量分析 + +```java +// lock 锁 +final transient ReentrantLock lock = new ReentrantLock(); +// 存放数据的数组 +private transient volatile Object[] array; + +// getter +final Object[] getArray() { + return array; +} +// setter +final void setArray(Object[] a) { + array = a; +} +``` + +## 3. 核心方法分析 + +### 3.1 add(E e) + +核心逻辑就是使用数组copy把旧的数据复制到新的数组(比旧的数组长度大1),然后把存放数据的数组指向新的数组。加锁是解决线程不安全问题,可以明白问题 **TOP 1** 的线程安全问题是由 COW+Lock 来保证的。 + +```java +public boolean add(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len + 1); + newElements[len] = e; + setArray(newElements); + return true; + } finally { + lock.unlock(); + } +} +``` + +### 3.2 get(int index) + +获取指定 index 的数据比较简单,可以看到查询是无锁的 + +```java +public E get(int index) { + return get(getArray(), index); +} + +private E get(Object[] a, int index) { + return (E) a[index]; +} +``` + +## 总结 + CopyOnWriteArrayList主要使用于读多写少的应用场景 + CopyOnWriteArrayList每次添加,修改,删除都会上锁,并且会创建一个新的数组 + CopyOnWriteArrayList是通过ReentrantLock来上锁 + CopyOnWriteArrayList能保证最终一致性,但是不能保证实时一致性 + -- Gitee