From cef63801361bb3265e6a7c3e64d74d7b7946d9fa Mon Sep 17 00:00:00 2001 From: lhc Date: Mon, 13 Jan 2020 10:37:07 +0800 Subject: [PATCH 1/2] week 06 --- ...20\347\240\201\350\247\243\346\236\220.md" | 188 ++++++++++++++++++ 1 file changed, 188 insertions(+) create mode 100644 "week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" diff --git "a/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" "b/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" new file mode 100644 index 0000000..717f4b2 --- /dev/null +++ "b/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" @@ -0,0 +1,188 @@ +# ArrayBlockingQueue 源码分析 + +public class ArrayBlockingQueue extends AbstractQueue implements BlockingQueue, java.io.Serializable +继承了 AbstractQueue 实现 BlockingQueue是阻塞队列,是固定大小的队列,可实现按照FIFO原则排序 +内部由默认非公平锁实现加锁 +内部维持一个数组,存储元素,创建对象时指定数组大小 +PriorityQueue 维持一个无界优先级队列 + + 为空 +private final Condition notEmpty; + +未装满队列 +private final Condition notFull; + +如果队列已满,再塞元素会抛出异常 + +放入对象: + add(object)队列没满的话,放入成功。否则抛出异常。 + offer(object):表示如果可能的话,将object加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程) + offer(E o, long timeout, TimeUnit unit)可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。 + put(object)把object加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程阻塞。直到BlockingQueue里面有空间再继续. + +取出对象: + poll()取走BlockingQueue里排在首位的对象, + poll(long timeout, TimeUnit unit)从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。 + take()取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; + drainTo()一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。 + +private class Itr implements Iterator { + // 队列中剩余元素的个数 + private int remaining; // Number of elements yet to be returned + // 下一次调用next()返回的元素的索引 + private int nextIndex; // Index of element to be returned by next + // 下一次调用next()返回的元素 + private E nextItem; // Element to be returned by next call to next + // 上一次调用next()返回的元素 + private E lastItem; // Element returned by last call to next + // 上一次调用next()返回的元素的索引 + private int lastRet; // Index of last element returned, or -1 if none + + Itr() { + // 获取“阻塞队列”的锁 + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + lastRet = -1; + if ((remaining = count) > 0) + nextItem = itemAt(nextIndex = takeIndex); + } finally { + // 释放“锁” + lock.unlock(); + } + } + + public boolean hasNext() { + return remaining > 0; + } + + public E next() { + // 获取“阻塞队列”的锁 + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + // 若“剩余元素<=0”,则抛出异常。 + if (remaining <= 0) + throw new NoSuchElementException(); + lastRet = nextIndex; + // 获取第nextIndex位置的元素 + E x = itemAt(nextIndex); // check for fresher value + if (x == null) { + x = nextItem; // we are forced to report old value + lastItem = null; // but ensure remove fails + } + else + lastItem = x; + while (--remaining > 0 && // skip over nulls + (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) + ; + return x; + } finally { + lock.unlock(); + } + } + + public void remove() { + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + int i = lastRet; + if (i == -1) + throw new IllegalStateException(); + lastRet = -1; + E x = lastItem; + lastItem = null; + // only remove if item still at index + if (x != null && x == items[i]) { + boolean removingHead = (i == takeIndex); + removeAt(i); + if (!removingHead) + nextIndex = dec(nextIndex); + } + } finally { + lock.unlock(); + } + } +} + +遍历 +```java +private class Itr implements Iterator { + // 队列中剩余元素的个数 + private int remaining; // Number of elements yet to be returned + // 下一次调用next()返回的元素的索引 + private int nextIndex; // Index of element to be returned by next + // 下一次调用next()返回的元素 + private E nextItem; // Element to be returned by next call to next + // 上一次调用next()返回的元素 + private E lastItem; // Element returned by last call to next + // 上一次调用next()返回的元素的索引 + private int lastRet; // Index of last element returned, or -1 if none + + Itr() { + // 获取“阻塞队列”的锁 + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + lastRet = -1; + if ((remaining = count) > 0) + nextItem = itemAt(nextIndex = takeIndex); + } finally { + // 释放“锁” + lock.unlock(); + } + } + + public boolean hasNext() { + return remaining > 0; + } + + public E next() { + // 获取“阻塞队列”的锁 + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + // 若“剩余元素<=0”,则抛出异常。 + if (remaining <= 0) + throw new NoSuchElementException(); + lastRet = nextIndex; + // 获取第nextIndex位置的元素 + E x = itemAt(nextIndex); // check for fresher value + if (x == null) { + x = nextItem; // we are forced to report old value + lastItem = null; // but ensure remove fails + } + else + lastItem = x; + while (--remaining > 0 && // skip over nulls + (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) + ; + return x; + } finally { + lock.unlock(); + } + } + + public void remove() { + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + int i = lastRet; + if (i == -1) + throw new IllegalStateException(); + lastRet = -1; + E x = lastItem; + lastItem = null; + // only remove if item still at index + if (x != null && x == items[i]) { + boolean removingHead = (i == takeIndex); + removeAt(i); + if (!removingHead) + nextIndex = dec(nextIndex); + } + } finally { + lock.unlock(); + } + } +} +``` \ No newline at end of file -- Gitee From 341e4a9fc8bc0d5af19b179ee92c9478b8ee9259 Mon Sep 17 00:00:00 2001 From: lhc Date: Mon, 13 Jan 2020 17:50:23 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=A1=A5=E5=85=85=E4=B8=8A=E5=91=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ...20\347\240\201\350\247\243\346\236\220.md" | 180 +++++++----------- 1 file changed, 74 insertions(+), 106 deletions(-) diff --git "a/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" "b/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" index 717f4b2..39dc94c 100644 --- "a/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" +++ "b/week_04/62/ArrayBlockingQueue\346\272\220\347\240\201\350\247\243\346\236\220.md" @@ -6,10 +6,11 @@ public class ArrayBlockingQueue extends AbstractQueue implements BlockingQ 内部维持一个数组,存储元素,创建对象时指定数组大小 PriorityQueue 维持一个无界优先级队列 - 为空 +队列阻塞时条件 + 为空条件 private final Condition notEmpty; -未装满队列 +未装满队列条件 private final Condition notFull; 如果队列已满,再塞元素会抛出异常 @@ -26,6 +27,7 @@ private final Condition notFull; take()取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; drainTo()一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。 +node private class Itr implements Iterator { // 队列中剩余元素的个数 private int remaining; // Number of elements yet to be returned @@ -38,74 +40,9 @@ private class Itr implements Iterator { // 上一次调用next()返回的元素的索引 private int lastRet; // Index of last element returned, or -1 if none - Itr() { - // 获取“阻塞队列”的锁 - final ReentrantLock lock = ArrayBlockingQueue.this.lock; - lock.lock(); - try { - lastRet = -1; - if ((remaining = count) > 0) - nextItem = itemAt(nextIndex = takeIndex); - } finally { - // 释放“锁” - lock.unlock(); - } - } - - public boolean hasNext() { - return remaining > 0; - } - - public E next() { - // 获取“阻塞队列”的锁 - final ReentrantLock lock = ArrayBlockingQueue.this.lock; - lock.lock(); - try { - // 若“剩余元素<=0”,则抛出异常。 - if (remaining <= 0) - throw new NoSuchElementException(); - lastRet = nextIndex; - // 获取第nextIndex位置的元素 - E x = itemAt(nextIndex); // check for fresher value - if (x == null) { - x = nextItem; // we are forced to report old value - lastItem = null; // but ensure remove fails - } - else - lastItem = x; - while (--remaining > 0 && // skip over nulls - (nextItem = itemAt(nextIndex = inc(nextIndex))) == null) - ; - return x; - } finally { - lock.unlock(); - } - } - - public void remove() { - final ReentrantLock lock = ArrayBlockingQueue.this.lock; - lock.lock(); - try { - int i = lastRet; - if (i == -1) - throw new IllegalStateException(); - lastRet = -1; - E x = lastItem; - lastItem = null; - // only remove if item still at index - if (x != null && x == items[i]) { - boolean removingHead = (i == takeIndex); - removeAt(i); - if (!removingHead) - nextIndex = dec(nextIndex); - } - } finally { - lock.unlock(); - } } -} -遍历 +迭代器 ```java private class Itr implements Iterator { // 队列中剩余元素的个数 @@ -119,23 +56,71 @@ private class Itr implements Iterator { // 上一次调用next()返回的元素的索引 private int lastRet; // Index of last element returned, or -1 if none - Itr() { - // 获取“阻塞队列”的锁 - final ReentrantLock lock = ArrayBlockingQueue.this.lock; - lock.lock(); - try { - lastRet = -1; - if ((remaining = count) > 0) - nextItem = itemAt(nextIndex = takeIndex); - } finally { - // 释放“锁” - lock.unlock(); +Itr() { + // assert lock.getHoldCount() == 0; + lastRet = NONE; + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + if (count == 0) { + // assert itrs == null; + cursor = NONE; + nextIndex = NONE; + prevTakeIndex = DETACHED; + } else { + final int takeIndex = ArrayBlockingQueue.this.takeIndex; + prevTakeIndex = takeIndex; + nextItem = itemAt(nextIndex = takeIndex); + cursor = incCursor(takeIndex); + if (itrs == null) { + itrs = new Itrs(this); + } else { + itrs.register(this); // in this order + itrs.doSomeSweeping(false); + } + prevCycles = itrs.cycles; + // assert takeIndex >= 0; + // assert prevTakeIndex == takeIndex; + // assert nextIndex >= 0; + // assert nextItem != null; + } + } finally { + lock.unlock(); + } } - } +count等于0的时候,创建的这个迭代器是个无用的迭代器,可以直接移除,进入detach模式。 +否则就把当前队列的读取位置给迭代器当做下一个元素,cursor存储下个元素的位置。 + + +private void noNext() { + final ReentrantLock lock = ArrayBlockingQueue.this.lock; + lock.lock(); + try { + // assert cursor == NONE; + // assert nextIndex == NONE; + if (!isDetached()) { + // assert lastRet >= 0; + incorporateDequeues(); // might update lastRet + if (lastRet >= 0) { + lastItem = itemAt(lastRet); + // assert lastItem != null; + detach(); + } + } + // assert isDetached(); + // assert lastRet < 0 ^ lastItem != null; + } finally { + lock.unlock(); + } + } +这里引用:https://www.cnblogs.com/lighten/p/7427763.html +hashNext()先判断nextItem有没有值了,没有值的时候触发noNext函数,这个函数做的就是调整修正队列。 +这里先提一下并发中迭代器会发生什么问题。ArrayBlockingQueue的实现是一个循环数组,使用takeIndex和putIndex来控制元素的出入队列。 +这样就产生了一个问题,迭代器在创建的时候,其位置已经确定,但是队列可能在不断的出入队列,这样迭代器会受到严重影响, +可能造成队列实际上入出循环了数组一圈,而迭代器记录的是上一圈的情况,只有下标,这样遍历就会造成很大的问题。所以才需要上面所说的2点来保证, +第一个就是每个迭代器记录当前其循环的圈数,第二个就是队列元素出队列时影响所有的迭代器。这个修正队列就是在当前迭代器遍历完成之后, +比较一下圈数来判断具体情况,圈数和队列读取下标和迭代器读取下标来判断是否要废弃该迭代器。 - public boolean hasNext() { - return remaining > 0; - } public E next() { // 获取“阻塞队列”的锁 @@ -162,27 +147,10 @@ private class Itr implements Iterator { lock.unlock(); } } +next方法也会在迭代器没有被废弃的时候比较一下当前队列的情况。之后就是更新上次的下标和游标已经下一个值。 - public void remove() { - final ReentrantLock lock = ArrayBlockingQueue.this.lock; - lock.lock(); - try { - int i = lastRet; - if (i == -1) - throw new IllegalStateException(); - lastRet = -1; - E x = lastItem; - lastItem = null; - // only remove if item still at index - if (x != null && x == items[i]) { - boolean removingHead = (i == takeIndex); - removeAt(i); - if (!removingHead) - nextIndex = dec(nextIndex); - } - } finally { - lock.unlock(); - } - } -} -``` \ No newline at end of file +``` +```java +Node extends WeakReference +实现弱引用 +``` -- Gitee