diff --git a/week_04/54/ArrayBlockingQueue-054.md b/week_04/54/ArrayBlockingQueue-054.md new file mode 100644 index 0000000000000000000000000000000000000000..2cff6283d40263d6b4c512f9e2e1afae245b8e40 --- /dev/null +++ b/week_04/54/ArrayBlockingQueue-054.md @@ -0,0 +1,368 @@ +## ArrayBlockingQueue + +#### 简介 +ArrayBlockingQueue是一个基于数组实现的阻塞有界队列 + +#### 属性 +```java +/** The queued items */ +//队列元素 +final Object[] items; + +/** items index for next take, poll, peek or remove */ +//下一个take, poll, peek or remove操作的index位置 +int takeIndex; + +/** items index for next put, offer, or add */ +//下一个put, offer, or add操作的index位置 +int putIndex; + +/** Number of elements in the queue */ +//队列里面的元素数量 +int count; + +/* + * Concurrency control uses the classic two-condition algorithm + * found in any textbook. + */ + +/** Main lock guarding all access */ +//ReentrantLock锁 +final ReentrantLock lock; + +/** Condition for waiting takes */ +//take等待条件,队列空了就挂起,等到放入元素后唤醒 +private final Condition notEmpty; + +/** Condition for waiting puts */ +//puts等待条件,队列满了就挂起,等到取出一个元素后唤醒 +private final Condition notFull; + +/** + * Shared state for currently active iterators, or null if there + * are known not to be any. Allows queue operations to update + * iterator state. + */ + //当前活动迭代器的共享状态,如果已知不存在,则为空。允许队列操作更新迭代器状态。 +transient Itrs itrs = null; +``` + +####构造方法 +```java + /** + * Creates an {@code ArrayBlockingQueue} with the given (fixed) + * capacity and default access policy. + * + * @param capacity the capacity of this queue + * @throws IllegalArgumentException if {@code capacity < 1} + */ +//传入队列大小,默认非公平锁 +public ArrayBlockingQueue(int capacity) { + this(capacity, false); +} +``` + + +```java +/** + * Creates an {@code ArrayBlockingQueue} with the given (fixed) + * capacity and the specified access policy. + * + * @param capacity the capacity of this queue + * @param fair if {@code true} then queue accesses for threads blocked + * on insertion or removal, are processed in FIFO order; + * if {@code false} the access order is unspecified. + * @throws IllegalArgumentException if {@code capacity < 1} + */ +//容量大小,以及是否公平锁 +public ArrayBlockingQueue(int capacity, boolean fair) { + //容量小于等于0,抛出IllegalArgumentException异常 + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity];//新建一个capacity大小的Object数组 + lock = new ReentrantLock(fair);//初始化ReentrantLock + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); +} +``` + + +```java +/** + * Creates an {@code ArrayBlockingQueue} with the given (fixed) + * capacity, the specified access policy and initially containing the + * elements of the given collection, + * added in traversal order of the collection's iterator. + * + * @param capacity the capacity of this queue + * @param fair if {@code true} then queue accesses for threads blocked + * on insertion or removal, are processed in FIFO order; + * if {@code false} the access order is unspecified. + * @param c the collection of elements to initially contain + * @throws IllegalArgumentException if {@code capacity} is less than + * {@code c.size()}, or less than 1. + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ +//放入集合,保存到当前数组里面 +public ArrayBlockingQueue(int capacity, boolean fair, + Collection c) { + this(capacity, fair); + + final ReentrantLock lock = this.lock; + lock.lock(); // Lock only for visibility, not mutual exclusion + try { + int i = 0; + //循环放入 + try { + for (E e : c) { + checkNotNull(e);//检查是否为null + items[i++] = e; + } + } catch (ArrayIndexOutOfBoundsException ex) { + //数组越界异常 + throw new IllegalArgumentException(); + } + //更改数量 + count = i; + //更改的数量,如果等于给定容量则为0 否则给为插入的最后位置 + putIndex = (i == capacity) ? 0 : i; + } finally { + lock.unlock(); + } +} +``` + +####主要方法 + +**add** +```java +/** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning {@code true} upon success and throwing an + * {@code IllegalStateException} if this queue is full. + * + * @param e the element to add + * @return {@code true} (as specified by {@link Collection#add}) + * @throws IllegalStateException if this queue is full + * @throws NullPointerException if the specified element is null + */ +public boolean add(E e) { + //实际上是调用offer方法实现,队列满会抛出IllegalStateException异常 + return super.add(e); +} +``` + +```java +/** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning {@code true} upon success and {@code false} if this queue + * is full. This method is generally preferable to method {@link #add}, + * which can fail to insert an element only by throwing an exception. + * + * @throws NullPointerException if the specified element is null + */ +public boolean offer(E e) { + checkNotNull(e);//检查是否为null + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //如果数量相等,表示队列已满,返回false + if (count == items.length) + return false; + else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } +} +``` +```java +/** + * Inserts element at current put position, advances, and signals. + * Call only when holding lock. + */ +private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + items[putIndex] = x; + if (++putIndex == items.length)//判断队列满没有 + putIndex = 0; + count++; + notEmpty.signal();//唤醒阻塞在take上面的线程 +} +``` +**remove** + +```java + /** + * Removes a single instance of the specified element from this queue, + * if it is present. More formally, removes an element {@code e} such + * that {@code o.equals(e)}, if this queue contains one or more such + * elements. + * Returns {@code true} if this queue contained the specified element + * (or equivalently, if this queue changed as a result of the call). + * + *

Removal of interior elements in circular array based queues + * is an intrinsically slow and disruptive operation, so should + * be undertaken only in exceptional circumstances, ideally + * only when the queue is known not to be accessible by other + * threads. + * + * @param o element to be removed from this queue, if present + * @return {@code true} if this queue changed as a result of the call + */ +public boolean remove(Object o) { + if (o == null) return false; + final Object[] items = this.items; + final ReentrantLock lock = this.lock; + lock.lock(); + try { + if (count > 0) { + final int putIndex = this.putIndex; + int i = takeIndex;//初始化取出位置 + do { + if (o.equals(items[i])) { + removeAt(i);//取出成功 + return true; + } + if (++i == items.length) + i = 0; + } while (i != putIndex); + } + return false; + } finally { + lock.unlock(); + } +} +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } +} + /** + * Extracts element at current take position, advances, and signals. + * Call only when holding lock. + */ +private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; + items[takeIndex] = null;//把取出的位置设置成null + if (++takeIndex == items.length) + takeIndex = 0; + count--; + if (itrs != null) + itrs.elementDequeued(); + notFull.signal(); + return x; +} +//取出元素,空了就等待插入 +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == 0) + notEmpty.await();//等待队列中放入元素 + return dequeue(); + } finally { + lock.unlock(); + } +} + +//poll超时 +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == 0) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + return dequeue(); + } finally { + lock.unlock(); + } +} + +``` +```java +/** + * Deletes item at array index removeIndex. + * Utility for remove(Object) and iterator.remove. + * Call only when holding lock. + */ +void removeAt(final int removeIndex) { + // assert lock.getHoldCount() == 1; + // assert items[removeIndex] != null; + // assert removeIndex >= 0 && removeIndex < items.length; + final Object[] items = this.items; + if (removeIndex == takeIndex) { + // removing front item; just advance + items[takeIndex] = null; + if (++takeIndex == items.length) + takeIndex = 0; + count--; + if (itrs != null) + itrs.elementDequeued(); + } else { + // an "interior" remove + + // slide over all others up through putIndex. + final int putIndex = this.putIndex; + for (int i = removeIndex;;) { + int next = i + 1; + if (next == items.length) + next = 0; + if (next != putIndex) { + items[i] = items[next]; + i = next; + } else { + items[i] = null; + this.putIndex = i; + break; + } + } + count--; + if (itrs != null) + itrs.removedAt(removeIndex); + } + notFull.signal(); +} +``` +**put** +```java +/** + * Inserts the specified element at the tail of this queue, waiting + * for space to become available if the queue is full. + * + * @throws InterruptedException {@inheritDoc} + * @throws NullPointerException {@inheritDoc} + */ +//添加元素,满了就等待 +public void put(E e) throws InterruptedException { + checkNotNull(e);//检查是否为null + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == items.length) + notFull.await();//满了等待删除 + enqueue(e); + } finally { + lock.unlock(); + } +} +``` diff --git a/week_04/54/ConcurrentHashMap-054.md b/week_04/54/ConcurrentHashMap-054.md new file mode 100644 index 0000000000000000000000000000000000000000..82373b1f6feb45bd30b81d1d00733b1c5ca0453a --- /dev/null +++ b/week_04/54/ConcurrentHashMap-054.md @@ -0,0 +1,499 @@ +## ConcurrentHashMap源码分析 + +###成员 + +```java +/** + * The largest possible table capacity. This value must be + * exactly 1<<30 to stay within Java array allocation and indexing + * bounds for power of two table sizes, and is further required + * because the top two bits of 32bit hash fields are used for + * control purposes. + */ + //最大容量 +private static final int MAXIMUM_CAPACITY = 1 << 30; + +/** + * The default initial table capacity. Must be a power of 2 + * (i.e., at least 1) and at most MAXIMUM_CAPACITY. + */ + //默认初始容量 +private static final int DEFAULT_CAPACITY = 16; + +/** + * The largest possible (non-power of two) array size. + * Needed by toArray and related methods. + */ + //最大数组长度 +static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + +/** + * The default concurrency level for this table. Unused but + * defined for compatibility with previous versions of this class. + */ + //此表的默认并发级别。未使用但为与该类的早期版本兼容而定义。 +private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + +/** + * The load factor for this table. Overrides of this value in + * constructors affect only the initial table capacity. The + * actual floating point value isn't normally used -- it is + * simpler to use expressions such as {@code n - (n >>> 2)} for + * the associated resizing threshold. + */ + //加载因子 +private static final float LOAD_FACTOR = 0.75f; + +/** + * The bin count threshold for using a tree rather than list for a + * bin. Bins are converted to trees when adding an element to a + * bin with at least this many nodes. The value must be greater + * than 2, and should be at least 8 to mesh with assumptions in + * tree removal about conversion back to plain bins upon + * shrinkage. + */ + //转变为红黑树阈值 +static final int TREEIFY_THRESHOLD = 8; + +/** + * The bin count threshold for untreeifying a (split) bin during a + * resize operation. Should be less than TREEIFY_THRESHOLD, and at + * most 6 to mesh with shrinkage detection under removal. + */ + //变为链表阈值 +static final int UNTREEIFY_THRESHOLD = 6; + +/** + * The smallest table capacity for which bins may be treeified. + * (Otherwise the table is resized if too many nodes in a bin.) + * The value should be at least 4 * TREEIFY_THRESHOLD to avoid + * conflicts between resizing and treeification thresholds. + */ + //存放table的最小容量 +static final int MIN_TREEIFY_CAPACITY = 64; + +/** + * Minimum number of rebinnings per transfer step. Ranges are + * subdivided to allow multiple resizer threads. This value + * serves as a lower bound to avoid resizers encountering + * excessive memory contention. The value should be at least + * DEFAULT_CAPACITY. + */ + //每个传输步骤的最小重新绑定数 +private static final int MIN_TRANSFER_STRIDE = 16; + +/** + * The number of bits used for generation stamp in sizeCtl. + * Must be at least 6 for 32bit arrays. + */ + //sizeCtl中用于生成戳的位数 +private static int RESIZE_STAMP_BITS = 16; + +/** + * The maximum number of threads that can help resize.可以帮助调整大小的最大线程数 + * Must fit in 32 - RESIZE_STAMP_BITS bits. + */ +private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; + +/** + * The bit shift for recording size stamp in sizeCtl.sizeCtl中记录尺寸戳的位移 + */ +private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + +/* + * Encodings for Node hash fields. See above for explanation. + */ + //表示正在转移 +static final int MOVED = -1; // hash for forwarding nodes +//表示正在转换成树 +static final int TREEBIN = -2; // hash for roots of trees +//临时保存的hash +static final int RESERVED = -3; // hash for transient reservations +//正常hash可用位 +static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash + +/** Number of CPUS, to place bounds on some sizings */ +//cpu的数量 +static final int NCPU = Runtime.getRuntime().availableProcessors(); + +``` + +####构造方法 +```java + +/** + * Creates a new, empty map with the default initial table size (16). + */ +public ConcurrentHashMap() { +} + +/** + * Creates a new, empty map with an initial table size + * accommodating the specified number of elements without the need + * to dynamically resize. + * + * @param initialCapacity The implementation performs internal + * sizing to accommodate this many elements. + * @throws IllegalArgumentException if the initial capacity of + * elements is negative + */ + //传入初始化容量大小 +public ConcurrentHashMap(int initialCapacity) { + if (initialCapacity < 0) + throw new IllegalArgumentException(); + //判断超出最大容量 + int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? + MAXIMUM_CAPACITY : + tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); + //初始化sizeCtl + this.sizeCtl = cap; +} + +/** + * Creates a new map with the same mappings as the given map. + * + * @param m the map + */ + //当出入一个Map的时候,先设定sizeCtl为默认容量,在添加元素 +public ConcurrentHashMap(Map m) { + this.sizeCtl = DEFAULT_CAPACITY; + putAll(m); +} + +/** + * Creates a new, empty map with an initial table size based on + * the given number of elements ({@code initialCapacity}) and + * initial table density ({@code loadFactor}). + * + * @param initialCapacity the initial capacity. The implementation + * performs internal sizing to accommodate this many elements, + * given the specified load factor. + * @param loadFactor the load factor (table density) for + * establishing the initial table size + * @throws IllegalArgumentException if the initial capacity of + * elements is negative or the load factor is nonpositive + * + * @since 1.6 + */ + //初始化容量以及加载因子 +public ConcurrentHashMap(int initialCapacity, float loadFactor) { + this(initialCapacity, loadFactor, 1); +} + +/** + * Creates a new, empty map with an initial table size based on + * the given number of elements ({@code initialCapacity}), table + * density ({@code loadFactor}), and number of concurrently + * updating threads ({@code concurrencyLevel}). + * + * @param initialCapacity the initial capacity. The implementation + * performs internal sizing to accommodate this many elements, + * given the specified load factor. + * @param loadFactor the load factor (table density) for + * establishing the initial table size + * @param concurrencyLevel the estimated number of concurrently + * updating threads. The implementation may use this value as + * a sizing hint. + * @throws IllegalArgumentException if the initial capacity is + * negative or the load factor or concurrencyLevel are + * nonpositive + */ +public ConcurrentHashMap(int initialCapacity, + float loadFactor, int concurrencyLevel) { + //判断加载因子 线程大小 + if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) + throw new IllegalArgumentException(); + //判断初始化容量和线程大小 + if (initialCapacity < concurrencyLevel) // Use at least as many bins + initialCapacity = concurrencyLevel; // as estimated threads + long size = (long)(1.0 + (long)initialCapacity / loadFactor); + int cap = (size >= (long)MAXIMUM_CAPACITY) ? + MAXIMUM_CAPACITY : tableSizeFor((int)size); + this.sizeCtl = cap; +} + +``` + +####主要方法 + +**get系列** +```java +/** + * Returns the value to which the specified key is mapped, + * or {@code null} if this map contains no mapping for the key. + * + *

More formally, if this map contains a mapping from a key + * {@code k} to a value {@code v} such that {@code key.equals(k)}, + * then this method returns {@code v}; otherwise it returns + * {@code null}. (There can be at most one such mapping.) + * + * @throws NullPointerException if the specified key is null + */ +public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + //根据hashcode计算hash + int h = spread(key.hashCode()); + //(tab = table) != null表示赋值给tab判断是否为null + //(n = tab.length) > 0赋值tab大小给n 并判断是否为空 + //(e = tabAt(tab, (n - 1) & h)) != null 表示key的桶不为空 + if ((tab = table) != null && (n = tab.length) > 0 && + (e = tabAt(tab, (n - 1) & h)) != null) { + //表示不存在树只有一个元素 + if ((eh = e.hash) == h) { + //表示对应的key就是目标key + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + else if (eh < 0)//表示 节点的hash小于0 + //在桶或者链表中查找 + return (p = e.find(h, key)) != null ? p.val : null; + //当节点hash大于0 + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; +} + +static final int spread(int h) { + return (h ^ (h >>> 16)) & HASH_BITS; +} + +static final Node tabAt(Node[] tab, int i) { + return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); +} + +/** +* Virtualized support for map.get(); overridden in subclasses. +*/ +Node find(int h, Object k) { + Node e = this; + if (k != null) { + do { + K ek; + if (e.hash == h && + ((ek = e.key) == k || (ek != null && k.equals(ek)))) + return e; + } while ((e = e.next) != null); + } + return null; +} +``` +**put系列** + +```java +//实现是完全基于putVal方法 +public V put(K key, V value) { + return putVal(key, value, false); +} + +//核心实现put方法 +final V putVal(K key, V value, boolean onlyIfAbsent) { + //判断key和value的值为null抛出异常 + if (key == null || value == null) throw new NullPointerException(); + //计算hash + int hash = spread(key.hashCode()); + int binCount = 0; + //无限循环 + for (Node[] tab = table;;) { + Node f; int n, i, fh; + //tab为空初始化table + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + //tab不为空,且该桶为空 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + //cas设置新的node + if (casTabAt(tab, i, null, + new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + //表示hash为MOVED + else if ((fh = f.hash) == MOVED) + //进行节点的转移(在扩容中) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + //使用synchronized加锁 防止其他写入 + synchronized (f) { + //找到table为i的节点,再次检测元素是否有变化 + if (tabAt(tab, i) == f) { + //取出来的hash大于0(说明不是在迁移也不是树) + if (fh >= 0) { + //桶中元素赋值 + binCount = 1; + //遍历桶,加binCount + for (Node e = f;; ++binCount) { + K ek; + //如果存在,重新赋值 + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + Node pred = e; + //如果没有找到,就放入链表结尾 + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + //如果为红黑树 + else if (f instanceof TreeBin) { + + Node p; + //binCount赋值为2 + binCount = 2; + //放入红黑树 + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + //表示插入成功 + if (binCount != 0) { + //判断阈值 + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + //如果插入的元素存在 返回旧值 + if (oldVal != null) + return oldVal; + break; + } + } + } + //对元素个数加1(是否扩容也在里面) + addCount(1L, binCount); + return null; +} + +//初始化table +private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + if ((sc = sizeCtl) < 0) + //初始化失败 + Thread.yield(); // lost initialization race; just spin + //cas设置SIZECTL值 + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + //再次检查table是否为空,防止ABA问题 + if ((tab = table) == null || tab.length == 0) { + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; + } + break; + } + } + return tab; +} + + /** + * Adds to count, and if table is too small and not already + * resizing, initiates transfer. If already resizing, helps + * perform transfer if work is available. Rechecks occupancy + * after a transfer to see if another resize is already needed + * because resizings are lagging additions. + * + * @param x the count to add + * @param check if <0, don't check resize, if <= 1 only check if uncontended + */ +private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + //把数量加在BASECOUNT上面,如果失败就放入counterCells里面 + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { + CounterCell a; long v; int m; + boolean uncontended = true; + //判断as数组是否为空或者当前线程所在段为null或者在当前线程段上加数失败 + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { + //强制添加数量 + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + //计算个数 + s = sumCount(); + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + //如果达到了扩容门槛,进行扩容 + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); + //表示正在扩容 + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + //扩容完成 + break; + //未完成,加入迁移元素中,扩容线程+1 + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + //表示触发扩容线程 + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } +} + +/** + * Helps transfer if a resize is in progress. + */ +final Node[] helpTransfer(Node[] tab, Node f) { + Node[] nextTab; int sc; + //tab != null 桶不为空 + //f instanceof ForwardingNode 桶的第一个元素为ForwardingNode类型 + //nextTab = ((ForwardingNode)f).nextTable) != null nextTab不为空 + //说明当前桶已经迁移完毕了,才去帮忙迁移其它桶的元素 + //扩容时会把旧桶的第一个元素置为ForwardingNode,并让其nextTab指向新桶数组 + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { + int rs = resizeStamp(tab.length); + //正在扩容 + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { + transfer(tab, nextTab); + break; + } + } + return nextTab; + } + return table; +} + +//循环调用putVal方法 +public void putAll(Map m) { + tryPresize(m.size()); + for (Map.Entry e : m.entrySet()) + putVal(e.getKey(), e.getValue(), false); +} +``` diff --git a/week_04/54/ConcurrentLinkedQueue-054.md b/week_04/54/ConcurrentLinkedQueue-054.md new file mode 100644 index 0000000000000000000000000000000000000000..9d2076a08d1e1363403327631e2d11eb7dc086ac --- /dev/null +++ b/week_04/54/ConcurrentLinkedQueue-054.md @@ -0,0 +1,168 @@ +## ConcurrentLinkedQueue基于jdk1.8源码分析 + +####主要属性 +一个头节点一个尾节点,用volatile进行修饰 +```java +/** + * A node from which the first live (non-deleted) node (if any) + * can be reached in O(1) time. + * Invariants: + * - all live nodes are reachable from head via succ() + * - head != null + * - (tmp = head).next != tmp || tmp != head + * Non-invariants: + * - head.item may or may not be null. + * - it is permitted for tail to lag behind head, that is, for tail + * to not be reachable from head! + */ +private transient volatile Node head; + +/** + * A node from which the last node on list (that is, the unique + * node with node.next == null) can be reached in O(1) time. + * Invariants: + * - the last node is always reachable from tail via succ() + * - tail != null + * Non-invariants: + * - tail.item may or may not be null. + * - it is permitted for tail to lag behind head, that is, for tail + * to not be reachable from head! + * - tail.next may or may not be self-pointing to tail. + */ +private transient volatile Node tail; +``` + +####构造方法 +```java + /** + * Creates a {@code ConcurrentLinkedQueue} that is initially empty. + */ + //初始化头尾部为null + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } + + /** + * Creates a {@code ConcurrentLinkedQueue} + * initially containing the elements of the given collection, + * added in traversal order of the collection's iterator. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ + public ConcurrentLinkedQueue(Collection c) { + Node h = null, t = null; + //循环插入 + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (h == null) + h = t = newNode; + else { + t.lazySetNext(newNode); + t = newNode; + } + } + if (h == null) + h = t = new Node(null); + head = h; + tail = t; + } +``` + +####主要方法 + +**入列** +```java +/** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never throw + * {@link IllegalStateException} or return {@code false}. + * + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ + //主要是offer实现 +public boolean add(E e) { + return offer(e); +} + +/** + * Inserts the specified element at the tail of this queue. + * As the queue is unbounded, this method will never return {@code false}. + * + * @return {@code true} (as specified by {@link Queue#offer}) + * @throws NullPointerException if the specified element is null + */ +public boolean offer(E e) { + //检查是否为null + checkNotNull(e); + //新建一个node + final Node newNode = new Node(e); + + //遍历链表 + for (Node t = tail, p = t;;) { + Node q = p.next; + //表示为末尾 + if (q == null) { + // p is last node + //插入末尾 + if (p.casNext(null, newNode)) { + // Successful CAS is the linearization point + // for e to become an element of this queue, + // and for newNode to become "live". + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + return true; + } + // Lost CAS race to another thread; re-read next + } + //表名已经出列了重新设置p的值 + else if (p == q) + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; + else + //表示t后面还有值 重新设置p的值 + // Check for tail updates after two hops. + p = (p != t && t != (t = tail)) ? t : q; + } +} +``` + +**出列** + +```java + +public E poll() { + restartFromHead: + for (;;) { + //获取链表的头节点 + for (Node h = head, p = h, q;;) { + E item = p.item; + //表示节点的值不为null且设置成null + if (item != null && p.casItem(item, null)) { + // Successful CAS is the linearization point + // for item to be removed from this queue. + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + //说明头节点变了 + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + //说明已经出列了 + else if (p == q) + continue restartFromHead;、 + //设置p的值为q + else + p = q; + } + } +} +``` diff --git a/week_04/54/CopyOnWriteArrayList-054.md b/week_04/54/CopyOnWriteArrayList-054.md new file mode 100644 index 0000000000000000000000000000000000000000..efb093e2c1a70f6d7a0ea5512146d2b7b032692a --- /dev/null +++ b/week_04/54/CopyOnWriteArrayList-054.md @@ -0,0 +1,451 @@ +## 基于jdk1.8的CopyOnWriteArrayList源码分析 + +#### 简介 + +CopyOnWriteArrayList是List集合里面的线程安全的ArrayList,对其操作元素都是 +在底层创建一个拷贝数组(快照)上进行的,也就是写时拷贝策略。 +主要是利用ReentrantLock来锁住其他的操作。 +#### 构造方法 +```java + /** + * Creates an empty list.创建一个空的list集合 + */ +public CopyOnWriteArrayList() { + //创建一个大小为0的Object数组 + setArray(new Object[0]); +} +/** + * Sets the array. + */ +final void setArray(Object[] a) { + array = a; +} +``` + +```java +/** + * Creates a list containing the elements of the specified + * collection, in the order they are returned by the collection's + * iterator. + * + * @param c the collection of initially held elements + * @throws NullPointerException if the specified collection is null + */ +//入参为集合,拷贝集合里面的元素到list里面 +public CopyOnWriteArrayList(Collection c) { + //初始化一个Object数组elements + Object[] elements; + //如果为同类 + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray();//直接转变为数组赋值给elements + else { + //其他类 + elements = c.toArray();//用toArray()方法转为数组 + // c.toArray might (incorrectly) not return Object[] (see 6260652) + if (elements.getClass() != Object[].class) + //取出元素给Object数组 + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); +} +``` +```java +/** + * Creates a list holding a copy of the given array. + * + * @param toCopyIn the array (a copy of this array is used as the + * internal array) + * @throws NullPointerException if the specified array is null + */ +public CopyOnWriteArrayList(E[] toCopyIn) { + //创建list,拷贝数组 + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); +} +``` + +#### 主要方法 + +**add** + +```java +/** + * Appends the specified element to the end of this list. + * + * @param e element to be appended to this list + * @return {@code true} (as specified by {@link Collection#add}) + */ +//末尾添加一个元素 +public boolean add(E e) { + //使用ReentrantLock进行加锁,然后扩容数组,最后添加数组 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len + 1); + newElements[len] = e; + setArray(newElements); + return true; + } finally { + lock.unlock(); + } +} +``` +```java +/** + * Inserts the specified element at the specified position in this + * list. Shifts the element currently at that position (if any) and + * any subsequent elements to the right (adds one to their indices). + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ +//指定位置添加一个元素 +public void add(int index, E element) { + //使用ReentrantLock加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + //计算添加的位置是否在数组中,不在抛出IndexOutOfBoundsException异常 + int len = elements.length; + if (index > len || index < 0) + throw new IndexOutOfBoundsException("Index: "+index+ + ", Size: "+len); + Object[] newElements; + int numMoved = len - index;//需要移动的数组位置 + if (numMoved == 0) + newElements = Arrays.copyOf(elements, len + 1);//刚好在末尾,直接扩容数组 + else { + //新建一个Object + newElements = new Object[len + 1]; + //拷贝index前面和后面的元素到新数组里面 + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index, newElements, index + 1, + numMoved); + } + newElements[index] = element; + setArray(newElements); + } finally { + lock.unlock(); + } +} +``` +**get** +```java +//获取指定Object的指定位置的元素,通过数组的索引获取 +@SuppressWarnings("unchecked") +private E get(Object[] a, int index) { + return (E) a[index]; +} +``` +```java + /** + * {@inheritDoc} + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ +//获取指定位置的元素 +public E get(int index) { + //传入数组进去 + return get(getArray(), index); +} +``` +```java +/** + * static version of indexOf, to allow repeated calls without + * needing to re-acquire array each time. + * @param o element to search for + * @param elements the array + * @param index first index to search + * @param fence one past last index to search + * @return index of element, or -1 if absent + */ +//indexOf的静态版本,允许重复调用每次都需要重新获取数组。 +private static int indexOf(Object o, Object[] elements, + int index, int fence) { + if (o == null) { + for (int i = index; i < fence; i++) + if (elements[i] == null) + return i; + } else { + for (int i = index; i < fence; i++) + if (o.equals(elements[i])) + return i; + } + return -1; +} +``` +```java + /** + * static version of lastIndexOf. + * @param o element to search for + * @param elements the array + * @param index first index to search + * @return index of element, or -1 if absent + */ +//lastIndexOf的静态版本,允许重复调用每次都需要重新获取数组。 +private static int lastIndexOf(Object o, Object[] elements, int index) { + //循环给定数组查询位置 + if (o == null) { + for (int i = index; i >= 0; i--) + if (elements[i] == null) + return i; + } else { + for (int i = index; i >= 0; i--) + if (o.equals(elements[i])) + return i; + } + return -1; +} +``` +```java +/** + * {@inheritDoc} + */ +//获取指定元素的第一次出现的位置 +public int indexOf(Object o) { + Object[] elements = getArray(); + return indexOf(o, elements, 0, elements.length); +} +``` +```java +/** + * Returns {@code true} if this list contains the specified element. + * More formally, returns {@code true} if and only if this list contains + * at least one element {@code e} such that + * (o==null ? e==null : o.equals(e)). + * + * @param o element whose presence in this list is to be tested + * @return {@code true} if this list contains the specified element + */ +//数组中是否包含给定元素 +public boolean contains(Object o) { + Object[] elements = getArray(); + return indexOf(o, elements, 0, elements.length) >= 0; +} +``` +```java +/** + * Returns the index of the first occurrence of the specified element in + * this list, searching forwards from {@code index}, or returns -1 if + * the element is not found. + * More formally, returns the lowest index {@code i} such that + * (i >= index && (e==null ? get(i)==null : e.equals(get(i)))), + * or -1 if there is no such index. + * + * @param e element to search for + * @param index index to start searching from + * @return the index of the first occurrence of the element in + * this list at position {@code index} or later in the list; + * {@code -1} if the element is not found. + * @throws IndexOutOfBoundsException if the specified index is negative + */ +//从开始位置到index位置出现给定元素的位置 +public int indexOf(E e, int index) { + Object[] elements = getArray(); + return indexOf(e, elements, index, elements.length); +} +``` +```java + /** + * {@inheritDoc} + */ +//最后一次出现给定元素的位置 +public int lastIndexOf(Object o) { + Object[] elements = getArray(); + return lastIndexOf(o, elements, elements.length - 1); +} +``` +```java +/** + * Returns the index of the last occurrence of the specified element in + * this list, searching backwards from {@code index}, or returns -1 if + * the element is not found. + * More formally, returns the highest index {@code i} such that + * (i <= index && (e==null ? get(i)==null : e.equals(get(i)))), + * or -1 if there is no such index. + * + * @param e element to search for + * @param index index to start searching backwards from + * @return the index of the last occurrence of the element at position + * less than or equal to {@code index} in this list; + * -1 if the element is not found. + * @throws IndexOutOfBoundsException if the specified index is greater + * than or equal to the current size of this list + */ +//从最后到给定位置index出现给定元素的位置 +public int lastIndexOf(E e, int index) { + Object[] elements = getArray(); + return lastIndexOf(e, elements, index); +} +``` +**remove** + +```java + /** + * Removes the element at the specified position in this list. + * Shifts any subsequent elements to the left (subtracts one from their + * indices). Returns the element that was removed from the list. + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ +//删除指定位置的元素,返回旧值 +public E remove(int index) { + //加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + E oldValue = get(elements, index);//获取到旧值 + int numMoved = len - index - 1;//获取删除的数组位置 + if (numMoved == 0) + setArray(Arrays.copyOf(elements, len - 1));//表示为尾部 + else { + //新建一个Object数组,移动index前后的位置 + Object[] newElements = new Object[len - 1]; + System.arraycopy(elements, 0, newElements, 0, index); + System.arraycopy(elements, index + 1, newElements, index, + numMoved); + setArray(newElements); + } + //返回旧值 + return oldValue; + } finally { + lock.unlock(); + } +} +``` +```java +/** + * Removes the first occurrence of the specified element from this list, + * if it is present. If this list does not contain the element, it is + * unchanged. More formally, removes the element with the lowest index + * {@code i} such that + * (o==null ? get(i)==null : o.equals(get(i))) + * (if such an element exists). Returns {@code true} if this list + * contained the specified element (or equivalently, if this list + * changed as a result of the call). + * + * @param o element to be removed from this list, if present + * @return {@code true} if this list contained the specified element + */ +//删除指定元素 +public boolean remove(Object o) { + Object[] snapshot = getArray();//获取数组 + //获取元素的位置 + int index = indexOf(o, snapshot, 0, snapshot.length); + //调用remove方法 + return (index < 0) ? false : remove(o, snapshot, index); +} +``` +```java + /** + * A version of remove(Object) using the strong hint that given + * recent snapshot contains o at the given index. + */ +//使用以下强提示的remove(Object)版本最近的快照在给定索引处包含o。 +private boolean remove(Object o, Object[] snapshot, int index) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] current = getArray(); + int len = current.length; + if (snapshot != current) findIndex: { + int prefix = Math.min(index, len); + for (int i = 0; i < prefix; i++) { + if (current[i] != snapshot[i] && eq(o, current[i])) { + index = i; + break findIndex; + } + } + if (index >= len) + return false; + if (current[index] == o) + break findIndex; + index = indexOf(o, current, index, len); + if (index < 0) + return false; + } + Object[] newElements = new Object[len - 1]; + System.arraycopy(current, 0, newElements, 0, index); + System.arraycopy(current, index + 1, + newElements, index, + len - index - 1); + setArray(newElements); + return true; + } finally { + lock.unlock(); + } +} +``` +```java +/** + * Removes from this list all of the elements whose index is between + * {@code fromIndex}, inclusive, and {@code toIndex}, exclusive. + * Shifts any succeeding elements to the left (reduces their index). + * This call shortens the list by {@code (toIndex - fromIndex)} elements. + * (If {@code toIndex==fromIndex}, this operation has no effect.) + * + * @param fromIndex index of first element to be removed + * @param toIndex index after last element to be removed + * @throws IndexOutOfBoundsException if fromIndex or toIndex out of range + * ({@code fromIndex < 0 || toIndex > size() || toIndex < fromIndex}) + */ +//删除给定区间内的元素 +void removeRange(int fromIndex, int toIndex) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + + if (fromIndex < 0 || toIndex > len || toIndex < fromIndex) + throw new IndexOutOfBoundsException(); + int newlen = len - (toIndex - fromIndex); + int numMoved = len - toIndex; + if (numMoved == 0) + setArray(Arrays.copyOf(elements, newlen)); + else { + Object[] newElements = new Object[newlen]; + System.arraycopy(elements, 0, newElements, 0, fromIndex); + System.arraycopy(elements, toIndex, newElements, + fromIndex, numMoved); + setArray(newElements); + } + } finally { + lock.unlock(); + } +} +``` +**set** +```java +/** + * Replaces the element at the specified position in this list with the + * specified element. + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ +//修改指定位置的元素并返回旧值 +public E set(int index, E element) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + E oldValue = get(elements, index); + + if (oldValue != element) { + int len = elements.length; + Object[] newElements = Arrays.copyOf(elements, len); + newElements[index] = element; + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; + } finally { + lock.unlock(); + } +} +``` diff --git a/week_04/54/DelayQueue-054.md b/week_04/54/DelayQueue-054.md new file mode 100644 index 0000000000000000000000000000000000000000..a2e02f8eb53a5b72409ac475d91a3ce481d47a8d --- /dev/null +++ b/week_04/54/DelayQueue-054.md @@ -0,0 +1,183 @@ +## DelayQueue基于jdk1.8的源码分析 + +#### 属性 + +```java +// 用于控制并发的锁 +private final transient ReentrantLock lock = new ReentrantLock(); +// 优先级队列 +private final PriorityQueue q = new PriorityQueue(); + +/** + * Thread designated to wait for the element at the head of + * the queue. This variant of the Leader-Follower pattern + * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to + * minimize unnecessary timed waiting. When a thread becomes + * the leader, it waits only for the next delay to elapse, but + * other threads await indefinitely. The leader thread must + * signal some other thread before returning from take() or + * poll(...), unless some other thread becomes leader in the + * interim. Whenever the head of the queue is replaced with + * an element with an earlier expiration time, the leader + * field is invalidated by being reset to null, and some + * waiting thread, but not necessarily the current leader, is + * signalled. So waiting threads must be prepared to acquire + * and lose leadership while waiting. + */ + // 用于标记当前是否有线程在排队(仅用于取元素时) +private Thread leader = null; + +/** + * Condition signalled when a newer element becomes available + * at the head of the queue or a new thread may need to + * become leader. + */ + // lock条件,用于表示现在是否有可取的元素 +private final Condition available = lock.newCondition(); +``` + +#### 构造方法 + +```java +/** + * Creates a new {@code DelayQueue} that is initially empty. + */ +public DelayQueue() {} + +/** + * Creates a {@code DelayQueue} initially containing the elements of the + * given collection of {@link Delayed} instances. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ + //使用addAll方法插入一个collection +public DelayQueue(Collection c) { + this.addAll(c); +} +``` + +#### 主要方法 + +**入列** + +```java +/** + * Inserts the specified element into this delay queue. + * + * @param e the element to add + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ +public boolean add(E e) { + return offer(e); +} + +/** + * Inserts the specified element into this delay queue. + * + * @param e the element to add + * @return {@code true} + * @throws NullPointerException if the specified element is null + */ + +public boolean offer(E e) { + //引入ReentrantLock可重入锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //获取到元素至优先级队列里面 + q.offer(e); + //如果是堆顶元素就置空leader,唤醒available上面的线程 + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + //释放锁 + lock.unlock(); + } +} +``` + +**出列** + +```java +/** + * Retrieves and removes the head of this queue, or returns {@code null} + * if this queue has no elements with an expired delay. + * + * @return the head of this queue, or {@code null} if this + * queue has no elements with an expired delay + */ +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //获取优先级队列的头部 + E first = q.peek(); + //还没有到期返回为Null + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + //到期了就调用poll弹出队列 + else + return q.poll(); + } finally { + lock.unlock(); + } +} +``` +```java +/** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element with an expired delay is available on this queue. + * + * @return the head of this queue + * @throws InterruptedException {@inheritDoc} + */ + //会抛出中断异常 +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + //获取堆顶元素 + E first = q.peek(); + if (first == null) + //available阻塞等待 + available.await(); + else { + //堆顶元素的到期时间 + long delay = first.getDelay(NANOSECONDS); + //直接弹出 + if (delay <= 0) + return q.poll(); + first = null; // don't retain ref while waiting + //如果存在其他线程等待,直接进入等待状态 + if (leader != null) + available.await(); + else { + //等待delay时候后自动醒过来 + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + //成功出队,不需要阻塞了 + if (leader == null && q.peek() != null) + //signal()只是把等待的线程放到AQS的队列里面,并不是真正的唤醒 + available.signal(); + //解锁,真正唤醒 + lock.unlock(); + } +} +``` \ No newline at end of file