From 6defa3f599177c3512dfed5d9bbcd99f88ca89c9 Mon Sep 17 00:00:00 2001 From: zhongtao9527 <765426290@qq.com> Date: Sun, 5 Jan 2020 23:58:57 +0800 Subject: [PATCH] =?UTF-8?q?026=E5=8F=B7=E5=AD=A6=E5=91=98=E7=AC=AC?= =?UTF-8?q?=E5=9B=9B=E5=91=A8=E4=BD=9C=E4=B8=9A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_04/26/ArrayBlockingQueue.md | 210 ++++++++++++++++++ week_04/26/ConcurrentHashMap.md | 328 ++++++++++++++++++++++++++++ week_04/26/ConcurrentLinkedQueue.md | 219 +++++++++++++++++++ week_04/26/CopyOnWriteArrayList.md | 281 ++++++++++++++++++++++++ week_04/26/DelayQueue.md | 267 ++++++++++++++++++++++ 5 files changed, 1305 insertions(+) create mode 100644 week_04/26/ArrayBlockingQueue.md create mode 100644 week_04/26/ConcurrentHashMap.md create mode 100644 week_04/26/ConcurrentLinkedQueue.md create mode 100644 week_04/26/CopyOnWriteArrayList.md create mode 100644 week_04/26/DelayQueue.md diff --git a/week_04/26/ArrayBlockingQueue.md b/week_04/26/ArrayBlockingQueue.md new file mode 100644 index 0000000..2f747f8 --- /dev/null +++ b/week_04/26/ArrayBlockingQueue.md @@ -0,0 +1,210 @@ +# ArrayBlockingQueue +## 简单说明 +ArrayBlockingQueue 是 GUC(java.util.concurrent) 包下的一个线程安全的阻塞队列,底层使用数组实现。 + +### 特点: + +当队列已满时,会阻塞后面添加元素 [put(E e)] 的线程,直到调用了移除元素的方法队列不满的情况下会唤醒前面添加元素的线程 + +当队列已空时,会阻塞后面移除元素 [take()] 的线程,直到调用了添加元素的方法队列不为空的情况下会唤醒前面移除元素的线程 + +新添加的元素并不一定在数组的 0 下标位置,因为其内部维护了一个 putIndex 属性 + +数组大小确定,通过构造函数初始化阻塞队列大小,没有扩容机制,因为线程阻塞,不存在数组下标越界异常 + +元素都是紧凑的,比如阻塞队列中有两个元素,那这两个元素在数组中下标之差一定是 1 + +插入的元素不允许为 null,所有的队列都有这个特点 + +先进先出(FIFO (first-in-first-out)) +### 阻塞队列 +BlockingQueue 不接受 null 元素 + +BlockingQueue 可以是限定容量的 没有的话就是Integer.MAX_VALUE + +BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口 + +BlockingQueue 实现是线程安全的 + +生产者不断往队列放数据,放满了,就会自动被阻塞 + +消费者不断从队列中取数据,取没了,就自动会被阻塞 +## 内部属性 +```Java + // 底层存储元素的数组 + final Object[] items; + // 出队序号,如果有一个元素出队,那么后面的元素不会向前移动, + // 而是将 takeIndex + 1 表示后面要出队的元素的角标 + int takeIndex; + // 入队序号,表示后续插入的元素的角标,putIndex 不一定大于 takeIndex + int putIndex; + // 元素个数 + int count; + // 内部锁 + final ReentrantLock lock; + // notEmpty 条件 + private final Condition notEmpty; + // notFull 条件 + private final Condition notFull; +``` +## 构造函数 +```Java + public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + // 初始化底层数组 + this.items = new Object[capacity]; + // 默认为非公平锁 + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } +``` +## put +```Java + public void put(E e) throws InterruptedException { + // 插入的元素不允许为 null + checkNotNull(e); + // 获取锁 + final ReentrantLock lock = this.lock; + /** + * lock:调用后一直阻塞到获得锁 + * tryLock:尝试是否能获得锁 如果不能获得立即返回 + * lockInterruptibly:调用后一直阻塞到获得锁 但是接受中断信号(比如:Thread、sleep) + */ + lock.lockInterruptibly(); + try { + // 如果队列数组已满,则 notFull 阻塞,当有元素被移除后才唤醒 notFull + while (count == items.length) + notFull.await(); + // 元素入队 + enqueue(e); + } finally { + // 添加完元素后释放锁 + lock.unlock(); + } + } + + + private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + // 添加元素 + items[putIndex] = x; + // 如果插入元素的位置是数组尾部则重置 putIndex 为 0 + if (++putIndex == items.length) + putIndex = 0; + count++; + // 队列中一定有元素,因此唤醒 notEmpty + notEmpty.signal(); + } +``` +先加锁,如果队列没有满的情况下直接在 putIndex 的位置插入新元素,如果队列已满则阻塞当前获得锁的添加元素的线程,直到有元素从队列中被移除了,会唤醒 notFull,添加元素的线程才会被唤醒继续执行。 +## take +```Java +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + // 如果队列中没有元素,则让 notEmpty 阻塞,添加元素后会唤醒 notEmpty + while (count == 0) + notEmpty.await(); + return dequeue(); + } finally { + lock.unlock(); + } + } + + + private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; + // 元素置 null + items[takeIndex] = null; + // 如果出队的是数组中的最后一个元素,则重置 takeIndex 为 0 + if (++takeIndex == items.length) + takeIndex = 0; + count--; + if (itrs != null) + itrs.elementDequeued(); + // 唤醒 notFull + notFull.signal(); + } +``` +出队与入队的原理都是类似的,同样是先加锁,如果队列中没有任何元素,则获得锁的出队的线程阻塞 notEmpty.await(),直到有元素被添加到队列中,会唤醒 notEmpty,移除元素的线程才会被唤醒继续执行,如果队列中有元素,则直接把 takeIndex 位置上的元素出队。 +## remove +```Java +public E remove() { +// 首先调用poll方法获取元素,如果不为空则直接返回,否则抛出NoSuchElementException异常。 + E x = poll(); + if (x != null) + return x; + else + throw new NoSuchElementException(); + } +// 首先判断队列是否已满,如果已满再循环判断超时时间是否超时,超时则直接返回false,否则阻塞该生产线程nanos时间,如果nanos时间之内唤醒则调用enqueue方法插入元素。 +// 如果队列不满则直接调用enqueue方法插入元素,并返回true。 +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + //得到超时的时间 + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + //获得可响应中断的锁 + lock.lockInterruptibly(); + try { + while (count == 0) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + return dequeue(); + } finally { + lock.unlock(); + } + } +``` +## add +```Java +public boolean add(E e) { + return super.add(e); +} +public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); +} +``` +调用父类AbstractQueue 最终调用自身的offer(e) offer一次不成功就抛出异常,否则成功返回 +## offer +```Java +public boolean offer(E e) { + checkNotNull(e); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (count == items.length) + return false; + else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } +} +private void enqueue(E x) { + final Object[] items = this.items; + items[putIndex] = x; + if (++putIndex == items.length) + putIndex = 0; + count++; + notEmpty.signal(); +} +``` +负责的逻辑已经被ReentrantLock控制住了,所以看起就是操作数组。 +队列满了直接返回false +队列没满items[putIndex] = data;达到数组长度重置putIndex,达到环形队列目的。 \ No newline at end of file diff --git a/week_04/26/ConcurrentHashMap.md b/week_04/26/ConcurrentHashMap.md new file mode 100644 index 0000000..2a056c6 --- /dev/null +++ b/week_04/26/ConcurrentHashMap.md @@ -0,0 +1,328 @@ +# ConcurrentHashMap +## 简单说明 +#### 1、线程不安全的HashMap + +因为多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap。 +#### 2、效率低下的HashTable +HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下。 + +因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态。 + +#### 3、锁分段技术 +ConcurrentHashMap所使用的锁分段技术。 +首先将数据分成一段一段的存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。 +有些方法需要跨段,比如size()和containsValue(),它们可能需要锁定整个表而而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。 +这里“按顺序”是很重要的,否则极有可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的, +但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证。这可以确保不会出现死锁,因为获得锁的顺序是固定的。 +## 类说明 +```Java +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable { + +     /** + * 散列映射表的默认初始容量为 16,即初始默认为 16 个桶 + * 在构造函数中没有指定这个参数时,使用本参数 + */ + static final int DEFAULT_INITIAL_CAPACITY= 16; + + /** + * 散列映射表的默认装载因子为 0.75,该值是 table 中包含的 HashEntry 元素的个数与 + * table 数组长度的比值 + * 当 table 中包含的 HashEntry 元素的个数超过了 table 数组的长度与装载因子的乘积时, + * 将触发 再散列 + * 在构造函数中没有指定这个参数时,使用本参数 + */ + static final float DEFAULT_LOAD_FACTOR= 0.75f; + + /** + * 散列表的默认并发级别为 16。该值表示当前更新线程的估计数 + * 在构造函数中没有指定这个参数时,使用本参数 + */ + static final int DEFAULT_CONCURRENCY_LEVEL= 16; + + /** + * segments 的掩码值 + * key 的散列码的高位用来选择具体的 segment + */ + final int segmentMask; + + /** + * 偏移量 + */ + final int segmentShift; + + /** + * 由 Segment 对象组成的数组 + */ + final Segment[] segments; + + /** + * 创建一个带有指定初始容量、加载因子和并发级别的新的空映射。 + */ + public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { + if(!(loadFactor > 0) || initialCapacity < 0 || + concurrencyLevel <= 0) + throw new IllegalArgumentException(); + + if(concurrencyLevel > MAX_SEGMENTS) + concurrencyLevel = MAX_SEGMENTS; + + // 寻找最佳匹配参数(不小于给定参数的最接近的 2 次幂) + int sshift = 0; + int ssize = 1; + while(ssize < concurrencyLevel) { + ++sshift; + ssize <<= 1; + } + segmentShift = 32 - sshift; // 偏移量值 + segmentMask = ssize - 1; // 掩码值 + this.segments = Segment.newArray(ssize); // 创建数组 + + if (initialCapacity > MAXIMUM_CAPACITY) + initialCapacity = MAXIMUM_CAPACITY; + int c = initialCapacity / ssize; + if(c * ssize < initialCapacity) + ++c; + int cap = 1; + while(cap < c) + cap <<= 1; + + // 依次遍历每个数组元素 + for(int i = 0; i < this.segments.length; ++i) + // 初始化每个数组元素引用的 Segment 对象 + this.segments[i] = new Segment(cap, loadFactor); + } + + /** + * 创建一个带有默认初始容量 (16)、默认加载因子 (0.75) 和 默认并发级别 (16) + * 的空散列映射表。 + */ + public ConcurrentHashMap() { + // 使用三个默认参数,调用上面重载的构造函数来创建空散列映射表 + this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); + } +} +``` +### Put +```Java + 首先,根据 key 计算出对应的 hash 值: +        public V put(K key, V value) { + if (value == null) //ConcurrentHashMap 中不允许用 null 作为映射值 + throw new NullPointerException(); + int hash = hash(key.hashCode()); // 计算键对应的散列码 + // 根据散列码找到对应的 Segment + return segmentFor(hash).put(key, hash, value, false); + } + 根据 hash 值找到对应的 Segment: + /** + * 使用 key 的散列码来得到 segments 数组中对应的 Segment + */ + final Segment segmentFor(int hash) { + // 将散列值右移 segmentShift 个位,并在高位填充 0 + // 然后把得到的值与 segmentMask 相“与” + // 从而得到 hash 值对应的 segments 数组的下标值 + // 最后根据下标值返回散列码对应的 Segment 对象 + return segments[(hash >>> segmentShift) & segmentMask]; + } + 在这个 Segment 中执行具体的 put 操作: + V put(K key, int hash, V value, boolean onlyIfAbsent) { + lock(); // 加锁,这里是锁定某个 Segment 对象而非整个 ConcurrentHashMap + try { + int c = count; + + if (c++ > threshold) // 如果超过再散列的阈值 + rehash(); // 执行再散列,table 数组的长度将扩充一倍 + + HashEntry[] tab = table; + // 把散列码值与 table 数组的长度减 1 的值相“与” + // 得到该散列码对应的 table 数组的下标值 + int index = hash & (tab.length - 1); + // 找到散列码对应的具体的那个桶 + HashEntry first = tab[index]; + + HashEntry e = first; + while (e != null && (e.hash != hash || !key.equals(e.key))) + e = e.next; + + V oldValue; + if (e != null) { // 如果键 / 值对以经存在 + oldValue = e.value; + if (!onlyIfAbsent) + e.value = value; // 设置 value 值 + } + else { // 键 / 值对不存在 + oldValue = null; + ++modCount; // 要添加新节点到链表中,所以 modCont 要加 1 + // 创建新节点,并添加到链表的头部 + tab[index] = new HashEntry(key, hash, first, value); + count = c; // 写 count 变量 + } + return oldValue; + } finally { + unlock(); // 解锁 + } + } +``` +这里的加锁操作是针对(键的 hash 值对应的)某个具体的 Segment,锁定的是该 Segment 而不是整个 ConcurrentHashMap。 + +因为插入键 / 值对操作只是在这个 Segment 包含的某个桶中完成,不需要锁定整个ConcurrentHashMap。 + +此时,其他写线程对另外 15 个Segment 的加锁并不会因为当前线程对这个 Segment 的加锁而阻塞。 + +同时,所有读线程几乎不会因本线程的加锁而阻塞(除非读线程刚好读到这个 Segment 中某个 HashEntry 的 value 域的值为 null,此时需要加锁后重新读取该值)。 +### get +```Java +V get(Object key, int hash) { + if(count != 0) { // 首先读 count 变量 + HashEntry e = getFirst(hash); + while(e != null) { + if(e.hash == hash && key.equals(e.key)) { + V v = e.value; + if(v != null) + return v; + // 如果读到 value 域为 null,说明发生了重排序,加锁后重新读取 + return readValueUnderLock(e); + } + e = e.next; + } + } + return null; + } + V readValueUnderLock(HashEntry e) { + lock(); + try { + return e.value; + } finally { + unlock(); + } + } +``` +ConcurrentHashMap中的读方法不需要加锁,所有的修改操作在进行结构修改时都会在最后一步写count 变量,通过这种机制保证get操作能够得到几乎最新的结构更新。 +### remove +```Java +V remove(Object key, int hash, Object value) { + lock(); //加锁 + try{ + int c = count - 1; + HashEntry[] tab = table; + //根据散列码找到 table 的下标值 + int index = hash & (tab.length - 1); + //找到散列码对应的那个桶 + HashEntry first = tab[index]; + HashEntry e = first; + while(e != null&& (e.hash != hash || !key.equals(e.key))) + e = e.next; + + + V oldValue = null; + if(e != null) { + V v = e.value; + if(value == null|| value.equals(v)) { //找到要删除的节点 + oldValue = v; + ++modCount; + //所有处于待删除节点之后的节点原样保留在链表中 + //所有处于待删除节点之前的节点被克隆到新链表中 + HashEntry newFirst = e.next;// 待删节点的后继结点 + for(HashEntry p = first; p != e; p = p.next) + newFirst = new HashEntry(p.key, p.hash, + newFirst, p.value); + //把桶链接到新的头结点 + //新的头结点是原链表中,删除节点之前的那个节点 + tab[index] = newFirst; + count = c; //写 count 变量 + } + } + return oldValue; + } finally{ + unlock(); //解锁 + } + } +``` +整个操作是在持有段锁的情况下执行的,空白行之前的行主要是定位到要删除的节点e。 +如果不存在这个节点就直接返回null,否则就要将e前面的结点复制一遍,尾结点指向e的下一个结点。 +e后面的结点不需要复制,它们可以重用。 +### size +```Java +private transient volatile long baseCount; + + private transient volatile CounterCell[] counterCells; +@sun.misc.Contended static final class CounterCell { + volatile long value; + CounterCell(long x) { value = x; } + } + /** + *(返回容器的大小。这个方法应该被用来代替size()方法,因为 + * ConcurrentHashMap的容量大小可能会大于int的最大值。 + * 返回的值是一个估计值;如果有并发插入或者删除操作,则实际的数量可能有所不同。 + */ + +final long sumCount() { + CounterCell[] as = counterCells; CounterCell a; + long sum = baseCount; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; + } + +public int size() { + long n = sumCount(); + return ((n < 0L) ? 0 : + (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : + (int)n); + } + + + +public long mappingCount() { + long n = sumCount(); + return (n < 0L) ? 0L : n; // ignore transient negative values + } +``` +我们要统计整个ConcurrentHashMap里元素的大小,就必须统计所有Segment里元素的大小后求和。 +Segment里的全局变量count是一个volatile变量,那么在多线程场景下最安全的做法,是在统计size的时候把所有Segment的put,remove和clean方法全部锁住,但是这种做法显然非常低效。 + +因为在累加count操作过程中,之前累加过的count发生变化的几率非常小,所以ConcurrentHashMap的做法是先尝试2次通过不锁住Segment的方式来统计各个Segment大小,如果统计的过程中,容器的count发生了变化,则再采用加锁的方式来统计所有Segment的大小。 + +使用modCount变量,在put , remove和clean方法里操作元素前都会将变量modCount进行加1,那么在统计size前后比较modCount是否发生变化,从而得知容器的大小是否发生变化。 +## segment类 +```Java +static final class Segment extends ReentrantLock implements Serializable { + /** + * 在本 segment 范围内,包含的 HashEntry 元素的个数 + * 该变量被声明为 volatile 型,保证每次读取到最新的数据 + */ + transient volatile int count; + + + /** + *table 被更新的次数 + */ + transient int modCount; + + + /** + * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列 + */ + transient int threshold; + + + /** + * table 是由 HashEntry 对象组成的数组 + * 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表 + * table 数组的数组成员代表散列映射表的一个桶 + * 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分 + * 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16 + */ + transient volatile HashEntry[] table; + + + /** + * 装载因子 + */ + final float loadFactor; + } +``` diff --git a/week_04/26/ConcurrentLinkedQueue.md b/week_04/26/ConcurrentLinkedQueue.md new file mode 100644 index 0000000..0e3b15f --- /dev/null +++ b/week_04/26/ConcurrentLinkedQueue.md @@ -0,0 +1,219 @@ +# ConcurrentLinkedQueue +## 简单说明 +ConcurrentLinkedQueue是一种线程安全的队列。他是使用非阻塞算法(CAS)来实现线程安全的。ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。 + +ConcurrentLinkedQueue内部的队列是使用单向链表方式实现,类中两个volatile 类型的Node 节点分别用来存放队列的首位节点。 + +ConcurrentLinkedQueue的构造函数通过无参构造函数可知默认头尾节点都是指向 item 为 null 的哨兵节点。 + +Node节点内部则维护一个volatile 修饰的变量item 用来存放节点的值,next用来存放链表的下一个节点,从而链接为一个单向无界链表,这就是单向无界的根本原因。 +## offer +```Java + +public boolean offer(E e) { + if (e == null) + throw new NullPointerException(); + // 入队前创建一个入队新节点 + Node n = new Node(e); // 创建新节点 + // 死循环,入队不成功反复入队。 + retry: for (;;) { + // 创建一个指向tail节点的引用 + Node t = tail; + // p用来表示队列的尾节点,默认情况下等于tail节点 + Node p = t; + for (int hops = 0;; hops++) { + // 找到 tail 的下一个节点 next + Node next = succ(p); + // next节点不为空,说明p不是尾节点,需要更新p后在将它指向next节点 + if (next != null) { + // 如果已经至少越过了两个节点,且 tail 被修改 (tail 被修改,说明其他线程向队列添加了新的节点,且更新 tail 成功 ), + // 并且当前节点还是不等于尾节点 + if (hops > HOPS && t != tail) + // 跳出内外两层循环,重新开始迭代(因为 tail 刚刚被其他线程更新了) + continue retry; // B2 + // 向后推进到下一个节点 + p = next; // B3 + // 如果p是尾节点,则设置p节点的next节点为入队节点 + } else if (p.casNext(null, n)) { // C + // 如果tail节点有大于等于1个next节点,则将入队节点设置成tail节点, + // 更新失败了也没关系,因为失败了表示有其他线程成功更新了tail节点 + if (hops >= HOPS) // C1 + // 使用 CAS 原子指令更新 tail 指向这个新插入的节点,允许失败 + casTail(t, n); // C2 + return true; // C3 + } else {// p有next节点,表示p的next节点是尾节点,则重新设置p节点 + p = succ(p); // D1 + } + } + } + } +``` +主要做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试 + +第一步定位尾节点。tail节点并不总是尾节点,所以每次入队都必须先通过tail节点来找到尾节点,尾节点可能就是tail节点,也可能是tail节点的next节点。代码中循环体中的第一个if就是判断tail是否有next节点,有则表示next节点可能是尾节点。获取tail节点的next节点需要注意的是p节点等于q节点的情况,出现这种情况的原因我们后续再来介绍。 + +第二步设置入队节点为尾节点。p.casNext(null,newNode)方法用于将入队节点设置为当前队列尾节点的next节点,q如果是null表示p是当前队列的尾节点,如果不为null表示有其他线程更新了尾节点,则需要重新获取当前队列的尾节点。 +## tail + +让tail节点永远作为队列的尾节点,这样实现代码量非常少,而且逻辑非常清楚和易懂。 + +但是这么做有个缺点就是每次都需要使用循环CAS更新tail节点。如果能减少CAS更新tail节点的次数,就能提高入队的效率。 + +在JDK 1.7的实现中,doug lea使用hops变量来控制并减少tail节点的更新频率,并不是每次节点入队后都将tail节点更新成尾节点,而是当tail节点和尾节点的距离大于等于常量HOPS的值(默认等于1)时才更新tail节点,tail和尾节点的距离越长使用CAS更新tail节点的次数就会越少,但是距离越长带来的负面效果就是每次入队时定位尾节点的时间就越长,因为循环体需要多循环一次来定位出尾节点,但是这样仍然能提高入队的效率,因为从本质上来看它通过增加对volatile变量的读操作来减少了对volatile变量的写操作,而对volatile变量的写操作开销要远远大于读操作,所以入队效率会有所提升。 + +在JDK 1.8的实现中,tail的更新时机是通过p和t是否相等来判断的,其实现结果和JDK 1.7相同,即当tail节点和尾节点的距离大于等于1时,更新tail。 +## poll +```Java +public E poll() { + restartFromHead: + for (;;) { + // p节点表示首节点,即需要出队的节点 + for (Node h = head, p = h, q;;) { + E item = p.item; + + // 如果p节点的元素不为null,则通过CAS来设置p节点引用的元素为null,如果成功则返回p节点的元素 + if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + // 如果p != h,则更新head + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + // 如果头节点的元素为空或头节点发生了变化,这说明头节点已经被另外一个线程修改了。 + // 那么获取p节点的下一个节点,如果p节点的下一节点为null,则表明队列已经空了 + else if ((q = p.next) == null) { + // 更新头结点 + updateHead(h, p); + return null; + } + // p == q,则使用新的head重新开始 + else if (p == q) + continue restartFromHead; + // 如果下一个元素不为空,则将头节点的下一个节点设置成头节点 + else + p = q; + } + } +} +``` +主要逻辑就是首先获取头节点的元素,然后判断头节点元素是否为空, + +如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走, + +如果不为空,则使用CAS的方式将头节点的引用设置成null, + +如果CAS成功,则直接返回头节点的元素, + +如果CAS不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。 +```Java +final void updateHead(Node h, Node p) { + if (h != p && casHead(h, p)) + // 将旧的头结点h的next域指向为h + h.lazySetNext(h); +} +``` +在更新完head之后,会将旧的头结点h的next域指向为h, + +如果出现出现了p==q的情况,则会触发执行head的更新,将p节点重新指向为head,所有“活着”的节点(指未删除节点),都能从head通过遍历可达,这样就能通过head成功获取到尾节点,然后添加元素了 +## peek +```Java +// 获取链表的首部元素(只读取而不移除) +public E peek() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + if (item != null || (q = p.next) == null) { + updateHead(h, p); + return item; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } +} +``` +peek操作代码与poll操作类似,只是前者只获取队列头元素,但是并不从队列里面删除,而后者获取后需要从队列里面删除。 + +另外,在第一次调用peek操作的时候,会删除哨兵节点,并让队列的head节点指向队列里面第一个元素或者null。 +## size +获取当前队列元素个数,在并发环境下不是很有用,因为 CAS 没有加锁所以从调用 size 函数到返回结果期间有可能增删元素,导致统计的元素个数不精确 +```Java +public int size() { + int count = 0; + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) + // 最大返回Integer.MAX_VALUE + if (++count == Integer.MAX_VALUE) + break; + return count; +} + +//获取第一个队列元素(哨兵元素不算),没有则为null +Node first() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + boolean hasItem = (p.item != null); + if (hasItem || (q = p.next) == null) { + updateHead(h, p); + return hasItem ? p : null; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } +} + +//获取当前节点的next元素,如果是自引入节点则返回真正头节点 +final Node succ(Node p) { + Node next = p.next; + return (p == next) ? head : next; +} +``` +## remove +```Java +public boolean remove(Object o) { + + //查找元素为空,直接返回false + if (o == null) return false; + Node pred = null; + for (Node p = first(); p != null; p = succ(p)) { + E item = p.item; + + //相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其它元素是否有匹配的。 + if (item != null && + o.equals(item) && + p.casItem(item, null)) { + + //获取next元素 + Node next = succ(p); + + //如果有前驱节点,并且next不为空则链接前驱节点到next, + if (pred != null && next != null) + pred.casNext(p, next); + return true; + } + pred = p; + } + return false; +} +``` +使用 CAS 原子指令来处理对数据的并发访问,这是非阻塞算法得以实现的基础。 + +head/tail 并非总是指向队列的头 / 尾节点,也就是说允许队列处于不一致状态。 这个特性把入队 / 出队时,原本需要一起原子化执行的两个步骤分离开来,从而缩小了入队 / 出队时需要原子化更新值的范围到唯一变量。这是非阻塞算法得以实现的关键。 + +由于队列有时会处于不一致状态。为此,ConcurrentLinkedQueue 使用三个不变式(基本不变式、head 的不变式和可变式、tail 的不变式和可变式)来维护非阻塞算法的正确性。 + +以批处理方式来更新 head/tail,从整体上减少入队 / 出队操作的开销。 + +为了有利于垃圾收集,队列使用特有的 head 更新机制;为了确保从已删除节点向后遍历,可到达所有的非删除节点,队列使用了特有的向后推进策略。 + + + + diff --git a/week_04/26/CopyOnWriteArrayList.md b/week_04/26/CopyOnWriteArrayList.md new file mode 100644 index 0000000..6775d5a --- /dev/null +++ b/week_04/26/CopyOnWriteArrayList.md @@ -0,0 +1,281 @@ +# CopyOnWriteArrayList +## 简单说明 +并发包中并发List只有CopyOnWriteArrayList这一个,CopyOnWriteArrayList是一个线程安全的ArrayList,对其进行修改操作和元素迭代操作都是在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。 + +每个CopyOnWriteArrayList对象里面有一个array数组对象用来存放具体元素,ReentrantLock独占锁对象用来保证同时只有一个线程对array进行修改 +## 特征 +它最适合于具有以下特征的应用程序:List 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。 + +它是线程安全的。 + +因为通常需要复制整个基础数组,所以可变操作(add()、set() 和 remove() 等等)的开销很大。 + +迭代器支持hasNext(), next()等不可变操作,但不支持可变 remove()等操作。 + +使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。 + +CopyOnWriteArrayList使用了一种叫写时复制的方法,当有新元素添加到CopyOnWriteArrayList时,先将原有数组的元素拷贝到新数组中,然后在新的数组中做写操作,写完之后,再将原来的数组引用(volatile 修饰的数组引用)指向新数组。CopyOnWriteArrayList的整个add操作都是在锁的保护下进行的。 + +### 初始化 +```Java + //创建空的CopyOnWriteArrayList对象 + public CopyOnWriteArrayList() { + setArray(new Object[0]); + } +``` +内部创建了一个大小为0的Object数据作为array的初始值 +```Java + //创建含有指定Collection的元素的CopyOnWriteArrayList;入参为集合,拷贝集合里面元素到本list + public CopyOnWriteArrayList(Collection c) { + Object[] elements; + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + else { + elements = c.toArray(); + // c.toArray might (incorrectly) not return Object[] (see 6260652) + if (elements.getClass() != Object[].class) + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); + } + + + //创建含有指定数组的元素的CopyOnWriteArrayList,创建一个list,其内部元素是入参toCopyIn的拷贝 + public CopyOnWriteArrayList(E[] toCopyIn) { + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); + } + + final void setArray(Object[] a) { + array = a; + } +``` +### 添加 +```Java + //添加元素 + public boolean add(E e) { + //获取该对象的锁 + final ReentrantLock lock = this.lock; + // 获取“锁”,每次只有一个线程可进入临界区 + lock.lock(); + try { + // 获取原始”volatile数组“中的数据和数据长度。 + Object[] elements = getArray(); + int len = elements.length; + // 新建一个数组newElements,并将原始数据拷贝到newElements中; + // newElements数组的长度=“原始数组的长度”+1 + Object[] newElements = Arrays.copyOf(elements, len + 1); + // 将“新增加的元素”保存到newElements中。 + newElements[len] = e; + // 将”volatile数组“引用指向newElements数组,这样旧数组就被GC回收了 + setArray(newElements); + return true; + } finally { + // 释放“锁” + lock.unlock(); + } + } +``` + +调用add方法的线程会首先执行去获取独占锁,如果多个线程都调用add则只有一个线程会获取该锁,其他线程会被阻塞挂起知道锁被释放。所以一个线程获取到锁后,就保证了在该线程添加元素过程中,其他线程不会对array进行修改。 + +线程获取锁后执行获取array,然后执行拷贝array到一个新数组(从这里可以知道新数组的大小是原来数组大小加增加1,所以CopyOnWriteArrayList是无界List),并把要新增的元素添加到新数组。 + +然后执行到把新数组替换原数组,然后返回前要释放锁,由于加了锁,所以整个add过程是个原子操作,需要注意的是添加元素时候首先是拷贝了一个快照,然后在快照上进行的添加,而不是直接在源来的数组上进行。 +### 获取 +获取指定位置元素,E get(int index)获取下标为index的元素,如果元素不存在会抛出IndexOutOfBoundsException 异常 +```Java +public E get(int index) { + return get(getArray(), index); + } + + final Object[] getArray() { + return array; + } + + private E get(Object[] a, int index) { + return (E) a[index]; + } +} +``` +获取指定位置的元素分为两步,首先获取到当前list里面的array数组,这里称为步骤1,然后通过随机访问的下标方式访问指定位置的元素,这里称为步骤2。 + +整个过程并没有加锁,这就可能会导致当执行完步骤1后执行步骤2前,另外一个线程C进行了修改操作,比如remove操作,就会进行写时拷贝删除当前get方法要访问的元素,并且修改当前list的array为新数组。 + +而这之后步骤2可能才开始执行,步骤2操作的是线程C删除元素前的一个快照数组(因为步骤1让array指向的是原来的数组),所以虽然线程C已经删除了index处的元素,但是步骤2还是返回index处的元素,这其实就是写时拷贝策略带来弱一致性。 +### 修改 +修改 list 中指定元素的值,如果指定位置的元素不存在则抛出 IndexOutOfBoundsException 异常 +```Java +public E set(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + E oldValue = get(elements, index); + + if (oldValue != element) { + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len); + newElements[index] = element; + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; + } finally { + lock.unlock(); + } + } +``` +首先获取了独占锁控制了其他线程对array数组的修改,然后获取当前数组,并调用get方法获取指定位置元素。 + +如果指定的位置元素与新值不一致则创建新数组并拷贝元素,在新数组上修改指定位置元素值并设置新数组到array。 + +如果指定位置元素与新值一样,则为了保障volatile语义,还是需要重新设置下array,虽然array内容并没有改变(为了保证 volatile 语义是考虑到 set 方法本身应该提供 volatile 的语义) +### 删除 +```Java + //删除索引index处的元素 + public E remove(int index) { + final ReentrantLock lock = l.lock; + //获得“锁” + lock.lock(); + try { + //检测是否“数组越界” + rangeCheck(index); + checkForComodification(); + //删除元素 + E result = l.remove(index+offset); + expectedArray = l.getArray(); + size--; + return result; + } finally { + //释放“锁” + lock.unlock(); + } + } + + private void rangeCheck(int index) { + if (index < 0 || index >= size) + throw new IndexOutOfBoundsException("Index: "+index+",Size: "+size); + } + + private void checkForComodification() { + if (l.getArray() != expectedArray) + throw new ConcurrentModificationException(); + } + + //删除元素的真正实现 + public E remove(int index) { + final ReentrantLock lock = this.lock; + //获得“锁” + lock.lock(); + try { + // 获取原始”volatile数组“中的数据和数据长度。 + Object[] elements = getArray(); + int len = elements.length; + //// 获取elements数组中的第index个数据。 + E oldValue = get(elements, index); + int numMoved = len - index - 1; + // 如果被删除的是最后一个元素,则直接通过Arrays.copyOf()进行处理,而不需要新建数组。 + if (numMoved == 0) + setArray(Arrays.copyOf(elements, len - 1)); + else { + // 否则,新建数组,然后将”volatile数组中被删除元素之外的其它元素“拷贝到新数组中; + // 最后,将”volatile数组“引用指向newElements数组,这样旧数组就被GC回收了。 + Object[] newElements = new Object[len - 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index + 1, newElements, index, numMoved); + setArray(newElements); + } + return oldValue; + } finally { + //释放“锁” + lock.unlock(); + } + } +``` +和新增元素时候是类似的,首先是获取独占锁保证删除数组期间,其他线程不能对array进行修改,然后获取数组中要给删除的元素,并把剩余的原始拷贝到新数组后,把新数组替换原来的数组,最后在返回前释放锁。 +### 遍历 +```Java +public Iterator iterator() { + return new COWIterator(getArray(), 0); + } + + static final class COWIterator implements ListIterator { + /** array的快照版本*/ + private final Object[] snapshot; + /** 游标 */ + private int cursor; + + //构造函数 + private COWIterator(Object[] elements, int initialCursor) { + cursor = initialCursor; + snapshot = elements; + } + + //是否遍历结束 + public boolean hasNext() { + return cursor < snapshot.length; + } + + // 获取下一个元素 + @SuppressWarnings("unchecked") + public E next() { + if (! hasNext()) + throw new NoSuchElementException(); + return (E) snapshot[cursor++]; + } + + // 获取上一个元素 + @SuppressWarnings("unchecked") + public E previous() { + if (! hasPrevious()) + throw new NoSuchElementException(); + return (E) snapshot[--cursor]; + } + + public int nextIndex() { + return cursor; + } + + public int previousIndex() { + return cursor-1; + } + + + //不支持remove + public void remove() { + throw new UnsupportedOperationException(); + } + + //不支持set + public void set(E e) { + throw new UnsupportedOperationException(); + } + + + //不支持add + public void add(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + Object[] elements = snapshot; + final int size = elements.length; + for (int i = cursor; i < size; i++) { + @SuppressWarnings("unchecked") E e = (E) elements[i]; + action.accept(e); + } + cursor = size; + } + } +``` +当调用iterator()方法获取迭代器时候实际是返回一个COWIterator对象,COWIterator的snapshot变量保存了当前list的内容,cursor是遍历list数据的下标。 + +如果在该线程使用返回的迭代器遍历元素的过程中,其他线程没有对list进行增删改,那么snapshot本身就是list的array,因为它们是引用关系。 + +但是如果遍历期间,有其他线程对该list进行了增删改,那么snapshot就是快照了,因为增删改后list里面的数组被新数组替换了,这时候老数组只有被snapshot索引用,所以这也就说明获取迭代器后,使用改迭代器进行变量元素时候,其它线程对该list进行的增删改是不可见的, + +因为它们操作的是两个不同的数组,这也就是弱一致性的达成。 diff --git a/week_04/26/DelayQueue.md b/week_04/26/DelayQueue.md new file mode 100644 index 0000000..ce2757a --- /dev/null +++ b/week_04/26/DelayQueue.md @@ -0,0 +1,267 @@ +# DelayQueue +## 简单说明 +DelayedQueue是一个用来延时处理的队列,delayQueue其实就是在每次往优先级队列中添加元素 + +然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,当非阻塞从队列中获取元素时,返回最早达到延迟时间的元素,或空(没有元素达到延迟时间) +## 类定义 +```Java +public class DelayQueue extends AbstractQueue + implements BlockingQueue { + /** 重入锁,实现线程安全 */ + private final transient ReentrantLock lock = new ReentrantLock(); + /** 使用优先队列实现 */ + private final PriorityQueue q = new PriorityQueue(); + + /** Leader/Followers模式 */ + private Thread leader = null; + + /** 条件对象,当新元素到达,或新线程可能需要成为leader时被通知 */ + private final Condition available = lock.newCondition(); + + + /** + * 默认构造,得到空的延迟队列 + */ + public DelayQueue() {} + + /** + * 构造延迟队列,初始包含c中的元素 + * + * @param c 初始包含的元素集合 + * @throws NullPointerException 当集合或集合任一元素为空时抛出空指针错误 + */ + public DelayQueue(Collection c) { + this.addAll(c); + } +} +``` +## add +```Java +/** + * 向延迟队列插入元素 + * + * @param e 要插入的元素 + * @return true + * @throws NullPointerException 元素为空,抛出空指针错误 + */ + public boolean add(E e) { + // 直接调用offer并返回 + return offer(e); + } +``` +## put +```Java +/** + * 向延迟队列插入元素. 因为队列是无界的,所以不会阻塞。 + * + * @param e 要插入的元素 + * @throws NullPointerException 元素为空,抛出空指针错误 + */ + public void put(E e) { + offer(e); + } +``` +## offer +```Java + /** + * 向延迟队列插入元素 + * + * @param e 要插入的元素 + * @return true + * @throws NullPointerException 元素为空,抛出空指针错误 + */ + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + // 获得锁 + lock.lock(); + try { + // 向优先队列插入元素 + q.offer(e); + // 若在此之前队列为空,则置空leader,并通知条件对象 + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + // 释放锁 + lock.unlock(); + } + } +``` +## poll +```Java +/** + * 获取并移除队首的元素, 或者返回null(如果队列不包含到达延迟时间的元素) + * + * @return 队首的元素, 或者null(如果队列不包含到达延迟时间的元素) + */ + public E poll() { + final ReentrantLock lock = this.lock; + // 获得锁 + lock.lock(); + try { + // 获取优先队列队首元素 + E first = q.peek(); + // 若优先队列队首元素为空,或者还没达到延迟时间,返回null + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + // 否则,返回并移除队首元素 + else + return q.poll(); + } finally { + // 释放锁 + lock.unlock(); + } + } +``` +## 带超时的poll +```Java +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + //超时等待时间 + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + //可中断的获取锁 + lock.lockInterruptibly(); + try { + //无限循环 + for (;;) { + //获取队头元素 + E first = q.peek(); + //队头为空,也就是队列为空 + if (first == null) { + //达到超时指定时间,返回null + if (nanos <= 0) + return null; + else + // 如果还没有超时,那么再available条件上进行等待nanos时间 + nanos = available.awaitNanos(nanos); + } else { + //获取元素延迟时间 + long delay = first.getDelay(NANOSECONDS); + //延时到期 + if (delay <= 0) + return q.poll(); //返回出队元素 + //延时未到期,超时到期,返回null + if (nanos <= 0) + return null; + first = null; // don't retain ref while waiting + // 超时等待时间 < 延迟时间 或者有其它线程再取数据 + if (nanos < delay || leader != null) + //在available 条件上进行等待nanos 时间 + nanos = available.awaitNanos(nanos); + else { + //超时等待时间 > 延迟时间 并且没有其它线程在等待,那么当前元素成为leader,表示leader 线程最早 正在等待获取元素 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + //等待 延迟时间 超时 + long timeLeft = available.awaitNanos(delay); + //还需要继续等待 nanos + nanos -= delay - timeLeft; + } finally { + //清除 leader + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + //唤醒阻塞在available 的一个线程,表示可以取数据了 + if (leader == null && q.peek() != null) + available.signal(); + //释放锁 + lock.unlock(); + } + } +``` +如果队列为空,如果超时时间未到,则进行等待,否则返回null + +队列不空,取出队头元素,如果延迟时间到,则返回元素,否则 如果超时 时间到 返回null + +超时时间未到,并且超时时间< 延迟时间或者有线程正在获取元素,那么进行等待 + +超时时间> 延迟时间,那么肯定可以取到元素,设置leader为当前线程,等待延迟时间到期。 + +## take +```Java + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + // 堆顶元素 + E first = q.peek(); + // 如果堆顶元素为空,说明队列中还没有元素,直接阻塞等待 + if (first == null) + available.await(); + else { + // 堆顶元素的到期时间 + long delay = first.getDelay(NANOSECONDS); + // 如果小于0说明已到期,直接调用poll()方法弹出堆顶元素 + if (delay <= 0) + return q.poll(); + + // 如果delay大于0 ,则下面要阻塞了 + + // 将first置为空方便gc,因为有可能其它元素弹出了这个元素 + // 这里还持有着引用不会被清理 + first = null; // don't retain ref while waiting + // 如果前面有其它线程在等待,直接进入等待 + if (leader != null) + available.await(); + else { + // 如果leader为null,把当前线程赋值给它 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + // 等待delay时间后自动醒过来 + // 醒过来后把leader置空并重新进入循环判断堆顶元素是否到期 + // 这里即使醒过来后也不一定能获取到元素 + // 因为有可能其它线程先一步获取了锁并弹出了堆顶元素 + // 条件锁的唤醒分成两步,先从Condition的队列里出队 + // 再入队到AQS的队列中,当其它线程调用LockSupport.unpark(t)的时候才会真正唤醒 + // 关于AQS我们后面会讲的^^ + available.awaitNanos(delay); + } finally { + // 如果leader还是当前线程就把它置为空,让其它线程有机会获取元素 + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // 成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程 + if (leader == null && q.peek() != null) + // signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒 + available.signal(); + // 解锁,这才是真正的唤醒 + lock.unlock(); + } +} +``` +## peek +```Java +/** + * 获取但不移除队首元素,或返回null(如果队列为空)。和poll方法不同, + * 若队列不为空,该方法换回队首元素,不论是否达到延迟时间 + * + * @return 队首元素,或null(如果队列为空) + */ + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.peek(); + } finally { + lock.unlock(); + } + } +``` + + + + + -- Gitee