From 1908e92468beaac82b584ff85fa529e961dc121c Mon Sep 17 00:00:00 2001 From: puhaiyang Date: Mon, 23 Mar 2020 09:27:22 +0800 Subject: [PATCH 1/6] =?UTF-8?q?1.=E6=B7=BB=E5=8A=A0=E6=9C=AC=E5=91=A8?= =?UTF-8?q?=E5=AD=A6=E4=B9=A0=E5=86=85=E5=AE=B9=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- second/week_04/70/ArrayBlockingQueue.adoc | 0 second/week_04/70/CLH.adoc | 0 second/week_04/70/ConcurrentHashMap.adoc | 0 second/week_04/70/ConcurrentLinkQueue.adoc | 0 second/week_04/70/CopyOnWriteArrayList.adoc | 0 second/week_04/70/DelayQueue.adoc | 0 6 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 second/week_04/70/ArrayBlockingQueue.adoc create mode 100644 second/week_04/70/CLH.adoc create mode 100644 second/week_04/70/ConcurrentHashMap.adoc create mode 100644 second/week_04/70/ConcurrentLinkQueue.adoc create mode 100644 second/week_04/70/CopyOnWriteArrayList.adoc create mode 100644 second/week_04/70/DelayQueue.adoc diff --git a/second/week_04/70/ArrayBlockingQueue.adoc b/second/week_04/70/ArrayBlockingQueue.adoc new file mode 100644 index 0000000..e69de29 diff --git a/second/week_04/70/CLH.adoc b/second/week_04/70/CLH.adoc new file mode 100644 index 0000000..e69de29 diff --git a/second/week_04/70/ConcurrentHashMap.adoc b/second/week_04/70/ConcurrentHashMap.adoc new file mode 100644 index 0000000..e69de29 diff --git a/second/week_04/70/ConcurrentLinkQueue.adoc b/second/week_04/70/ConcurrentLinkQueue.adoc new file mode 100644 index 0000000..e69de29 diff --git a/second/week_04/70/CopyOnWriteArrayList.adoc b/second/week_04/70/CopyOnWriteArrayList.adoc new file mode 100644 index 0000000..e69de29 diff --git a/second/week_04/70/DelayQueue.adoc b/second/week_04/70/DelayQueue.adoc new file mode 100644 index 0000000..e69de29 -- Gitee From 1babd3cc94f8de2e63382b6e7e3f0f352b252859 Mon Sep 17 00:00:00 2001 From: puhaiyang Date: Mon, 23 Mar 2020 14:35:50 +0800 Subject: [PATCH 2/6] =?UTF-8?q?1.=E5=AE=8C=E6=88=90CopyOnWriteArrayList?= =?UTF-8?q?=E5=AD=A6=E4=B9=A0=E7=AC=94=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- second/week_04/70/CLH.adoc | 1 + second/week_04/70/CopyOnWriteArrayList.adoc | 262 ++++++++++++++++++++ 2 files changed, 263 insertions(+) diff --git a/second/week_04/70/CLH.adoc b/second/week_04/70/CLH.adoc index e69de29..adcd116 100644 --- a/second/week_04/70/CLH.adoc +++ b/second/week_04/70/CLH.adoc @@ -0,0 +1 @@ +CLH的发明人是:Craig,Landin and Hagersten \ No newline at end of file diff --git a/second/week_04/70/CopyOnWriteArrayList.adoc b/second/week_04/70/CopyOnWriteArrayList.adoc index e69de29..e248700 100644 --- a/second/week_04/70/CopyOnWriteArrayList.adoc +++ b/second/week_04/70/CopyOnWriteArrayList.adoc @@ -0,0 +1,262 @@ += CopyOnWriteArrayList基础入门学习笔记 +Doc Writer +v1.0, 2020-03-23 +:toc-title: 目录 +:toc: left + +== 基本要求 + +=== 源码学习要求 + +- 至少提交2个类的源码分析笔记 + +=== jdk环境要求 + +jdk1.8 + +=== 学习后需要至少掌握的知识点 + +- CopyOnWriteArrayList与ArrayList有什么区别? +- CopyOnWriteArrayList是基本原理 + +== 部分源码分析笔记 + +=== ArrayList可能带来的并发问题示例 + +[source,java] +---- +import java.util.ArrayList; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ArrayListNoSafe { + public static void main(String[] args) { + ArrayList xxx = new ArrayList<>(); + ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(50, 50, 10L, TimeUnit.SECONDS, new LinkedBlockingDeque<>(1000)); + for (int i = 0; i < 50; i++) { + threadPoolExecutor.execute(new Runnable() { + @Override + public void run() { + long i = 0; + for (; ; ) { + i++; + xxx.add(i + ""); + } + } + }); + } + + } +} +---- + +输出结果为: + +---- +Exception in thread "pool-1-thread-1" Exception in thread "pool-1-thread-7" Exception in thread "pool-1-thread-2" Exception in thread "pool-1-thread-3" java.lang.ArrayIndexOutOfBoundsException: 49 + at java.util.ArrayList.add(ArrayList.java:463) + at com.example.springbootstu.ArrayListNoSafe$1.run(ArrayListNoSafe.java:20) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at java.lang.Thread.run(Thread.java:748) +java.lang.ArrayIndexOutOfBoundsException: 15 + at java.util.ArrayList.add(ArrayList.java:463) + at com.example.springbootstu.ArrayListNoSafe$1.run(ArrayListNoSafe.java:20) + at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) + at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) + at java.lang.Thread.run(Thread.java:748) +---- + +因为扩容不及时,会导致outOfBoundsException + +=== CopyOnWriteArrayList构造方法 + +[source,adoc] +---- + /** The array, accessed only via getArray/setArray. */ + private transient volatile Object[] array; + + /** + * Creates an empty list. + */ + public CopyOnWriteArrayList() { + setArray(new Object[0]); + } + /** + * Sets the array. + */ + final void setArray(Object[] a) { + array = a; + } +---- + +=== CopyOnWriteArrayList的add方法 + +[source,adoc] +---- + /** The lock protecting all mutators */ + final transient ReentrantLock lock = new ReentrantLock(); + + /** + * Appends the specified element to the end of this list. + * + * @param e element to be appended to this list + * @return {@code true} (as specified by {@link Collection#add}) + */ + public boolean add(E e) { + //拿到lock + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + //在需要写的时候再copy数组 + Object[] newElements = Arrays.copyOf(elements, len + 1); + newElements[len] = e; + setArray(newElements); + return true; + } finally { + lock.unlock(); + } + } +---- + +=== CopyOnWriteArrayList的remove方法 + +[source,adoc] +---- + /** + * Removes the element at the specified position in this list. + * Shifts any subsequent elements to the left (subtracts one from their + * indices). Returns the element that was removed from the list. + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ + public E remove(int index) { + //获取锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + Object[] elements = getArray(); + int len = elements.length; + //获取到原数据 + E oldValue = get(elements, index); + //原始数据大小-index-1=需要移动的数组节点数 + int numMoved = len - index - 1; + if (numMoved == 0) + //len-index=1,移除的是老末节点的数据 + setArray(Arrays.copyOf(elements, len - 1)); + else { + //新的数组 + Object[] newElements = new Object[len - 1]; + //前半段 + System.arraycopy(elements, 0, newElements, 0, index); + //后半段 + System.arraycopy(elements, index + 1, newElements, index, + numMoved); + //新数组 + setArray(newElements); + } + return oldValue; + } finally { + lock.unlock(); + } + } + + @SuppressWarnings("unchecked") + private E get(Object[] a, int index) { + return (E) a[index]; + } +---- +=== CopyOnWriteArrayList的iterator方法 +[source,java] +---- + static final class COWIterator implements ListIterator { + /** Snapshot of the array */ + private final Object[] snapshot; + /** Index of element to be returned by subsequent call to next. */ + private int cursor; + + private COWIterator(Object[] elements, int initialCursor) { + cursor = initialCursor; + snapshot = elements; + } + + public boolean hasNext() { + return cursor < snapshot.length; + } + + public boolean hasPrevious() { + return cursor > 0; + } + + @SuppressWarnings("unchecked") + public E next() { + if (! hasNext()) + throw new NoSuchElementException(); + return (E) snapshot[cursor++]; + } + + @SuppressWarnings("unchecked") + public E previous() { + if (! hasPrevious()) + throw new NoSuchElementException(); + return (E) snapshot[--cursor]; + } + + public int nextIndex() { + return cursor; + } + + public int previousIndex() { + return cursor-1; + } + + /** + * Not supported. Always throws UnsupportedOperationException. + * @throws UnsupportedOperationException always; {@code remove} + * is not supported by this iterator. + */ + public void remove() { + throw new UnsupportedOperationException(); + } + + /** + * Not supported. Always throws UnsupportedOperationException. + * @throws UnsupportedOperationException always; {@code set} + * is not supported by this iterator. + */ + public void set(E e) { + throw new UnsupportedOperationException(); + } + + /** + * Not supported. Always throws UnsupportedOperationException. + * @throws UnsupportedOperationException always; {@code add} + * is not supported by this iterator. + */ + public void add(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public void forEachRemaining(Consumer action) { + Objects.requireNonNull(action); + Object[] elements = snapshot; + final int size = elements.length; + for (int i = cursor; i < size; i++) { + @SuppressWarnings("unchecked") E e = (E) elements[i]; + action.accept(e); + } + cursor = size; + } + } +---- +[NOTE] +.通过查看CopyOnWriteArrayList的代码中可以看出 +==== +查看源码后可知如下内容: +. CopyOnWriteArrayList是通过ReentrantLock来实现的,在add的remove方法中都通过锁来实现 +. CopyOnWriteArrayList的迭代器不允许进行数据进行删除,如想删除需要调用remove,从而实现并发控制 +==== \ No newline at end of file -- Gitee From d1f66a4b00bc073304e47b2a5fbccce3dd56831f Mon Sep 17 00:00:00 2001 From: puhaiyang Date: Wed, 25 Mar 2020 10:34:52 +0800 Subject: [PATCH 3/6] =?UTF-8?q?1.=E5=AE=8C=E6=88=90ArrayBlockingQueue?= =?UTF-8?q?=E5=AD=A6=E4=B9=A0=E7=AC=94=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- second/week_04/70/ArrayBlockingQueue.adoc | 240 ++++++++++++++++++++++ 1 file changed, 240 insertions(+) diff --git a/second/week_04/70/ArrayBlockingQueue.adoc b/second/week_04/70/ArrayBlockingQueue.adoc index e69de29..616d17f 100644 --- a/second/week_04/70/ArrayBlockingQueue.adoc +++ b/second/week_04/70/ArrayBlockingQueue.adoc @@ -0,0 +1,240 @@ += ArrayBlockingQueue基础入门学习笔记 +Doc Writer +v1.0, 2020-03-24 +:toc-title: 目录 +:toc: left + +== 基本要求 + +=== 源码学习要求 + +- 至少提交2个类的源码分析笔记 + +=== jdk环境要求 + +jdk1.8 + +=== 学习后需要至少掌握的知识点 + +- ArrayBlockingQueue与ArrayList有什么区别? +- ArrayBlockingQueue的基本原理 + +== 部分源码分析笔记 +=== ArrayBlockingQueue的构造函数 +[souce,adoc] +---- + //capacity指定大小 + /** + * Creates an {@code ArrayBlockingQueue} with the given (fixed) + * capacity and default access policy. + * + * @param capacity the capacity of this queue + * @throws IllegalArgumentException if {@code capacity < 1} + */ + public ArrayBlockingQueue(int capacity) { + this(capacity, false); + } + /** Condition for waiting takes */ + private final Condition notEmpty; + + /** Condition for waiting puts */ + private final Condition notFull; + + /** + * Creates an {@code ArrayBlockingQueue} with the given (fixed) + * capacity and the specified access policy. + * + * @param capacity the capacity of this queue + * @param fair if {@code true} then queue accesses for threads blocked + * on insertion or removal, are processed in FIFO order; + * if {@code false} the access order is unspecified. + * @throws IllegalArgumentException if {@code capacity < 1} + */ + public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); + } +---- + +=== ArrayBlockingQueue的add方法 +[souce,adoc] +---- + /** + * Inserts the specified element at the tail of this queue if it is + * possible to do so immediately without exceeding the queue's capacity, + * returning {@code true} upon success and throwing an + * {@code IllegalStateException} if this queue is full. + * + * @param e the element to add + * @return {@code true} (as specified by {@link Collection#add}) + * @throws IllegalStateException if this queue is full + * @throws NullPointerException if the specified element is null + */ + public boolean add(E e) { + return super.add(e); + } + + //AbstractQueue中的add + public boolean add(E e) { + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); + } + + //位于ArrayBlockingQueue中的offer方法 + public boolean offer(E e) { + //元素不能为空 + checkNotNull(e); + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + // int count; count从0开始 + if (count == items.length) + return false; + else { + enqueue(e); + return true; + } + } finally { + lock.unlock(); + } + } + + /** + * Inserts element at current put position, advances, and signals. + * Call only when holding lock. + */ + private void enqueue(E x) { + //int putIndex; putIndex从0开始的 + // assert lock.getHoldCount() == 1; + // assert items[putIndex] == null; + final Object[] items = this.items; + //进行赋值 + items[putIndex] = x; + //添加后putIndex自增一位 + if (++putIndex == items.length) + //如果自增后的结果与items.length相等,则让putIndex归0,因为是一个队列 + putIndex = 0; + //count自增,记录添加过的数据数 + count++; + //notEmpty的condition执行singnal,通知notEmpty等待的线程 + notEmpty.signal(); + } +---- + + +=== ArrayBlockingQueue的pool方法 +[souce,adoc] +---- + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } + } + + /** + * Extracts element at current take position, advances, and signals. + * Call only when holding lock. + */ + private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + //takeIndex从0开始,x代表队首元素 + E x = (E) items[takeIndex]; + //items[takeIndex]设为空 + items[takeIndex] = null; + //takeIndex自增后判断是否到达队尾了 + if (++takeIndex == items.length) + //如果已到达队尾,则takeIndex重置为0 + takeIndex = 0; + //count-- + count--; + if (itrs != null) + // /** + // * Shared state for currently active iterators, or null if there + // * are known not to be any. Allows queue operations to update + // * iterator state. + // */ + // transient Itrs itrs = null; + itrs.elementDequeued(); + //激活notFull执行了wait的线程 + notFull.signal(); + return x; + } +---- + +=== ArrayBlockingQueue的peek方法 +[souce,adoc] +---- + /** items index for next take, poll, peek or remove */ + int takeIndex; + + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return itemAt(takeIndex); // null when queue is empty + } finally { + lock.unlock(); + } + } + + /** + * Returns item at index i. + */ + @SuppressWarnings("unchecked") + final E itemAt(int i) { + return (E) items[i]; + } +---- +.NOTE +peek方法会获取锁,然后返回takeIndex位置的元素 + +=== ArrayBlockingQueue的clear方法 +[souce,adoc] +---- + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public void clear() { + final Object[] items = this.items; + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + int k = count; + if (k > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + //从队头开始,进行清空 + items[i] = null; + if (++i == items.length) + i = 0; + } while (i != putIndex); + takeIndex = putIndex; + count = 0; + if (itrs != null) + //将迭代器置空 + itrs.queueIsEmpty(); + for (; k > 0 && lock.hasWaiters(notFull); k--) + notFull.signal(); + } + } finally { + lock.unlock(); + } + } +---- \ No newline at end of file -- Gitee From e28604ba75770bf53593470b965014f774b945b2 Mon Sep 17 00:00:00 2001 From: puhaiyang Date: Wed, 25 Mar 2020 18:14:44 +0800 Subject: [PATCH 4/6] =?UTF-8?q?1.=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- second/week_04/70/ConcurrentLinkQueue.adoc | 189 +++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/second/week_04/70/ConcurrentLinkQueue.adoc b/second/week_04/70/ConcurrentLinkQueue.adoc index e69de29..3450f90 100644 --- a/second/week_04/70/ConcurrentLinkQueue.adoc +++ b/second/week_04/70/ConcurrentLinkQueue.adoc @@ -0,0 +1,189 @@ += ConcurrentLinkQueue基础入门学习笔记 +Doc Writer +v1.0, 2020-03-25 +:toc-title: 目录 +:toc: left + +== 基本要求 + +=== 源码学习要求 + +- 至少提交2个类的源码分析笔记 + +=== jdk环境要求 + +jdk1.8 + +=== 学习后需要至少掌握的知识点 + +- ConcurrentLinkQueue与与LinkQueue有什么区别? +- ConcurrentLinkQueue与ArrayBlockingQueue有什么区别? +- ConcurrentLinkQueue的基本原理 + +== 部分源码分析笔记 +=== ConcurrentLinkQueue的构造函数 +[souce,adoc] +---- + /** + * Creates a {@code ConcurrentLinkedQueue} that is initially empty. + */ + public ConcurrentLinkedQueue() { + head = tail = new Node(null); + } +---- + +=== ConcurrentLinkQueue的add方法 +[souce,adoc] +---- + public boolean add(E e) { + return offer(e); + } + + public boolean offer(E e) { + checkNotNull(e); + //创建新node + final Node newNode = new Node(e); + + for (Node t = tail, p = t;;) { + //p为tail,tail由volatile修饰 + Node q = p.next; + //tail的下一节点是否为空,为空的话,说明tail是最后的一个节点 + if (q == null) { + // p is last node + if (p.casNext(null, newNode)) { + // 用cas给最后一个节点赋值为newNode;如果赋值成功,则q=newNode,p=t + // Successful CAS is the linearization point + // for e to become an element of this queue, + // and for newNode to become "live". + if (p != t) // hop two nodes at a time + //什么情况下会可能出现p!=t呢?也就是另一个线程拿到q后发现q!=null + casTail(t, newNode); // Failure is OK. + return true; + } + // Lost CAS race to another thread; re-read next + } + else if (p == q) + //执行到了这里,则说明q!=null,p=tail + //p=tail,q=p.next;即p=tail,q=tail.next + // We have fallen off list. If tail is unchanged, it + // will also be off-list, in which case we need to + // jump to head, from which all live nodes are always + // reachable. Else the new tail is a better bet. + p = (t != (t = tail)) ? t : head; + else + // Check for tail updates after two hops. + p = (p != t && t != (t = tail)) ? t : q; + } + } +---- + + +=== ArrayBlockingQueue的pool方法 +[souce,adoc] +---- + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return (count == 0) ? null : dequeue(); + } finally { + lock.unlock(); + } + } + + /** + * Extracts element at current take position, advances, and signals. + * Call only when holding lock. + */ + private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + final Object[] items = this.items; + @SuppressWarnings("unchecked") + //takeIndex从0开始,x代表队首元素 + E x = (E) items[takeIndex]; + //items[takeIndex]设为空 + items[takeIndex] = null; + //takeIndex自增后判断是否到达队尾了 + if (++takeIndex == items.length) + //如果已到达队尾,则takeIndex重置为0 + takeIndex = 0; + //count-- + count--; + if (itrs != null) + // /** + // * Shared state for currently active iterators, or null if there + // * are known not to be any. Allows queue operations to update + // * iterator state. + // */ + // transient Itrs itrs = null; + itrs.elementDequeued(); + //激活notFull执行了wait的线程 + notFull.signal(); + return x; + } +---- + +=== ArrayBlockingQueue的peek方法 +[souce,adoc] +---- + /** items index for next take, poll, peek or remove */ + int takeIndex; + + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + return itemAt(takeIndex); // null when queue is empty + } finally { + lock.unlock(); + } + } + + /** + * Returns item at index i. + */ + @SuppressWarnings("unchecked") + final E itemAt(int i) { + return (E) items[i]; + } +---- +.NOTE +peek方法会获取锁,然后返回takeIndex位置的元素 + +=== ArrayBlockingQueue的clear方法 +[souce,adoc] +---- + /** + * Atomically removes all of the elements from this queue. + * The queue will be empty after this call returns. + */ + public void clear() { + final Object[] items = this.items; + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + int k = count; + if (k > 0) { + final int putIndex = this.putIndex; + int i = takeIndex; + do { + //从队头开始,进行清空 + items[i] = null; + if (++i == items.length) + i = 0; + } while (i != putIndex); + takeIndex = putIndex; + count = 0; + if (itrs != null) + //将迭代器置空 + itrs.queueIsEmpty(); + for (; k > 0 && lock.hasWaiters(notFull); k--) + notFull.signal(); + } + } finally { + lock.unlock(); + } + } +---- \ No newline at end of file -- Gitee From 38919761f4d2081fb697fdd1b926c8c132fcec94 Mon Sep 17 00:00:00 2001 From: puhaiyang Date: Thu, 26 Mar 2020 13:36:10 +0800 Subject: [PATCH 5/6] =?UTF-8?q?1.=E6=9A=82=E5=AD=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- second/week_04/70/ConcurrentLinkQueue.adoc | 127 ++++++++------------- 1 file changed, 50 insertions(+), 77 deletions(-) diff --git a/second/week_04/70/ConcurrentLinkQueue.adoc b/second/week_04/70/ConcurrentLinkQueue.adoc index 3450f90..ed56224 100644 --- a/second/week_04/70/ConcurrentLinkQueue.adoc +++ b/second/week_04/70/ConcurrentLinkQueue.adoc @@ -33,6 +33,7 @@ jdk1.8 ---- === ConcurrentLinkQueue的add方法 +https://www.jianshu.com/p/231caf90f30b [souce,adoc] ---- public boolean add(E e) { @@ -44,6 +45,8 @@ jdk1.8 //创建新node final Node newNode = new Node(e); + //p初始时指向tail节点 + //q始终指向tail的next节点 for (Node t = tail, p = t;;) { //p为tail,tail由volatile修饰 Node q = p.next; @@ -52,33 +55,66 @@ jdk1.8 // p is last node if (p.casNext(null, newNode)) { // 用cas给最后一个节点赋值为newNode;如果赋值成功,则q=newNode,p=t - // Successful CAS is the linearization point - // for e to become an element of this queue, - // and for newNode to become "live". - if (p != t) // hop two nodes at a time - //什么情况下会可能出现p!=t呢?也就是另一个线程拿到q后发现q!=null + if (p != t) + //赋值tail,交替循环更新tail casTail(t, newNode); // Failure is OK. return true; } - // Lost CAS race to another thread; re-read next } else if (p == q) - //执行到了这里,则说明q!=null,p=tail - //p=tail,q=p.next;即p=tail,q=tail.next - // We have fallen off list. If tail is unchanged, it - // will also be off-list, in which case we need to - // jump to head, from which all live nodes are always - // reachable. Else the new tail is a better bet. + // p = (t != (t = tail)) ? t : head; else - // Check for tail updates after two hops. + //让p指向最后一个节点,最后一个节点有可能是t,也有可能是t.next,所以这里要进行判断 p = (p != t && t != (t = tail)) ? t : q; } } ---- +[NOTE] +.通过查看offer方法后记录 +==== +. ConcurrentLinkQueue的入队操作正如源码所说的那样,它采用了wait-free算法来实现,并发非阻塞 +. 与传统的队列相比,ConcurrentLinkQueue的tail可能是不正真的队尾节点,队尾节点也可能为tail->next节点 +==== + +=== ConcurrentLinkQueue的peek方法 +[souce,adoc] +---- + public E peek() { + restartFromHead: + for (;;) { + for (Node h = head, p = h, q;;) { + E item = p.item; + //第一次:item==null的话,q=p.next,q=head.next + if (item != null || (q = p.next) == null) { + //第二次:item=head.next.item,item不为空 + //h + updateHead(h, p); + //返回item + return item; + } + else if (p == q) + continue restartFromHead; + else + //第一次:进行赋值p=q + p = q; + } + } + } + /** + * Tries to CAS head to p. If successful, repoint old head to itself + * as sentinel for succ(), below. + */ + final void updateHead(Node h, Node p) { + //cas将h和p进行更新 + if (h != p && casHead(h, p)) + //head.next=h + h.lazySetNext(h); + } +---- -=== ArrayBlockingQueue的pool方法 +=== ConcurrentLinkQueue的pool方法 [souce,adoc] ---- public E poll() { @@ -124,66 +160,3 @@ jdk1.8 } ---- -=== ArrayBlockingQueue的peek方法 -[souce,adoc] ----- - /** items index for next take, poll, peek or remove */ - int takeIndex; - - public E peek() { - final ReentrantLock lock = this.lock; - lock.lock(); - try { - return itemAt(takeIndex); // null when queue is empty - } finally { - lock.unlock(); - } - } - - /** - * Returns item at index i. - */ - @SuppressWarnings("unchecked") - final E itemAt(int i) { - return (E) items[i]; - } ----- -.NOTE -peek方法会获取锁,然后返回takeIndex位置的元素 - -=== ArrayBlockingQueue的clear方法 -[souce,adoc] ----- - /** - * Atomically removes all of the elements from this queue. - * The queue will be empty after this call returns. - */ - public void clear() { - final Object[] items = this.items; - final ReentrantLock lock = this.lock; - //加锁 - lock.lock(); - try { - int k = count; - if (k > 0) { - final int putIndex = this.putIndex; - int i = takeIndex; - do { - //从队头开始,进行清空 - items[i] = null; - if (++i == items.length) - i = 0; - } while (i != putIndex); - takeIndex = putIndex; - count = 0; - if (itrs != null) - //将迭代器置空 - itrs.queueIsEmpty(); - for (; k > 0 && lock.hasWaiters(notFull); k--) - notFull.signal(); - } - } finally { - lock.unlock(); - } - } ----- \ No newline at end of file -- Gitee From 7f7047f5a1691b17dbd15170c65804aa94e08837 Mon Sep 17 00:00:00 2001 From: puhaiyang <761396462@qq.com> Date: Sun, 29 Mar 2020 20:45:46 +0800 Subject: [PATCH 6/6] =?UTF-8?q?1.=E6=B7=BB=E5=8A=A0DelayQueue=E5=AD=A6?= =?UTF-8?q?=E4=B9=A0=E7=AC=94=E8=AE=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- second/week_04/70/DelayQueue.adoc | 315 ++++++++++++++++++++++++++++++ 1 file changed, 315 insertions(+) diff --git a/second/week_04/70/DelayQueue.adoc b/second/week_04/70/DelayQueue.adoc index e69de29..dd5c0d1 100644 --- a/second/week_04/70/DelayQueue.adoc +++ b/second/week_04/70/DelayQueue.adoc @@ -0,0 +1,315 @@ += DelayQueue基础入门学习笔记 +Doc Writer +v1.0, 2020-03-29 +:toc-title: 目录 +:toc: left + +== 基本要求 + +=== 源码学习要求 + +- 至少提交2个类的源码分析笔记 + +=== jdk环境要求 + +jdk1.8 + +=== 学习后需要至少掌握的知识点 + +- DelayQueue的基本原理 + +== 部分源码分析笔记 + +=== DelayQueue类的结构和构造方法 + +[source,java] +---- +/** + * An unbounded {@linkplain BlockingQueue blocking queue} of + * {@code Delayed} elements, in which an element can only be taken + * when its delay has expired. The head of the queue is that + * {@code Delayed} element whose delay expired furthest in the + * past. If no delay has expired there is no head and {@code poll} + * will return {@code null}. Expiration occurs when an element's + * {@code getDelay(TimeUnit.NANOSECONDS)} method returns a value less + * than or equal to zero. Even though unexpired elements cannot be + * removed using {@code take} or {@code poll}, they are otherwise + * treated as normal elements. For example, the {@code size} method + * returns the count of both expired and unexpired elements. + * This queue does not permit null elements. + * + *

This class and its iterator implement all of the + * optional methods of the {@link Collection} and {@link + * Iterator} interfaces. The Iterator provided in method {@link + * #iterator()} is not guaranteed to traverse the elements of + * the DelayQueue in any particular order. + * + *

This class is a member of the + * + * Java Collections Framework. + * + * @since 1.5 + * @author Doug Lea + * @param the type of elements held in this collection + */ +public class DelayQueue extends AbstractQueue + implements BlockingQueue { + //锁 + private final transient ReentrantLock lock = new ReentrantLock(); + //优先队列,PriorityQueue采用堆排序实现 + private final PriorityQueue q = new PriorityQueue(); + + /** + * Thread designated to wait for the element at the head of + * the queue. This variant of the Leader-Follower pattern + * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to + * minimize unnecessary timed waiting. When a thread becomes + * the leader, it waits only for the next delay to elapse, but + * other threads await indefinitely. The leader thread must + * signal some other thread before returning from take() or + * poll(...), unless some other thread becomes leader in the + * interim. Whenever the head of the queue is replaced with + * an element with an earlier expiration time, the leader + * field is invalidated by being reset to null, and some + * waiting thread, but not necessarily the current leader, is + * signalled. So waiting threads must be prepared to acquire + * and lose leadership while waiting. + */ + private Thread leader = null; + + /** + * Condition signalled when a newer element becomes available + * at the head of the queue or a new thread may need to + * become leader. + */ + private final Condition available = lock.newCondition(); + + /** + * Creates a new {@code DelayQueue} that is initially empty. + */ + public DelayQueue() {} + + /** + * Creates a {@code DelayQueue} initially containing the elements of the + * given collection of {@link Delayed} instances. + * + * @param c the collection of elements to initially contain + * @throws NullPointerException if the specified collection or any + * of its elements are null + */ + public DelayQueue(Collection c) { + this.addAll(c); + } +} + +public interface Delayed extends Comparable { + long getDelay(TimeUnit unit); +} + +public interface Comparable { + public int compareTo(T o); +} +---- + +[NOTE] +.通过上面的代码中可以看出 +==== +. 通过DelayQueue类可知,它是用ReentrantLock实现的一个阻塞队列 +. DelayQueue中的元素必须实现Delayed接口,Delayed又继承于Comparable方法,所以DelayQueue中的对象必须实现getDelay和compareTo方法 +==== + +=== DelayQueue的add方法 + +[source,adoc] +---- + /** + * Inserts the specified element into this delay queue. + * + * @param e the element to add + * @return {@code true} (as specified by {@link Collection#add}) + * @throws NullPointerException if the specified element is null + */ + public boolean add(E e) { + return offer(e); + } + + /** + * Inserts the specified element into this delay queue. + * + * @param e the element to add + * @return {@code true} + * @throws NullPointerException if the specified element is null + */ + public boolean offer(E e) { + final ReentrantLock lock = this.lock; + //通过阻塞,添加数据 + lock.lock(); + try { + q.offer(e); + if (q.peek() == e) { + leader = null; + available.signal(); + } + return true; + } finally { + lock.unlock(); + } + } + + //q.offer是PriorityQueue队列中的方法,实现堆排序 + public boolean offer(E e) { + if (e == null) + throw new NullPointerException(); + modCount++; + int i = size; + if (i >= queue.length) + grow(i + 1); + size = i + 1; + if (i == 0) + queue[0] = e; + else + siftUp(i, e); + return true; + } + + //q.offer + public E peek() { + return (size == 0) ? null : (E) queue[0]; + } +---- + +=== DelayQueue的peek方法 + +[souce,adoc] +---- + /** + * Retrieves, but does not remove, the head of this queue, or + * returns {@code null} if this queue is empty. Unlike + * {@code poll}, if no expired elements are available in the queue, + * this method returns the element that will expire next, + * if one exists. + * + * @return the head of this queue, or {@code null} if this + * queue is empty + */ + public E peek() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //调用优先队列的peek + return q.peek(); + } finally { + lock.unlock(); + } + } +---- + +=== DelayQueue的poll方法 + +[souce,adoc] +---- + //无参poll + public E poll() { + final ReentrantLock lock = this.lock; + lock.lock(); + try { + E first = q.peek(); + if (first == null || first.getDelay(NANOSECONDS) > 0) + //没有节点或第一个节点还没到时间,返回null + return null; + else + //从优先队列中取出一个元素 + return q.poll(); + } finally { + lock.unlock(); + } + } + + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + final ReentrantLock lock = this.lock; + //申请锁 + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) { + if (nanos <= 0) + return null; + else + nanos = available.awaitNanos(nanos); + } else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + return q.poll(); + if (nanos <= 0) + return null; + first = null; // don't retain ref while waiting + if (nanos < delay || leader != null) + nanos = available.awaitNanos(nanos); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + long timeLeft = available.awaitNanos(delay); + nanos -= delay - timeLeft; + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } + } +---- + +=== DelayQueue的tack方法 + +[souce,adoc] +---- + /** + * Retrieves and removes the head of this queue, waiting if necessary + * until an element with an expired delay is available on this queue. + * + * @return the head of this queue + * @throws InterruptedException {@inheritDoc} + */ + public E take() throws InterruptedException { + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + E first = q.peek(); + if (first == null) + available.await(); + else { + long delay = first.getDelay(NANOSECONDS); + if (delay <= 0) + return q.poll(); + first = null; // don't retain ref while waiting + if (leader != null) + available.await(); + else { + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } + } +---- \ No newline at end of file -- Gitee