diff --git a/week_04/55/ArrayBlockingQueue-55.md b/week_04/55/ArrayBlockingQueue-55.md new file mode 100644 index 0000000000000000000000000000000000000000..b642097d2676c0f5ca2a13c6270591f35231ac8d --- /dev/null +++ b/week_04/55/ArrayBlockingQueue-55.md @@ -0,0 +1,187 @@ +ArrayBlockingQueue + +一,简介 +ArrayBlockingQueue 是一个用数组实现的有界队列;此队列按照先进先出(FIFO)的规则对元素进行排序;默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序的访问队列,即先阻塞的线程先访问队列;非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问;为了保证公平性,通常会降低吞吐量。 + +二,基本成员 + /** The queued items */ + // 记录数据的数组 + final Object[] items; + + /** items index for next take, poll, peek or remove */ + // 索引用于 take,poll,peek,remove 等方法 + int takeIndex; + + /** items index for next put, offer, or add */ + // 索引用于 put,offer,or add 等方法 + int putIndex; + + /** Number of elements in the queue */ + // 总数 + int count; + + /* + * Concurrency control uses the classic two-condition algorithm + * found in any textbook. + */ + + /** Main lock guarding all access */ + // 队列的锁 + final ReentrantLock lock; + + /** Condition for waiting takes */ + // 用于让线程等待,消费时队列为空 + private final Condition notEmpty; + + /** Condition for waiting puts */ + // 用于让线程等待,生产时队列满 + private final Condition notFull; +三,常用方法 +构造方法 +我们看下两个构造,其实也就是一个,注意没有无参构造,初始化时必须要给出容量。 + + public ArrayBlockingQueue(int capacity) { + this(capacity, false); + } + + // 初始化一个ArrayBlockingQueue + public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + // 初始化一个数组 + this.items = new Object[capacity]; + // 初始化一个锁 + lock = new ReentrantLock(fair); + // 用来存放消费者的阻塞线程 + notEmpty = lock.newCondition(); + // 用来存放生产者的线程 + notFull = lock.newCondition(); + } +add 方法 +可以看出add调用的是offer方法,详情请看offer方法。 + + public boolean add(E e) { + // 调用父类的方法 + return super.add(e); + } + + // 父类 AbstractQueue 的add方法 + public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); + } +注意:add 插入失败会抛异常。 + +offer 方法 + // offer加入元素 + public boolean offer(E e) { + // 不能为null + checkNotNull(e); + // 获取锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 如果数组满了,返回false + if (count == items.length) + return false; + else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } + } + // enqueue + + private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + // 获取数组 + final Object[] items = this.items; + items[putIndex] = x; + if (++putIndex == items.length) + putIndex = 0; + count++; + // 唤醒消费阻塞的队列 + notEmpty.signal(); + } +注意:offer还有一个重载方法,带有超时时间的插入,支持中断offer(E e, long timeout, TimeUnit unit)。 + +put 方法 +public void put(E e) throws InterruptedException { + // 不能为null + checkNotNull(e); + // 获取锁 + final ReentrantLock lock = this.lock; + // 支持中断 + lock.lockInterruptibly(); + try { + // 等于数组的容量 + while (count == items.length) + // 等待 + notFull.await(); + enqueue(e); + } finally { + lock.unlock(); + } + } +注意:put和前面的offer要区别,offer方法队列满是返回false,put方法是让线程等待,根据自己的场景用合适的方法。 + +poll 方法 + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } + } +注意:poll也有一个重载方法,带有超时和中断poll(long timeout, TimeUnit unit)。 + +take 方法 + // 消费 + public E take() throws InterruptedException { + // 获取锁 + final ReentrantLock lock = this.lock; + // 支持中断 + lock.lockInterruptibly(); + try { + // 队列为空 + while (count == 0) + // 阻塞 + notEmpty.await(); + return dequeue(); + } finally { + lock.unlock(); + } + } +注意:take和poll也是一对方法,poll队列为空返回null,take是让线程等待,直到唤醒。 + +peek 方法 +// 获取队尾的元素 不删除 + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return itemAt(takeIndex); // null when queue is empty + } finally { + lock.unlock(); + } + } +size 方法 + // 统计个数 size是准确值 + public int size() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return count; + } finally { + lock.unlock(); + } + } +四,总结 +ArrayBlockingQueue 是有界的,所以我们在初始化是容量要设计好,因为它是不可以扩容的,还有我觉得这个队列适合一些稳定并发量的系统,如果并发量突然变大,导致队列满,会造成大量的线程等待,影响系统的响应;我们通过阅读源码也发现队列的源码是很轻量的,使用起来也很简单,让人很好理解;使用这个队列一定要注意put,offer,take,poll这两组方法,根据自己的业务场景选择是直接返回(响应速度快)还是阻塞线程。 diff --git a/week_04/55/ConcurrentHashMap-55.md b/week_04/55/ConcurrentHashMap-55.md new file mode 100644 index 0000000000000000000000000000000000000000..14acfd0a73421195a461c2b87938f308ab16e210 --- /dev/null +++ b/week_04/55/ConcurrentHashMap-55.md @@ -0,0 +1,273 @@ +ConcurrentHashMap +1、概述 +  ConcurrentHashMap这个类在java.lang.current包中,这个包中的类都是线程安全的。ConcurrentHashMap底层存储数据的结构与1.8的HashMap是一样的,都是数组+链表(或红黑树)的结构。在日常的开发中,我们最长用到的键值对存储结构的是HashMap,但是我们知道,这个类是非线程安全的,在高并发的场景下,在进行put操作的时候有可能进入死循环从而使服务器的cpu使用率达到100%;sun公司因此也给出了与之对应的线程安全的类。在jdk1.5以前,使用的是HashTable,这个类为了保证线程安全,在每个类中都添加了synchronized关键字,而想而知在高并发的情景下相率是非常低下的。为了解决HashTable效率低下的问题,官网在jdk1.5后推出了ConcurrentHashMap来替代饱受诟病的HashTable。jdk1.5后ConcurrentHashMap使用了分段锁的技术。在整个数组中被分为多个segment,每次get,put,remove操作时就锁住目标元素所在的segment中,因此segment与segment之前是可以并发操作的,上述就是jdk1.5后实现线程安全的大致思想。但是,从描述中可以看出一个问题,就是如果出现比较机端的情况,所有的数据都集中在一个segment中的话,在并发的情况下相当于锁住了全表,这种情况下其实是和HashTable的效率出不多的,但总体来说相较于HashTable,效率还是有了很大的提升。jdk1.8后,ConcurrentHashMap摒弃了segment的思想,转而使用cas+synchronized组合的方式来实现并发下的线程安全的,这种实现方式比1.5的效率又有了比较大的提升。那么,它是如何整体提升效率的呢?见下文分析吧! + + 2、重要成员变量 +  1、ziseCtr:在多个方法中出现过这个变量,该变量主要是用来控制数组的初始化和扩容的,默认值为0,可以概括一下4种状态: + +    a、sizeCtr=0:默认值; + +    b、sizeCtr=-1:表示Map正在初始化中; + +    c、sizeCtr=-N:表示正在有N-1个线程进行扩容操作; + +    d、sizeCtr>0: 未初始化则表示初始化Map的大小,已初始化则表示下次进行扩容操作的阈值; + +  2、table:用于存储链表或红黑数的数组,初始值为null,在第一次进行put操作的时候进行初始化,默认值为16; + +  3、nextTable:在扩容时新生成的数组,其大小为当前table的2倍,用于存放table转移过来的值; + +  4、Node:该类存储数据的核心,以key-value形式来存储; + +  5、ForwardingNode:这是一个特殊Node节点,仅在进行扩容时用作占位符,表示当前位置已被移动或者为null,该node节点的hash值为-1; + +  4、put操作 +   +/** Implementation for put and putIfAbsent */ + final V putVal(K key, V value, boolean onlyIfAbsent) { + //key和value不能为空 + if (key == null || value == null) throw new NullPointerException(); + //通过key来计算获得hash值 + int hash = spread(key.hashCode()); + //用于计算数组位置上存放的node的节点数量 + //在put完成后会对这个参数判断是否需要转换成红黑树或链表 + int binCount = 0; + //使用自旋的方式放入数据 + //这个过程是非阻塞的,放入失败会一直循环尝试,直至成功 + for (Node[] tab = table;;) { + Node f; int n, i, fh; + //第一次put操作,对数组进行初始化,实现懒加载 + if (tab == null || (n = tab.length) == 0) + //初始化 + tab = initTable(); + //数组已初始化完成后 + //使用cas来获取插入元素所在的数组的下标的位置,该位置为空的话就直接放进去 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + //hash=-1,表明该位置正在进行扩容操作,让当前线程也帮助该位置上的扩容,并发扩容提高扩容的速度 + else if ((fh = f.hash) == MOVED) + //帮助扩容 + tab = helpTransfer(tab, f); + //插入到该位置已有数据的节点上,即用hash冲突 + //在这里为保证线程安全,会对当前数组位置上的第一个节点进行加锁,因此其他位置上 + //仍然可以进行插入,这里就是jdk1.8相较于之前版本使用segment作为锁性能要高效的地方 + else { + V oldVal = null; + synchronized (f) { + //再一次判断f节点是否为第一个节点,防止其他线程已修改f节点 + if (tabAt(tab, i) == f) { + //为链表 + if (fh >= 0) { + binCount = 1; + //将节点放入链表中 + for (Node e = f;; ++binCount) { + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + //为红黑树 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + //将节点插入红黑树中 + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + //插入成功后判断插入数据所在位置上的节点数量, + //如果数量达到了转化红黑树的阈值,则进行转换 + if (binCount != 0) { + if (binCount >= TREEIFY_THRESHOLD) + //由链表转换成红黑树 + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + //使用cas统计数量增加1,同时判断是否满足扩容需求,进行扩容 + addCount(1L, binCount); + return null; + } +  在代码上写注释可能看得不是很清晰,那么我就使用文字再来描述一下插入数据的整个流程: + +    1、判断传进来的key和value是否为空,在ConcurrentHashMap中key和value都不允许为空,然而在HashMap中是可以为key和val都可以为空,这一点值得注意一下; + +    2、对key进行重hash计算,获得hash值; + +    3、如果当前的数组为空,说明这是第一插入数据,则会对table进行初始化; + +    4、插入数据,这里分为3中情况: + +      1)、插入位置为空,直接将数据放入table的第一个位置中; + +      2)、插入位置不为空,并且改为是一个ForwardingNode节点,说明该位置上的链表或红黑树正在进行扩容,然后让当前线程加进去并发扩容,提高效率; + +      3)、插入位置不为空,也不是ForwardingNode节点,若为链表则从第一节点开始组个往下遍历,如果有key的hashCode相等并且值也相等,那么就将该节点的数据替换掉, + +        否则将数据加入  到链表末段;若为红黑树,则按红黑树的规则放进相应的位置; + +    5、数据插入成功后,判断当前位置上的节点的数量,如果节点数据大于转换红黑树阈值(默认为8),则将链表转换成红黑树,提高get操作的速度; + +    6、数据量+1,并判断当前table是否需要扩容; + +  所以,put操作流程可以简单的概括为上面的六个步骤,其中一些具体的操作会在下面进行详细的说明,不过,值得注意的是: + +    1、ConcurrentHashMap不可以存储key或value为null的数据,有别于HashMap; + +    2、ConcurrentHashMap使用了懒加载的方式初始化数据,把table的初始化放在第一次put数据的时候,而不是在new的时候; + +    3、扩容时是支持并发扩容,这将有助于减少扩容的时间,因为每次扩容都需要对每个节点进行重hash,从一个table转移到新的table中,这个过程会耗费大量的时间和cpu资源。 + +    4、插入数据操作锁住的是表头,这是并发效率高于jdk1.7的地方; + +  4.1、hash计算的spread方法 +1* + * Spreads (XORs) higher bits of hash to lower and also forces top + * bit to 0. Because the table uses power-of-two masking, sets of + * hashes that vary only in bits above the current mask will + * always collide. (Among known examples are sets of Float keys + * holding consecutive whole numbers in small tables.) So we + * apply a transform that spreads the impact of higher bits + * downward. There is a tradeoff between speed, utility, and + * quality of bit-spreading. Because many common sets of hashes + * are already reasonably distributed (so don't benefit from + * spreading), and because we use trees to handle large sets of + * collisions in bins, we just XOR some shifted bits in the + * cheapest possible way to reduce systematic lossage, as well as + * to incorporate impact of the highest bits that would otherwise + * never be used in index calculations because of table bounds. + */ + static final int spread(int h) { + return (h ^ (h >>> 16)) & HASH_BITS; + } +  从源码中可以看到,jdk1.8计算hash的方法是先获取到key的hashCode,然后对hashCode进行高16位和低16位异或运算,然后再与 0x7fffffff 进行与运算。高低位异或运算可以保证haahCode的每一位都可以参与运算,从而使运算的结果更加均匀的分布在不同的区域,在计算table位置时可以减少冲突,提高效率,我们知道Map在put操作时大部分性能都耗费在解决hash冲突上面。得出运算结果后再和 0x7fffffff 与运算,其目的是保证每次运算结果都是一个正数。对于java位运算不了解的同学,建议百度自行了解相关内容。 + +  4.2、java内存模型和cas操作 +  这里我只是简单的说一下java的内存模型和cas,因为这篇文章的主角的ConcurrentHashMap。 + +  java内存模型:在java中线程之间的通讯是通过共享内存(即我们在变成时声明的成员变量或叫全局变量)的来实现的。Java内存模型中规定了所有的变量都存储在主内存中,每条线程还有自己的工作内存(可以与前面将的处理器的高速缓存类比),线程的工作内存中保存了该线程使用到的变量到主内存副本拷贝,线程对变量的所有操作(读取、赋值)都必须在工作内存中进行,而不能直接读写主内存中的变量。不同线程之间无法直接访问对方工作内存中的变量,线程间变量值的传递均需要在主内存来完成,线程、主内存和工作内存的交互关系如下图所示,和上图很类似。 + +  举一个非常简单的例子,就是我们常用的i++的操作,这个操作看起来只有一行,然而在编译器中这一行代码会被编译成3条指令,分别是读取、更新和写入,所以i++并不是一个原子操作,在多线程环境中是有问题了。其原因在于(我们假设当前 i 的值为1)当一条线程向主内存中读取数据时,还没来得及把更新后的值刷新到主内存中,另一个线程就已经开始向主内存中读取了数据,而此时内存中的值仍然为1,两个线程执行+1操作后得到的结果都为2,然后将结果刷新到主内存中,整个i++操作结果,最终得到的结果为2,但是我们预想的结果应该是3,这就出现了线程安全的问题了。 + +  cas: cas的全名称是Compare And Swap 即比较交换。cas算法在不需要加锁的情况也可以保证多线程安全。核心思想是: cas中有三个变量,要更新的变量V,预期值E和新值N,首先先读取V的值,然后进行相关的操作,操作完成后再向主存中读取一次取值为E,当且仅当V == E时才将N赋值给V,否则再走一遍上诉的流程,直至更新成功为止。就拿上面的i++的操作来做说明,假设当前i=1,两个线程同时对i进行+1的操作,线程A中V = 1,E = 1,N = 2;线程B中 V = 1,E = 1,N = 2;假设线程A先执行完整个操作,此时线程A发现 V = E = 1,所以线程A将N的值赋值给V,那么此时i的值就变成了 2 ;线程B随后也完成了操作,向主存中读取i的值,此时E = 2,V = 1,V != E,发现两个并不相等,说明i已经被其他线程修改了,因此不执行更新操作,而是从新读取V的值V = 2 ,执行+1后N = 3,完成后再读取主存中i的值,因为此时没有其他线程修改i的值了,所以E = 2,V = E = 2,两个值相等,因此执行赋值操作,将N的值赋值给i,最终得到的结果为3。在整过过程中始终没有使用到锁,却实现的线程的安全性。 + +  从上面的过程知道,cas会面临着两个问题,一个是当线程一直更新不成功的话,那么这个线程就一直处于死循环中,这样会非常耗费cpu的资源;另一种是ABA的问题,即对i =1进行+1操作后,再-1,那么此时i的值仍为1,而另外一个线程获取的E的值也是1,认为其他线程没有修改过i,然后进行的更新操作,事实上已经有其他线程修改过了这个值了,这个就是 A ---> B ---> A 的问题; + +  4.3、获取table对应的索引元素的位置 +  通过(n-1)& hash 的算法来获得对应的table的下标的位置,如果对于这条公式不是很理解的同学可以到: jdk1.8源码分析-hashMap 博客中了解。 + +  tabAt(Node[] tab, int i): 这个方法使用了java提供的原子操作的类来操作的,sun.misc.Unsafe.getObjectVolatile 的方法来保证每次线程都能获取到最新的值; + +  casTabAt(Node[] tab, int i,Node c, Node v): 这个方法是通过cas的方式来获取i位置的元素; + +  4.4、扩容 + +  - 如果新增节点之后,所在的链表的元素个数大于等于8,则会调用treeifyBin把链表转换为红黑树。在转换结构时,若tab的长度小于MIN_TREEIFY_CAPACITY,默认值为64, + +  则会将数组长度扩大到原来的两倍,并触发transfer,重新调整节点位置。(只有当tab.length >= 64, ConcurrentHashMap才会使用红黑树。) +  - 新增节点后,addCount统计tab中的节点个数大于阈值(sizeCtl),会触发transfer,重新调整节点位置。 + + +/** + * Adds to count, and if table is too small and not already + * resizing, initiates transfer. If already resizing, helps + * perform transfer if work is available. Rechecks occupancy + * after a transfer to see if another resize is already needed + * because resizings are lagging additions. + * + * @param x the count to add + * @param check if <0, don't check resize, if <= 1 only check if uncontended + */ + private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { + CounterCell a; long v; int m; + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } + } +  5、get操作 +  get操作中没有使用到同步的操作,所以相对来说比较简单一点。通过key的hashCode计算获得相应的位置,然后在遍历该位置上的元素,找到需要的元素,然后返回,如果没有则返回null: + +/** + * Returns the value to which the specified key is mapped, + * or {@code null} if this map contains no mapping for the key. + * + *

More formally, if this map contains a mapping from a key + * {@code k} to a value {@code v} such that {@code key.equals(k)}, + * then this method returns {@code v}; otherwise it returns + * {@code null}. (There can be at most one such mapping.) + * + * @throws NullPointerException if the specified key is null + */ + public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { + if ((eh = e.hash) == h) { + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; + } diff --git a/week_04/55/ConcurrentLinkedQueue-55.md b/week_04/55/ConcurrentLinkedQueue-55.md new file mode 100644 index 0000000000000000000000000000000000000000..b6e7edc3f60b7eec74de04eb32eb8b3ce5317e47 --- /dev/null +++ b/week_04/55/ConcurrentLinkedQueue-55.md @@ -0,0 +1,332 @@ +ConcurrentLinkedQueue +ConcurrentLinkedQueue +ConcurrentLinkedQueue是一个无锁化、非阻塞、线程安全的单向队列,JDK1.5提供,由大名鼎鼎的Doug Lea编写。不过要是你认为这是大神灵感爆发的杰作,那就错了,仔细读读java源文件的注释就知道,这个类的实现原理来自Michael & Scott设计的算法(请参考论文),而Michael & Scott也是在很多前人研究的基础上加以综合完善得到该算法的,成为很多具体平台并发FIFO队列实现的蓝本。如果不读读这篇论文,直接阅读java源码,光凭那点注释,会有点摸不着头脑。 + +我们要研究的ConcurrentLinkedQueue可以说是这个算法的java版本,当然针对java语言做了很多修改。到这里,你要是认为Doug Lea就干了点将原始算法翻译成java的活,那又错了,从一个理论算法到具体某个语言的实现还有相当长的距离;而且java语言本来就比较慢,所以为了挖掘性能,作者做了大量精细而巧妙的设计。 + +ConcurrentLinkedQueue实现的依赖cas操作和java的valotile语义,所以不理解这两点是无法看懂代码的。关于cas操作和valotile关键字,不是本文要讲的内容,有很多的资料可以查询,比如: +JAVA中的CAS +volatile关键字解析 + +说明: + +java里面并没有指针的概念,但是算法原理结构理论讲队列,有“头指针”、“尾指针”的说法,为了方便,下面在讲解ConcurrentLinkedQueue内部结构时也会使用这两个术语; +采用的源码是Java 1.8版本; +强烈建议先阅读cas和java内存模型相关资料。 +一、队列的内部链表结构 +下面是摘录自源码,经过简化的ConcurrentLinkedQueue定义片段。 + +public class ConcurrentLinkedQueue extends AbstractQueue + implements Queue, java.io.Serializable { + + private transient volatile Node head; + private transient volatile Node tail; + + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } + } +一个空的ConcurrentLinkedQueue包含一个空节点(数据字段=null),队列的HEAD和TAIL指针都指向这个节点。当经过一些出入队列的操作以后,队列的结构可能类似下面: + +【N1】->【N2】->【N3】->【N4】->【N5】->【N6】 +HEAD                                                                    TAIL + +但是,也有可能类似这样: +【N1】->【N2】->【N3】->【N4】->【N5】->【N6】 +                  HEAD                                  TAIL + +上边这个结构,表示N1节点已经出了队列,此时的HEAD是N2节点,但是N1节点的next指针仍然保持着。而TAIL节点指向的N5却不是真正的队尾,队尾是N6。这是因为在入队列的过程中,由于并发,导致TAIL更新不及时。 + +同样还是上面的结构,如果我把里面的数据字段标识出来,有可能是这样的: +【N1(null)】->【N2(null)】->【N3(data)】->【N4(data)】->【N5(data)】->【N6(data)】 +                          HEAD                                                                 TAIL +HEAD节点的数据字段是NULL,说明这是一个空节点,这个队列语义上第一个节点应该是N3才对。 + +再来看一个状态: +【N1】<-| 【N2】->【N3】->【N4】->【N5】->【N6】 +                  HEAD                                  TAIL +N1节点的next字段指向自身,这表示N1节点彻底断开和队列的关系,为什么不直接将N1的next置为null呢,因为这样就不能直接断定N1到底是不是尾节点了。 + +上面这些情形是ConcurrentLinkedQueue在执行一些操作之后可能处于的状态,之所于允许这些看起来不够严谨的状态,是为了在并发过程中提高效率。但是不管如何,在整个生命周期内,算法保持以下属性: + +链表里面至少会有一个节点,数据字段为null的节点是空节点; +顺着HEAD指针肯定能找到的真正的头节点,并能访问到所有节点; +TAIL指针不一定指向有效节点,更不能保证指向真正的尾节点,但是它大部分情况下指向尾节点或接近尾节点,因此可以提高效率; +和队列断开的节点,next字段指向自身; +对于入了队列的节点(哪怕又出了队列),只有尾节点的next字段才等于null。 +在ConcurrentLinkedQueue内部链表上,可能有一个或多个数据字段为null的空节点,空节点虽然没有数据,但是对高效地保持链表的连接状态至关重要。另一方面,节点的数据字段和next字段值的变化有很强的规律性:数据字段在节点入队列是不为null,出队列时变为null;next字段入队列时是null,出队列时先保持不变,再指向自身。这种规律性可以有效规避掉cas操作的ABA问题。 + +上面这些也可以认为是ConcurrentLinkedQueue实现算法的不变式,有些在源码注释里面就有说明,有些是我总结的。带着这些原则才能理解每个方法的实现为什么是这样的。 + +二、内部节点类 +直接上源码吧,由于Node提供一个比较简单的节点数据结构,逻辑不多,但是我在读的过程中还是有一些疑惑,直接将解释写在下面的源码块里面了。 + +private static class Node { + volatile E item; + volatile Node next; + + /** + * Constructs a new node. Uses relaxed write because item can + * only be seen after publication via casNext. + */ + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + 为什么要使用UNSAFE.putObject而不直接赋值呢?刚看到“relaxed write”这个注释时一脸懵逼, + 后来才搞明白,因为item是volatile修饰的,如果直接赋值,会触发volatile的内存同步语义, + 在初始化阶段不需要如此,所以使用UNSAFE.putObject能提高一些性能。 + + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + cas操作设置数据字段,这个操作在出队列操作时,保证并发安全。 + + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + 这个操作相比UNSAFE.putObject,会一定程度禁止指令重排, + 相比volatile赋值,不保证全局可见性,性能也稍好一些,用于节点出队列后断开链接, + + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + 给节点设置next字段,入队列的关键操作,cas操作保证并发安全 + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + } + +三、出队列:poll方法 +poll的目标是安全地取出第一个有效节点的数据,如果没有返回null。 +源码如下,为了方便分析,对关键行,我加上了L1~Ln这样的行标记 。 + + public E poll() { + restartFromHead: + for (;;) { + L1 for (Node h = head, p = h, q;;) { + E item = p.item; + + L2 if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + L3 if (p != h) // hop two nodes at a time + L4 updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + L5 else if ((q = p.next) == null) { + L6 updateHead(h, p); + return null; + } + L7 else if (p == q) + continue restartFromHead; + else + L8 p = q; + } + } + } +L1行:开始一个类似遍历的for循环,初始化h,p为head节点,p是当前检查的节点,q是下一个节点。 +L2行:如果p节点的数据不为空,那么p肯定就是真正的第一个节点,只要能将节点的数据字段(item)置为null,出队列就算成功(参考第一节总结的节点属性);p.casItem(item, null)操作保证了并发poll的安全,如果有其他线程抢先一步执行成功,那么casItem操作会失败。 +L2行如果执行成功,那么出队列可以算是完成了,此时的head指向一个空节点,这种情况是允许的,但是队列里面也不能总是留很多空节点,所以L3L4行,就是把head指针后移。 + +L3行:p==h,说明刚出队列的p就是遍历开始的第一个节点,有2种情况,1)没有并发poll,head指向了空节点,队列里面只有且仅有一个空节点,此时没有必要做head移动;2)有并发poll,p==h说明本线程在竞争中领先,那么交由那个落后的线程来做head移动。 +L4行:如果p.next==null,说明p是尾节点,链表空了,head也指向p;否则head指向p.next。两种情况都是正确的。 +L5行:无论p的item字段为null,还是在并发中竞争失败,我们都要往后遍历;于是q=p.next,如果q==null,说明链表空了(参考第一节总结的节点属性),此时将head置为尾节点;当然,如果h==p是不需要执行的,updateHead里面有这个判断。 +L7行:p==q实际上就是p==p.next,也即p的next指向自身,这说明p节点已经和链表断开了(参考第一节总结的节点属性),这种情况下,只能重新开始poll。 +L8行:正常迭代下一个节点。 +上面L2~L4行,每个线程在成功poll一个节点之后,就会尝试把头结点移到p的下一个节点,这样保证了head后面最多只有一个空节点。L3行的那个判断如果没有,也不会影响程序的正确性,但是性能会差一些。 + +再介绍一下更新头指针的updateHead方法: + + final void updateHead(Node h, Node p) { + if (h != p && casHead(h, p)) + h.lazySetNext(h); + } +将头指针从h转移到p,就是将节点h出队列,一旦出队列成功,就将h的next字段指向自身,防止那些出队列的节点仍然互相链接,妨碍垃圾收集,这个操作也维护了第一节总结的节点属性。值得注意的是h.lazySetNext(h)操作没有volatile的语义,有可能对其他线程暂时不可见。 假设poll方法执行到L5行时,p节点已经被另外一个线程出队列了,但是本线程不知道,那么L7行的判断也可能失败,最终执行到L8行,这种情况下程序仍然是正确的,不过会多遍历一些节点。(只能认为,作者经过测量,认为这种情况造成的性能损失低于volatile造成的性能损失)。 + +还有一个有意思的现象是,在poll的过程中,并不会去修正TAIL指针,所以在链表的TAIL指针是有可能落在HEAD之后的,甚至暂时指向一个已经出队列的节点。 + +四、入队列:offer方法 +offer的目标是将新节点插入到尾节点的后面,即使在并发情况下也不丢失数据。 + +public boolean offer(E e) { + checkNotNull(e); + final Node newNode = new Node(e); + + L1 for (Node t = tail, p = t;;) { + L2 Node q = p.next; + L3 if (q == null) { + // p is last node + L4 if (p.casNext(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this queue, + // and for newNode to become "live". + L5 if (p != t) // hop two nodes at a time + L6 casTail(t, newNode); // Failure is OK. + return true; + } + // Lost CAS race to another thread; re-read next + } + L7 else if (p == q) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + L8 p = (t != (t = tail)) ? t : head; + else + // Check for tail updates after two hops. + L9 p = (p != t && t != (t = tail)) ? t : q; + } + } +L1行:开启一个从tail指针开始遍历的循环,p指向当前疑似尾节点; +L2行:初始化q=p的下一个节点; +L3行:如果q==null,说明此刻p确实是尾节点,新节点应该插到后面; +L4行:casNext执行安全的插入操作; +如果成功插入,那么应该考虑更新tail指针,L5L6行执行这个操作; +L5行:和poll方法一样,如果插入的地方不是在tail处,才往后移,这个行为可以保证,tail后面最多还有一个节点; +L6行:cas操作安全移动tail指针。 +后面的两个if分支说明p不是尾节点。 +L7行:p==q,也即p的next字段指向自身,说明p是一个脱离了链表的节点(并发poll操作造成的,参考第一节总结的节点属性)),需要找一个节点重新开始遍历。 +L8行:从head重新开始肯定正确,但是如果tail指针有更新过,那么从tail开始大概率可能效率更高。 +L9行:按道理直接p=q跳到下一个节点就Ok了,但是代码里面做了这个判断(p != t && t != (t = tail)),如果p已经正常往后移动了一次,且tail发生了变化,那么从新的tail重新开始。为什么要加个(p!=t)的前置判断呢?我认为是为了提高效率,因为tail是valotile变量,读取有一定代价,当p==t的时候,作者认为再往后跳一下成功的概率挺高。(作者应该经过测量,可见对性能的压榨已经丧心病狂了)。 +移除操作:remove +再来看移除操作,通过数据字段的比较,来移除数据相等的节点。 + +public boolean remove(Object o) { + if (o != null) { + Node next, pred = null; + for (Node p = first(); p != null; pred = p, p = next) { + boolean removed = false; + E item = p.item; + L1 if (item != null) { + L2 if (!o.equals(item)) { + L3 next = succ(p); + L4 continue; + } + L5 removed = p.casItem(item, null); + } + + L6 next = succ(p); + L7 if (pred != null && next != null) // unlink + pred.casNext(p, next); + if (removed) + return true; + } + } + return false; +} +L1行:判断当前是不是空节点。 +L2行:如果节点数据与输入参数相等,说明这是一个需要移除的节点,否则应该跳到下一个节点; +L3/4行:节点数据不相等,取出当前节点p的下一个节点,然后重新开始循环; +L5行:节点数据相等,尝试清空数据;如果别的线程并发执行remove或poll,这一步操作可能失败; +L6行:执行到这一行说明,p是空节点,(本来就是空,或在L5被清空);取出next节点; +L7行:由于p是空节点,尝试将p的前继和后继相连; +上面再取p的下一个节点的时候,用了调用了succ这个方法,这里也看一下: + + final Node succ(Node p) { + Node next = p.next; + return (p == next) ? head : next; + } +很简单,如果p的next指向自己,说明p已经脱离链表,此时返回head指向的节点。 + +源码可以看出,如果队列里面有多个相同数据的节点,一次remove调用最多删除一个。 + +迭代器 +private class Itr implements Iterator { + L1 private Node nextNode; + L2 private E nextItem; + L3 private Node lastRet; + + Itr() { + L4 advance(); + } + private E advance() { + L5 lastRet = nextNode; + L6 E x = nextItem; + + Node pred, p; + L7 if (nextNode == null) { + L8 p = first(); + pred = null; + } else { + pred = nextNode; + L9 p = succ(nextNode); + } + + for (;;) { + L10 if (p == null) { + nextNode = null; + nextItem = null; + return x; + } + E item = p.item; + L11 if (item != null) { + nextNode = p; + nextItem = item; + return x; + } else { + // skip over nulls + L12 Node next = succ(p); + L13 if (pred != null && next != null) + L14 pred.casNext(p, next); + L15 p = next; + } + } + } + + public boolean hasNext() { + L16 return nextNode != null; + } + + public E next() { + if (nextNode == null) throw new NoSuchElementException(); + L17 return advance(); + } + + public void remove() { + L18 Node l = lastRet; + if (l == null) throw new IllegalStateException(); + // rely on a future traversal to relink. + L19 l.item = null; + L20 lastRet = null; + } + } +源代码稍微长点,我删掉了注释。 + +L1行:成员变量nextNode指向下一个节点,用来支持hasNext,next等操作; +L2行:成员变量nextItem是下一个节点的值,即nextNode的item值;为什么单独需要这个变量呢,因为由于并发,nextNode的item字段有可能被置空,而迭代器在hasNext的时候是不允许返回空值的,所以在迭代到nextNode的时候,立即取出item字段保存起来。 +L3行:成员变量lastRet指向上一个节点,用来支持删除操作; +L4行:构造函数仅调用advance方法,从名字上推测,应该是将迭代器往后推一步; +L5行:核心方法advance的第一行,先保存lastRet,因为这个方法执行完了以后,nextNode就指向下一个节点了; +L6行:临时变量x保存nextItem,因为advance执行完以后,nextItem就变成下一个节点的值了; +L7行:如果nextNode==null,说明这是初始化操作; +L8行:直接调用p=first(),将p赋值成第一个有效节点或null(队列空的); +L9行:不是初始化调用,那么p赋值成nextNode的后继节点; +至此p保存了nextNode的备选值 +L10行:如果p==null,说明已经到了链表尾部了,迭代结束; +L11行:如果p.item!=null,说明p是有效节点,ok,就是它了; +L12~L15行:如果p.item==null,说明p是无效节点,我们应该跳过它,继续寻找; +L13~L14行:如果p是空节点,我们尝试将它的前继与后继节点相连; +L16行:hasNext()方法的实现,就看nextNode是否空; +L17行:next()方法,很简单就是调用advance; +L18~L20行:remove()方法,将lastRet指向节点的数据清空;因为当我们调用next方法时,返回的值对应的是lastRet节点。 +从上面的实现我们可以知道,在迭代过程中,如果有数据并发入队列,这些数据是可以被迭代到的;如果有值被并发删除或出队列,那么这些数据有可能也被迭代到。尤其是迭代器的remove方法,删除的有可能是一个不存在的数据。 + +队列长度 +ConcurrentLinkedQueue的size方法是很没效率的的,实际是把队列遍历了一遍来计算长度。所以推荐大家使用isEmpty来判断队列是否为空,而不要使用size()==0。 + +public boolean isEmpty() { + return first() == null; +} + + public int size() { + int count = 0; + for (Node p = first(); p != null; p = succ(p)) + if (p.item != null) + // Collection.size() spec says to max out + if (++count == Integer.MAX_VALUE) + break; + return count; +} +为什么ConcurrentLinkedQueue内部不维护一个size变量来跟踪队列的长度呢?不是不想,是做不到,由于ConcurrentLinkedQueue使用无锁化设计,通过cas操作来保证并发安全。而cas操作只能保证单个变量的并发安全性,无法在出入队列操作的同时,维护size变量。 diff --git a/week_04/55/CopyOnWriteArrayList-55.md b/week_04/55/CopyOnWriteArrayList-55.md new file mode 100644 index 0000000000000000000000000000000000000000..bc357d70f6e6d213afa675025dd564d7f46517e0 --- /dev/null +++ b/week_04/55/CopyOnWriteArrayList-55.md @@ -0,0 +1,126 @@ +CopyOnWriteArrayList +1.1前言 + CopyOnWriteArrayList是JAVA中的并发容器类,同时也是符合写时复制思想的CopyOnWrite容器。写时复制思想即是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。 + +1.2源码分析 +1.2.1实现接口 +public class CopyOnWriteArrayList + implements List, RandomAccess, Cloneable, java.io.Serializable + CopyOnWriteArrayList实现了List和RandomAccess接口,使得该容器可以具有列表的基本功能和随机访问的特性,并且实现了Cloneable接口和Serializable接口,表示可被克隆和序列化。 + +1.2.2成员变量 +//重入锁 +final transient ReentrantLock lock = new ReentrantLock(); + +//对象数组,用于存放数据,用volatile修饰 +private transient volatile Object[] array; +1.2.3构造函数 +//设置数组 +final void setArray(Object[] a) { + array = a; +} + +//调用setArray,创建一个空的列表 +public CopyOnWriteArrayList() { + setArray(new Object[0]); +} + +//创建一个包含collection的列表 +public CopyOnWriteArrayList(Collection c) { + Object[] elements; + //判断集合C的类型是否是CopyOnWriteArrayList,如果是则获取集合的数组,否则进入else + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + else { + elements = c.toArray();//将集合转为数组 + //判断elements的类型是否为Object[]类型,如果不是则转为Object[]类型 + if (elements.getClass() != Object[].class) + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements);//设置数组 +} +//将toCopyIn转为Object[]类型,然后设置数组 +public CopyOnWriteArrayList(E[] toCopyIn) { + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); +} +1.2.4get方法 +private E get(Object[] a, int index) { + return (E) a[index]; +} + +public E get(int index) { + return get(getArray(), index); +} +final Object[] getArray() { + return array; +} + get方法没有加锁也没有cas操作,因此代码非常简单。 + +1.2.5add方法 +//将指定元素添加到列表尾部 +public boolean add(E e) { + final ReentrantLock lock = this.lock; + lock.lock();//加锁 + try { + Object[] elements = getArray();//获取旧数组引用 + int len = elements.length;//旧数组长度 + Object[] newElements = Arrays.copyOf(elements, len + 1);//创建新数组,并将旧数组的数组复制到新数组中 + newElements[len] = e;//添加元素e + setArray(newElements);//设置数组 + return true; + } finally { + lock.unlock();//释放锁 + } +} +1.2.6set方法 +//替换列表指定位置的元素 +public E set(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock();//加锁 + try { + Object[] elements = getArray(); + E oldValue = get(elements, index);//获取指定位置的旧值 + + if (oldValue != element) { + int len = elements.length;//旧数组长度 + Object[] newElements = Arrays.copyOf(elements, len);//创建新数组,并将旧数组的数组复制到新数组中 + newElements[index] = element;//替换指定位置的元素 + setArray(newElements);//设置数组 + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue;//返回旧值 + } finally { + lock.unlock();//释放锁 + } +} +1.2.7remove方法 +//删除指定位置的元素 +public E remove(int index) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + E oldValue = get(elements, index);//获取指定位置的元素 + int numMoved = len - index - 1;//需要移动的元素个数 + if (numMoved == 0) + setArray(Arrays.copyOf(elements, len - 1));//移动个数为0则表示移除的是数组的最后一个元素,复制elements数组,复制长度为length-1,然后设置数组 + else {//移动个数不为0 + Object[] newElements = new Object[len - 1];//创建一个新数组 + System.arraycopy(elements, 0, newElements, 0, index);//复制index之前的元素 + System.arraycopy(elements, index + 1, newElements, index, + numMoved);//复制index之后的元素 + setArray(newElements);//设置数组 + } + return oldValue; + } finally { + lock.unlock(); + } +} +1.3缺点 +内存占用问题,有可能造成频繁的垃圾回收。 +数据一致性问题,CopyOnWriteArrayList只能保证数据的最终一致性,不能保证数据的实时一致性。 +1.4总结 + 对于CopyOnWriteArrayList容器来说,只适合读多写少的并发场景下使用。 diff --git a/week_04/55/DelayQueue-55.md b/week_04/55/DelayQueue-55.md new file mode 100644 index 0000000000000000000000000000000000000000..d483cd7b7dbb9b8b5349d8d1ecc4a4e66cb62899 --- /dev/null +++ b/week_04/55/DelayQueue-55.md @@ -0,0 +1,163 @@ +DelayQueue +DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。 + +主要属性 +public class DelayQueue extends AbstractQueue + implements BlockingQueue { + // 持有内部重入锁。 + private final transient ReentrantLock lock = new ReentrantLock(); + // 优先级队列,存放工作任务。 + private final PriorityQueue q = new PriorityQueue(); + private Thread leader = null; + // 依赖于重入锁的 condition(出队列的线程使用) + private final Condition available = lock.newCondition(); +} +1.Delayed接口 +DelayQueue队列与其它的队列最大的不同就是这个队列里的元素必须实现Delayed接口才能入队,我们来看一下这个接口: + +public interface Delayed extends Comparable { + + /** + * Returns the remaining delay associated with this object, in the + * given time unit. + * + * @param unit the time unit + * @return the remaining delay; zero or negative values indicate + * that the delay has already elapsed + */ + long getDelay(TimeUnit unit); +} +该接口继承自Comparable,也就意味着实现了Delayed接口的类必须有两个方法getDelay和compareTo,示例类: + +static class Task implements Delayed{ + long time = System.currentTimeMillis(); + public Task(long time) { + this.time = time; + } + @Override + public int compareTo(Delayed o) { + if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) + return -1; + else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) + return 1; + else + return 0; + } + + @Override + public long getDelay(TimeUnit unit) { + return unit.convert(time - System.currentTimeMillis(),TimeUnit.MILLISECONDS); + } + @Override + public String toString() { + return "" + time; + } + } +2.内部队列PriorityQueue +DelayQueue内部使用优先级队列PriorityQueue来存放元素,PriorityQueue队列里的元素会根据某些属性排列先后的顺序,这里正好可以利用Delayed接口里的getDelay的返回值来进行排序,delayQueue其实就是在每次往优先级队列中添加元素,然后以元素的delay/过期值作为排序的因素,以此来达到先过期的元素会拍在队首,每次从队列里取出来都是最先要过期的元素。 + +3.offer()方法 + /** + * Inserts the specified element into this delay queue. + * + * @param e the element to add + * @return {@code true} + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e) { + // 获取锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + q.offer(e); + // 判断是否添加成功 + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); + } + } +1.获取锁来执行后续操作 +2.元素添加到优先级队列中 +3.查看元素是否为队首,如果是队首的话,设置leader为空,唤醒一个消费线程。 + +这里有一个leader元素它的作用我们后面说,先看一下取元素过程 + +4.take()方法 +/** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element with an expired delay is available on this queue. + * + * @return the head of this queue + * @throws InterruptedException {@inheritDoc} + */ + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + // 获取可中断锁。 + lock.lockInterruptibly(); + try { + for (;;) { + // 从优先级队列中获取队列头元素 + E first = q.peek(); + if (first == null) + // 无元素,当前线程加入等待队列,并阻塞 + available.await(); + else { + // 通过getDelay 方法获取延迟时间 + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + // 延迟时间到期,获取并删除头部元素。 + return q.poll(); + first = null; // don't retain ref while waiting + if (leader != null) + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + // 线程节点进入等待队列 x 纳秒。 + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // leader == null且还存在元素的话,唤醒一个消费线程。 + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } + } +1.获取锁 +2.取出优先级队列q的首元素 +3.如果元素q的队首/队列为空,阻塞 +3.如果元素q的队首(first)不为空,获得这个元素的delay时间值,如果first的延迟delay时间值为0的话,说明该元素已经到了可以使用的时间,调用poll方法弹出该元素,跳出方法 +4.如果first的延迟delay时间值不为0的话,释放元素first的引用,避免内存泄露 +5.循环以上操作,直到return + +5.leader元素的使用 +leader是一个Thread元素,它在offer和take中都有使用,它代表当前获取到锁的消费者线程, +我们从take里的逻辑片段来分析 + +if (leader != null) + available.await(); +else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } +} +如果leader不为null,说明已经有消费者线程拿到锁,直接阻塞当前线程,如果leader为null,把当前线程赋值给leader,并等待剩余的到期时间,最后释放leader,这里我们想象着我们有个多个消费者线程用take方法去取,如果没有leader!=null的判断,这些线程都会无限循环,直到返回第一个元素,很显然很浪费资源。所以leader的作用是设置一个标记,来避免消费者的无脑竞争。 + +first = null; // don't retain ref while waiting +这里是释放first,是因为first是队列第一个元素的引用,同时可以有很多线程执行,意味着有很多线程持有第一个元素的引用,很有可能导致内存溢出,所以手动释放。