diff --git a/week_04/08/ArrayBlockingQueue-008.md b/week_04/08/ArrayBlockingQueue-008.md new file mode 100644 index 0000000000000000000000000000000000000000..cebce100089cbf9fe0f623859a5a9406105f2909 --- /dev/null +++ b/week_04/08/ArrayBlockingQueue-008.md @@ -0,0 +1,187 @@ +# 读源码-ArrayBlockingQueue + +## 继承体系 + +- 实现接口BlockingQueue,提供了3个增加方法add,offer,put和3个删方法poll,remove,take +- 实现Serializable接口 +- 继承AbstractQueue类,队列的抽象类,提供了基本的增删功能 + +![ArrayBlockingQueue继承图](E:\Project\JavaStudy\week_04\08\attachment\ArrayBlockingQueue继承图.png) + +## 主要属性 + +```java + // 队列数组 + final Object[] items; + + // 用于take,poll,peek和remove的索引 + int takeIndex; + + // 用于put,offer,add的索引 + int putIndex; + + /** Number of elements in the queue */ + int count; + + // 可重入锁 + final ReentrantLock lock; +``` + +## 主要方法 + +### 添加元素 + +add:添加成功返回true,否则抛出状态非法异常IllegalStateException + +```java + public boolean add(E e) { + return super.add(e); // 调用父类的add + } + // 父类的add,父类又调用Queue的offer + public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); + } +``` + +offer:元素不能为null,满了则返回false,否则加入队列,返回true + +```java + public boolean offer(E e) { + // 不能为null + checkNotNull(e); + final ReentrantLock lock = this.lock; + // 加锁 + lock.lock(); + try { + // 满了则返回false + if (count == items.length) + return false; + // 没满则加入队列,并返回true + else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } + } + + private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + items[putIndex] = x; + if (++putIndex == items.length) + putIndex = 0; + count++; + notEmpty.signal(); + } +``` + +put:满了则阻塞,直至有线程消费了队列中的数据才被唤醒 + +```java + public void put(E e) throws InterruptedException { + checkNotNull(e); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == items.length) + notFull.await(); // 阻塞 + enqueue(e); + } finally { + lock.unlock(); + } + } +``` + +总结: + +- add和offer方法不会阻塞线程,put在队列满的情况下,会阻塞 +- 三个方法内部都会使用重入锁保证原子性,从而保证线程安全 + +### 删除元素 + +poll:队列长度为0,返回null;否则返回删除的元素 + +```java + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } + } + + private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + // 获取删除的元素,以便返回 + E x = (E) items[takeIndex]; + items[takeIndex] = null; + if (++takeIndex == items.length) + takeIndex = 0; + count--; + if (itrs != null) + itrs.elementDequeued(); + notFull.signal(); // 使用notFull对象进行通知,例如put方法时队列已满,此时消费了,队列没满,就需要调用signal进行通知 + return x; + } +``` + +take:若队列为空,阻塞,直至队列不为空 + +```java + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == 0) + notEmpty.await(); //队列为空,阻塞当前线程,并加入到条件对象notEmpty的等待队列中,同时释放锁 + return dequeue(); + } finally { + lock.unlock(); + } + } +``` + +remove:若队列为空,则返回false;否则正常删除返回true + +```java + public boolean remove(Object o) { + if (o == null) return false; + final Object[] items = this.items; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (count > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + if (o.equals(items[i])) { + removeAt(i); + return true; + } + if (++i == items.length) + i = 0; + } while (i != putIndex); + } + return false; + } finally { + lock.unlock(); + } + } +``` + +总结: + +- poll和remove方法不会阻塞线程,take在队列为空的情况下,会阻塞 +- 三个方法都会调用notFull.signal方法通知正在等待队列满情况的阻塞线程,比如put的方法 + diff --git a/week_04/08/ArrayBlockingQueue-008.xmind b/week_04/08/ArrayBlockingQueue-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..5a589f81a3ffe43a2cf0963073c3e9e1f14820e8 Binary files /dev/null and b/week_04/08/ArrayBlockingQueue-008.xmind differ diff --git a/week_04/08/ConCurrentLinkedQueue-008.md b/week_04/08/ConCurrentLinkedQueue-008.md new file mode 100644 index 0000000000000000000000000000000000000000..119669c5b83d12d0b6e9773ad39368949b64ee5b --- /dev/null +++ b/week_04/08/ConCurrentLinkedQueue-008.md @@ -0,0 +1,110 @@ +# 读源码-ConCurrentLinkedQueue + +## 继承体系 + +​ 与ArrayBlockingQueue相比,没有阻塞 + +- 实现Serializable接口 +- 继承AbstractQueue类,队列的抽象类,提供了基本的增删功能 + +![ConcurrentLinkedQueue继承图](E:\Project\JavaStudy\week_04\08\attachment\ConcurrentLinkedQueue继承图.png) + +## 属性 + +内部类node前面都有,就不说了 + +```java + // 头节点 + private transient volatile Node head; + // 尾节点 + private transient volatile Node tail; +``` + +## 构造器 + +```java + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } +``` + +## 方法 + +offer + +```java + public boolean offer(E e) { + checkNotNull(e); + final Node newNode = new Node(e); + // 初始化p=t=tail,p被认为时真正的尾节点,tail会延迟更新 + for (Node t = tail, p = t;;) { + // q为p的下一个节点 + Node q = p.next; + // q为空,说明p是尾节点 + if (q == null) { + // 更新p.next尾新节点 + if (p.casNext(null, newNode)) { + // 其他线程对tail进行了更新,p取到的是t后面的值,如下面else中p=q + if (p != t) // hop two nodes at a time + // 说明t或tail后面还有节点,更新tail + casTail(t, newNode); + return true; + } + // Lost CAS race to another thread; re-read next + } + // p为p的next,说明p已经被删除,重新设置p的值 + else if (p == q) + p = (t != (t = tail)) ? t : head; + else + // 多线程下,尾节点有更新,p!=t,重新获取尾部 + p = (p != t && t != (t = tail)) ? t : q; + } + } +``` + +总结 + +- 定位到链表尾部,尝试添加节点 +- 尾节点有变化,则重新获取尾节点,重试 + +poll + +```java + public E poll() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + // 如果数据项不为空,则cas更新其为null + if (item != null && p.casItem(item, null)) { + // 说明头节点有变化 + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + // 如果p.next为null,说明队列为空,更新h为p + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + // 出队 + else if (p == q) + continue restartFromHead; + else + p = q; + } + } + } + + // 更新头节点 + final void updateHead(Node h, Node p) { + if (h != p && casHead(h, p)) + h.lazySetNext(h); + } +``` + +总结 + +- 定位到链表头部,尝试更新其为null,成功则出队 +- 失败或头节点有变化,则重新尝试 + diff --git a/week_04/08/ConCurrentQueue-008.xmind b/week_04/08/ConCurrentQueue-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..1c2345aca3e2656463eba91cbae80b182d94e2ab Binary files /dev/null and b/week_04/08/ConCurrentQueue-008.xmind differ diff --git a/week_04/08/ConcurrentHashMap-008.md b/week_04/08/ConcurrentHashMap-008.md new file mode 100644 index 0000000000000000000000000000000000000000..b97b38d09e731a7d4910a4784ca2c8aa3dec0166 --- /dev/null +++ b/week_04/08/ConcurrentHashMap-008.md @@ -0,0 +1,454 @@ +# 读源码-ConcurrentHashMap + +## 锁 + +## synchronized + +优化后现有三种方式,分别是偏向锁,轻量级锁,重量级锁。 + +1)偏向锁,同一段代码一致被一个线程访问,让这个线程自动获取锁,降低代价 + +2)轻量级锁,当锁为偏向锁时,被另一个线程访问,锁升级为轻量级锁。这个线程通过自旋方式访问 + +3)重量级锁,当锁为轻量级锁时,自旋一定次数还没有获取到锁,则阻塞,转为重量级锁 + +### CAS乐观锁 + +对于同一数据的并发操作,不断尝试 + +### volatile(关键字) + +1)可见性 + +2)有序性 + +### 分段锁 + +分段锁,细化了锁的粒度。分段加锁,最终将所有分段数据汇总 + +### Reenactment + +可重入锁,可多次加锁。避免死锁 + +## 源码分析 + +### 插入元素 + +putVal(K key, V value, boolean onlyIfAbsent)方法 + +```java + final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); + int hash = spread(key.hashCode()); + int binCount = 0; + for (Node[] tab = table;;) { + Node f; int n, i, fh; + // 如果桶未初始化或桶个数为0,初始化桶 + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + // 如果要插入元素所在桶中没有元素,则将该元素插入 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + // cas插入,成功则break;否则继续自旋(未成功可能是其他线程抢在之前插入了) + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + // 若fh为moved,当前线程则帮助迁移元素 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + // 桶既不为空,也没迁移,则锁住这个桶(只锁住这个桶,分段锁) + else { + V oldVal = null; + synchronized (f) { + if (tabAt(tab, i) == f) { + // 若第一个元素的hash值>=0,说明不是树,也没迁移 + if (fh >= 0) { + binCount = 1; + for (Node e = f;; ++binCount) { + K ek; + // 若找到相同key,则更新(onlyIfAbsent为false情况下) + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + // 若没有找到相同key,则插入链表尾部 + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + // 若为树节点,则putTreeVal + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + // 若找到该元素,则更新值并返回旧值 + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + // bincount不为0,表示成功插入或找到了元素 + if (binCount != 0) { + // 若binCount超过8,则树化 + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + // 若找到了元素,则返回该元素的旧值 + if (oldVal != null) + return oldVal; + break; + } + } + } + // 插入成功,元素个数+1 + addCount(1L, binCount); + return null; + } +``` + +过程总结 + +1)如果桶未初始化,则初始化桶 + +2)如果桶已初始化,待插入元素所在桶的元素个数为0,则尝试插入元素到第一个位置(cas更新) + +3)如果正在扩容,当前线程帮忙迁移元素 + +4)如果待插入元素所在桶既不为空,也没迁移,则锁住这个桶(只锁住这个桶,分段锁synchronized ) + +5)若为链表,则在链表方式插入或更新(自旋锁) + +6)若为树,则在树中插入或更新(自旋锁) + +7)如果元素存在,则返回旧值 + +8)如果元素不存在,则map元素个数+1,并检查扩容 + +### 初始化数组 + +```java + private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + // 其他线程正在进行初始化,让出线程 + if ((sc = sizeCtl) < 0) + Thread.yield(); // lost initialization race; just spin + // 若成功将sizectl变为-1,表示该线程初始化 + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + // 防止ABA问题? + if ((tab = table) == null || tab.length == 0) { + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + sc = n - (n >>> 2); + } + } finally { + // 初始化成功,将下次扩容阈值改为0.75倍容量 + // sizectl存储的是扩容门槛,为啥存它 + sizeCtl = sc; + } + break; + } + } + return tab; + } +``` + +1)cas保证只有一个线程能初始化 + +2)sizectl存储的是下一次的扩容门槛,为啥存它 + +3)扩容阈值为容量的0.75倍 + +### 判断是否需要扩容 + +```java + private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + // 先更新baseCount,失败则分段更新 + // 1.计数器不为空 + // 2.更新baseCount失败 + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { + CounterCell a; long v; int m; + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { + // 强制增加数量 + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + // s达到扩容阈值,进行扩容 + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + // 扩容邮戳 + int rs = resizeStamp(n); + // sc小于0,说明正在扩容 + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + // 扩容完成,推出循环 + break; + // 扩容未完成,当前元素加入迁移元素 + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } + } +``` + +### 协助迁移 + +线程添加元素时发现正在扩容且所在桶的元素已经迁移完毕后,协助迁移 + +```java + + final Node[] helpTransfer(Node[] tab, Node f) { + Node[] nextTab; int sc; + // 如果桶数组不为空,且首元素为ForwardingNode,nextTab不为空 + // 说明当前桶已迁移完毕 + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { + int rs = resizeStamp(tab.length); + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { + // sc小于0,说明正在迁移 + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { + transfer(tab, nextTab); + break; + } + } + return nextTab; + } + return table; +``` + +### 迁移元素 + +扩容时容量变为两倍,并将部分元素迁移至其他桶 + +```java + private final void transfer(Node[] tab, Node[] nextTab) { + int n = tab.length, stride; + if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) + stride = MIN_TRANSFER_STRIDE; // subdivide range + if (nextTab == null) { // initiating + try { + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n << 1];// n<<1,两倍 + nextTab = nt; + } catch (Throwable ex) { // try to cope with OOME + sizeCtl = Integer.MAX_VALUE; + return; + } + nextTable = nextTab; + transferIndex = n; + } + int nextn = nextTab.length; + // 新建ForwardingNode节点,并将心痛nextTab放入其中 + ForwardingNode fwd = new ForwardingNode(nextTab); + boolean advance = true; + boolean finishing = false; // to ensure sweep before committing nextTab + for (int i = 0, bound = 0;;) { + Node f; int fh; + while (advance) { + int nextIndex, nextBound; + if (--i >= bound || finishing) + advance = false; + else if ((nextIndex = transferIndex) <= 0) { + i = -1; + advance = false; + } + else if (U.compareAndSwapInt + (this, TRANSFERINDEX, nextIndex, + nextBound = (nextIndex > stride ? + nextIndex - stride : 0))) { + bound = nextBound; + i = nextIndex - 1; + advance = false; + } + } + if (i < 0 || i >= n || i + n >= nextn) { + int sc; + if (finishing) { + nextTable = null; + table = nextTab; + sizeCtl = (n << 1) - (n >>> 1); + return; + } + if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { + if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) + return; + finishing = advance = true; + i = n; // recheck before commit + } + } + else if ((f = tabAt(tab, i)) == null) + advance = casTabAt(tab, i, null, fwd); + else if ((fh = f.hash) == MOVED) + advance = true; // already processed + else { + synchronized (f) { + if (tabAt(tab, i) == f) { + Node ln, hn; + if (fh >= 0) { + int runBit = fh & n; + Node lastRun = f; + for (Node p = f.next; p != null; p = p.next) { + int b = p.hash & n; + if (b != runBit) { + runBit = b; + lastRun = p; + } + } + if (runBit == 0) { + ln = lastRun; + hn = null; + } + else { + hn = lastRun; + ln = null; + } + for (Node p = f; p != lastRun; p = p.next) { + int ph = p.hash; K pk = p.key; V pv = p.val; + if ((ph & n) == 0) + ln = new Node(ph, pk, pv, ln); + else + hn = new Node(ph, pk, pv, hn); + } + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + else if (f instanceof TreeBin) { + TreeBin t = (TreeBin)f; + TreeNode lo = null, loTail = null; + TreeNode hi = null, hiTail = null; + int lc = 0, hc = 0; + for (Node e = t.first; e != null; e = e.next) { + int h = e.hash; + TreeNode p = new TreeNode + (h, e.key, e.val, null, null); + if ((h & n) == 0) { + if ((p.prev = loTail) == null) + lo = p; + else + loTail.next = p; + loTail = p; + ++lc; + } + else { + if ((p.prev = hiTail) == null) + hi = p; + else + hiTail.next = p; + hiTail = p; + ++hc; + } + } + ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : + (hc != 0) ? new TreeBin(lo) : t; + hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : + (lc != 0) ? new TreeBin(hi) : t; + setTabAt(nextTab, i, ln); + setTabAt(nextTab, i + n, hn); + setTabAt(tab, i, fwd); + advance = true; + } + } + } + } + } + } +``` + +1)新桶数据时旧桶的2倍 + +2)迁移元素先从靠后的桶开始 + +3)迁移完成的桶里放一个ForwardingNode类型元素,标志该桶迁移完毕 + +4)迁移时根据hash&n是否等于0分为两个链表或树 + +5)低位链表存在原来的位置 + +6)高位链表存在原来位置+n的位置 + +7)迁移元素会锁住当前桶,分段锁 + +### 删除元素 + +与put类似 + +### 获取元素 + +```java + public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + // 获取hash + int h = spread(key.hashCode()); + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { + // 第一个就是,则返回 + if ((eh = e.hash) == h) { + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + // eh小于0,说明正在扩容 + // 使用find + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + // 遍历链表寻找 + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } +``` + +1)hash到元素所在的桶 + +2)如果桶中第一个元素就是该找的元素,直接返回; + +3)如果是树或者正在迁移元素,则调用各自Node子类的find()方法寻找元素; + +4)如果是链表,遍历整个链表寻找元素; + +5)获取元素没有加锁; + + + +ConcurrentHashMap设计知识点繁多,过程复杂,须理清流程,反复调试,消化吸收。 \ No newline at end of file diff --git a/week_04/08/ConcurrentHashMap-008.xmind b/week_04/08/ConcurrentHashMap-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..ea9cbc8225a5e9e81221e82d5e3d258a09b68ead Binary files /dev/null and b/week_04/08/ConcurrentHashMap-008.xmind differ diff --git a/week_04/08/CopyOnWriteArrayList-008.md b/week_04/08/CopyOnWriteArrayList-008.md new file mode 100644 index 0000000000000000000000000000000000000000..20f3a7e3312dd38c71d0acaad00bf8ac03938cc9 --- /dev/null +++ b/week_04/08/CopyOnWriteArrayList-008.md @@ -0,0 +1,197 @@ +# 读源码-CopyOnWriteArrayList + +## 继承体系 + +![CopyOnWriteArrayList类图](E:\Project\JavaStudy\week_04\08\attachment\CopyOnWriteArrayList类图.png) + +- 实现RandomAccess:可随机访问 +- 实现Serializable:可序列化 +- 实现Cloneable:可克隆 +- 实现List:基本的增删改查功能 + +## 源码分析 + +### 属性 + +```java + /** The lock protecting all mutators */ + // ReentrantLock来保证线程安全 + final transient ReentrantLock lock = new ReentrantLock(); + + /** The array, accessed only via getArray/setArray. */ + // 存储数组,只能通过getArray/setArray来访问 + // transient 不自动序列化 + // volatile保证多线程可见性 + private transient volatile Object[] array; +``` + +### 构造器 + +```java + public CopyOnWriteArrayList() { + setArray(new Object[0]); + } + + /** + * Creates a list containing the elements of the specified + * collection, in the order they are returned by the collection's + * iterator. + * + * @param c the collection of initially held elements + * @throws NullPointerException if the specified collection is null + */ + public CopyOnWriteArrayList(Collection c) { + Object[] elements; + // 若c为CopyOnWriteArrayList,强制转换后赋给array + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + // 若c不是CopyOnWriteArrayList,则转为数组并赋给array + else { + elements = c.toArray(); + // 若c为null,可能报空指针异常 + // c.toArray might (incorrectly) not return Object[] (see 6260652) + if (elements.getClass() != Object[].class) + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); + } + + // 参数为数组 + public CopyOnWriteArrayList(E[] toCopyIn) { + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); + } + + +``` + +### 重点方法 + +add(E e):添加元素,并与ArrayList中的add做个对比 + +```java + // CopyOnWriteArrayList的add方法 + public boolean add(E e) { + final ReentrantLock lock = this.lock; + // 加锁 + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + // 将旧数组的值复制到新数组 + Object[] newElements = Arrays.copyOf(elements, len + 1); + // 将新元素放到末尾 + newElements[len] = e; + setArray(newElements); + return true; + } finally { + // 解锁 + lock.unlock(); + } + } + + // ArrayList中的add方法 + public boolean add(E e) { + ensureCapacityInternal(size + 1); // Increments modCount!! + // 此行代码可分解为 + // 1、elementData[size] = e; + // 2、elementData[size++] = e; + // 非原子操作,没有加锁,会出现线程安全问题 + elementData[size++] = e; + return true; + } + + +``` + +add(int index, E element):指定索引处添加 + +```java + //在指定索引处添加元素,与add基本相同 + public void add(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + // 检查越界 + if (index > len || index < 0) + throw new IndexOutOfBoundsException("Index: "+index+ + ", Size: "+len); + Object[] newElements; + int numMoved = len - index; + // 在末尾添加 + if (numMoved == 0) + newElements = Arrays.copyOf(elements, len + 1); + else { + newElements = new Object[len + 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index, newElements, index + 1, + numMoved); + } + newElements[index] = element; + setArray(newElements); + } finally { + lock.unlock(); + } + } + + // 熟悉回顾arraycopy方法 + public static native void arraycopy(Object src, int srcPos, + Object dest, int destPos, + int length); +``` + +addIfAbsent(E e, Object[] snapshot):若不存在就添加 + +```java + private boolean addIfAbsent(E e, Object[] snapshot) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] current = getArray(); + int len = current.length; + if (snapshot != current) { + // Optimize for lost race to another addXXX operation + int common = Math.min(snapshot.length, len); + // 先判断共有长度下,是否存在 + for (int i = 0; i < common; i++) + if (current[i] != snapshot[i] && eq(e, current[i])) + return false; + // 判断非共有长度下是否存在 + if (indexOf(e, current, common, len) >= 0) + return false; + } + Object[] newElements = Arrays.copyOf(current, len + 1); + newElements[len] = e; + setArray(newElements); + return true; + } finally { + lock.unlock(); + } + } +``` + +其余的常用方法与ArrayList中相似,其实现与上述中的add方法类似,主要用了cow的方式 + +### COW + +cow即copyOnWrite,当我们向一个容器添加或删除要素时,可以先复制一个容器,然后再新容器中进行操作。最后将旧容器的引用指向新容器。 + +#### 优点: + +- 读效率高 +- 线程安全 + +#### 缺点 + +- 写效率低。因为要复制替换 +- 内存占用问题。每次修改的时候都进行复制,如果原数组比较大呢 + +- 数据延时问题。 + +### 适用场景 + +由以上优缺点,可以得到CopyOnWriteArrayList的适用场景 + +- 读多写少。比如黑白名单 +- 对数据实时性要求不高的场景 \ No newline at end of file diff --git a/week_04/08/CopyOnWriteArrayList-008.xmind b/week_04/08/CopyOnWriteArrayList-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..84fb2a85915c362457235861cd0585d635c39617 Binary files /dev/null and b/week_04/08/CopyOnWriteArrayList-008.xmind differ diff --git a/week_04/08/DelayQueue-008.md b/week_04/08/DelayQueue-008.md new file mode 100644 index 0000000000000000000000000000000000000000..c91397ca9a1697f1025117e9c6cdb58f71fcf77e --- /dev/null +++ b/week_04/08/DelayQueue-008.md @@ -0,0 +1,204 @@ +# 读源码-DelayQueue + +## 继承体系 + +- 实现接口BlockingQueue +- 实现Delayed接口,其中的getDelay方法可以得到到期时间 +- 继承AbstractQueue类,队列的抽象类,提供了基本的增删功能 + +![DelayQueue继承图](E:\Project\JavaStudy\week_04\08\attachment\DelayQueue继承图.png) + +## 属性 + +```java + // 重入锁 + private final transient ReentrantLock lock = new ReentrantLock(); + // 优先级队列 + private final PriorityQueue q = new PriorityQueue(); + // leader,标记当前是否有线程在排队 + private Thread leader = null; + // 条件,用于表示是否有可取的元素 + private final Condition available = lock.newCondition(); +``` + +## 主要方法 + +add,put,都调用的是offer方法。put无返回值 + +offer:添加元素 + +```java + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.offer(e);//添加到PriorityQueue + // 如果e是堆顶元素,则嫁给你leader置空,并唤醒等待线程 + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); + } + } + // PriorityQueue中的offer + public boolean offer(E e) { + if (e == null) + throw new NullPointerException(); + modCount++; + int i = size; + if (i >= queue.length) + grow(i + 1); + size = i + 1; + if (i == 0) + queue[0] = e; + else + siftUp(i, e); + return true; + } + + // 获取堆顶元素 + public E peek() { + return (size == 0) ? null : (E) queue[0]; + } + +``` + +poll:无阻塞队列 + +```java + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + // 若堆顶元素为空,或等待时间大于0,返回空 + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + // 优先级队列弹出 + return q.poll(); + } finally { + lock.unlock(); + } + } +``` + +take:加入了阻塞 + +```java + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + // 如栈顶元素为空,则加入等待 + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + // 若延时为0,说明可以出队了 + if (delay <= 0) + return q.poll(); + // 若延时大于0,则阻塞 + first = null; // don't retain ref while waiting 置空,便于GC + // 如果有线程在执行,等待 + if (leader != null) + available.await(); + // 若没有,则将leader赋予当前线程 + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + // 加入等待 + available.awaitNanos(delay); + } finally { + // 如果leader还是当前线程则将其置空,让其他线程获取 + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // 如果leader为空且栈顶还有元素,唤醒下一个线程 + if (leader == null && q.peek() != null) + // sigal只是把线程放入AQS,并非真正唤醒 + available.signal(); + // 解锁,真正唤醒 + lock.unlock(); + } + } +``` + +## 使用方法 + +举个栗子 + +```java +public class DelayQueueTest { + public static void main(String[] args) { + DelayQueue queue = new DelayQueue(); + + long now = System.currentTimeMillis(); + + new Thread(() -> { + while (true) { + try { + System.out.println(queue.take().deadLine-now); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }).start(); + + queue.add(new Message(now + 5000)); + queue.add(new Message(now + 8000)); + queue.add(new Message(now + 9000)); + queue.add(new Message(now + 18000)); + queue.add(new Message(now + 2000)); + } +} +// 实现Delayed接口 +public class Message implements Delayed { + long deadLine; + public Message( long deadLine) { + this.deadLine = deadLine; + } + @Override + public String toString() { + return String.valueOf(deadLine); + } + @Override + // 获取等待时间 + public long getDelay(TimeUnit unit) { + return deadLine-System.currentTimeMillis(); + } + @Override + public int compareTo(Delayed o) { + return (int) (getDelay(TimeUnit.MILLISECONDS)-o.getDelay(TimeUnit.MILLISECONDS)); + } +} +``` + +执行结果 + +```java +2000 +5000 +8000 +9000 +18000 +``` + +总结:到期时间越短,越快出队 + + + + + + + diff --git a/week_04/08/DelayQueue-008.xmind b/week_04/08/DelayQueue-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..9208965f6586f06754e52c594353d1a625085fc1 Binary files /dev/null and b/week_04/08/DelayQueue-008.xmind differ diff --git "a/week_04/08/attachment/ArrayBlockingQueue\347\273\247\346\211\277\345\233\276.png" "b/week_04/08/attachment/ArrayBlockingQueue\347\273\247\346\211\277\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..d741ab1419c447e52dae65e5bc9a2914a17cff92 Binary files /dev/null and "b/week_04/08/attachment/ArrayBlockingQueue\347\273\247\346\211\277\345\233\276.png" differ diff --git "a/week_04/08/attachment/ConcurrentLinkedQueue\347\273\247\346\211\277\345\233\276.png" "b/week_04/08/attachment/ConcurrentLinkedQueue\347\273\247\346\211\277\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..7313dbc67bf60abfc8955c21534a84428bf26f3e Binary files /dev/null and "b/week_04/08/attachment/ConcurrentLinkedQueue\347\273\247\346\211\277\345\233\276.png" differ diff --git "a/week_04/08/attachment/CopyOnWriteArrayList\347\261\273\345\233\276.png" "b/week_04/08/attachment/CopyOnWriteArrayList\347\261\273\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..83cd3bd0081434fd6fa9ef14635be64e51b3bb18 Binary files /dev/null and "b/week_04/08/attachment/CopyOnWriteArrayList\347\261\273\345\233\276.png" differ diff --git "a/week_04/08/attachment/DelayQueue\347\273\247\346\211\277\345\233\276.png" "b/week_04/08/attachment/DelayQueue\347\273\247\346\211\277\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..33591015d25e78b6b3722a0a0f5485281f4873c7 Binary files /dev/null and "b/week_04/08/attachment/DelayQueue\347\273\247\346\211\277\345\233\276.png" differ