diff --git a/week_04/11/ArrayBlockingQueue.md b/week_04/11/ArrayBlockingQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..852f659b1b85416fbfeb8b29ecee05500c294379 --- /dev/null +++ b/week_04/11/ArrayBlockingQueue.md @@ -0,0 +1,39 @@ +ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则 对元素进行排序。 LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序 PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。 DelayQueue 优先级队列实现的无界阻塞队列 SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。 LinkedTransferQueue 链表实现的无界阻塞队列 LinkedBlockingDeque 链表实现的双向阻塞队列 在阻塞队列中,提供了四种处理方式 + +插入操作 add(e) :添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。 offer(e) : 添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回 true put(e) :当阻塞队列满了以后,生产者继续通过 put添加元素,队列会一直阻塞生产者线程,知道队列可用 offer(e,time,unit) :当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出 + +移除操作 remove():当队列为空时,调用 remove 会返回 false,如果元素移除成功,则返回 true poll(): 当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null take():基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,直到队列中有新的数据可以消费 poll(time,unit):带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回 + +ArrayBlockingQueue 源码分析 构造方法 ArrayBlockingQueue 提供了三个构造方法,分别如下: capacity: 表示数组的长度,也就是队列的长度 fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。 其中第三个构造方法就不解释了,它提供了接收一个几个作为数据初始化的方法 + +public ArrayBlockingQueue(int capacity) { this(capacity, false); } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); //重入锁,出队和入队持有这一把锁 notEmpty = lock.newCondition(); //初始化非空等待队列 notFull = lock.newCondition(); //初始化非满等待队列 } 关于锁的用途,大家在没有看接下来的源码之前,可以先思考一下他的作用。 items 构造以后,大概是一个这样的数组结构 + +image.png Add 方法 以 add 方法作为入口,在 add 方法中会调用父类的 add 方法,也就是 AbstractQueue.如果看源码看得比较多的话,一般这种写法都是调用父类的模版方法来解决通用性问题 + +public boolean add(E e) { return super.add(e); } 从父类的 add 方法可以看到,这里做了一个队列是否满了的判断,如果队列满了直接抛出一个异常 + +public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } offer 方法 add 方法最终还是调用 offer 方法来添加数据,返回一个添加成功或者失败的布尔值反馈 这段代码做了几个事情 + +判断添加的数据是否为空 添加重入锁 判断队列长度,如果队列长度等于数组长度,表示满了直接返回 false 否则,直接调用 enqueue 将元素添加到队列中 public boolean offer(E e) { checkNotNull(e); //对请求数据做判断 final ReentrantLock lock = this.lock; lock.lock(); try { f (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); } } enqueue 这个是最核心的逻辑,方法内部通过 putIndex 索引直接将元素添加到数组 items + +private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; //通过 putIndex 对数据赋值 if (++putIndex == items.length) // 当putIndex 等于数组长度时,将 putIndex 重置为 0 putIndex = 0; count++;//记录队列元素的个数 notEmpty.signal();//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素 } putIndex 为什么会在等于数组长度的时候重新设置为 0? + +因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了 + +下面这个图模拟了添加到不同长度的元素时,putIndex 的变化,当 putIndex 等于数组长度时,不可能让 putIndex 继续累加,否则会超出数组初始化的容量大小。同时大家还需要思考两个问题: + +当元素满了以后是无法继续添加的,因为会报错 其次,队列中的元素肯定会有一个消费者线程通过 take或者其他方法来获取数据,而获取数据的同时元素也会从队列中移除 image.png put 方法 put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。 + +public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //这个也是获得锁,但是和 lock 的区别是,这个方法优先允许在等待时由其他线程调用等待线程的 interrupt 方法来中断等待直接返回。而 lock方法是尝试获得锁成功后才响应中断 try { while (count == items.length) notFull.await();//队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中 enqueue(e); } finally { lock.unlock(); } } image.png take 方法 take 方法是一种阻塞获取队列中元素的方法 它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。 + +public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //如果队列为空的情况下,直接通过 await 方法阻塞 return dequeue(); } finally { lock.unlock(); } } image.png + +如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素 + +image.png dequeue 方法 这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端 takeIndex,是用来记录拿数据的索引值 + +private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //默认获取 0 位置的元素 items[takeIndex] = null;//将该位置的元素设置为空 if (++takeIndex == items.length)//这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据 takeIndex = 0; count--;//记录 元素个数递减 if (itrs != null) itrs.elementDequeued();//同时更新迭代器中的元素数据 notFull.signal();//触发 因为队列满了以后导致的被阻塞的线程 return x; } itrs.elementDequeued() ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素 所以 itrs.elementDequeued() 是用来更新迭代器中的元素数据的 takeIndex 的索引变化图如下,同时随着数据的移除,会唤醒处于 put 阻塞状态下的线程来继续添加数据 + +image.png remove 方法 remove 方法是移除一个指定元素 + +public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; //获取数组元素 final ReentrantLock lock = this.lock; lock.lock(); //获得锁 try { if (count > 0) { //如果队列不为空 final int putIndex = this.putIndex; //获取下一个要添加元素时的索引 int i = takeIndex;//获取当前要被移除的元素的索引 do { if (o.equals(items[i])) {//从takeIndex 下标开始,找到要被删除的元素 removeAt(i);//移除指定元素 return true;//返回执行结果 } //当前删除索引执行加 1 后判断是否与数组长度相等 //若为 true,说明索引已到数组尽头,将 i 设置为 0 if (++i == items.length) i = 0; } while (i != putIndex);//继续查找,直到找到最后一个元素 } return false; } finally { lock.unlock(); } } diff --git a/week_04/11/ConcurrentHashMap.md b/week_04/11/ConcurrentHashMap.md new file mode 100644 index 0000000000000000000000000000000000000000..5e0824452ac2c256b35da605534b2551b50d5ca2 --- /dev/null +++ b/week_04/11/ConcurrentHashMap.md @@ -0,0 +1,336 @@ +并发编程实践中,ConcurrentHashMap是一个经常被使用的数据结构,相比于Hashtable以及Collections.synchronizedMap(),ConcurrentHashMap在线程安全的基础上提供了更好的写并发能力,但同时降低了对读一致性的要求(这点好像CAP理论啊 O(∩_∩)O)。ConcurrentHashMap的设计与实现非常精巧,大量的利用了volatile,final,CAS等lock-free技术来减少锁竞争对于性能的影响,无论对于Java并发编程的学习还是Java内存模型的理解,ConcurrentHashMap的设计以及源码都值得非常仔细的阅读与揣摩。 + +这篇日志记录了自己对ConcurrentHashMap的一些总结,由于JDK6,7,8中实现都不同,需要分开阐述在不同版本中的ConcurrentHashMap。 + +之前已经在ConcurrentHashMap原理分析中解释了ConcurrentHashMap的原理,主要是从代码的角度来阐述是源码是如何写的,本文仍然从源码出发,挑选个人觉得重要的点(会用红色标注)再次进行回顾,以及阐述ConcurrentHashMap的一些注意点。 + +JDK6与JDK7中的实现 +1 设计思路 ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。但同时,由于不是对整个Map加锁,导致一些需要扫描整个Map的方法(如size(), containsValue())需要使用特殊的实现,另外一些方法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的,具体请查看ConcurrentHashMap能完全替代HashTable吗?)。 +ConcurrentHashMap中的分段锁称为Segment,它即类似于HashMap(JDK7与JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性,代码如下: + +static final class HashEntry { final int hash; final K key; volatile V value; volatile HashEntry next; 1.2 并发度(Concurrency Level) 并发度可以理解为程序运行时能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数,实际上就是ConcurrentHashMap中的分段锁个数,即Segment[]的数组长度。ConcurrentHashMap默认的并发度为16,但用户也可以在构造函数中设置并发度。当用户设置并发度时,ConcurrentHashMap会使用大于等于该值的最小2幂指数作为实际并发度(假如用户设置并发度为17,实际并发度则为32)。运行时通过将key的高n位(n = 32 – segmentShift)和并发度减1(segmentMask)做位与运算定位到所在的Segment。segmentShift与segmentMask都是在构造过程中根据concurrency level被相应的计算出来。 + +如果并发度设置的过小,会带来严重的锁竞争问题;如果并发度设置的过大,原本位于同一个Segment内的访问会扩散到不同的Segment中,CPU cache命中率会下降,从而引起程序性能下降。(文档的说法是根据你并发的线程数量决定,太多会导性能降低) + +1.3 创建分段锁 和JDK6不同,JDK7中除了第一个Segment之外,剩余的Segments采用的是延迟初始化的机制:每次put之前都需要检查key对应的Segment是否为null,如果是则调用ensureSegment()以确保对应的Segment被创建。 + +ensureSegment可能在并发环境下被调用,但与想象中不同,ensureSegment并未使用锁来控制竞争,而是使用了Unsafe对象的getObjectVolatile()提供的原子读语义结合CAS来确保Segment创建的原子性。代码段如下: + +if ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck Segment s = new Segment(lf, threshold, tab); while ((seg = (Segment)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } 1.4 put/putIfAbsent/putAll 和JDK6一样,ConcurrentHashMap的put方法被代理到了对应的Segment(定位Segment的原理之前已经描述过)中。与JDK6不同的是,JDK7版本的ConcurrentHashMap在获得Segment锁的过程中,做了一定的优化 - 在真正申请锁之前,put方法会通过tryLock()方法尝试获得锁,在尝试获得锁的过程中会对对应hashcode的链表进行遍历,如果遍历完毕仍然找不到与key相同的HashEntry节点,则为后续的put操作提前创建一个HashEntry。当tryLock一定次数后仍无法获得锁,则通过lock申请锁。 + +需要注意的是,由于在并发环境下,其他线程的put,rehash或者remove操作可能会导致链表头结点的变化,因此在过程中需要进行检查,如果头结点发生变化则重新对表进行遍历。而如果其他线程引起了链表中的某个节点被删除,即使该变化因为是非原子写操作(删除节点后链接后续节点调用的是Unsafe.putOrderedObject(),该方法不提供原子写语义)可能导致当前线程无法观察到,但因为不影响遍历的正确性所以忽略不计。 + +之所以在获取锁的过程中对整个链表进行遍历,主要目的是希望遍历的链表被CPU cache所缓存,为后续实际put过程中的链表遍历操作提升性能。 + +在获得锁之后,Segment对链表进行遍历,如果某个HashEntry节点具有相同的key,则更新该HashEntry的value值,否则新建一个HashEntry节点,将它设置为链表的新head节点并将原头节点设为新head的下一个节点。新建过程中如果节点总数(含新建的HashEntry)超过threshold,则调用rehash()方法对Segment进行扩容,最后将新建HashEntry写入到数组中。 + +put方法中,链接新节点的下一个节点(HashEntry.setNext())以及将链表写入到数组中(setEntryAt())都是通过Unsafe的putOrderedObject()方法来实现,这里并未使用具有原子写语义的putObjectVolatile()的原因是:JMM会保证获得锁到释放锁之间所有对象的状态更新都会在锁被释放之后更新到主存,从而保证这些变更对其他线程是可见的。 + +1.5 rehash 相对于HashMap的resize,ConcurrentHashMap的rehash原理类似,但是Doug Lea为rehash做了一定的优化,避免让所有的节点都进行复制操作:由于扩容是基于2的幂指来操作,假设扩容前某HashEntry对应到Segment中数组的index为i,数组的容量为capacity,那么扩容后该HashEntry对应到新数组中的index只可能为i或者i+capacity,因此大多数HashEntry节点在扩容前后index可以保持不变。基于此,rehash方法中会定位第一个后续所有节点在扩容后index都保持不变的节点,然后将这个节点之前的所有节点重排即可。这部分代码如下: + +private void rehash(HashEntry node) { HashEntry[] oldTable = table; int oldCapacity = oldTable.length; int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); HashEntry[] newTable = (HashEntry[]) new HashEntry[newCapacity]; int sizeMask = newCapacity - 1; for (int i = 0; i < oldCapacity ; i++) { HashEntry e = oldTable[i]; if (e != null) { HashEntry next = e.next; int idx = e.hash & sizeMask; if (next == null) // Single node on list newTable[idx] = e; else { // Reuse consecutive sequence at same slot HashEntry lastRun = e; int lastIdx = idx; for (HashEntry last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // Clone remaining nodes for (HashEntry p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry n = newTable[k]; newTable[k] = new HashEntry(h, p.key, v, n); } } } } int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; } 1.6 remove 和put类似,remove在真正获得锁之前,也会对链表进行遍历以提高缓存命中率。 + +1.7 get与containsKey get与containsKey两个方法几乎完全一致:他们都没有使用锁,而是通过Unsafe对象的getObjectVolatile()方法提供的原子读语义,来获得Segment以及对应的链表,然后对链表遍历判断是否存在key相同的节点以及获得该节点的value。但由于遍历过程中其他线程可能对链表结构做了调整,因此get和containsKey返回的可能是过时的数据,这一点是ConcurrentHashMap在弱一致性上的体现。如果要求强一致性,那么必须使用Collections.synchronizedMap()方法。 + +1.8 size、containsValue 这些方法都是基于整个ConcurrentHashMap来进行操作的,他们的原理也基本类似:首先不加锁循环执行以下操作:循环所有的Segment(通过Unsafe的getObjectVolatile()以保证原子读语义),获得对应的值以及所有Segment的modcount之和。如果连续两次所有Segment的modcount和相等,则过程中没有发生其他线程修改ConcurrentHashMap的情况,返回获得的值。 + +当循环次数超过预定义的值时,这时需要对所有的Segment依次进行加锁,获取返回值后再依次解锁。值得注意的是,加锁过程中要强制创建所有的Segment,否则容易出现其他线程创建Segment并进行put,remove等操作。代码如下: + +for(int j =0; j < segments.length; ++j) + +ensureSegment(j).lock();// force creation 一般来说,应该避免在多线程环境下使用size和containsValue方法。 + +注1:modcount在put, replace, remove以及clear等方法中都会被修改。 + +注2:对于containsValue方法来说,如果在循环过程中发现匹配value的HashEntry,则直接返回true。 + +最后,与HashMap不同的是,ConcurrentHashMap并不允许key或者value为null,按照Doug Lea的说法,这么设计的原因是在ConcurrentHashMap中,一旦value出现null,则代表HashEntry的key/value没有映射完成就被其他线程所见,需要特殊处理。在JDK6中,get方法的实现中就有一段对HashEntry.value == null的防御性判断。但Doug Lea也承认实际运行过程中,这种情况似乎不可能发生(参考:http://cs.oswego.edu/pipermail/concurrency-interest/2011-March/007799.html)。 + +JDK8中的实现 ConcurrentHashMap在JDK8中进行了巨大改动,很需要通过源码来再次学习下Doug Lea的实现方法。 +它摒弃了Segment(锁段)的概念,而是启用了一种全新的方式实现,利用CAS算法。它沿用了与它同时期的HashMap版本的思想,底层依然由“数组”+链表+红黑树的方式思想(JDK7与JDK8中HashMap的实现),但是为了做到并发,又增加了很多辅助的类,例如TreeBin,Traverser等对象内部类。 + +2.1 重要的属性 首先来看几个重要的属性,与HashMap相同的就不再介绍了,这里重点解释一下sizeCtl这个属性。可以说它是ConcurrentHashMap中出镜率很高的一个属性,因为它是一个控制标识符,在不同的地方有不同用途,而且它的取值不同,也代表不同的含义。 + +负数代表正在进行初始化或扩容操作 -1代表正在初始化 -N 表示有N-1个线程正在进行扩容操作 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,这一点类似于扩容阈值的概念。还后面可以看到,它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。 /** * 盛装Node元素的数组 它的大小是2的整数次幂 * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node[] table; + + /** + * Table initialization and resizing control. When negative, the + * table is being initialized or resized: -1 for initialization, + * else -(1 + the number of active resizing threads). Otherwise, + * when table is null, holds the initial table size to use upon + * creation, or 0 for default. After initialization, holds the + * next element count value upon which to resize the table. + hash表初始化或扩容时的一个控制位标识量。 + 负数代表正在进行初始化或扩容操作 + -1代表正在初始化 + -N 表示有N-1个线程正在进行扩容操作 + 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小 + + */ +private transient volatile int sizeCtl; +// 以下两个是用来控制扩容的时候 单线程进入的变量 + /** + * The number of bits used for generation stamp in sizeCtl. + * Must be at least 6 for 32bit arrays. + */ +private static int RESIZE_STAMP_BITS = 16; + /** + * The bit shift for recording size stamp in sizeCtl. + */ +private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + + +/* + * Encodings for Node hash fields. See above for explanation. + */ +static final int MOVED = -1; // hash值是-1,表示这是一个forwardNode节点 +static final int TREEBIN = -2; // hash值是-2 表示这时一个TreeBin节点 +2.2 重要的类 2.2.1 Node Node是最核心的内部类,它包装了key-value键值对,所有插入ConcurrentHashMap的数据都包装在这里面。它与HashMap中的定义很相似,但是但是有一些差别它对value和next属性设置了volatile同步锁(与JDK7的Segment相同),它不允许调用setValue方法直接改变Node的value域,它增加了find方法辅助map.get()方法。 + +2.2.2 TreeNode 树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。 + +2.2.3 TreeBin 这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。 + +这里仅贴出它的构造方法。可以看到在构造TreeBin节点时,仅仅指定了它的hash值为TREEBIN常量,这也就是个标识为。同时也看到我们熟悉的红黑树构造方法 + +2.2.4 ForwardingNode + +一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。 + +/** * A node inserted at head of bins during transfer operations. */ static final class ForwardingNode extends Node { final Node[] nextTable; ForwardingNode(Node[] tab) { super(MOVED, null, null, null); this.nextTable = tab; } + + Node find(int h, Object k) { + // loop to avoid arbitrarily deep recursion on forwarding nodes + outer: for (Node[] tab = nextTable;;) { + Node e; int n; + if (k == null || tab == null || (n = tab.length) == 0 || + (e = tabAt(tab, (n - 1) & h)) == null) + return null; + for (;;) { + int eh; K ek; + if ((eh = e.hash) == h && + ((ek = e.key) == k || (ek != null && k.equals(ek)))) + return e; + if (eh < 0) { + if (e instanceof ForwardingNode) { + tab = ((ForwardingNode)e).nextTable; + continue outer; + } + else + return e.find(h, k); + } + if ((e = e.next) == null) + return null; + } + } + } +} +2.3 Unsafe与CAS 在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。 + +2.3.1 unsafe静态块 unsafe代码块控制了一些属性的修改工作,比如最常用的SIZECTL 。在这一版本的concurrentHashMap中,大量应用来的CAS方法进行变量、属性的修改工作。利用CAS进行无锁操作,可以大大提高性能。 + +private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; + +static { + try { + U = sun.misc.Unsafe.getUnsafe(); + Class k = ConcurrentHashMap.class; + SIZECTL = U.objectFieldOffset + (k.getDeclaredField("sizeCtl")); + TRANSFERINDEX = U.objectFieldOffset + (k.getDeclaredField("transferIndex")); + BASECOUNT = U.objectFieldOffset + (k.getDeclaredField("baseCount")); + CELLSBUSY = U.objectFieldOffset + (k.getDeclaredField("cellsBusy")); + Class ck = CounterCell.class; + CELLVALUE = U.objectFieldOffset + (ck.getDeclaredField("value")); + Class ak = Node[].class; + ABASE = U.arrayBaseOffset(ak); + int scale = U.arrayIndexScale(ak); + if ((scale & (scale - 1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); + } catch (Exception e) { + throw new Error(e); + } +} +2.3.2 三个核心方法 ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。 + +//获得在i位置上的Node节点 static final Node tabAt(Node[] tab, int i) { return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少 //在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改 //因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果 有点类似于SVN static final boolean casTabAt(Node[] tab, int i, Node c, Node v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法设置节点位置的值 static final void setTabAt(Node[] tab, int i, Node v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); } 2.4 初始化方法initTable 对于ConcurrentHashMap来说,调用它的构造方法仅仅是设置了一些参数而已。而整个table的初始化是在向ConcurrentHashMap中插入元素的时候发生的。如调用put、computeIfAbsent、compute、merge等方法的时候,调用时机是检查table==null。 + +初始化方法主要应用了关键属性sizeCtl 如果这个值〈0,表示其他线程正在进行初始化,就放弃这个操作。在这也可以看出ConcurrentHashMap的初始化只能由一个线程完成。如果获得了初始化权限,就用CAS方法将sizeCtl置为-1,防止其他线程进入。初始化数组后,将sizeCtl的值改为0.75*n。 + +/** * Initializes table, using the size recorded in sizeCtl. / private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。对于table的初始化工作,只能有一个线程在进行。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//利用CAS方法把sizectl的值置为-1 表示本线程正在进行初始化 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n]; table = tab = nt; sc = n - (n >>> 2);//相当于0.75n 设置一个扩容的阈值 } } finally { sizeCtl = sc; } break; } } return tab; } 2.5 扩容方法 transfer 当ConcurrentHashMap容量不足的时候,需要对table进行扩容。这个方法的基本思想跟HashMap是很像的,但是由于它是支持并发扩容的,所以要复杂的多。原因是它支持多线程进行扩容操作,而并没有加锁。我想这样做的目的不仅仅是为了满足concurrent的要求,而是希望利用并发处理去减少扩容带来的时间影响。因为在扩容的时候,总是会涉及到从一个“数组”到另一个“数组”拷贝的操作,如果这个操作能够并发进行,那真真是极好的了。 + +整个扩容操作分为两个部分 + +第一部分是构建一个nextTable,它的容量是原来的两倍,这个操作是单线程完成的。这个单线程的保证是通过RESIZE_STAMP_SHIFT这个常量经过一次运算来保证的,这个地方在后面会有提到; + +第二个部分就是将原来table中的元素复制到nextTable中,这里允许多线程进行操作。 先来看一下单线程是如何完成的: + +它的大体思想就是遍历、复制的过程。首先根据运算得到需要遍历的次数i,然后利用tabAt方法获得i位置的元素: + +如果这个位置为空,就在原table中的i位置放入forwardNode节点,这个也是触发并发扩容的关键点; + +如果这个位置是Node节点(fh>=0),如果它是一个链表的头节点,就构造一个反序链表,把他们分别放在nextTable的i和i+n的位置上 + +如果这个位置是TreeBin节点(fh<0),也做一个反序处理,并且判断是否需要untreefi,把处理的结果分别放在nextTable的i和i+n的位置上 + +遍历过所有的节点以后就完成了复制工作,这时让nextTable作为新的table,并且更新sizeCtl为新容量的0.75倍 ,完成扩容。 再看一下多线程是如何完成的: + +在代码的69行有一个判断,如果遍历到的节点是forward节点,就向后继续遍历,再加上给节点上锁的机制,就完成了多线程的控制。多线程遍历节点,处理了一个节点,就把对应点的值set为forward,另一个线程看到forward,就向后遍历。这样交叉就完成了复制工作。而且还很好的解决了线程安全的问题。 这个方法的设计实在是让我膜拜。 + +/** * 一个过渡的table表 只有在扩容的时候才会使用 */ private transient volatile Node[] nextTable; + +/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. */ private final void transfer(Node[] tab, Node[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node[] nt = (Node[])new Node[n << 1];//构造一个nextTable对象 它的容量是原来的两倍 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode fwd = new ForwardingNode(nextTab);//构造一个连节点指针 用于标志位 boolean advance = true;//并发扩容的关键属性 如果等于true 说明这个节点已经处理过 boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node f; int fh; //这个while循环体的作用就是在控制i-- 通过i--可以依次遍历原hash表中的节点 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //如果所有的节点都已经完成复制工作 就把nextTable赋值给table 清空临时对象nextTable nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//扩容阈值设置为原来容量的1.5倍 依然相当于现在容量的0.75倍 return; } //利用CAS方法更新这个扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //如果遍历到的节点为空 则放入ForwardingNode指针 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果遍历到ForwardingNode节点 说明这个点已经被处理过了 直接跳过 这里是控制并发扩容的核心 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //节点上锁 synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; //如果fh>=0 证明这是一个Node节点 if (fh >= 0) { int runBit = fh & n; //以下的部分在完成的工作是构造两个链表 一个是原链表 另一个是原链表的反序排列 Node lastRun = f; for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node(ph, pk, pv, ln); else hn = new Node(ph, pk, pv, hn); } //在nextTable的i位置上插入一个链表 setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就可以执行i--操作 advance = true; } //对TreeBin对象进行处理 与上面的过程类似 else if (f instanceof TreeBin) { TreeBin t = (TreeBin)f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; //构造正序和反序两个链表 for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //如果扩容后已经不再需要tree的结构 反向转换为链表结构 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin(hi) : t; //在nextTable的i位置上插入一个链表 +setTabAt(nextTab, i, ln); //在nextTable的i+n的位置上插入另一个链表 setTabAt(nextTab, i + n, hn); //在table的i位置上插入forwardNode节点 表示已经处理过该节点 setTabAt(tab, i, fwd); //设置advance为true 返回到上面的while循环中 就可以执行i--操作 advance = true; } } } } } } 2.6 Put方法 前面的所有的介绍其实都为这个方法做铺垫。ConcurrentHashMap最常用的就是put和get两个方法。现在来介绍put方法,这个put方法依然沿用HashMap的put方法的思想,根据hash值计算这个新插入的点在table中的位置i,如果i位置是空的,直接放进去,否则进行判断,如果i位置是树节点,按照树的方式插入新的节点,否则把i插入到链表的末尾。ConcurrentHashMap中依然沿用这个思想,有一个最重要的不同点就是ConcurrentHashMap不允许key或value为null值。另外由于涉及到多线程,put方法就要复杂一点。在多线程中可能有以下两个情况 + +如果一个或多个线程正在对ConcurrentHashMap进行扩容操作,当前线程也要进入扩容的操作中。这个扩容的操作之所以能被检测到,是因为transfer方法中在空结点上插入forward节点,如果检测到需要插入的位置被forward节点占有,就帮助进行扩容; + +如果检测到要插入的节点是非空且不是forward节点,就对这个节点加锁,这样就保证了线程安全。尽管这个有一些影响效率,但是还是会比hashTable的synchronized要好得多。 + +整体流程就是首先定义不允许key或value为null的情况放入 对于每一个放入的值,首先利用spread方法对key的hashcode进行一次hash计算,由此来确定这个值在table中的位置。 + +如果这个位置是空的,那么直接放入,而且不需要加锁操作。 + +如果这个位置存在结点,说明发生了hash碰撞,首先判断这个节点的类型。如果是链表节点(fh>0),则得到的结点就是hash值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到hash值与key值都与新加入节点是一致的情况,则只需要更新value值即可。否则依次向后遍历,直到链表尾插入这个结点。如果加入这个节点以后链表长度大于8,就把这个链表转换成红黑树。如果这个节点的类型已经是树节点的话,直接调用树节点的插入方法进行插入新的值。 + +public V put(K key, V value) { return putVal(key, value, false); } + +/** Implementation for put and putIfAbsent */ +final V putVal(K key, V value, boolean onlyIfAbsent) { + //不允许 key或value为null + if (key == null || value == null) throw new NullPointerException(); + //计算hash值 + int hash = spread(key.hashCode()); + int binCount = 0; + //死循环 何时插入成功 何时跳出 + for (Node[] tab = table;;) { + Node f; int n, i, fh; + //如果table为空的话,初始化table + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + //根据hash值计算出在table里面的位置 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + //如果这个位置没有值 ,直接放进去,不需要加锁 + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + //当遇到表连接点时,需要进行整合表的操作 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + //结点上锁 这里的结点可以理解为hash值相同组成的链表的头结点 + synchronized (f) { + if (tabAt(tab, i) == f) { + //fh〉0 说明这个节点是一个链表的节点 不是树的节点 + if (fh >= 0) { + binCount = 1; + //在这里遍历链表所有的结点 + for (Node e = f;; ++binCount) { + K ek; + //如果hash值和key值相同 则修改对应结点的value值 + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + //如果遍历到了最后一个结点,那么就证明新的节点需要插入 就把它插入在链表尾部 + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + //如果这个节点是树节点,就按照树的方式插入值 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + if (binCount != 0) { + //如果链表长度已经达到临界值8 就需要把链表转换为树结构 + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + if (oldVal != null) + return oldVal; + break; + } + } + } + //将当前ConcurrentHashMap的元素数量+1 + addCount(1L, binCount); + return null; +} +我们可以发现JDK8中的实现也是锁分离的思想,只是锁住的是一个Node,而不是JDK7中的Segment,而锁住Node之前的操作是无锁的并且也是线程安全的,建立在之前提到的3个原子操作上。 + +2.6.1 helpTransfer方法 这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用transfer方法。回看上面的transfer方法可以看到,当本线程进入扩容方法的时候会直接进入复制阶段。 + +2.6.2 treeifyBin方法 这个方法用于将过长的链表转换为TreeBin对象。但是他并不是直接转换,而是进行一次容量判断,如果容量没有达到转换的要求,直接进行扩容操作并返回;如果满足条件才链表的结构抓换为TreeBin ,这与HashMap不同的是,它并没有把TreeNode直接放入红黑树,而是利用了TreeBin这个小容器来封装所有的TreeNode. + +2.7 get方法 get方法比较简单,给定一个key来确定value的时候,必须满足两个条件 key相同 hash值相同,对于节点可能在链表或树上的情况,需要分别去查找。 + +public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; //计算hash值 int h = spread(key.hashCode()); //根据hash值确定节点位置 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { //如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点 +if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果eh<0 说明这个节点在树上 直接寻找 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //否则遍历链表 找到对应的值并返回 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } 2.8 Size相关的方法 对于ConcurrentHashMap来说,这个table里到底装了多少东西其实是个不确定的数量,因为不可能在调用size()方法的时候像GC的“stop the world”一样让其他线程都停下来让你去统计,因此只能说这个数量是个估计值。对于这个估计值,ConcurrentHashMap也是大费周章才计算出来的。 + +2.8.1 辅助定义 为了统计元素个数,ConcurrentHashMap定义了一些变量和一个内部类 + +/** * A padded cell for distributing counts. Adapted from LongAdder * and Striped64. See their internal docs for explanation. */ @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } } + +/**/ + +/** + * 实际上保存的是hashmap中的元素个数 利用CAS锁进行更新 + 但它并不用返回当前hashmap的元素个数 + + */ +private transient volatile long baseCount; +/** + * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. + */ +private transient volatile int cellsBusy; + +/** + * Table of counter cells. When non-null, size is a power of 2. + */ +private transient volatile CounterCell[] counterCells; +2.8.2 mappingCount与Size方法 mappingCount与size方法的类似 从Java工程师给出的注释来看,应该使用mappingCount代替size方法 两个方法都没有直接返回basecount 而是统计一次这个值,而这个值其实也是一个大概的数值,因此可能在统计的时候有其他线程正在执行插入或删除操作。 + +public int size() { long n = sumCount(); return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } /** * Returns the number of mappings. This method should be used * instead of {@link #size} because a ConcurrentHashMap may * contain more mappings than can be represented as an int. The * value returned is an estimate; the actual count may differ if * there are concurrent insertions or removals. * * @return the number of mappings * @since 1.8 */ public long mappingCount() { long n = sumCount(); return (n < 0L) ? 0L : n; // ignore transient negative values } + + final long sumCount() { + CounterCell[] as = counterCells; CounterCell a; + long sum = baseCount; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value;//所有counter的值求和 + } + } + return sum; +} +2.8.3 addCount方法 在put方法结尾处调用了addCount方法,把当前ConcurrentHashMap的元素个数+1这个方法一共做了两件事,更新baseCount的值,检测是否进行扩容。 + +private final void addCount(long x, int check) { CounterCell[] as; long b, s; //利用CAS方法更新baseCount的值 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //如果check值大于等于0 则需要检验是否需要进行扩容操作 if (check >= 0) { Node[] tab, nt; int n, sc; while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); // if (sc < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; //如果已经有其他线程在执行扩容操作 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } //当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } } 总结 JDK6,7中的ConcurrentHashmap主要使用Segment来实现减小锁粒度,把HashMap分割成若干个Segment,在put的时候需要锁住Segment,get时候不加锁,使用volatile来保证可见性,当要统计全局时(比如size),首先会尝试多次计算modcount来确定,这几次尝试中,是否有其他线程进行了修改操作,如果没有,则直接返回size。如果有,则需要依次锁住所有的Segment来计算。 + +jdk7中ConcurrentHashmap中,当长度过长碰撞会很频繁,链表的增改删查操作都会消耗很长的时间,影响性能,所以jdk8 中完全重写了concurrentHashmap,代码量从原来的1000多行变成了 6000多 行,实现上也和原来的分段式存储有很大的区别。 + +主要设计上的变化有以下几点: + +不采用segment而采用node,锁住node来实现减小锁粒度。 设计了MOVED状态 当resize的中过程中 线程2还在put数据,线程2会帮助resize。 使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。 sizeCtl的不同值来代表不同含义,起到了控制的作用。 至于为什么JDK8中使用synchronized而不是ReentrantLock,我猜是因为JDK8中对synchronized有了足够的优化吧。 \ No newline at end of file diff --git a/week_04/11/ConcurrentLinkedQueue.md b/week_04/11/ConcurrentLinkedQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..ca099d7a5f57d6696fee9a6dce8309b43e5cbafd --- /dev/null +++ b/week_04/11/ConcurrentLinkedQueue.md @@ -0,0 +1,67 @@ +队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(front)进行删除操作,而在表的后端(rear)进行插入操作,和栈一样,队列是一种操作受限制的线性表。进行插入操作的端称为队尾,进行删除操作的端称为队头。队列中没有元素时,称为空队列是一种容器。 + +队列模型 ConcurrentLinkedQueue队列 ConcurrentLinkedQueue:是一种线程安全的非阻塞队列,一个基于链表的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序。队列的头部是队列中时间最长的元素。队列的尾部是队列中时间最短的元素。新的元素插入到队列的尾部,队列获取操作从队列头部获得元素 + +阻塞和非阻塞 阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态. + +阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。 + +非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程 + +ConcurrentLinkedQueue 的非阻塞实现原理是:使用循环CAS(CAS是一种原子操作方式,Java中的Unsafe类是他的一个抽象,该类中的每个方法都相当与一条机器指令,CAS+volatile构成Java并发包)的方式来添加,获取,修改数据,保证数据的可见性,也就保证了线程安全。由于不断循环CAS操作,即使不同的线程操作同一个CAS方法,只要有一个线程操作成功,就会立即刷新内存,其他线程就会立即得到最新的数据,而不会导致在多线程环境下数据不一致的问题。线程也不会出现阻塞的情况。 + +ConcurrentLinkedQueue原理 首先ConcurrentLinkedQueue中的链表结构(该篇文章中使用JDK1.8环境) + +Java利用内存存储数据的方式有两种,一种是数组,在内存中划出一段连续的空间存储数据,使用下标维护数据之间的联系,一种是链表可以利用分散内存空间,使用索引来维护数据之前的联系 + +使用Node作为节点 item:表示该节点的内容,next:表示下一个节点的索引 + +在ConcurrentLinkedQueue中定义了一个head节点用来出队列使用,一个tail节点用来入队列使用,初始化的时候head = tail 都是一个空节点 + +下面通过ConcurrentLinkedQueue的入队列来解释为什么是线程安全的。 + +ConcurrentLinkedQueue的入队列 模拟四个元素添加到ConcurrentLinkedQueue中 + +添加元素1。队列更新head节点的next节点为元素1节点。又因为tail节点默认情况下等于head节点,所以它们的next节点都指向元素1节点。 + +添加元素2。队列首先设置元素1节点的next节点为元素2节点,然后更新tail节点指向元素2节点。 + +添加元素3,设置tail节点的next节点为元素3节点。 + +添加元素4,设置元素3的next节点为元素4节点,然后将tail节点指向元素4节点。 + +通过调试入队过程并观察head节点和tail节点的变化,发现入队主要做两件事情:第一是将入队节点设置成当前队列尾节点的下一个节点;第二是更新tail节点,如果tail节点的next节点不为空,则将入队节点设置成tail节点,如果tail节点的next节点为空,则将入队节点设置成tail的next节点,所以tail节点不总是尾节点。 + +下面查看入队的源代码来解释上面这句话 + +offer()源码 从源代码角度来看,整个入队过程主要做两件事情:第一是定位出尾节点;第二是使用CAS算法将入队节点设置成尾节点的next节点,如不成功则重试。 + +为什么offer()方法是线程安全的 + +原因在于CAS算法,在p.casNext(null, newNode)这个方法中 + +nextOffset:是一个内存的物理位置,表示该节点的next节点的内存地址 + +如果nextOffset的内存位置的数据和cmp相同,就将nextOffset的内存位置的数据换成val,这个是一个原子操作,线程安全。 + +当一个线程A进入这个方法的时候,说明找到了尾节点p(尾节点的next为null)。即使这个时候A线程挂起,B线程进行添加,或者修改了尾节点,导致A线程的尾节点发生改变,这个时候A线程恢复继续执行p.casNext(null, newNode)方法由于第一个参数为null 但是尾节点发生改变使得线程A的p节点(不能说是尾节点,因为尾节点在B线程已经修改)的nextOffset指向的物理地址不是null的物理地址,所以不能替换,该方法返回false,线程A继续循环寻找尾节点。 + +常用API add(E e):插入一个元素到队尾,返回boolean的插入结果 + +contains(Object o):是否包含某个元素,包含返回true + +isEmpty():判断是否为空,为空返回true + +iterator():遍历队列元素,返回Iterator + +offer(E e):插入一个元素到队尾,返回boolean的插入结果 + +peek():检索但不移除此队列的头部,如果此队列为空,则返回null + +poll():检索并删除此队列的头部,如果此队列为空,则返回null + +remove(Object o):从此队列中删除指定元素的单个实例(如果存在) + +size():返回此队列中的元素数 + +toArray():以适当的顺序返回包含此队列中所有元素的数组。 \ No newline at end of file diff --git a/week_04/11/CopyOnWriteArrayList.md b/week_04/11/CopyOnWriteArrayList.md new file mode 100644 index 0000000000000000000000000000000000000000..d0e28a363a68b85fe4b17f07b39ce0fd2fae3086 --- /dev/null +++ b/week_04/11/CopyOnWriteArrayList.md @@ -0,0 +1,117 @@ + 写入时复制(CopyOnWrite,简称COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个调用者(Callers)同时要求相同的资源(如内存或者是磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者视图修改资源内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的(transparently)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。 CopyOnWriteArrayList的实现原理   在使用CopyOnWriteArrayList之前,我们先阅读其源码了解下它是如何实现的。以下代码是向CopyOnWriteArrayList中add方法的实现(向CopyOnWriteArrayList里添加元素),可以发现在添加的时候是需要加锁的,否则多线程写的时候会Copy出N个副本出来。 复制代码 /** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return true (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } 复制代码   读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWriteArrayList。 + +public E get(int index) { return get(getArray(), index); }   JDK中并没有提供CopyOnWriteMap,我们可以参考CopyOnWriteArrayList来实现一个,基本代码如下: + +复制代码 import java.util.Collection; import java.util.Map; import java.util.Set; + +public class CopyOnWriteMap implements Map, Cloneable { private volatile Map internalMap; + +public CopyOnWriteMap() { + internalMap = new HashMap(); +} + +public V put(K key, V value) { + + synchronized (this) { + Map newMap = new HashMap(internalMap); + V val = newMap.put(key, value); + internalMap = newMap; + return val; + } +} + +public V get(Object key) { + return internalMap.get(key); +} + +public void putAll(Map newData) { + synchronized (this) { + Map newMap = new HashMap(internalMap); + newMap.putAll(newData); + internalMap = newMap; + } +} +} 复制代码   实现很简单,只要了解了CopyOnWrite机制,我们可以实现各种CopyOnWrite容器,并且在不同的应用场景中使用。 + + 几个要点 实现了List接口 内部持有一个ReentrantLock lock = new ReentrantLock(); 底层是用volatile transient声明的数组 array 读写分离,写时复制出一个新的数组,完成插入、修改或者移除操作后将新数组赋值给array 注: + +  volatile (挥发物、易变的):变量修饰符,只能用来修饰变量。volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变 化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。 + +  transient (暂短的、临时的):修饰符,只能用来修饰字段。在对象序列化的过程中,标记为transient的变量不会被序列化。 + + 增删改查   1)增 + +复制代码 public boolean add(E e) { final ReentrantLock lock = this.lock; //获得锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //复制一个新的数组 Object[] newElements = Arrays.copyOf(elements, len + 1); //插入新值 newElements[len] = e; //将新的数组指向原来的引用 setArray(newElements); return true; } finally { //释放锁 lock.unlock(); } } + +public void add(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (index > len || index < 0) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+len); Object[] newElements; int numMoved = len - index; if (numMoved == 0) newElements = Arrays.copyOf(elements, len + 1); else { newElements = new Object[len + 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index, newElements, index + 1, numMoved); } newElements[index] = element; setArray(newElements); } finally { lock.unlock(); } } 复制代码   2)删 + +复制代码 public E remove(int index) { final ReentrantLock lock = this.lock; //获得锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) //如果删除的元素是最后一个,直接复制该元素前的所有元素到新的数组 setArray(Arrays.copyOf(elements, len - 1)); else { //创建新的数组 Object[] newElements = new Object[len - 1]; //将index+1至最后一个元素向前移动一格 System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } } 复制代码 3)改 + +复制代码 public E set(int index, E element) { final ReentrantLock lock = this.lock; //获得锁 lock.lock(); try { Object[] elements = getArray(); E oldValue = get(elements, index); + + if (oldValue != element) { + int len = elements.length; + //创建新数组 + Object[] newElements = Arrays.copyOf(elements, len); + //替换元素 + newElements[index] = element; + //将新数组指向原来的引用 + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; +} finally { + //释放锁 + lock.unlock(); +} +} 复制代码 4)查 + +//直接获取index对应的元素 public E get(int index) {return get(getArray(), index);} private E get(Object[] a, int index) {return (E) a[index];} CopyOnWrite的应用场景   CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下: + +复制代码 import java.util.Map; + +import com.ifeve.book.forkjoin.CopyOnWriteMap; + +/** + +黑名单服务 + +@author fangtengfei + +/ public class BlackListServiceImpl { + +private static CopyOnWriteMap blackListMap = new CopyOnWriteMap( + + 1000); +public static boolean isBlackList(String id) { + + return blackListMap.get(id) == null ? false : true; +} + +public static void addBlackList(String id) { + + blackListMap.put(id, Boolean.TRUE); +} + +/** + +批量添加黑名单 +@param ids +/ public static void addBlackList(Map ids) { blackListMap.putAll(ids); } +} 复制代码 代码很简单,但是使用CopyOnWriteMap需要注意两件事情: + +  1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。 + +  2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。 + +CopyOnWrite的缺点  CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。 + +  内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。 + +  针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。 + +  数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。 + +CopyOnWriteArrayList为什么并发安全且性能比Vector好  我知道Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况。 \ No newline at end of file diff --git a/week_04/11/DelayQueue.md b/week_04/11/DelayQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..93243db1f3ecffe5933475e14c6bb738598da614 --- /dev/null +++ b/week_04/11/DelayQueue.md @@ -0,0 +1,19 @@ +DelayQueue是一个无界阻塞队列,只有在延迟期满时,才能从中提取元素。 队列的头部,是延迟期满后保存时间最长的delay元素。 2.使用场景: 缓存系统设计:使用DelayQueue保存缓存元素的有效期,用一个线程循环查询DelayQueue,一旦从DelayQueue中取出元素,就表示有元素到期。 定时任务调度:使用DelayQueue保存当天要执行的任务和执行的时间,一旦从DelayQueue中获取到任务,就开始执行,比如Timer,就是基于DelayQueue实现的。 3.使用条件: 存放DelayQueue的元素,必须继承Delay接口,Delay接口使对象成为延迟对象。 该接口强制实现两个方法: 1.CompareTo(Delayed o):用于比较延时,队列里元素的排序依据,这个是Comparable接口的方法,因为Delay实现了Comparable接口,所以需要实现。 2.getDelay(TimeUnit unit):这个接口返回到激活日期的--剩余时间,时间单位由单位参数指定。 此队列不允许使用null元素。 4.源码详解: 4.1 DelayQueue + +public class DelayQueue extends AbstractQueue implements BlockingQueue 该类继承了AbstractQueue并实现了BlockingQueue DelayQueue里边重要的字段有: + +private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue q = new PriorityQueue();//无界队列 private Thread leader = null; private final Condition available = lock.newCondition(); lock:全局独占锁,用于实现线程安全 q:优先队列,用于存储元素,并按优先顺序 leader:用于优化内部阻塞通知的线程 available:用于实现阻塞的Condition对象 4.2 delay + +public interface Delayed extends Comparable { long getDelay(TimeUnit unit); } 只有一个getDelay(TimeUnit unit) 方法,继承了Comparable,所以要实现一个compareTo方法 + +4.3 入队 add: + +public boolean add(E e) { return offer(e); } 调用了offer来添加元素: + +public boolean offer(E e) { // 获取全局独占锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 向优先队列中插入元素 q.offer(e); // 如果队首元素是刚插入的元素,则设置leader为null,并唤醒阻塞在available上的线程 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { // 释放全局独占锁 lock.unlock(); } } leader是等待获取队列头元素的线程,主从式设计减少不必要的等待。 如果leader != null,表示已经有线程在等待获取队列的头元素,会通过await()方法让出当前线程等待信号。 如果leader==null,则把当前线程设置为leader,当一个线程为leader时,会使用awaitNanos()让当前线程等待接受信号,或等待delay时间。 4.4 出队 会阻塞的take()方法: + +public E take() throws InterruptedException { // 获取全局独占锁 final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 获取队首元素 E first = q.peek(); // 队首为空,则阻塞当前线程 if (first == null) available.await(); else { // 获取队首元素的超时时间 long delay = first.getDelay(NANOSECONDS); // 已超时,直接出队 if (delay <= 0) return q.poll(); // 释放first的引用,避免内存泄漏 first = null; // don't retain ref while waiting // leader != null表明有其他线程在操作,阻塞当前线程 if (leader != null) available.await(); else { // leader指向当前线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { // 超时阻塞 available.awaitNanos(delay); } finally { // 释放leader if (leader == thisThread) leader = null; } } } } } finally { // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列 if (leader == null && q.peek() != null) available.signal(); // 释放全局独占锁 lock.unlock(); } } 这里为什么如果不设置first = null,则会引起内存泄漏呢?线程A到达,列首元素没有到期,设置leader = 线程A,这是线程B来了因为leader != null,则会阻塞,线程C一样。假如线程阻塞完毕了,获取列首元素成功,出列。这个时候列首元素应该会被回收掉,但是问题是它还被线程B、线程C持有着,所以不会回收,这里只有两个线程,如果有线程D、线程E…呢?这样会无限期的不能回收,就会造成内存泄漏 + +BlockingQueue中take、offer、put、add | | 抛出异常 | 特殊值 | 阻塞 | 超时 | | 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) | | 移除 | remove() | poll() | take() | poll(time, unit) | | 检查 | element() | peek() | 不可用 | 不可用 | + +offer:插入到队列,成功返回true,如果当前没有可用空间,返回false。 add:入队,如果没有可用空间,抛出异常,IllegalStateException。 put:插入队列,等待可用空间,阻塞,直到能够有空间插入元素。 take:获取并移除队列头部,在元素变的可用之前一直等待。 \ No newline at end of file