diff --git a/week_04/28/ArrayBlockingQueue.md b/week_04/28/ArrayBlockingQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..5910d1df3287ff7a55adeb3bbbbf977f4f18e5d6 --- /dev/null +++ b/week_04/28/ArrayBlockingQueue.md @@ -0,0 +1,206 @@ +# ArrayBlockingQueue 源码分析 + +采用数组实现的线程安全的阻塞队列,队列放满了或者取没了就会阻塞。 + +同类的队列还有 +1.ArrayBlockingQueue 数组实现的有界阻塞队列, 此队列按照先进先出(FIFO)的原则对元素进行排序。 +2.LinkedBlockingQueue 链表实现的有界阻塞队列, 此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序 +3.PriorityBlockingQueue 支持优先级排序的无界阻塞队列, 默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo()方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。 +4.DelayQueue 优先级队列实现的无界阻塞队列 +5.SynchronousQueue 不存储元素的阻塞队列, 每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。 +6.LinkedTransferQueue 链表实现的无界阻塞队列 +7.LinkedBlockingDeque 链表实现的双向阻塞队列 +8.其他开源及实现, tomcat,netty,redis,guava 等实现。 + +插入与移除提供了4种处理方式 +## 插入操作 +add(e) :添加元素到队列中,如果队列满了,继续插入元素会报错,IllegalStateException。 +offer(e) : 添加元素到队列,同时会返回元素是否插入成功的状态,如果成功则返回 true +put(e) :当阻塞队列满了以后,生产者继续通过 put添加元素,队列会一直阻塞生产者线程,直到队列可用 +offer(e,time,unit) :当阻塞队列满了以后继续添加元素,生产者线程会被阻塞指定时间,如果超时,则线程直接退出 + +## 移除操作 +remove():当队列为空时,调用 remove 会返回 false,如果元素移除成功,则返回 true +poll(): 当队列中存在元素,则从队列中取出一个元素,如果队列为空,则直接返回 null +take():基于阻塞的方式获取队列中的元素,如果队列为空,则 take 方法会一直阻塞,直到队列中有新的数据可以消费 +poll(time,unit):带超时机制的获取数据,如果队列为空,则会等待指定的时间再去获取元素返回 + +## 构造方法 +ArrayBlockingQueue 提供了三个构造方法 +capacity: 表示数组的长度,也就是队列的长度 +fair:表示是否为公平的阻塞队列,默认情况下构造的是非公平的阻塞队列。 +其中第三个构造方法,它提供了接收一个几个参数作为数据初始化的方法 + +```java +public ArrayBlockingQueue(int capacity) { + this(capacity, false); +} +public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); //重入锁,出队和入队持有这一把锁 + notEmpty = lock.newCondition(); //初始化非空等待队列 + notFull = lock.newCondition(); //初始化非满等待队列 +} +``` + +## Add 方法 +调用父类的 add 方法 +```java +public boolean add(E e) { + return super.add(e); +} +// 父类方法 +public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); +} +``` +## offer 方法 +add 方法最终还是调用 offer 方法来添加数据,返回一个添加成功或者失败的布尔值反馈 +这段代码做了几个事情 + +1.判断添加的数据是否为空 +2.添加重入锁 +3.判断队列长度,如果队列长度等于数组长度,表示满了直接返回 false。 否则,直接调用 enqueue 将元素添加到队列中 + +```java +public boolean offer(E e) { + checkNotNull(e); //对请求数据做判断 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + f (count == items.length){ + return false; + } else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } +} +``` + +## enqueue +这个是最核心的逻辑,方法内部通过 putIndex 索引直接将元素添加到数组 items + +因为 ArrayBlockingQueue 是一个 FIFO 的队列,队列添加元素时,是从队尾获取 putIndex 来存储元素,当 putIndex等于数组长度时,下次就需要从数组头部开始添加了 + +```java +private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + items[putIndex] = x; //通过 putIndex 对数据赋值 + if (++putIndex == items.length) // 当putIndex 等于数组长度时,将 putIndex 重置为 0 + putIndex = 0; + count++;//记录队列元素的个数 + notEmpty.signal();//唤醒处于等待状态下的线程,表示当前队列中的元素不为空,如果存在消费者线程阻塞,就可以开始取出元素 +} +``` + +## put 方法 +put 方法和 add 方法功能一样,差异是 put 方法如果队列满了,会阻塞。 +```java +public void put(E e) throws InterruptedException { + checkNotNull(e); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); //这个也是获得锁,但是和 lock 的区别是,这个方法优先允许在等待时由其他线程调用等待线程的 interrupt 方法来中断等待直接返回。而 lock方法是尝试获得锁成功后才响应中断 + try { + while (count == items.length) + notFull.await();//队列满了的情况下,当前线程将会被 notFull 条件对象挂起加到等待队列中 + enqueue(e); + } finally { + lock.unlock(); + } +} +``` + +## take 方法 + +take 方法是一种阻塞获取队列中元素的方法 +它的实现原理很简单,有就删除没有就阻塞,注意这个阻塞是可以中断的,如果队列没有数据那么就加入 notEmpty条件队列等待(有数据就直接取走,方法结束),如果有新的put 线程添加了数据,那么 put 操作将会唤醒 take 线程,执行 take 操作。 +如果队列中添加了元素,那么这个时候,会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素 +```java +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == 0) + notEmpty.await(); //如果队列为空的情况下,直接通过 await 方法阻塞 + return dequeue(); + } finally { + lock.unlock(); + } +} +``` + +## dequeue 方法 + +这个是出队列的方法,主要是删除队列头部的元素并发返回给客户端 +takeIndex,是用来记录拿数据的索引值 +```java +private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; //默认获取 0 位置的元素 + items[takeIndex] = null;//将该位置的元素设置为空 + if (++takeIndex == items.length)//这里的作用也是一样,如果拿到数组的最大值,那么重置为 0,继续从头部位置开始获取数据 + takeIndex = 0; + count--;//记录 元素个数递减 + if (itrs != null) + itrs.elementDequeued();//同时更新迭代器中的元素数据 + notFull.signal();//触发 因为队列满了以后导致的被阻塞的线程 + return x; +} + +//itrs.elementDequeued() +//ArrayBlockingQueue 中,实现了迭代器的功能,也就是可以通过迭代器来遍历阻塞队列中的元素 +//所以 itrs.elementDequeued() 是用来更新迭代器中的元素数据的 +//takeIndex 的索引变化图如下,同时随着数据的移除,会唤醒处于 put 阻塞状态下的线程来继续添加数据 +``` + +## remove 方法 +remove 方法是移除一个指定元素 +```java +public boolean remove(Object o) { + if (o == null) return false; + final Object[] items = this.items; //获取数组元素 + final ReentrantLock lock = this.lock; + lock.lock(); //获得锁 + try { + if (count > 0) { //如果队列不为空 + final int putIndex = this.putIndex; + //获取下一个要添加元素时的索引 + int i = takeIndex;//获取当前要被移除的元素的索引 + do { + if (o.equals(items[i])) {//从takeIndex 下标开始,找到要被删除的元素 + removeAt(i);//移除指定元素 + return true;//返回执行结果 + } + //当前删除索引执行加 1 后判断是否与数组长度相等 + //若为 true,说明索引已到数组尽头,将 i 设置为 0 + if (++i == items.length) + i = 0; + } while (i != putIndex);//继续查找,直到找到最后一个元素 + } + return false; + } finally { + lock.unlock(); + } +} +``` +## 总结 + +1.ArrayBlockingQueue是有界的阻塞队列,不接受null +2.底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列 +3.所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的。 +4.阻塞的场景:1获取lock锁,2进入和取出还要满足condition 满了或者空了都等待出队和加入唤醒,ArrayBlockingQueue我们主要是put和take真正用到的阻塞方法。 +5.成员cout /putIndex、takeIndex是共享的,所以一些查询方法size、peek、toString、方法也是加上锁保证线程安全,但没有了并发损失了性能。 + diff --git a/week_04/28/ConcurrentHashMap.md b/week_04/28/ConcurrentHashMap.md new file mode 100644 index 0000000000000000000000000000000000000000..dc310a83e11cc948e08a5e2a74763cf835fbfdcf --- /dev/null +++ b/week_04/28/ConcurrentHashMap.md @@ -0,0 +1,560 @@ +# ConcurrentHashMap 源码分析 + + +## 属性 + +添加了Unsafe实例,主要用于反射获取对象相应的字段。 + +```java +public class ConcurrentHashMap extends AbstractMap + implements ConcurrentMap, Serializable { + private static final long serialVersionUID = 7249069246763182397L; + // 表的最大容量 + private static final int MAXIMUM_CAPACITY = 1 << 30; + // 默认表的大小 + private static final int DEFAULT_CAPACITY = 16; + // 最大数组大小 + static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + // 默认并发数 + private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + // 装载因子 + private static final float LOAD_FACTOR = 0.75f; + // 转化为红黑树的阈值 + static final int TREEIFY_THRESHOLD = 8; + // 由红黑树转化为链表的阈值 + static final int UNTREEIFY_THRESHOLD = 6; + // 转化为红黑树的表的最小容量 + static final int MIN_TREEIFY_CAPACITY = 64; + // 每次进行转移的最小值 + private static final int MIN_TRANSFER_STRIDE = 16; + // 生成sizeCtl所使用的bit位数 + private static int RESIZE_STAMP_BITS = 16; + // 进行扩容所允许的最大线程数 + private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; + // 记录sizeCtl中的大小所需要进行的偏移位数 + private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + // 一系列的标识 + static final int MOVED = -1; // hash for forwarding nodes + static final int TREEBIN = -2; // hash for roots of trees + static final int RESERVED = -3; // hash for transient reservations + static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash + // + /** Number of CPUS, to place bounds on some sizings */ + // 获取可用的CPU个数 + static final int NCPU = Runtime.getRuntime().availableProcessors(); + // + /** For serialization compatibility. */ + // 进行序列化的属性 + private static final ObjectStreamField[] serialPersistentFields = { + new ObjectStreamField("segments", Segment[].class), + new ObjectStreamField("segmentMask", Integer.TYPE), + new ObjectStreamField("segmentShift", Integer.TYPE) + }; + + // 表 + transient volatile Node[] table; + // 下一个表 + private transient volatile Node[] nextTable; + // + /** + * Base counter value, used mainly when there is no contention, + * but also as a fallback during table initialization + * races. Updated via CAS. + */ + // 基本计数 + private transient volatile long baseCount; + // + /** + * Table initialization and resizing control. When negative, the + * table is being initialized or resized: -1 for initialization, + * else -(1 + the number of active resizing threads). Otherwise, + * when table is null, holds the initial table size to use upon + * creation, or 0 for default. After initialization, holds the + * next element count value upon which to resize the table. + */ + // 对表初始化和扩容控制 + private transient volatile int sizeCtl; + + /** + * The next table index (plus one) to split while resizing. + */ + // 扩容下另一个表的索引 + private transient volatile int transferIndex; + + /** + * Spinlock (locked via CAS) used when resizing and/or creating CounterCells. + */ + // 旋转锁 + private transient volatile int cellsBusy; + + /** + * Table of counter cells. When non-null, size is a power of 2. + */ + // counterCell表 + private transient volatile CounterCell[] counterCells; + + // views + // 视图 + private transient KeySetView keySet; + private transient ValuesView values; + private transient EntrySetView entrySet; + + // Unsafe mechanics + private static final sun.misc.Unsafe U; + private static final long SIZECTL; + private static final long TRANSFERINDEX; + private static final long BASECOUNT; + private static final long CELLSBUSY; + private static final long CELLVALUE; + private static final long ABASE; + private static final int ASHIFT; + + static { + try { + U = sun.misc.Unsafe.getUnsafe(); + Class k = ConcurrentHashMap.class; + SIZECTL = U.objectFieldOffset + (k.getDeclaredField("sizeCtl")); + TRANSFERINDEX = U.objectFieldOffset + (k.getDeclaredField("transferIndex")); + BASECOUNT = U.objectFieldOffset + (k.getDeclaredField("baseCount")); + CELLSBUSY = U.objectFieldOffset + (k.getDeclaredField("cellsBusy")); + Class ck = CounterCell.class; + CELLVALUE = U.objectFieldOffset + (ck.getDeclaredField("value")); + Class ak = Node[].class; + ABASE = U.arrayBaseOffset(ak); + int scale = U.arrayIndexScale(ak); + if ((scale & (scale - 1)) != 0) + throw new Error("data type scale not a power of two"); + ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); + } catch (Exception e) { + throw new Error(e); + } + } +} +``` +## 构造函数 +默认构造函数创建一个带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射 +```java +public ConcurrentHashMap() { +} + +public ConcurrentHashMap(int initialCapacity) { + + if (initialCapacity < 0) // 初始容量小于0,抛出异常 + throw new IllegalArgumentException(); + int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? + MAXIMUM_CAPACITY : + tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); // 找到最接近该容量的2的幂次方数 + // 初始化 + this.sizeCtl = cap; +} + +public ConcurrentHashMap(Map m) { + this.sizeCtl = DEFAULT_CAPACITY; + // 将集合m的元素全部放入 + putAll(m); +} + +public ConcurrentHashMap(int initialCapacity, float loadFactor) { + this(initialCapacity, loadFactor, 1); +} + + +//concurrencyLevel 对于构造函数而言,会根据输入的initialCapacity的大小来确定一个最小的且大于等于initialCapacity大小的2的n次幂,如initialCapacity为15,则sizeCtl为16,若initialCapacity为16,则sizeCtl为16。若initialCapacity大小超过了允许的最大值,则sizeCtl为最大值。值得注意的是,构造函数中的concurrencyLevel参数已经在JDK1.8中的意义发生了很大的变化,其并不代表所允许的并发数,其只是用来确定sizeCtl大小,在JDK1.8中的并发控制都是针对具体的桶而言,即有多少个桶就可以允许多少个并发数。 +public ConcurrentHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { + if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 合法性判断 + throw new IllegalArgumentException(); + if (initialCapacity < concurrencyLevel) // Use at least as many bins + initialCapacity = concurrencyLevel; // as estimated threads + long size = (long)(1.0 + (long)initialCapacity / loadFactor); + int cap = (size >= (long)MAXIMUM_CAPACITY) ? + MAXIMUM_CAPACITY : tableSizeFor((int)size); + this.sizeCtl = cap; +} + +``` + +## 核心函数 + +### putVal函数 +```java +final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); // 键或值为空,抛出异常 + // 键的hash值经过计算获得hash值 + int hash = spread(key.hashCode()); + int binCount = 0; + for (Node[] tab = table;;) { // 无限循环 + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0) // 表为空或者表的长度为0 + // 初始化表 + tab = initTable(); + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 表不为空并且表的长度大于0,并且该桶不为空 + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) // 比较并且交换值,如tab的第i项为空则用新生成的node替换 + break; // no lock when adding to empty bin + } + else if ((fh = f.hash) == MOVED) // 该结点的hash值为MOVED + // 进行结点的转移(在扩容的过程中) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + synchronized (f) { // 加锁同步 + if (tabAt(tab, i) == f) { // 找到table表下标为i的节点 + if (fh >= 0) { // 该table表中该结点的hash值大于0 + // binCount赋值为1 + binCount = 1; + for (Node e = f;; ++binCount) { // 无限循环 + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { // 结点的hash值相等并且key也相等 + // 保存该结点的val值 + oldVal = e.val; + if (!onlyIfAbsent) // 进行判断 + // 将指定的value保存至结点,即进行了结点值的更新 + e.val = value; + break; + } + // 保存当前结点 + Node pred = e; + if ((e = e.next) == null) { // 当前结点的下一个结点为空,即为最后一个结点 + // 新生一个结点并且赋值给next域 + pred.next = new Node(hash, key, + value, null); + // 退出循环 + break; + } + } + } + else if (f instanceof TreeBin) { // 结点为红黑树结点类型 + Node p; + // binCount赋值为2 + binCount = 2; + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { // 将hash、key、value放入红黑树 + // 保存结点的val + oldVal = p.val; + if (!onlyIfAbsent) // 判断 + // 赋值结点value值 + p.val = value; + } + } + } + } + if (binCount != 0) { // binCount不为0 + if (binCount >= TREEIFY_THRESHOLD) // 如果binCount大于等于转化为红黑树的阈值 + // 进行转化 + treeifyBin(tab, i); + if (oldVal != null) // 旧值不为空 + // 返回旧值 + return oldVal; + break; + } + } + } + // 增加binCount的数量 + addCount(1L, binCount); + return null; +} +``` +### initTable +```java +private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { // 无限循环 + if ((sc = sizeCtl) < 0) // sizeCtl小于0,则进行线程让步等待 + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 比较sizeCtl的值与sc是否相等,相等则用-1替换 + try { + if ((tab = table) == null || tab.length == 0) { // table表为空或者大小为0 + // sc的值是否大于0,若是,则n为sc,否则,n为默认初始容量 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + // 新生结点数组 + Node[] nt = (Node[])new Node[n]; + // 赋值给table + table = tab = nt; + // sc为n * 3/4 + sc = n - (n >>> 2); + } + } finally { + // 设置sizeCtl的值 + sizeCtl = sc; + } + break; + } + } + // 返回table表 + return tab; +} + +final Node[] helpTransfer(Node[] tab, Node f) { + Node[] nextTab; int sc; + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { // table表不为空并且结点类型使ForwardingNode类型,并且结点的nextTable不为空 + int rs = resizeStamp(tab.length); + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { // 条件判断 + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) // + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { // 比较并交换 + // 将table的结点转移到nextTab中 + transfer(tab, nextTab); + break; + } + } + return nextTab; + } + return table; +} + + +final TreeNode putTreeVal(int h, K k, V v) { + Class kc = null; + boolean searched = false; + for (TreeNode p = root;;) { + int dir, ph; K pk; + if (p == null) { + first = root = new TreeNode(h, k, v, null, null); + break; + } + else if ((ph = p.hash) > h) + dir = -1; + else if (ph < h) + dir = 1; + else if ((pk = p.key) == k || (pk != null && k.equals(pk))) + return p; + else if ((kc == null && + (kc = comparableClassFor(k)) == null) || + (dir = compareComparables(kc, k, pk)) == 0) { + if (!searched) { + TreeNode q, ch; + searched = true; + if (((ch = p.left) != null && + (q = ch.findTreeNode(h, k, kc)) != null) || + ((ch = p.right) != null && + (q = ch.findTreeNode(h, k, kc)) != null)) + return q; + } + dir = tieBreakOrder(k, pk); + } + + TreeNode xp = p; + if ((p = (dir <= 0) ? p.left : p.right) == null) { + TreeNode x, f = first; + first = x = new TreeNode(h, k, v, f, xp); + if (f != null) + f.prev = x; + if (dir <= 0) + xp.left = x; + else + xp.right = x; + if (!xp.red) + x.red = true; + else { + lockRoot(); + try { + root = balanceInsertion(root, x); + } finally { + unlockRoot(); + } + } + break; + } + } + assert checkInvariants(root); + return null; +} + +private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { // 表不为空 + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // table表的长度小于最小的长度 + // 进行扩容,调整某个桶中结点数量过多的问题(由于某个桶中结点数量超出了阈值,则触发treeifyBin) + tryPresize(n << 1); + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 桶中存在结点并且结点的hash值大于等于0 + synchronized (b) { // 对桶中第一个结点进行加锁 + if (tabAt(tab, index) == b) { // 第一个结点没有变化 + TreeNode hd = null, tl = null; + for (Node e = b; e != null; e = e.next) { // 遍历桶中所有结点 + // 新生一个TreeNode结点 + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) // 该结点前驱为空 + // 设置p为头结点 + hd = p; + else + // 尾节点的next域赋值为p + tl.next = p; + // 尾节点赋值为p + tl = p; + } + // 设置table表中下标为index的值为hd + setTabAt(tab, index, new TreeBin(hd)); + } + } + } + } +} + +private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { // counterCells不为空或者比较交换失败 + CounterCell a; long v; int m; + // 无竞争标识 + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { // + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } +} +``` +### get +```java +public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + // 计算key的hash值 + int h = spread(key.hashCode()); + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空 + if ((eh = e.hash) == h) { // 表中的元素的hash值与key的hash值相等 + if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 键相等 + // 返回值 + return e.val; + } + else if (eh < 0) // 结点hash值小于0 + // 在桶(链表/红黑树)中查找 + return (p = e.find(h, key)) != null ? p.val : null; + while ((e = e.next) != null) { // 对于结点hash值大于0的情况 + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; +} +``` +### replaceNode +```java +final V replaceNode(Object key, V value, Object cv) { + // 计算key的hash值 + int hash = spread(key.hashCode()); + for (Node[] tab = table;;) { // 无限循环 + Node f; int n, i, fh; + if (tab == null || (n = tab.length) == 0 || + (f = tabAt(tab, i = (n - 1) & hash)) == null) // table表为空或者表长度为0或者key所对应的桶为空 + // 跳出循环 + break; + else if ((fh = f.hash) == MOVED) // 桶中第一个结点的hash值为MOVED + // 转移 + tab = helpTransfer(tab, f); + else { + V oldVal = null; + boolean validated = false; + synchronized (f) { // 加锁同步 + if (tabAt(tab, i) == f) { // 桶中的第一个结点没有发生变化 + if (fh >= 0) { // 结点hash值大于0 + validated = true; + for (Node e = f, pred = null;;) { // 无限循环 + K ek; + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { // 结点的hash值与指定的hash值相等,并且key也相等 + V ev = e.val; + if (cv == null || cv == ev || + (ev != null && cv.equals(ev))) { // cv为空或者与结点value相等或者不为空并且相等 + // 保存该结点的val值 + oldVal = ev; + if (value != null) // value为null + // 设置结点value值 + e.val = value; + else if (pred != null) // 前驱不为空 + // 前驱的后继为e的后继,即删除了e结点 + pred.next = e.next; + else + // 设置table表中下标为index的值为e.next + setTabAt(tab, i, e.next); + } + break; + } + pred = e; + if ((e = e.next) == null) + break; + } + } + else if (f instanceof TreeBin) { // 为红黑树结点类型 + validated = true; + // 类型转化 + TreeBin t = (TreeBin)f; + TreeNode r, p; + if ((r = t.root) != null && + (p = r.findTreeNode(hash, key, null)) != null) { // 根节点不为空并且存在与指定hash和key相等的结点 + // 保存p结点的value + V pv = p.val; + if (cv == null || cv == pv || + (pv != null && cv.equals(pv))) { // cv为空或者与结点value相等或者不为空并且相等 + oldVal = pv; + if (value != null) + p.val = value; + else if (t.removeTreeNode(p)) // 移除p结点 + setTabAt(tab, i, untreeify(t.first)); + } + } + } + } + } + if (validated) { + if (oldVal != null) { + if (value == null) + // baseCount值减一 + addCount(-1L, -1); + return oldVal; + } + break; + } + } + } + return null; +} +``` + +## 总结 +JDK1.7 中使用的是segment,每一个segment其实就是一个独立的小的hashtable,并且因为它继承了reentrantlock,所以每一个分段都已经拥有了锁。 +结构性修改方法(比如put)会在确定分段后只锁住某一个segment,而不影响剩余段的操作。不过有一些方法存在跨段操作,比如size/containsValue等。这些方法就要按顺序锁住整个表来进行操作。 + +在 JDK1.8 中,采用了新的设计思想,没有继续沿用 1.7 segment 分段锁。而是采用了 CAS + synchronized 关键字实现。整个数组的结构上在取消了 segment 之后,又和 hashMap 有了一些相似度。 + diff --git a/week_04/28/ConcurrentLinkedQueue.md b/week_04/28/ConcurrentLinkedQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..346edadb327fe0857a6b3ce796d4a29628e34c39 --- /dev/null +++ b/week_04/28/ConcurrentLinkedQueue.md @@ -0,0 +1,122 @@ +# ConcurrentLinkedQueue 源码分析 + +非阻塞的方式实现的线程安全队列ConcurrentLinkedQueue,类名就可以看的出来实现队列的数据结构是链式。 +## 链表节点 内部类 Node + +```java + private static class Node { + // 节点中的元素 + volatile E item; + // 下一个节点,没有上一个节点,表示它是一个单向链表的形式 + volatile Node next; + // 构造一个节点 + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + // 使用 CAS 的方式设置节点的元素 + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + // 设置下一个节点 + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + // 采用 CAS 的方式设置下一个节点 + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + // Unsafe 类的一些初始化 +} + + +``` + +## 属性 +```java +// 头节点, +private transient volatile Node head; +// 尾节点,尾节点不一定是链表的最后一个节点 +private transient volatile Node tail; +// 构造 +public ConcurrentLinkedQueue() { + head = tail = new Node(null); +} +``` + +## 添加节点 +```java +public boolean add(E e) { + return offer(e); +} + +public boolean offer(E e) { + // 判空,为空则抛出空指针异常 + checkNotNull(e); + // 创建要添加的节点 + final Node newNode = new Node(e); + + // 无限循环,入队不成功,则反复入队 + // t 表示 tail 节点 + // p 表示链表的尾节点,默认等于 tail 节点 + for (Node t = tail, p = t;;) { + // q 为尾节点的下一个节点 + Node q = p.next; + // 如果尾节点的下一个节点为空,则表示 p 为尾节点 + if (q == null) { + // CAS 设置尾节点的下一个节点为新添加的节点,如果设置失败,在再次尝试 + if (p.casNext(null, newNode)) { + // 如果tail节点有大于等于1个的 next 节点,则更新 tail 节点,将新添加的节点设置为 tail 节点 + if (p != t) // 相当于循环两次更新一次 tail 节点 + casTail(t, newNode); // 新添加的节点设置为tail节点,允许失败,失败了表示有其他线程成功更新了tail节点 + return true; + } + } + else if (p == q) // 只有在尾节点和尾节点的下一个节点为空的情况下成立 + p = (t != (t = tail)) ? t : head; + else + // 把 tail节点设置为为尾节点,再次循环设置下一个节点 + p = (p != t && t != (t = tail)) ? t : q; + } +} + + +``` + +## 获取 + +```java +public E poll() { + // 循环跳转,goto语法 + restartFromHead: + for (;;) { + // p 表示要出队的节点,默认为 head节点 + for (Node h = head, p = h, q;;) { + // 出队的元素 + E item = p.item; + // 如果出队的元素不为空,则把要出队的元素设置null,不更新head节点;如果出队元素为null或者cas设置失败,则表示有其他线程已经进行修改,则需要重写获取 + if (item != null && p.casItem(item, null)) { + if (p != h) // 当head元素为空,才会更新head节点,这里循环两次,更新一次head节点 + updateHead(h, ((q = p.next) != null) ? q : p); // 更新head节点 + return item; + } + // 队列为空,返回null + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + else if (p == q) + continue restartFromHead; + // 把 p 的next节点赋值给p + else + p = q; + } + } +} + +``` + +## 总结 + +size()方法的时候,会遍历一遍集合,判空使用isEmpty()方法。 +ConcurrentLinkedQueue是无界的,注意容量防止溢出。 + diff --git a/week_04/28/CopyOnWriteArrayList.md b/week_04/28/CopyOnWriteArrayList.md new file mode 100644 index 0000000000000000000000000000000000000000..57be7fbd8ddf472dbb5c0fee57c21bcf5f732bdd --- /dev/null +++ b/week_04/28/CopyOnWriteArrayList.md @@ -0,0 +1,168 @@ +# CopyOnWriteArrayList 源码分析 + +1、 线程安全 +2、 通过锁+数组拷贝+volatile关键字保证了线程安全 +3、 每次数组操作,都会把数组拷贝一份出来,在新数组上进行操作,操作成功之后再赋值回去 + +## 添加 +```java +public boolean add(E e) { + //获取该对象的锁 + final ReentrantLock lock = this.lock; + // 获取“锁”,每次只有一个线程可进入临界区 + lock.lock(); + try { + // 获取原始”volatile数组“中的数据和数据长度。 + Object[] elements = getArray(); + int len = elements.length; + // 新建一个数组newElements,并将原始数据拷贝到newElements中; + // newElements数组的长度=“原始数组的长度”+1 + Object[] newElements = Arrays.copyOf(elements, len + 1); + // 将“新增加的元素”保存到newElements中。 + newElements[len] = e; + // 将”volatile数组“引用指向newElements数组,这样旧数组就被GC回收了 + setArray(newElements); + return true; + } finally { + // 释放“锁” + lock.unlock(); + } +} + +``` + +## 移除 +```java +public E remove(int index) { + final ReentrantLock lock = l.lock; + //获得“锁” + lock.lock(); + try { + //检测是否“数组越界” + rangeCheck(index); + checkForComodification(); + //删除元素 + E result = l.remove(index+offset); + expectedArray = l.getArray(); + size--; + return result; + } finally { + //释放“锁” + lock.unlock(); + } +} + +// 真正实现 +public E remove(int index) { + final ReentrantLock lock = this.lock; + //获得“锁” + lock.lock(); + try { + // 获取原始”volatile数组“中的数据和数据长度。 + Object[] elements = getArray(); + int len = elements.length; + //// 获取elements数组中的第index个数据。 + E oldValue = get(elements, index); + int numMoved = len - index - 1; + // 如果被删除的是最后一个元素,则直接通过Arrays.copyOf()进行处理,而不需要新建数组。 + if (numMoved == 0) + setArray(Arrays.copyOf(elements, len - 1)); + else { + // 否则,新建数组,然后将”volatile数组中被删除元素之外的其它元素“拷贝到新数组中; + // 最后,将”volatile数组“引用指向newElements数组,这样旧数组就被GC回收了。 + Object[] newElements = new Object[len - 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index + 1, newElements, index, numMoved); + setArray(newElements); + } + return oldValue; + } finally { + //释放“锁” + lock.unlock(); + } +} +``` +## 遍历 +```java +public Iterator iterator() { + return new COWIterator(getArray(), 0); +} + +static final class COWIterator implements ListIterator { + /** Snapshot(快照) of the array */ + private final Object[] snapshot; + /** Index of element to be returned by subsequent call to next. */ + private int cursor; // 游标 + + private COWIterator(Object[] elements, int initialCursor) { + cursor = initialCursor; + snapshot = elements; + } + + public boolean hasNext() { + return cursor < snapshot.length; + } + + public boolean hasPrevious() { + return cursor > 0; + } + + // 获取下一个元素 + @SuppressWarnings("unchecked") + public E next() { + if (! hasNext()) + throw new NoSuchElementException(); + return (E) snapshot[cursor++]; + } + + // 获取上一个元素 + @SuppressWarnings("unchecked") + public E previous() { + if (! hasPrevious()) + throw new NoSuchElementException(); + return (E) snapshot[--cursor]; + } + + public int nextIndex() { + return cursor; + } + + public int previousIndex() { + return cursor-1; + } + + + //不支持remove + public void remove() { + throw new UnsupportedOperationException(); + } + + //不支持set + public void set(E e) { + throw new UnsupportedOperationException(); + } + + + //不支持add + public void add(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + Object[] elements = snapshot; + final int size = elements.length; + for (int i = cursor; i < size; i++) { + @SuppressWarnings("unchecked") E e = (E) elements[i]; + action.accept(e); + } + cursor = size; + } +} +``` + +## 总结 +创建迭代器的时候, 会保存数组元素的快照(有一个引用指向原数组)。 +COWIterator不支持修改元素的操作。对于remove(), set(), add()等操作,COWIterator都会抛出异常! +CopyOnWriteArrayList,数组原值(包括增加和删除)被改变也不会抛出异常,因为其迭代器持有的老数组的引用,每次都是拷贝出新数组,不会影响老数组。 \ No newline at end of file diff --git a/week_04/28/DelayQueue.md b/week_04/28/DelayQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..4a6ab899ffa672f68ae7ccc97f11b4f272936885 --- /dev/null +++ b/week_04/28/DelayQueue.md @@ -0,0 +1,219 @@ +# DelayQueue 源码分析 + +DelayQueue是一个无界的阻塞队列,并且是一个延迟队列,还是一个优先级队列。该队列中每个元素都有一个过期时间,只有在过期时间到期的时候,才可以从中获取元素;也就是说往队列中插入数据时(生产者)是不会被阻塞的,而只有在获取数据的时候(消费者)才会被阻塞。 +通过Condiiton接口的await()和signal()/signalAll() 组合实现线程间的通信 +## 属性 + +```java +private final transient ReentrantLock lock = new ReentrantLock();//可重入锁ReentrantLock用于多线程下的加锁操作; +private final PriorityQueue q = new PriorityQueue();//优先级队列PriorityQueue,将会根据Delay对象中的时间排序 +private Thread leader = null;//线程leader(也就是Leader-Follower 模型中的 leader),用于优化阻塞通知的线程对象;leader线程用于减少线程间获取数据竞争用的,如果leader不为空,说明已经有线程在获取数据了,然后当前线程进入等待状态即可 +private final Condition available = lock.newCondition();//Condition对象,用于线程阻塞和通知的关键对象;当队首有新元素可用或者有新线程成为 leader 时会触发Condition条件 +``` +## 添加 +```java + +public boolean add(E e) { + return offer(e); +} +public void put(E e) { + offer(e); +} +public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); +} + +public boolean offer(E e) { + // 获取可重入锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 往优先级队列中添加元素 + q.offer(e); + // 如果队列的头部元素(优先级最高的元素)等于添加的元素 + if (q.peek() == e) { + // 将线程leader设置为null + leader = null; + // 唤醒一个等待的线程 + available.signal(); + } + // 无界队列都返回true + return true; + } finally { + lock.unlock(); + } +} +``` + +## poll +```java + +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 获取队头元素 + E first = q.peek(); + // 判断队头元素是否为空,或者不为空的情况下队头元素是否到期 + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + return q.poll(); + } finally { + lock.unlock(); + } +} + +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + // 首先,将超时等待时间转换为纳秒单位 + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + // 获取锁,如果没有获取到,线程进入等待状态,但该线程能够响应中断,即中断线程的等待状态 + lock.lockInterruptibly(); + try { + // 无限循环操作 + for (;;) { + E first = q.peek(); + // 队列是否为空 + if (first == null) { + // 如果队列是空的,并且超时时间不大于0,直接返回 + if (nanos <= 0) + return null; + else + // 如果队列为空,并且超时时间大于0,进入等待状态 + nanos = available.awaitNanos(nanos); + } else { + // 如果队列不为空,获取队头元素剩余的过期时间 + long delay = first.getDelay(NANOSECONDS); + // 队头元素过期了,出队 + if (delay <= 0) + return q.poll(); + // 队头元素没有过期,但超时时间不大于0,返回null + if (nanos <= 0) + return null; + first = null; // don't retain ref while waiting + // 如果超时等待时间 < 元素过期时间 或者有其他的线程在获取数据 + if (nanos < delay || leader != null) + // 进入等待 + nanos = available.awaitNanos(nanos); + else { + // 这时候 超时等待时间 >= 元素过期时间,并且没有其他线程获取数据 + // 那么把当前线程作为leader,表示该leader线程最早开始等待获取元素 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + // 等待元素过期,这时候会释放锁;等过期后再重新获取锁 + long timeLeft = available.awaitNanos(delay); + // 重新计算最新的超时时间 + nanos -= delay - timeLeft; + } finally { + // 如果当前线程仍然是leader线程,操作完成,清除掉leader + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // 唤醒对应的线程 + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} +``` + +## take +```java +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + // 队列是否为空,为空则进入等待,直到被唤醒 + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + // 队头元素是否过期 + if (delay <= 0) + return q.poll(); + first = null; // don't retain ref while waiting + // 是否有其他元素在获取元素,如果有,进行等待; + if (leader != null) + available.await(); + else { + // 如果没有,将当前线程设置为leader,并等待元素过期 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + // 唤醒Conditon条件 + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} + +``` + +## peek +```java +public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return q.peek(); + } finally { + lock.unlock(); + } +} +``` + +## drainTo +```java +public int drainTo(Collection c) { + // 传入的集合不能为空 + if (c == null) + throw new NullPointerException(); + // 传入的集合不能时当前对象 + if (c == this) + throw new IllegalArgumentException(); + final ReentrantLock lock = this.lock; + lock.lock(); + try { + int n = 0; + // 遍历队列,获取队列队头元素 + for (E e; (e = peekExpired()) != null;) { + c.add(e); // In this order, in case add() throws. + // 获取到对头元素后,出队操作 + q.poll(); + ++n; + } + return n; + } finally { + lock.unlock(); + } +} + +private E peekExpired() { + // assert lock.isHeldByCurrentThread(); + E first = q.peek(); + return (first == null || first.getDelay(NANOSECONDS) > 0) ? + null : first; +} +``` + +## 总结 +DelayQueue是一个无界的可延迟的阻塞队列,往队列中插入数据时不会阻塞,只有在获取数据到时候才会被阻塞; +该队列中每个元素都有一个过期时间,内部维护了一个优先级队列PriorityQueue,然后通过元素的过期时间进行优先级的排序; +该队列中的元素要实现Delayed接口,并实现它的getDelay方法,并且由于该对象扩展自Comparable,所以放入该队列的元素还要实现对应的compareTo方法; \ No newline at end of file