diff --git "a/week_04/62/ConcurrentHashMap\346\272\220\347\240\201\345\210\206\346\236\220.md" "b/week_04/62/ConcurrentHashMap\346\272\220\347\240\201\345\210\206\346\236\220.md" new file mode 100644 index 0000000000000000000000000000000000000000..36e4f54e2416c775618da8346356be840a5d16f7 --- /dev/null +++ "b/week_04/62/ConcurrentHashMap\346\272\220\347\240\201\345\210\206\346\236\220.md" @@ -0,0 +1,235 @@ +# ConcurrentHashMap 源码分析 + +在1.8中使用 CAS + synchronized 来保证并发安全性 + +put() +```java +final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); + //1. 计算key的hash值 + int hash = spread(key.hashCode()); + int binCount = 0; + //循环直到put成功 + for (Node[] tab = table;;) { + Node f; int n, i, fh; + //2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化 + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + //3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + //4. 当前正在扩容 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + synchronized (f) { + if (tabAt(tab, i) == f) { + //5. 当前为链表,在链表中插入新的键值对 + if (fh >= 0) { + binCount = 1; + for (Node e = f;; ++binCount) { + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + // 6.当前为红黑树,将新的键值对插入到红黑树中 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + // 7.插入完键值对后再根据实际大小看是否需要转换成红黑树 + if (binCount != 0) { + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + //8.对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 + addCount(1L, binCount); + return null; +} +``` +扩容helpTransfer() +```java +private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + // 每核处理的量小于16,则强制赋值16 + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) + stride = MIN_TRANSFER_STRIDE; // subdivide range + if (nextTab == null) { // initiating + try { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n << 1]; //构建一个nextTable对象,其容量为原来容量的两倍 + nextTab = nt; + } catch (Throwable ex) { // try to cope with OOME + sizeCtl = Integer.MAX_VALUE; + return; + } + nextTable = nextTab; + transferIndex = n; + } + int nextn = nextTab.length; + // 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab) + ForwardingNode fwd = new ForwardingNode(nextTab); + // 当advance == true时,表明该节点已经处理过了 + boolean advance = true; + boolean finishing = false; // to ensure sweep before committing nextTab + for (int i = 0, bound = 0;;) { + Node f; int fh; + // 控制 --i ,遍历原hash表中的节点 + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) + advance = false; + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + // 用CAS计算得到的transferIndex + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + // 已经完成所有节点复制了 + if (finishing) { + nextTable = null; + table = nextTab; // table 指向nextTable + sizeCtl = (n << 1) - (n >>> 1); // sizeCtl阈值为原来的1.5倍 + return; // 跳出死循环, + } + // CAS 更扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) + return; + finishing = advance = true; + i = n; // recheck before commit + } + } + // 遍历的节点为null,则放入到ForwardingNode 指针节点 + else if ((f = tabAt(tab, i)) == null) + advance = casTabAt(tab, i, null, fwd); + // f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了 + // 这里是控制并发扩容的核心 + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + // 节点加锁 + synchronized (f) { + // 节点复制工作 + if (tabAt(tab, i) == f) { + Node ln, hn; + // fh >= 0 ,表示为链表节点 + if (fh >= 0) { + // 构造两个链表 一个是原链表 另一个是原链表的反序排列 + int runBit = fh & n; + Node lastRun = f; + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + ln = new Node(ph, pk, pv, ln); + else + hn = new Node(ph, pk, pv, hn); + } + // 在nextTable i 位置处插上链表 + setTabAt(nextTab, i, ln); + // 在nextTable i + n 位置处插上链表 + setTabAt(nextTab, i + n, hn); + // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了 + setTabAt(tab, i, fwd); + // advance = true 可以执行--i动作,遍历节点 + advance = true; + } + // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致 + else if (f instanceof TreeBin) { + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + // 扩容后树节点个数若<=6,将树转链表 + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + } + } + } + } + } +``` +总结:ConccurentHashMap线程安全,CAS+同步锁,数组+链表+红黑树 \ No newline at end of file diff --git "a/week_04/62/ConcurrentLinkedQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" "b/week_04/62/ConcurrentLinkedQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" new file mode 100644 index 0000000000000000000000000000000000000000..c20a91cff77ae3783e0eb3327d13d6645ee11eb6 --- /dev/null +++ "b/week_04/62/ConcurrentLinkedQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" @@ -0,0 +1,89 @@ +# ConcurrentLinkedQueue 源码分析 + +offer() +```java +public boolean offer(E e) { + // 如果e为null,则直接抛出NullPointerException异常 + checkNotNull(e); + // 创建入队节点 + final Node newNode = new Node(e); + + // 循环CAS直到入队成功 + // 1、根据tail节点定位出尾节点(last node);2、将新节点置为尾节点的下一个节点;3、casTail更新尾节点 + for (Node t = tail, p = t;;) { + // p用来表示队列的尾节点,初始情况下等于tail节点 + // q是p的next节点 + Node q = p.next; + // 判断p是不是尾节点,tail节点不一定是尾节点,判断是不是尾节点的依据是该节点的next是不是null + // 如果p是尾节点 + if (q == null) { + // p is last node + // 设置p节点的下一个节点为新节点,设置成功则casNext返回true;否则返回false,说明有其他线程更新过尾节点 + if (p.casNext(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this queue, + // and for newNode to become "live". + // 如果p != t,则将入队节点设置成tail节点,更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点 + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + return true; + } + // Lost CAS race to another thread; re-read next + } + // 多线程操作时候,由于poll时候会把旧的head变为自引用,然后将head的next设置为新的head + // 所以这里需要重新找新的head,因为新的head后面的节点才是激活的节点 + else if (p == q) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; + // 寻找尾节点 + else + // Check for tail updates after two hops. + p = (p != t && t != (t = tail)) ? t : q; + } +} +``` + +poll() +```java +public E poll() { + restartFromHead: + for (;;) { + // p节点表示首节点,即需要出队的节点 + for (Node h = head, p = h, q;;) { + E item = p.item; + + // 如果p节点的元素不为null,则通过CAS来设置p节点引用的元素为null,如果成功则返回p节点的元素 + if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + // 如果p != h,则更新head + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。 + // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了 + else if ((q = p.next) == null) { + // 更新头结点 + updateHead(h, p); + return null; + } + // p == q,则使用新的head重新开始 + else if (p == q) + continue restartFromHead; + // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 + else + p = q; + } + } +} +``` +总结: +使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。 +head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。 +由于队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式来维护非阻塞算法的正确性。 +以批处理方式来更新 head/tail,从整体上减少入队 / 出队操作的开销。 +为了有利于垃圾收集,队列使用特有的 head 更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略。 diff --git "a/week_04/62/CopyOnWriteArrayList\346\272\220\347\240\201\350\247\243\346\236\220.md" "b/week_04/62/CopyOnWriteArrayList\346\272\220\347\240\201\350\247\243\346\236\220.md" new file mode 100644 index 0000000000000000000000000000000000000000..5b8ddbfb05a8eda51f32031d8200670228469375 --- /dev/null +++ "b/week_04/62/CopyOnWriteArrayList\346\272\220\347\240\201\350\247\243\346\236\220.md" @@ -0,0 +1,64 @@ +# CopyOnWriteArrayList 源码分析 + + CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。 +  内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存, + 旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用, + 所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M, + 那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC, + 应用响应时间也随之变长。 +  针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。 +  数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。【当执行add或remove操作没完成时,get获取的仍然是旧数组的元素】 + 引用 https://blog.csdn.net/u010002184/article/details/90452918 +```java +public boolean add(E e) { + //创建可重入锁,内部是非公平锁 + final ReentrantLock lock = this.lock; + //进入锁,使用AQS完成加锁(主要完成对status赋值,需要保证原子性) + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + //对数组进行复制,扩充数组大小,再新数组中添加 + Object[] newElements = Arrays.copyOf(elements, len + 1); + newElements[len] = e; + //再赋值引用 + setArray(newElements); + return true; + } finally { + //调用 + lock.unlock(); + } + } +``` +```java +//删除元素 +public E remove(int index) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + E oldValue = get(elements, index); + int numMoved = len - index - 1; + //当为最一个 + if (numMoved == 0) + setArray(Arrays.copyOf(elements, len - 1)); + else { + Object[] newElements = new Object[len - 1]; + //分两段复制数组 + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index + 1, newElements, index, + numMoved); + setArray(newElements); + } + return oldValue; + } finally { + lock.unlock(); + } + } + +``` + +##总结 +CopyOnWriteArrayList 不能保证数据的一致性,再强一致性要求下不能达到要求,另外就是占用内存。在遍历的时候不能进行修改操作,会直接报错 +UnsupportedOperationException diff --git "a/week_04/62/DelayQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" "b/week_04/62/DelayQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" new file mode 100644 index 0000000000000000000000000000000000000000..141e7c83ab6bafc8276dcac2bbb49462e2bb95a0 --- /dev/null +++ "b/week_04/62/DelayQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" @@ -0,0 +1,101 @@ +# DelayQueue 源码分析 + +public class DelayQueue extends AbstractQueue implements BlockingQueue +继承了 AbstractQueue 实现 BlockingQueue是阻塞队列 +元素必须继承Delayed实现Comparable接口,在compareTo方法中判断当前元素是否到期 +内部由非公平锁实现加锁 +PriorityQueue 维持一个无界优先级队列 +```java +//添加元素 +public boolean add(E e) { + return offer(e); + } + + /** + * Inserts the specified element into this delay queue. + * + * @param e the element to add + * @return {@code true} + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + //加索 + lock.lock(); + try { + q.offer(e); + if (q.peek() == e) { + leader = null; + //如果队列中只有一个元素,即刚刚加入的则调用 signal方法,激活阻塞的线程继续遍历获取 + available.signal(); + } + return true; + } finally { + lock.unlock(); + } + } +``` + +```java +//检索并删除一个队列头,有过期数据就删除,没有返回null,需要代码中进行循环 +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + return q.poll(); + } finally { + lock.unlock(); + } + } +``` + +```java +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) + //没有元素,进行等到,由上面的signal唤醒线程 + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + //有到期元素则直接取出 + if (delay <= 0) + return q.poll(); + // 释放first的引用,避免内存泄漏 + first = null; // don't retain ref while waiting + if (leader != null) + //不是leader线程会在这里阻塞 + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + // 等待剩余时间后,再尝试获取元素,他在等待期间,由于leader是当前线程,所以其它线程会等待。 + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列 + if (leader == null && q.peek() != null) + available.signal(); + // 释放全局独占锁 + lock.unlock(); + } + } +``` +这里为什么如果不设置first = null,则会引起内存泄漏呢?线程A到达,列首元素没有到期,设置leader = 线程A, +这是线程B来了因为leader != null,则会阻塞,线程C一样。假如线程阻塞完毕了,获取列首元素成功,出列。 +这个时候列首元素应该会被回收掉,但是问题是它还被线程B、线程C持有着,所以不会回收,这里只有两个线程,如果有线程D、线程E…呢? +这样会无限期的不能回收,就会造成内存泄漏