diff --git a/week_04/12/ArrayBlockingQueue.md b/week_04/12/ArrayBlockingQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..c5fc30e1c9e3c9618de91491f1f461f1bc4c3b21 --- /dev/null +++ b/week_04/12/ArrayBlockingQueue.md @@ -0,0 +1,411 @@ +#### 问题 +``` +怎么实现的 +是否需要扩容 +有什么缺点 +``` + +#### 简介 +``` +通过数组实现的FIFO阻塞队列,是线程安全的,有界 +``` + +#### 继承体系 +``` +public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable +``` +BlockingQueue: +操作 | 抛出异常 | 返回特定值 | 阻塞 | 超时 +---|---|---|---|--- +入队 | add(e) | offer(e)——false | put(e) | offer(e, timeout, unit) +出队 | remove() | poll()——null | take() | poll(timeout, unit) +检查 | element()| peek()——null || + +#### 类结构说明 +``` +private class Itr implements Iterator +class Itrs { + private class Node extends WeakReference { + Node next; + Node(Itr iterator, Node next) { + super(iterator); + this.next = next; + } + } + + Itrs(Itr initial) { + register(initial); + } + + void register(Itr itr) { + // assert lock.getHoldCount() == 1; + head = new Node(itr, head); + } +} +``` + +#### 源码解析 +##### 属性 +``` +/** The queued items */ +final Object[] items; + +// 存取组成了先进先出的阻塞队列 +/** items index for next take, poll, peek or remove */ +// 默认0,从队列索引为0处开始累加取,累加长度与队列长度相等时,取索引更新为0 +int takeIndex; + +/** items index for next put, offer, or add */ +// 默认0,从队列索引为0处开始累加存,累加长度与队列长度相等时,存索引更新为0 +int putIndex; + +/** Number of elements in the queue */ +int count; + +final ReentrantLock lock; + +/** Condition for waiting takes */ +private final Condition notEmpty; + +/** Condition for waiting puts */ +private final Condition notFull; + +/** + * Shared state for currently active iterators, or null if there + * are known not to be any. Allows queue operations to update + * iterator state. + */ +transient Itrs itrs = null; +``` + +##### 构造方法 +``` +// 必须初始化容量,默认非公平锁 +public ArrayBlockingQueue(int capacity) { + this(capacity, false); +} + +// 自定义锁类型 +public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); +} + +// 初始化时集合入列 +public ArrayBlockingQueue(int capacity, boolean fair, + Collection c) { + this(capacity, fair); + + final ReentrantLock lock = this.lock; + lock.lock(); // Lock only for visibility, not mutual exclusion + try { + int i = 0; + try { + for (E e : c) { + checkNotNull(e); + items[i++] = e; + } + } catch (ArrayIndexOutOfBoundsException ex) { + throw new IllegalArgumentException(); + } + count = i; + putIndex = (i == capacity) ? 0 : i; + } finally { + lock.unlock(); + } +} +``` + +##### 主要方法 +###### 存 +add +``` +// 队列满时,抛异常 {@code IllegalStateException} +public boolean add(E e) { + return super.add(e); +} +``` + +offer +``` +// 队列满时,返回false +public boolean offer(E e) { + checkNotNull(e); + final ReentrantLock lock = this.lock; + // 加锁 + lock.lock(); + try { + // 队列满时,直接返回 + if (count == items.length) + return false; + else { + // 入列 + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } +} +``` + +offer(E e, long timeout, TimeUnit unit) +``` +// 队列满时,给与一定时间的请求,若超时则返回false +public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + + checkNotNull(e); + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + // 队列满时,超时判断 + while (count == items.length) { + // 超时,直接返回 + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + enqueue(e); + return true; + } finally { + lock.unlock(); + } +} +``` + +put +``` +// 队列满时,阻塞;若线程中断,抛异常 +public void put(E e) throws InterruptedException { + checkNotNull(e); + final ReentrantLock lock = this.lock; + // 中断锁,线程若中断直接抛异常 + lock.lockInterruptibly(); + try { + // 队列满时,阻塞线程 + while (count == items.length) + notFull.await(); + enqueue(e); + } finally { + lock.unlock(); + } +} +``` + +enqueue +``` +private void enqueue(E x) { + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + items[putIndex] = x; + // 存索引+1;存索引与长度相等.存索引更新为0,与 取索引与长度相等.取索引更新为0相对应,二者构成FIFO功能 + if (++putIndex == items.length) + putIndex = 0; + count++; + notEmpty.signal(); +} +``` +###### 取 +remove +``` +// 队列为空,抛异常 +public E remove() { + E x = poll(); + if (x != null) + return x; + else + throw new NoSuchElementException(); +} +``` + +poll +``` +// 队列空时,返回null +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } +} +``` + +poll(long timeout, TimeUnit unit) +``` +// 队列空时,给与一定时间的请求,若超时返回null +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == 0) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + return dequeue(); + } finally { + lock.unlock(); + } +} +``` + +take +``` +// 队列空时,等待 +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + while (count == 0) + notEmpty.await(); + return dequeue(); + } finally { + lock.unlock(); + } +} +``` + +dequeue +``` +private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; + items[takeIndex] = null; + if (++takeIndex == items.length) + takeIndex = 0; + count--; + if (itrs != null) + itrs.elementDequeued(); + notFull.signal(); + return x; +} +``` + +##### 其他方法 +iterator +``` +public Iterator iterator() { + return new Itr(); +} +``` + +Itrs +``` +class Itrs { + private class Node extends WeakReference { + Node next; + + Node(Itr iterator, Node next) { + super(iterator); + this.next = next; + } + } +} +``` + +Itr +``` +private class Itr implements Iterator { + private int cursor; + private E nextItem; + private int nextIndex; + private E lastItem; + private int lastRet; + private int prevTakeIndex; + private int prevCycles; + private static final int NONE = -1; + private static final int REMOVED = -2; + private static final int DETACHED = -3; + + Itr() { + // assert lock.getHoldCount() == 0; + lastRet = NONE; + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + if (count == 0) { + // assert itrs == null; + cursor = NONE; + nextIndex = NONE; + prevTakeIndex = DETACHED; + } else { + final int takeIndex = ArrayBlockingQueue.this.takeIndex; + prevTakeIndex = takeIndex; + nextItem = itemAt(nextIndex = takeIndex); + cursor = incCursor(takeIndex); + if (itrs == null) { + itrs = new Itrs(this); + } else { + itrs.register(this); // in this order + itrs.doSomeSweeping(false); + } + prevCycles = itrs.cycles; + // assert takeIndex >= 0; + // assert prevTakeIndex == takeIndex; + // assert nextIndex >= 0; + // assert nextItem != null; + } + } finally { + lock.unlock(); + } + } + + public boolean hasNext() { + // assert lock.getHoldCount() == 0; + if (nextItem != null) + return true; + noNext(); + return false; + } + + public E next() { + // assert lock.getHoldCount() == 0; + final E x = nextItem; + if (x == null) + throw new NoSuchElementException(); + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + if (!isDetached()) + incorporateDequeues(); + // assert nextIndex != NONE; + // assert lastItem == null; + lastRet = nextIndex; + final int cursor = this.cursor; + if (cursor >= 0) { + nextItem = itemAt(nextIndex = cursor); + // assert nextItem != null; + this.cursor = incCursor(cursor); + } else { + nextIndex = NONE; + nextItem = null; + } + } finally { + lock.unlock(); + } + return x; + } +} +``` +#### 总结 +``` +利用takeIndex和putIndex循环利用数组,实现队列的先进先出 +有界队列(即初始化时指定的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作) +入队和出队各定义了四组方法为满足不同的用途 + +注意: +队列长度固定且必须在初始化时指定,所以使用之前一定要慎重考虑好容量 +如果消费速度跟不上入队速度,则会导致提供者线程一直阻塞,且越阻塞越多,非常危险 +``` \ No newline at end of file diff --git a/week_04/12/ConcurrentHashMap.md b/week_04/12/ConcurrentHashMap.md new file mode 100644 index 0000000000000000000000000000000000000000..f6cb5b5623cae42dddb25c751e17d033cb5e7c3d --- /dev/null +++ b/week_04/12/ConcurrentHashMap.md @@ -0,0 +1,570 @@ +#### 问题 +``` +HashMap在多线程环境下何时会出现并发安全问题 +怎么解决并发安全问题的 +使用了哪些锁 +怎么进行扩容的 +是否是强一致性的 +不能解决哪些问题 +``` + +#### 简介 +``` +线程安全的容器,采用cas+synchronized方式保证线程安全,保证安全的同时效率比较稳定 +``` + +#### 继承体系 +``` +public class ConcurrentHashMap extends AbstractMap implements ConcurrentMap, Serializable +``` + +#### 类结构说明 +Node +``` +static class Node implements Map.Entry { + final int hash; + final K key; + volatile V val; + volatile Node next; + + Node(int hash, K key, V val, Node next) { + this.hash = hash; + this.key = key; + this.val = val; + this.next = next; + } + + Node find(int h, Object k) +} +``` + +TreeNode +``` +static final class TreeNode extends Node { + TreeNode parent; // red-black tree links + TreeNode left; + TreeNode right; + TreeNode prev; // needed to unlink next upon deletion + boolean red; + + TreeNode(int hash, K key, V val, Node next, TreeNode parent) { + super(hash, key, val, next); + this.parent = parent; + } + + Node find(int h, Object k) { + return findTreeNode(h, k, null); + } + + final TreeNode findTreeNode(int h, Object k, Class kc) +} +``` + +TreeBin +``` +static final class TreeBin extends Node { + TreeNode root; + volatile TreeNode first; + volatile Thread waiter; + volatile int lockState; + // values for lockState + static final int WRITER = 1; // set while holding write lock + static final int WAITER = 2; // set when waiting for write lock + static final int READER = 4; // increment value for setting read lock + + TreeBin(TreeNode b) + private final void lockRoot() + final TreeNode putTreeVal(int h, K k, V v) + final boolean removeTreeNode(TreeNode p) + static TreeNode rotateLeft(TreeNode root, TreeNode p) + static TreeNode rotateRight(TreeNode root, TreeNode p) + static TreeNode balanceInsertion(TreeNode root, TreeNode x) + static TreeNode balanceDeletion(TreeNode root, TreeNode x) + static boolean checkInvariants(TreeNode t) +} +``` + +#### 源码解析 +##### 常量 +Constants +``` +private static final int MAXIMUM_CAPACITY = 1 << 30; + +private static final int DEFAULT_CAPACITY = 16; + +static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + +private static final int DEFAULT_CONCURRENCY_LEVEL = 16; + +private static final float LOAD_FACTOR = 0.75f; + +static final int TREEIFY_THRESHOLD = 8; + +static final int UNTREEIFY_THRESHOLD = 6; + +static final int MIN_TREEIFY_CAPACITY = 64; + +private static final int MIN_TRANSFER_STRIDE = 16; + +private static int RESIZE_STAMP_BITS = 16; + +private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; + +private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; + +static final int MOVED = -1; // hash for forwarding nodes +static final int TREEBIN = -2; // hash for roots of trees +static final int RESERVED = -3; // hash for transient reservations +static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash + +/** Number of CPUS, to place bounds on some sizings */ +static final int NCPU = Runtime.getRuntime().availableProcessors(); +``` + +##### 属性 +Fields +``` +transient volatile Node[] table; + +private transient volatile Node[] nextTable; + +private transient volatile long baseCount; + +private transient volatile int sizeCtl; + +private transient volatile int transferIndex; + +private transient volatile int cellsBusy; + +private transient volatile CounterCell[] counterCells; + +// views +private transient KeySetView keySet; +private transient ValuesView values; +private transient EntrySetView entrySet; +``` + +##### 构造方法 +``` +public ConcurrentHashMap() {} +public ConcurrentHashMap(int initialCapacity) +public ConcurrentHashMap(int initialCapacity, float loadFactor) +public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) +public ConcurrentHashMap(Map m) +``` + +##### 主要方法 +put +``` +public V put(K key, V value) { + return putVal(key, value, false); +} + +final V putVal(K key, V value, boolean onlyIfAbsent) { + if (key == null || value == null) throw new NullPointerException(); + // 重新计算hash,防止hash冲突 + int hash = spread(key.hashCode()); + int binCount = 0; + for (Node[] tab = table;;) { + Node f; int n, i, fh; + // 初始化table + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + // 桶组添加值时,通过循环cas + // 角标位置为((n - 1) & hash) + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + if (casTabAt(tab, i, null, new Node(hash, key, value, null))) + break; // no lock when adding to empty bin + } + // 如果要插入的元素所在的桶的第一个元素的hash是MOVED,则当前线程帮忙一起迁移元素,因为此时有线程正在扩容 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + // 桶中添加值时,通过加锁实现 + V oldVal = null; + synchronized (f) { + // 桶组存在hash冲突,桶中添加;往链表或树中添加 + if (tabAt(tab, i) == f) { + // fh值根据(tabAt(tab, i = (n - 1) & hash)).hash求得 + // fh >= 0,说明不在迁移,也不是树 + // 链表中添加 + if (fh >= 0) { + binCount = 1; + for (Node e = f;; ++binCount) { + K ek; + // 链表中有值,修改 + if (e.hash == hash && + ((ek = e.key) == key || + (ek != null && key.equals(ek)))) { + oldVal = e.val; + if (!onlyIfAbsent) + e.val = value; + break; + } + // 链表中没值,添加 + Node pred = e; + if ((e = e.next) == null) { + pred.next = new Node(hash, key, + value, null); + break; + } + } + } + // 树中添加 + // 如果第一个元素是树节点 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + // 树中有值则修改,返回旧值;无值则添加,返回下方null + if ((p = ((TreeBin)f).putTreeVal(hash, key, + value)) != null) { + oldVal = p.val; + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + // 成功插了元素或者寻找到了元素 + if (binCount != 0) { + // 链表数量大于阀值8,树化 + // 上面把元素插入到树中时,binCount只赋值了2,并没有计算整个树中元素的个数,所以不会重复树化 + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + // 修改后,返回旧值 + if (oldVal != null) + return oldVal; + break; + } + } + } + + // 成功插入元素,元素个数加1(是否要扩容在这个里面),size中解释 + addCount(1L, binCount); + // 成功插入元素返回null + return null; +} + +// 重新计算hash,防止hash冲突 +static final int spread(int h) { + return (h ^ (h >>> 16)) & HASH_BITS; +} + +// 初始化table +private final Node[] initTable() { + Node[] tab; int sc; + while ((tab = table) == null || tab.length == 0) { + if ((sc = sizeCtl) < 0) + Thread.yield(); // lost initialization race; just spin + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + if ((tab = table) == null || tab.length == 0) { + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + table = tab = nt; + sc = n - (n >>> 2); + } + } finally { + sizeCtl = sc; + } + break; + } + } + return tab; +} + +// 当前有线程正在扩容,当前线程帮忙迁移元素 +final Node[] helpTransfer(Node[] tab, Node f) { + Node[] nextTab; int sc; + if (tab != null && (f instanceof ForwardingNode) && + (nextTab = ((ForwardingNode)f).nextTable) != null) { + int rs = resizeStamp(tab.length); + while (nextTab == nextTable && table == tab && + (sc = sizeCtl) < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { + transfer(tab, nextTab); + break; + } + } + return nextTab; + } + return table; +} + +private final void treeifyBin(Node[] tab, int index) { + Node b; int n, sc; + if (tab != null) { + // 桶组长度小于64,直接扩容 + if ((n = tab.length) < MIN_TREEIFY_CAPACITY) + tryPresize(n << 1); + // 进行树化 + else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { + // 加锁 + synchronized (b) { + // 再次对上面赋值后的数据进行判断,防止别的线程进行修改 + if (tabAt(tab, index) == b) { + TreeNode hd = null, tl = null; + // Node转换为TreeNode + for (Node e = b; e != null; e = e.next) { + TreeNode p = + new TreeNode(e.hash, e.key, e.val, + null, null); + if ((p.prev = tl) == null) + hd = p; + else + tl.next = p; + tl = p; + } + // 将根节点传入,进行树化;树化后对桶中值由链表替换为树 + setTabAt(tab, index, new TreeBin(hd)); + } + } + } + } +} + +// 树化 +TreeBin(TreeNode b) { + super(TREEBIN, null, null, null); + this.first = b; + TreeNode r = null; + // 循环遍历TreeNode + for (TreeNode x = b, next; x != null; x = next) { + next = (TreeNode)x.next; + x.left = x.right = null; + // 根节点赋值 + if (r == null) { + x.parent = null; + x.red = false; + r = x; + } + else { + K k = x.key; + int h = x.hash; + Class kc = null; + // 通过循环将值插入树应该属于的位置 + for (TreeNode p = r;;) { + int dir, ph; + K pk = p.key; + if ((ph = p.hash) > h) + dir = -1; + else if (ph < h) + dir = 1; + else if ((kc == null && + (kc = comparableClassFor(k)) == null) || + (dir = compareComparables(kc, k, pk)) == 0) + dir = tieBreakOrder(k, pk); + TreeNode xp = p; + // 循环一直查找应该插入的位置,为空,则插入并停止循环 + if ((p = (dir <= 0) ? p.left : p.right) == null) { + x.parent = xp; + if (dir <= 0) + xp.left = x; + else + xp.right = x; + // 平衡树 + r = balanceInsertion(r, x); + break; + } + } + } + } + this.root = r; + assert checkInvariants(root); +} + +// 插入时平衡树 +static TreeNode balanceInsertion(TreeNode root, TreeNode x) { + x.red = true; + for (TreeNode xp, xpp, xppl, xppr;;) { + if ((xp = x.parent) == null) { + x.red = false; + return x; + } + else if (!xp.red || (xpp = xp.parent) == null) + return root; + // 父亲在爷爷左边 + if (xp == (xppl = xpp.left)) { + // 右叔有值 + if ((xppr = xpp.right) != null && xppr.red) { + xppr.red = false; + xp.red = false; + xpp.red = true; + x = xpp; + } + // 右叔没值 + else { + // 在父亲右边 + if (x == xp.right) { + // 以父亲为新节点左旋 + root = rotateLeft(root, x = xp); + xpp = (xp = x.parent) == null ? null : xp.parent; + } + // todo 为什么下面代码说明是在父亲左边? + // 在父亲左边 + if (xp != null) { + xp.red = false; + if (xpp != null) { + xpp.red = true; + // 以爷爷为新节点右旋 + root = rotateRight(root, xpp); + } + } + } + } + else { + if (xppl != null && xppl.red) { + xppl.red = false; + xp.red = false; + xpp.red = true; + x = xpp; + } + else { + if (x == xp.left) { + root = rotateRight(root, x = xp); + xpp = (xp = x.parent) == null ? null : xp.parent; + } + if (xp != null) { + xp.red = false; + if (xpp != null) { + xpp.red = true; + root = rotateLeft(root, xpp); + } + } + } + } + } +} + +static TreeNode rotateLeft(TreeNode root, TreeNode p) { + TreeNode r, pp, rl; + if (p != null && (r = p.right) != null) { + if ((rl = p.right = r.left) != null) + rl.parent = p; + if ((pp = r.parent = p.parent) == null) + (root = r).red = false; + else if (pp.left == p) + pp.left = r; + else + pp.right = r; + r.left = p; + p.parent = r; + } + return root; +} + +static TreeNode rotateRight(TreeNode root, TreeNode p) { + TreeNode l, pp, lr; + if (p != null && (l = p.left) != null) { + if ((lr = p.left = l.right) != null) + lr.parent = p; + if ((pp = l.parent = p.parent) == null) + (root = l).red = false; + else if (pp.right == p) + pp.right = l; + else + pp.left = l; + l.right = p; + p.parent = l; + } + return root; +} +``` + +##### 其他方法 +size +``` +public int size() { + long n = sumCount(); + return ((n < 0L) ? 0 : + (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : + (int)n); +} + +final long sumCount() { + CounterCell[] as = counterCells; CounterCell a; + long sum = baseCount; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; +} + +// todo +private final void addCount(long x, int check) { + CounterCell[] as; long b, s; + if ((as = counterCells) != null || + !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { + CounterCell a; long v; int m; + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[ThreadLocalRandom.getProbe() & m]) == null || + !(uncontended = + U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { + fullAddCount(x, uncontended); + return; + } + if (check <= 1) + return; + s = sumCount(); + } + if (check >= 0) { + Node[] tab, nt; int n, sc; + while (s >= (long)(sc = sizeCtl) && (tab = table) != null && + (n = tab.length) < MAXIMUM_CAPACITY) { + int rs = resizeStamp(n); + if (sc < 0) { + if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || + sc == rs + MAX_RESIZERS || (nt = nextTable) == null || + transferIndex <= 0) + break; + if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) + transfer(tab, nt); + } + else if (U.compareAndSwapInt(this, SIZECTL, sc, + (rs << RESIZE_STAMP_SHIFT) + 2)) + transfer(tab, null); + s = sumCount(); + } + } +} +``` + +get +``` +public V get(Object key) { + Node[] tab; Node e, p; int n, eh; K ek; + int h = spread(key.hashCode()); + // 桶中有 + if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { + // 第一个元素就是要查找的值 + if ((eh = e.hash) == h) { + if ((ek = e.key) == key || (ek != null && key.equals(ek))) + return e.val; + } + // hash小于0,说明是树或者正在扩容 + // 使用find寻找元素,find的寻找方式依据Node的不同子类有不同的实现方式 + else if (eh < 0) + return (p = e.find(h, key)) != null ? p.val : null; + // 链表中查找 + while ((e = e.next) != null) { + if (e.hash == h && + ((ek = e.key) == key || (ek != null && key.equals(ek)))) + return e.val; + } + } + return null; +} +``` +#### 总结 +``` +与HashMap插入数据时相同,以桶组、链表、树方式存储数据 +并发处理采用cas+synchronized方式,桶组插入数据时一直循环采用cas方式尝试插入,链表和树中存值时通过加锁方式保证原子性 +``` \ No newline at end of file diff --git a/week_04/12/ConcurrentLinkedQueue.md b/week_04/12/ConcurrentLinkedQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..db7f4d4420f75394fa265110c412d4d90235ee82 --- /dev/null +++ b/week_04/12/ConcurrentLinkedQueue.md @@ -0,0 +1,161 @@ +#### 问题 +``` +是阻塞队列吗 +如何保证并发安全 +``` +#### 简介 +``` +非阻塞队列,通过链表实现,无界,是线程安全的,通过cas+自旋保证线程安全 +``` +#### 继承体系 +``` +public class ConcurrentLinkedQueue extends AbstractQueue implements Queue, java.io.Serializable +``` +#### 类结构说明 +Node +``` +private static class Node { + volatile E item; + volatile Node next; + + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } +} +``` +``` +private class Itr implements Iterator +static final class CLQSpliterator implements Spliterator +``` +#### 源码解析 +##### 属性 +``` +private transient volatile Node head; +private transient volatile Node tail; +``` + +##### 构造方法 +``` +public ConcurrentLinkedQueue() { + // 见Node + head = tail = new Node(null); +} + +public ConcurrentLinkedQueue(Collection c) { + // 定义头尾部,实现节点插入 + Node h = null, t = null; + for (E e : c) { + checkNotNull(e); + Node newNode = new Node(e); + if (h == null) + h = t = newNode; + else { + // 见Node + t.lazySetNext(newNode); + t = newNode; + } + } + if (h == null) + h = t = new Node(null); + head = h; + tail = t; +} +``` + +##### 主要方法 +add +``` +public boolean add(E e) { + return offer(e); +} +``` + +offer +``` +public boolean offer(E e) { + checkNotNull(e); + final Node newNode = new Node(e); + + for (Node t = tail, p = t;;) { + Node q = p.next; + if (q == null) { + if (p.casNext(null, newNode)) { + // p!=t,说明别的线程已经提前更新tail + // p取到的可能是t前面或后面的值 + // todo p=t就不更新尾部了吗? + if (p != t) // hop two nodes at a time + casTail(t, newNode); // Failure is OK. + return true; + } + } + // p已经被删除了,别的线程可能取出了一个节点,导致tail前移,需要更新p + else if (p == q) + p = (t != (t = tail)) ? t : head; + // t后面还有值,别的线程可能存入了一个节点,导致tail后移,需要更新p + else + p = (p != t && t != (t = tail)) ? t : q; + } +} +``` + +poll +``` +public E poll() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + + // head节点没变,Node中数据置为空 + if (item != null && p.casItem(item, null)) { + // p!=h,说明head节点发生变化,别的线程已经存入或取出一个节点 + // todo p=h,就不更新head节点了吗 + if (p != h) // hop two nodes at a time + // 更新head节点 + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + // head节点发生变化,更新p + // 别的线程已取出一个元素且队列中没数据了 + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + // 别的线程已存入一个元素 + else if (p == q) + continue restartFromHead; + // 别的线程已取出一个元素,队列中还有数据 + else + p = q; + } + } +} +``` +#### 总结 +``` +非阻塞队列,通过链表实现,无界,是线程安全的,通过cas+自旋保证线程安全,不允许使用null元素 +``` +#### 延伸 +``` +ConcurrentLinkedQueue与LinkedBlockingQueue对比? +两者都是线程安全的队列; +两者都可以实现取元素时队列为空直接返回null,后者的poll()方法可以实现此功能; +前者全程无锁,后者全部都是使用重入锁控制的; +前者效率较高,后者效率较低; +前者无法实现如果队列为空等待元素到来的操作; +前者是非阻塞队列,后者是阻塞队列; +// todo why +前者无法用在线程池中,后者可以; +``` \ No newline at end of file diff --git a/week_04/12/CopyOnWriteArrayList.md b/week_04/12/CopyOnWriteArrayList.md new file mode 100644 index 0000000000000000000000000000000000000000..e0459391b36e913801496f729405d8e92fa2896e --- /dev/null +++ b/week_04/12/CopyOnWriteArrayList.md @@ -0,0 +1,238 @@ +#### 问题 +并发操作数据时,如何实现读写分离
+有哪些优缺点,缺点如何解决 + +#### 简介 +应用于并发时读多写少的情况,背后是写时复制的思想,实现读写分离。
+会有内存占用问题和数据一致性问题 + + +#### 继承体系 +public class CopyOnWriteArrayList + implements List, RandomAccess, Cloneable, java.io.Serializable + +#### 类结构说明 +``` +static final class COWIterator implements ListIterator +private static class COWSubList extends AbstractList implements RandomAccess +private static class COWSubListIterator implements ListIterator +``` + +#### 源码解析 +##### 属性 +``` +final transient ReentrantLock lock = new ReentrantLock(); +/** The array, accessed only via getArray/setArray. */ +private transient volatile Object[] array; +``` + +##### 构造方法 +``` +public CopyOnWriteArrayList() { + setArray(new Object[0]); +} + +public CopyOnWriteArrayList(Collection c) { + Object[] elements; + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + else { + elements = c.toArray(); + // c.toArray might (incorrectly) not return Object[] (see 6260652) + if (elements.getClass() != Object[].class) + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); +} + +public CopyOnWriteArrayList(E[] toCopyIn) { + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); +} +``` + +##### 主要方法 +add +``` +public boolean add(E e) { + // 加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 获取原数组 + Object[] elements = getArray(); + int len = elements.length; + // 创建原数组长度+1的新数组,原数组数据复制到新数组中 + Object[] newElements = Arrays.copyOf(elements, len + 1); + // 新数组添加值 + newElements[len] = e; + // array指向新数组 + setArray(newElements); + return true; + } finally { + lock.unlock(); + } +} + +public static T[] copyOf(T[] original, int newLength) { + return (T[]) copyOf(original, newLength, original.getClass()); +} + +public static T[] copyOf(U[] original, int newLength, Class newType) { + @SuppressWarnings("unchecked") + T[] copy = ((Object)newType == (Object)Object[].class) + ? (T[]) new Object[newLength] : (T[]) Array.newInstance(newType.getComponentType(), newLength); + System.arraycopy(original, 0, copy, 0, Math.min(original.length, newLength)); + return copy; +} +``` + +addAll +``` +// 对单个复制的优化 +public boolean addAll(Collection c) { + Object[] cs = (c.getClass() == CopyOnWriteArrayList.class) ? + ((CopyOnWriteArrayList)c).getArray() : c.toArray(); + if (cs.length == 0) + return false; + // 加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + // array未初始化 + if (len == 0 && cs.getClass() == Object[].class) + setArray(cs); + else { + // 创建长度为(原数组长度+参数集合长度)的新数组,原数组数据复制到新数组中 + Object[] newElements = Arrays.copyOf(elements, len + cs.length); + // 参数集合复制到新数组中 + System.arraycopy(cs, 0, newElements, len, cs.length); + // array指向新数组 + setArray(newElements); + } + return true; + } finally { + lock.unlock(); + } +} + +// 返回新数组,将原数据复制到新数组中 +public Object[] toArray() { + Object[] elements = getArray(); + return Arrays.copyOf(elements, elements.length); +} +``` + +get +``` +public E get(int index) { + return get(getArray(), index); +} + +private E get(Object[] a, int index) { + return (E) a[index]; +} +``` + +##### 其他方法 +``` +// Support for resetting lock while deserializing +private void resetLock() { + UNSAFE.putObjectVolatile(this, lockOffset, new ReentrantLock()); +} + +private static final sun.misc.Unsafe UNSAFE; +private static final long lockOffset; +static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = CopyOnWriteArrayList.class; + lockOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("lock")); + } catch (Exception e) { + throw new Error(e); + } +} +``` + +#### 总结 +``` +应用于并发时读多写少的情况,背后是写时复制的思想,实现读写分离。 + +优化: +减少扩容开销,初始化容器大小 +减少容器复制次数,批量参加 + +缺点:内存占用问题和数据一致性问题 +内存占用问题:进行写操作的时候,频繁地进行替换会消耗内存。因为内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象,当对象占用内存比较大时,有可能造成频繁Yong GC和Full GC +针对内存占用解决方案:如果元素全是10进制的数字,可以考虑把它压缩成32进制或64进制,或使用其他的并发容器,如ConcurrentHashMap +数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,在添加到拷贝数据而还没进行替换的时候,读到的仍然是旧数据,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。 +``` + +#### 延伸 + +实现CopyOnWriteMap容器 +``` +public class CopyOnWriteMap implements Map, Cloneable { + private volatile Map internalMap; + + public CopyOnWriteMap() { + internalMap = new HashMap(); + } + + public V put(K key, V value) { + + synchronized (this) { + Map newMap = new HashMap(internalMap); + V val = newMap.put(key, value); + internalMap = newMap; + return val; + } + } + + public V get(Object key) { + return internalMap.get(key); + } + + public void putAll(Map newData) { + synchronized (this) { + Map newMap = new HashMap(internalMap); + newMap.putAll(newData); + internalMap = newMap; + } + } +} +``` + +CopyOnWriteMap应用 +``` +public class BlackListServiceImpl { + private static CopyOnWriteMap blackListMap = new CopyOnWriteMap( + 1000); + + public static boolean isBlackList(String id) { + return blackListMap.get(id) == null ? false : true; + } + + public static void addBlackList(String id) { + blackListMap.put(id, Boolean.TRUE); + } + + /** + * 批量添加黑名单 + * + * @param ids + */ + public static void addBlackList(Map ids) { + blackListMap.putAll(ids); + } + +} +``` + +使用CopyOnWrite容器需注意的两件事: +``` +1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。 +2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。 +``` diff --git a/week_04/12/DelayQueue.md b/week_04/12/DelayQueue.md new file mode 100644 index 0000000000000000000000000000000000000000..d8f3fcda764f7803cd221616013b779d28fb21da --- /dev/null +++ b/week_04/12/DelayQueue.md @@ -0,0 +1,328 @@ +#### 问题 +``` +实现方式什么 +主要有哪些应用场景 +take方法与poll方法区别,为什么有两个出列方法 +java中的线程池(ScheduledThreadPoolExecutor)实现定时任务是直接用的DelayQueue吗 +``` +#### 简介 +``` +DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务 +``` + +#### 继承体系 +``` +public class DelayQueue extends AbstractQueue implements BlockingQueue +``` + +#### 类结构说明 +``` +private class Itr implements Iterator +``` + +#### 源码解析 +##### 属性 +``` +private final transient ReentrantLock lock = new ReentrantLock(); +private final PriorityQueue q = new PriorityQueue(); +private Thread leader = null; +private final Condition available = lock.newCondition(); +``` + +##### 构造方法 +``` +public DelayQueue() {} +public DelayQueue(Collection c) { + this.addAll(c); +} +``` + +##### 主要方法 +存
+``` +public boolean add(E e) { + return offer(e); +} + +public boolean offer(E e, long timeout, TimeUnit unit) { + return offer(e); +} + +public void put(E e) { + offer(e); +} + +public boolean offer(E e) { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + // 优先队列插入 + q.offer(e); + // todo 插入值在队列首位或插入值与队列首位相同? + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); + } +} +``` + +PriorityQueue +``` +public boolean offer(E e) { + if (e == null) + throw new NullPointerException(); + modCount++; + int i = size; + if (i >= queue.length) + // 扩容 + grow(i + 1); + size = i + 1; + if (i == 0) + queue[0] = e; + else + // 通过比较存入 + siftUp(i, e); + return true; +} + +// 扩容 +private void grow(int minCapacity) { + int oldCapacity = queue.length; + // Double size if small; else grow by 50% + int newCapacity = oldCapacity + ((oldCapacity < 64) ? + (oldCapacity + 2) : + (oldCapacity >> 1)); + // overflow-conscious code + if (newCapacity - MAX_ARRAY_SIZE > 0) + newCapacity = hugeCapacity(minCapacity); + queue = Arrays.copyOf(queue, newCapacity); +} + +private void siftUp(int k, E x) { + // 自定义比较器存在 + if (comparator != null) + siftUpUsingComparator(k, x); + // 比较值是Comparable类型 + else + siftUpComparable(k, x); +} + +private void siftUpComparable(int k, E x) { + Comparable key = (Comparable) x; + while (k > 0) { + int parent = (k - 1) >>> 1; + Object e = queue[parent]; + // 传入值比前一个值大,跳出循环直接存入 + if (key.compareTo((E) e) >= 0) + break; + // 传入值与前一个值交换 + queue[k] = e; + k = parent; + } + // 尾部直接存入 + queue[k] = key; +} + +// 自定义比较器 +private void siftUpUsingComparator(int k, E x) { + while (k > 0) { + int parent = (k - 1) >>> 1; + Object e = queue[parent]; + if (comparator.compare(x, (E) e) >= 0) + break; + queue[k] = e; + k = parent; + } + queue[k] = x; +} +``` + +取
+poll +``` +public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + // 顶部无值或未过期 + // getDelay() @return 0或负数暗示已过期 + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + // 优先队列取出,并调整队列头部 + return q.poll(); + } finally { + lock.unlock(); + } +} +``` + +poll(long timeout, TimeUnit unit) +``` +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) { + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); + } else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + return q.poll(); + if (nanos <= 0) + return null; + first = null; // don't retain ref while waiting + if (nanos < delay || leader != null) + nanos = available.awaitNanos(nanos); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} +``` + +PriorityQueue +``` +public E poll() { + if (size == 0) + return null; + int s = --size; + modCount++; + // 返回head节点 + E result = (E) queue[0]; + // tail节点 + E x = (E) queue[s]; + // 删除tail节点 + queue[s] = null; + if (s != 0) + // tail节点找位置存入,调整头部 + siftDown(0, x); + return result; +} + +private void siftDown(int k, E x) { + if (comparator != null) + siftDownUsingComparator(k, x); + else + siftDownComparable(k, x); +} + +// 数据属于Comparable类型 +private void siftDownComparable(int k, E x) { + Comparable key = (Comparable)x; + int half = size >>> 1; // loop while a non-leaf + while (k < half) { + int child = (k << 1) + 1; // assume left child is least + Object c = queue[child]; + int right = child + 1; + if (right < size && ((Comparable) c).compareTo((E) queue[right]) > 0) + c = queue[child = right]; + // todo 尾部值好像和所有值比较,若尾部最小,直接跳出循环,首部直接存入 + if (key.compareTo((E) c) <= 0) + break; + // todo 交换 + queue[k] = c; + k = child; + } + // 头部直接存入 + queue[k] = key; +} + +// 自定义比较器 +private void siftDownUsingComparator(int k, E x) { + int half = size >>> 1; + while (k < half) { + int child = (k << 1) + 1; + Object c = queue[child]; + int right = child + 1; + if (right < size && + comparator.compare((E) c, (E) queue[right]) > 0) + c = queue[child = right]; + if (comparator.compare(x, (E) c) <= 0) + break; + queue[k] = c; + k = child; + } + queue[k] = x; +} +``` + +take +``` +public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + return q.poll(); + first = null; // don't retain ref while waiting + if (leader != null) + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} +``` + +take方法 +``` +加锁; +判断堆顶元素是否为空,为空的话直接阻塞等待; +判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素; +没到期,再判断前面是否有其它线程在等待,有则直接等待; +前面没有其它线程在等待,则把自己当作第一个线程等待delay时间后唤醒,再尝试获取元素; +获取到元素之后再唤醒下一个等待的线程; +解锁; +``` +#### 总结 +``` +阻塞队列 +内部存储结构使用优先级队列 +使用重入锁和条件来控制并发安全 +常用于定时任务 +``` \ No newline at end of file