diff --git a/week_03/25/synchronized_25.md b/week_03/25/synchronized_25.md index 11ac694039025d49e482f5bb1de15edad39a64c3..2cb55a0e6dc8ca181fcba312af330dbc0a0238ad 100644 --- a/week_03/25/synchronized_25.md +++ b/week_03/25/synchronized_25.md @@ -1,7 +1,48 @@ # synchronized synchronized关键字是Java中最基本的同步手段,声明synchronized的代码块或者方法会在编译时在代码块或方法的前后添加monitorenter、monitorexit字节码指令,这两个字节码指令都需要一个引用类型的参数来指明要锁定和解锁的对象。 -### 实现原理 +### 线程安全 +线程安全是多线程环境下保证了共享的、可修改变量状态的正确性。这就意味着如果不共享、非可修改变量也就不存在线程安全问题,因此除了加锁以外可以通过封装、声明为final使变量不共享、不可修改来保证变量的线程安全。 + +线程安全需要保证下面三个特性: +1. 原子性:一次完整(一个和多个)的操作不会中途被其他线程干扰 +2. 可见性:一个线程修改了某个共享变量,其状态能够被其他所有的线程立即知晓 +3. 有序性:保证线程内串行语义,避免指令重排 + +线程不安全的代码实例如下所示: +``` +public class Test implements Runnable{ + private int stock = 30; + + @Override + public void run() { + while (stock > 0) { + //模拟延时 + try { + Thread.sleep(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println(Thread.currentThread().getName() + " " + stock--); + } + } + + public static void main(String[] args) { + Test d = new Test(); + new Thread(d,"whl1").start(); + new Thread(d,"whl2").start(); + } +} +``` +执行结果可以确定是不正确的,这是因为一次完整的抢票操作包含了一次读写过程,线程whl1、whl2可能同时在主内存读取到同一份资源并拷贝到工作内存。假设同时读取剩余票数为21,whl1将stock修改为20的操作由于是在工作内存执行,外部其他线程是不可见的,因此whl1将stock=20回写到主内存时,whl2并不知道主内存中的stock值已经改变了,还是依旧以stock=21的状态继续执行,这就可能导致两次修改操作却只得到一次修改的结果。 + +根据上面的分析,或许可以通过对stock添加volatile保证两个线程读写操作的可见性以此实现线程安全的目的,那么在尝试添加volatile了之后执行结果依旧是不正确的。这是因为:```stock--;```这个操作存在读取、赋值的动作,读取和赋值分开来看都是原子操作,但是加在一起就不再具备原子性了。 + +假设whl1在读取了stock=21之后被阻塞,此时还未执行修改动作,whl2也读取到了stock=21。whl2对stock进行-1后回写到主存。此时whl1阻塞结束,由于whl1读取这个原子操作已经结束,因此即便主存中stock值更新了也不会再执行读取操作了,whl1会继续执行后序的修改动作,那么结果依旧是两次修改操作却只得到一次修改的结果。 + +由此看来,单单靠volatile是无法保证线程安全的,常见的做法是volatile保证可见性有序性的同时通过CAS操作保证原子性,以此实现一致性。 + +### synchronized实现原理 在Java内存模型中,存在lock和unlock指令,分别用于主内存变量的锁定和解锁。lock会把主内存中的变量标识为一条线程独占状态;unlock会把锁定的变量释放,以使其能被其他线程锁定。但是lock和unlock并没有直接提供给用户使用,而是提供了两个更高层次的指令monitorenter和monitorexit来隐式调用lock和unlock。 根据JVM规范的要求,在执行monitorenter指令时,首先会去尝试获取对象的锁,如果该对象没有被锁定,或当前线程已经持有该对象的锁,就把锁的计数器+1;在执行monitorexit时会把计数器-1,当计数器减小为0时,锁就释放了。下面通过代码进行测试,尝试对Test.class对象加上两次synchronized锁,并观察其字节码: @@ -86,13 +127,17 @@ public class Test { ``` 如果是公平锁,那么按照等待顺序获取锁的结果应该是:1,2,3,4,但结果是无序的,可以得知synchronized是非公平锁。 -### 锁优化 -JDK1.6之后,synchronized与ReenTrantLock的性能基本持平,而且虚拟机在未来的性能改进中会更偏向于原生的synchronized,所以还是提倡在synchronized能满足需求的情况下,优先考虑使用synchronized关键字来进行同步。优化后的synchronized和ReenTrantLock一样,在很多地方都是用到了CAS操作。下面是一些synchronized的状态: -(1)偏向锁:在大多数情况下,锁不存在多线程竞争,总是由同一线程多次获得。因此为了减少同一个线程多次获取锁导致的性能开销,引入了偏向锁。 -如果某个对象的锁被一个线程获取,那么该锁就进入偏向锁模式,当线程执行完毕后该锁还未被其他线程获取,则同一个线程再次请求该锁时,无需做任何同步操作。 -(2)轻量级锁:偏向锁不适用与锁竞争激烈的场合,因此当第二个线程加入锁争用的时候,偏向锁就会升级为轻量级锁。 -线程会通过自旋的方式尝试获取轻量级锁,该操作不会阻塞,性能比起重量级锁有显著提升。但是在锁争用激烈的情况下,线程会一直自旋,如果没有一直获取到锁,就会导致性能比起重量级锁还低。 -(3)重量级锁:当自旋的线程自旋到达一定次数还没获取到锁,就会进入阻塞状态,该锁升级为重量级锁,它会使其他线程阻塞,导致性能降低。 +### synchronized的优化 +在Java6之前,synchronized是依赖操作系统内部的互斥锁实现的,因此需要进行用户态到内核态的转换,因此早期的synchronized是一个开销较大的操作。而现代JDK中,对synchronized提供了三种优化机制,大大提高了其性能。所谓锁的升、降级也就是JVM对synchronized运行机制的优化,当JVM检测到不同的竞争环境时,会切换到合适的锁实现,这种切换就是锁的升、降级。下面synchronized的三种不同模式: +1. 偏向锁:在大多数情况下,锁不存在多线程竞争,总是由同一线程多次获得。因此为了减少同一个线程多次获取锁导致的性能开销,引入了偏向锁。 +如果某个对象的锁被一个线程获取,那么该锁就进入偏向锁模式,当线程执行完毕后该锁还未被其他线程获取,依旧由同一个线程再次请求该锁时,无需做任何同步操作。偏向锁不适用与锁竞争激烈的场合,因此当第二个线程加入锁争用的时候,偏向锁就会升级为轻量级锁。 +2. 轻量级锁:线程会通过自旋的方式尝试获取轻量级锁,该操作不会阻塞,性能比起重量级锁有显著提升。但是在锁争用激烈的情况下,线程会一直自旋,如果没有一直获取到锁,就会导致性能比起重量级锁还低。 +3. 重量级锁:当自旋的线程自旋到达一定次数还没获取到锁,就会进入阻塞状态,该锁升级为重量级锁,它会使其他线程阻塞,导致性能降低。 + +#### 自旋锁 +自旋锁采用让当前线程不停循环,当循环条件被其他线程改变时,才能进入临界区。由于自旋锁只是使当前线程不断执行循环体,不涉及上下文切换,因此响应更快,但如果锁争用激烈,线程循环了很久也没有获取到锁,是非常消耗CPU的,为此引入了自适应自旋锁。 + +需要注意的是,在单核CPU上,自旋锁是无效的,因为自旋锁尝试获取锁不成功会一直自旋,也就是一直占用着CPU,这就意味着在单核CPU上,其他线程是不可能被分配到执行时间片的。 ### 总结 (1)synchronized在编译时会在同步块前后生成monitorenter和monitorexit指令。 @@ -101,4 +146,16 @@ JDK1.6之后,synchronized与ReenTrantLock的性能基本持平,而且虚拟 (4)synchronized是可重入锁 (5)synchronized是非公平锁 (6)synchronized同时保证原子、可见、有序性 - (7)synchronized有偏向锁、轻量级锁、重量级锁三种模式 \ No newline at end of file + (7)synchronized有偏向锁、轻量级锁、重量级锁三种模式 + + + + + + + + + + + + \ No newline at end of file diff --git a/week_04/25/ArrayBlockingQueue_25.md b/week_04/25/ArrayBlockingQueue_25.md new file mode 100644 index 0000000000000000000000000000000000000000..6d5d1e040182ca9b337648bd13305fa9391f22ac --- /dev/null +++ b/week_04/25/ArrayBlockingQueue_25.md @@ -0,0 +1,331 @@ +# ArrayBlockingQueue +ArrayBlockingQueue是juc包下以数组实现的线程安全的阻塞队列。 + +### 属性 +```java +//使用数组存储元素 +final Object[] items; + +//读取元素的指针 +int takeIndex; + +//放入元素的指针 +int putIndex; + +//队列中的元素数量 +int count; + +//使用ReenTrantLock保证并发安全 +final ReentrantLock lock; + +//非空条件 +private final Condition notEmpty; + +//非满条件 +private final Condition notFull; +``` + +### 构造方法 +在创建时必须指定队列大小,并且可以通过指定可重入锁是否公平。 +```java +/** + * 1. 通过指定容量创建队列 + */ +public ArrayBlockingQueue(int capacity) { + this(capacity, false); +} + +/** + * 2. 通过指定容量、可重入锁是否公平创建队列 + */ +public ArrayBlockingQueue(int capacity, boolean fair) { + if (capacity <= 0) + throw new IllegalArgumentException(); + this.items = new Object[capacity]; + //初始化可重入锁以及两个条件 + lock = new ReentrantLock(fair); + notEmpty = lock.newCondition(); + notFull = lock.newCondition(); +} +``` + +### 入队 +入队存在四个方法,分别是:offer(E e)、put(E e)、add(E e)、offer(E e, long timeout, TimeUnit unit) + +#### add(E e) +```java +//ArrayBlockingQueue.add(E e) +public boolean add(E e) { + //调用AbstractQueue的add方法 + return super.add(e); +} + +//AbstractQueue.add(E e) +public boolean add(E e) { + //调用offer(E e)方法, 执行成功return true, else抛出队列已满的异常 + if (offer(e)) + return true; + else + throw new IllegalStateException("Queue full"); +} +``` + +#### offer(E e) +```java +public boolean offer(E e) { + //元素不允许为空 + checkNotNull(e); + //加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //若数组已满return false + if (count == items.length) + return false; + else { + //若数组未满调用入队方法enqueue + enqueue(e); + return true; + } + } finally { + //释放锁 + lock.unlock(); + } +} + +private void enqueue(E x) { + //获取队列存储元素的数组 + final Object[] items = this.items; + //在入队指针位置赋值添加元素x + items[putIndex] = x; + //若入队指针指向数组下标位置+1处, 说明队列已满 + if (++putIndex == items.length) + //入队指针置为0 + putIndex = 0; + //队列元素个数++ + count++; + //唤醒notEmpty的条件, 使得正在阻塞且执行删除操作的线程得到被唤醒的信号 + notEmpty.signal(); +} +``` + +#### put(E e) +```java +public void put(E e) throws InterruptedException { + //入队元素不能为null + checkNotNull(e); + //加锁 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + //如果队列已满, 则阻塞当前线程等待, 等到不满的时候唤醒当前线程 + while (count == items.length) + notFull.await(); + enqueue(e); + } finally { + lock.unlock(); + } +} +``` +下面是一段测试样例,第一个线程执行到put(3)的时候会一直阻塞,直到1秒后第二个线程执行poll()后,第一个线程被唤醒执行剩下的put(3)操作。 +```java +public class Test{ + public static void main(String[] args) throws InterruptedException { + ArrayBlockingQueue arr = new ArrayBlockingQueue<>(2); + new Thread(() -> { + try { + arr.put(1); + arr.put(2); + arr.put(3); + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }).start(); + Thread.sleep(1000); + new Thread(() -> { + arr.poll(); + }).start(); + } +} +``` + +#### offer(E e, long timeout, TimeUnit unit) +```java +public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { + //入队元素不能为null + checkNotNull(e); + long nanos = unit.toNanos(timeout); + //加锁 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + //如果队列满了, 就阻塞nanos纳秒 + //如果在nanos纳秒后唤醒这个线程后,依然没有空间则返回false + while (count == items.length) { + if (nanos <= 0) + return false; + nanos = notFull.awaitNanos(nanos); + } + //入队 + enqueue(e); + return true; + } finally { + lock.unlock(); + } +} +``` +四者的区别在于: +(1)add(E e)时若队列满则抛出异常 +(2)offer(E e)时若队列满了则返回false +(3)put(E e)时如果队列已满, 则阻塞当前线程等待, 等到不满的时候唤醒当前线程 +(4)offer(E e, long timeout, TimeUnit unit)时若队列满了则等待一段时间, 若时间过了队列依然满就返回false +(5)利用入队指针循环使用数组存储元素 + +### 出队 +出队存在五个方法,分别是:remove()、remove(Object o)、poll()、take()、poll(long timeout, TimeUnit unit) + +#### remove() +```java +//AbstractQueue.remove() +public E remove() { + //调用poll()出队 + E x = poll(); + //若有元素出队就返回该元素 + if (x != null) + return x; + else//否则抛出异常 + throw new NoSuchElementException(); +} +``` + +#### remove(Object o) +```java +public boolean remove(Object o) { + //不允许remove空值 + if (o == null) return false; + //获取队列存储元素的数组 + final Object[] items = this.items; + //加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //若队列中存在元素 + if (count > 0) { + //获取入队指针 + final int putIndex = this.putIndex; + //获取读取元素的指针 + int i = takeIndex; + //注意:这里必须使用do-while循环, 因为如果数组已经满了的话, putIndex指针会归0 + //那么takeIndex指针是从0开始的, 为了继续执行判断逻辑, 必须要通过do-while先使得takeIndex++ + //直到takeIndex == item.length时, takeIndex会归0, 与putIndex相等退出循环 + do {//当读取指针还未等于入队指针时, 判断删除元素是否等于读取指针指向的元素 + if (o.equals(items[i])) { + //如果相同, 则删除后return true + removeAt(i); + return true; + } + //若读取指针自增到数组长度值, 说明已经完成所有遍历, 读取指针归0循环利用 + if (++i == items.length) + i = 0; + } while (i != putIndex); + } + //若队列不存在元素, return false + return false; + } finally { + //释放锁 + lock.unlock(); + } +} +``` + +### poll() +```java +public E poll() { + //获取锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //若队列不存在元素返回null, 否则出队 + return (count == 0) ? null : dequeue(); + } finally { + //释放锁 + lock.unlock(); + } +} +``` + +### take() +```java +public E take() throws InterruptedException { + /加锁 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + //若队列不存在元素, 则阻塞当前线程, 直到其他线程在队列中添加了元素, 当前线程被唤醒 + while (count == 0) + notEmpty.await(); + //执行出队操作 + return dequeue(); + } finally { + lock.unlock(); + } +} +``` + +### E poll(long timeout, TimeUnit unit) +```java +public E poll(long timeout, TimeUnit unit) throws InterruptedException { + long nanos = unit.toNanos(timeout); + //加锁 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + //当队列不存在元素, 则阻塞当前线程nanos纳秒, 若到达时间后还未有其他线程往队列中添加元素, 那么return null + while (count == 0) { + if (nanos <= 0) + return null; + nanos = notEmpty.awaitNanos(nanos); + } + //执行出队操作 + return dequeue(); + } finally { + lock.unlock(); + } +} +``` + +#### dequeue() +```java +private E dequeue() { + // assert lock.getHoldCount() == 1; + // assert items[takeIndex] != null; + //获取队列存储元素的数组 + final Object[] items = this.items; + //获取takeIndex指针指向的元素 + @SuppressWarnings("unchecked") + E x = (E) items[takeIndex]; + //将takeIndex位置置为空 + items[takeIndex] = null; + //takeIndex自增直到指向数组末尾后归0 + if (++takeIndex == items.length) + takeIndex = 0; + //数组中的元素个数-1 + count--; + if (itrs != null) + itrs.elementDequeued(); + //唤醒notFull条件, 使得正在阻塞且执行添加操作的线程得到被唤醒的信号 + notFull.signal(); + return x; +} +``` +(1)remove()时, 如果队列为空则抛出异常 +(2)poll()时, 如果队列为空则返回null +(3)take()时, 如果队列为空, 则线程阻塞等待条件notEmpty满足后被唤醒, 继续执行出队操作 +(4)poll(), take的计时器版本 +(5)利用取指针takeIndex循环从数组中去除原宿 + +### 总结 +(1)ArrayBlockingQueue不需要扩容,因为是初始化时指定容量,并且会通过putIndex、takeIndex循环利用数组 +(2)利用重入锁和两个条件notEmpty、notFull保证并发安全 +(3)由于put或take方法会无限阻塞,若消费或生产的速度不一致,就会导致阻塞线程越积越多 \ No newline at end of file diff --git a/week_04/25/ConcurrentHashMap_25.md b/week_04/25/ConcurrentHashMap_25.md new file mode 100644 index 0000000000000000000000000000000000000000..8d21a56c272ad095ebd61094da29afc7259e519c --- /dev/null +++ b/week_04/25/ConcurrentHashMap_25.md @@ -0,0 +1,160 @@ +# ConcurrentHashMap +ConcurrentHashMap是HashMap的线程安全版本,在JDK1.7时采用分段数组Segment+链表实现,每一把锁只锁容器中的其中一部分数据,若多个线程访问的是不同的Segment,也就不存在锁竞争;在JDK1.8之后采用了数组+链表/红黑树实现,通过CAS+synchronized实现并发控制,以桶数组的节点数为单位,锁的粒度更加精细。 + +在JDK1.8版本,ConcurrentHashMap总体结构上与HashMap非常相似,其内部仍然存在Segment定义,但仅仅是为了保证序列化时的兼容性,不再有任何结构上的用处。因为不再使用Segment,初始化操作则与HashMap相同,即Lazy-load机制,这样可以有效避免初始开销。数据存储通过volatile和CAS操作保证线程安全。 + +### sizeCtl +sizeCtl是ConcurrentHashMap的一个volatile属性, 用于Map扩容时信号标识量, 当其值为-1表示正在初始化, 当值为-n表示有n-1个线程正在进行扩容操作, 当值为0表示第一次初始化, 当值为n表示扩容门槛, 若map的size到达扩容门槛时会执行扩容操作, 默认会扩容为原长度的2倍。 + +#### put +(1)若桶数组未初始化,则初始化 +(2)若待插入节点所在的桶为空,则自旋插入newNode +(3)若map正在扩容,则当前线程也加入到扩容过程 +(4)若待插入节点所在的桶不为空,且不存在迁移元素,则对该桶加锁,执行并发安全的put操作 +(5)若该桶的头节点是链表节点,那么遍历链表执行插入或更新操作 +(6)若该桶的头节点是树节点,那么遍历红黑树执行插入或更新操作 +(7)在插入或更新完成后,观察是否需要对桶进行树化 +(8)成功更新,return oldVal +(9)成功插入,map元素个数+1,并检查是否需要扩容 +```java +public V put(K key, V value) { + return putVal(key, value, false); +} + +final V putVal(K key, V value, boolean onlyIfAbsent) { + //若key或value为空, 则抛出空指针异常 + if (key == null || value == null) throw new NullPointerException(); + //计算hash值 + int hash = spread(key.hashCode()); + //用于记录插入元素所在桶的节点个数 + int binCount = 0; + //死循环, 结合CAS进行自旋操作, 若CAS操作失败, 则会重新获取桶进行下面的流程 + for (Node[] tab = table;;) { + Node f; + int n, i, fh; + //若桶未被初始化或者桶的长度为0, 则初始化桶 + if (tab == null || (n = tab.length) == 0) + tab = initTable(); + //若要put的节点所在桶还没有元素, 则把这个节点插入到这个桶 + else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { + //使用CAS插入元素, 如果插入失败, 则自旋重新插入 + if (casTabAt(tab, i, null, new Node(hash, key, value, null))) + //如果插入成功, 结束本次循环 + break;// no lock when adding to empty bin + } + //若要put的节点所在桶的第一个元素的hash值是MOVED, 则让当前线程帮忙迁移元素 + else if ((fh = f.hash) == MOVED) + tab = helpTransfer(tab, f); + else { + V oldVal = null; + //若这个桶不为空且不存在需要迁移的元素, 则锁住这个桶(分段锁) + synchronized (f) { + //再次检测桶的第一个元素是否有变化, 如果变化则进行下一次自旋 + if (tabAt(tab, i) == f) { + //若桶中头节点的hash值大于0, 说明不需要迁移, 也不是树节点 + if (fh >= 0) { + //桶中元素个数赋值为1 + binCount = 1; + //遍历整个桶, 每次结束后binCount++ + for (Node e = f;; ++binCount) { + K ek; + //遍历桶找到了相同key值的节点, 那么替换相同key节点的value值 + if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { + //当put为更新操作时, 记录旧值 + oldVal = e.val; + //若找到了相同key值的节点, 进行更新操作 + if (!onlyIfAbsent) + e.val = value; + //成功更新后break + break; + } + Node pred = e; + //若遍历到链表尾部还没有找到相同key的节点, 那么就插入到链表结尾并break + if ((e = e.next) == null) { + pred.next = new Node(hash, key, value, null); + break; + } + } + } + //若桶中第一个节点是树节点 + else if (f instanceof TreeBin) { + Node p; + binCount = 2; + //则调用红黑树的插入方式插入newNode + if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { + oldVal = p.val; + //若找到了相同key节点, 则赋予新值 + if (!onlyIfAbsent) + p.val = value; + } + } + } + } + //若binCount不为0, 说明成功插入或更新 + if (binCount != 0) { + //判断插入后是否需要树化 + if (binCount >= TREEIFY_THRESHOLD) + treeifyBin(tab, i); + //若oldValue不为空的情况下, 则表示为更新操作, 返回旧值 + if (oldVal != null) + return oldVal; + //结束自旋 + break; + } + } + } + //成功插入, map元素个数+1, 并检查是否需要扩容 + addCount(1L, binCount); + //若插入元素则返回null + return null; +} +``` + +### initTable() 初始化桶数组 +(1)使用volatile字段sizeCtl+CAS操作保证当前只有一个线程初始化桶数组 +(2)sizeCtl在初始化操作后存储为扩容门槛 +(3)扩容门槛写死为桶数组大小的0.75倍 +```java +/** + * Initializes table, using the size recorded in sizeCtl. + */ +private final Node[] initTable() { + Node[] tab; + int sc; + while ((tab = table) == null || tab.length == 0) { + //若sizeCtl小于0, 说明该map正在进行初始化或扩容 + if ((sc = sizeCtl) < 0) + //若sizeCtl小于-1, 说明存在多个线程在初始化或扩容, 那么当前线程调用yield让出cpu + //若sizeCtl等于-1, 说明只有当前线程正在自旋初始化, 当前线程线程调用yield后还是由当前线程继续执行扩容操作。 + Thread.yield(); // lost initialization race; just spin + //Unsafe.CAS操作, 参数分别为:this对象、SIZECTL的偏移量、 更新时偏移量处的预期值、更新的值 + //若sizeCtl原子更新为-1成功, 告知其他线程当前线程进入初始化; 若原子更新失败, 则自旋直到table.length!=0 + else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { + try { + //检查table是否为空 + if ((tab = table) == null || tab.length == 0) { + //若sc == 0, 则代表是第一次初始化, 使用默认值16进行初始化 + int n = (sc > 0) ? sc : DEFAULT_CAPACITY; + //新建Node数组 + @SuppressWarnings("unchecked") + Node[] nt = (Node[])new Node[n]; + //赋值给table桶数组 + table = tab = nt; + //设置sc = 数组长度的0.75倍 + sc = n - (n >>> 2); + } + } finally { + //将sc赋值给sizeCtl, 这里存储的是扩容门槛 + sizeCtl = sc; + } + break; + } + } + return tab; +} +``` + +### HashMap、HashTable、ConcurrentHashMap三者区别 +1. HashMap线程不安全, 底层实现为数组+链表+红黑树 +2. HashTable线程安全, 底层实现为数组+链表, 采用全局锁 +3. ConcurrentHashMap, 底层实现为数组+链表+红黑树, 采用分段锁, 锁的是位桶数组的每一个桶, 以此对锁进行细粒度化, 在更新时采用自旋CAS操作+synchronized, 性能高效。 \ No newline at end of file diff --git a/week_04/25/ConcurrentLinkedQueue_25.md b/week_04/25/ConcurrentLinkedQueue_25.md new file mode 100644 index 0000000000000000000000000000000000000000..8899dfc5358bd073b17902408a7a8ba03e7faf0b --- /dev/null +++ b/week_04/25/ConcurrentLinkedQueue_25.md @@ -0,0 +1,179 @@ +# ConcurrentLinkedQueue +ConcurrentLinkedQueue只实现了Queue接口,并没有实现BlockingQueue接口,因此它不是阻塞队列,也不能用于线程池中,但是它是线程安全的,可用于多线程环境中。 + +### 主要属性 +```java +//头节点 +private transient volatile Node head; + +//尾结点 +private transient volatile Node tail; +``` + +### 节点内部类 +```java +private static class Node { + volatile E item; + volatile Node next; + + //通过Unsafe提供的CAS更新进行节点初始化 + //参数分别是this节点对象、itemOffset偏移地址、初始化的data值 + //它会通过直接在节点对象的item偏移地址上直接写入data值 + Node(E item) { + UNSAFE.putObject(this, itemOffset, item); + } + + //替换item值, 同样的也是通过Unsafe提供的CAS操作实现, 核心思想是比较期望值和初始值是否相同 + boolean casItem(E cmp, E val) { + return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); + } + + //设置next指针指向的节点 + //同样是通过Unsafe直接在内存偏移地址写入值实现 + void lazySetNext(Node val) { + UNSAFE.putOrderedObject(this, nextOffset, val); + } + + //与casItem类似的方式更新 + boolean casNext(Node cmp, Node val) { + return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); + } + + //初始化Unsafe以及相关属性 + private static final sun.misc.Unsafe UNSAFE; + private static final long itemOffset; + private static final long nextOffset; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class k = Node.class; + itemOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("item")); + nextOffset = UNSAFE.objectFieldOffset + (k.getDeclaredField("next")); + } catch (Exception e) { + throw new Error(e); + } + } +} +``` + +### 构造方法 +```java +//1. 默认构造方法,初始化了一个空节点作为头尾节点 +public ConcurrentLinkedQueue() { + head = tail = new Node(null); +} + +//2. 通过传入容器初始化 +public ConcurrentLinkedQueue(Collection c) { + Node h = null, t = null; + //遍历集合c的所有元素 + for (E e : c) { + //不允许值为空 + checkNotNull(e); + //将元素转化为Node + Node newNode = new Node(e); + //如果头节点为空, 说明队列为空, 将newNode置为头、尾节点 + if (h == null) + h = t = newNode; + else { + //否则将newNode置为currNode节点的next节点 + t.lazySetNext(newNode); + //将newNode置为当前节点 + t = newNode; + } + } + //若转换结束后, 头节点依旧为null, 说明集合c不存在元素 + if (h == null) + //那么以默认构造方法的方式创建集合 + h = t = new Node(null); + head = h; + tail = t; +} +``` + +### 入队 +因为不是阻塞队列,因此只存在两个入队方法,分别是:add()、offer(),因为ConcurrentLinkedQueue无界的特性,add方法也不需要抛出越界的异常 +```java +public boolean add(E e) { + return offer(e); +} + +public boolean offer(E e) { + //检查入队元素e是否为空 + checkNotNull(e); + //创建newNode + final Node newNode = new Node(e); + //入队到链表尾部 + for (Node t = tail, p = t;;) { + Node q = p.next; + //如果q==null, 说明已经指向链表尾部, 入队 + if (q == null) { + //调用Node.casNext设置尾节点next指向的节点为newNode + if (p.casNext(null, newNode)) { + //如果p!=t, 说明可能有其他线程先一步更新了tail + if (p != t) // hop two nodes at a time + //此时把tail节点更新为最新 + casTail(t, newNode); // Failure is OK. + //返回入队成功 + return true; + } + // Lost CAS race to another thread; re-read next + } + else if (p == q) + //如果p.next==q, 说明p已经出队 + //重新设置p的值 + p = (t != (t = tail)) ? t : head; + else + //t后面还有值, 重新设置p的值 + p = (p != t && t != (t = tail)) ? t : q; + } +} +``` +整个入队流程就是: +(1)定位到链表尾部,尝试把新节点放在tail.next位置 +(2)如果在入队过程中, 发现其他线程已经先一步更新了尾结点, 那么重新获取最新的尾结点,继续尝试放入 + +### 出队 +同样的,因为非阻塞队列,出队方法只存在remove、poll +```java +public E poll() { + restartFromHead: + for (;;) { + //尝试弹出链表的头节点, 如果失败就继续循环 + for (Node h = head, p = h, q;;) { + E item = p.item; + if (item != null && p.casItem(item, null)) { + if (p != h) // hop two nodes at a time + updateHead(h, ((q = p.next) != null) ? q : p); + return item; + } + else if ((q = p.next) == null) { + updateHead(h, p); + return null; + } + else if (p == q) + continue restartFromHead; + else + p = q; + } + } +} + +//更新头节点的方法 +final void updateHead(Node h, Node p) { + //原子更新头节点为p成功后, 延迟更新头节点的next为它自己 + if (h != p && casHead(h, p)) + h.lazySetNext(h); +} +``` +整个出队流程就是: +(1)定位到头节点,尝试更新值为null +(2)如果更新成功就成功出队 +(3)如果失败了获取头节点已经被其他线程先一步更新,重试 + +### 总结 +(1)ConcurrentLinkedQueue非阻塞队列 +(2)ConcurrentLinkedQueue使用CAS+自旋的方式更新头节点控制出队入队的操作。 + diff --git a/week_04/25/CopyOnWriteArrayList_25.md b/week_04/25/CopyOnWriteArrayList_25.md new file mode 100644 index 0000000000000000000000000000000000000000..87dac17887122ec7d7627b1220c16b9245c38b93 --- /dev/null +++ b/week_04/25/CopyOnWriteArrayList_25.md @@ -0,0 +1,244 @@ +# CopyOnWriteArrayList +CopyOnWriteArrayList是ArrayList的线程安全版本,内部也是通过Object数组实现,每一次对数组的写操作都是通过拷贝一份新数组进行修改,修改完成后再替换老的数组,这样保证了并发场景下只阻塞写操作而不会阻塞读操作。 +```java +public class CopyOnWriteArrayList + implements List, RandomAccess, Cloneable, java.io.Serializable { +``` +(1)实现了List接口,提供了List基础的添加、删除、遍历等操作 +(2)实现了RandomAccess,标识CopyOnWriteArrayList具备随机访问的能力 +(3)实现了Cloneable,标识CopyOnWriteArrayList可以被克隆 +(4)实现了Serializable,标识CopyOnWriteArrayList可以被序列化 + +### 属性 +```java +//底层采用ReenTrantLock进行修改时的加锁 +final transient ReentrantLock lock = new ReentrantLock(); + +//真正存储元素的数组, 用volatile声明保证其可见性, 只能通过getArray()/setArray()访问 +private transient volatile Object[] array; +``` + +### 构造方法 +```java +/** + * 1. 默认构造:创建一个空数组 + */ +public CopyOnWriteArrayList() { + setArray(new Object[0]); +} + +/** + * 2. 传入集合c进行构造 + */ +public CopyOnWriteArrayList(Collection c) { + Object[] elements; + //若c是CopyOnWriteArrayList类型的, 直接将它的数组赋值给当前数组 + if (c.getClass() == CopyOnWriteArrayList.class) + elements = ((CopyOnWriteArrayList)c).getArray(); + //如果不是, 则进行拷贝 + else { + + elements = c.toArray(); + // c.toArray或许不能正确地转换为Object数组 + if (elements.getClass() != Object[].class) + //通过数组拷贝进行赋值操作 + elements = Arrays.copyOf(elements, elements.length, Object[].class); + } + setArray(elements); +} + +/** + * 3. 传入数组进行构造 + */ +public CopyOnWriteArrayList(E[] toCopyIn) { + //通过数组拷贝进行赋值操作 + setArray(Arrays.copyOf(toCopyIn, toCopyIn.length, Object[].class)); +} +``` + +### add(E e) 添加元素到末尾 +(1)加锁 +(2)获取旧数组,并通过旧数组创建新数组,长度为旧数组长度+1 +(3)新数组末尾赋值为待添加元素 +(4)执行完毕后释放锁 +```java +public boolean add(E e) { + //获取锁 + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + //获取旧数组 + Object[] elements = getArray(); + //获取旧数组长度 + int len = elements.length; + //创建新数组, 值为旧数组长度加一的拷贝 + Object[] newElements = Arrays.copyOf(elements, len + 1); + //新数组末尾添加上值e + newElements[len] = e; + //将新数组设置为底层存储元素的array + setArray(newElements); + //成功执行到此, 返回true + return true; + } finally { + //无论失败与否, 释放锁 + lock.unlock(); + } +} +``` + +### add(int index, E e) 指定index添加元素 +(1)加锁 +(2)获取旧数组 +(3)判断是否越界 +(4)创建新数组,长度为旧数组的长度+1 +(5)如果是在尾部插入,新数组末尾赋值为待添加元素即可 +(6)如果是在其他位置插入,先将插入位置的后序元素全部右移之后,在插入位置赋值 +(7)执行完毕释放锁 +```java +/** + * Inserts the specified element at the specified position in this + * list. Shifts the element currently at that position (if any) and + * any subsequent elements to the right (adds one to their indices). + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ +public void add(int index, E element) { + //获取锁 + final ReentrantLock lock = this.lock; + //加锁 + lock.lock(); + try { + //获取到旧数组 + Object[] elements = getArray(); + //获取旧数组的长度 + int len = elements.length; + //判断是否越界 + if (index > len || index < 0) + throw new IndexOutOfBoundsException("Index: "+index+ + ", Size: "+len); + //创建新数组 + Object[] newElements; + //插入之后, 要右移的元素个数 + int numMoved = len - index; + //如果是在尾部插入 + if (numMoved == 0) + //直接通过数组拷贝, 然后末尾赋值即可 + newElements = Arrays.copyOf(elements, len + 1); + else { + //如果插入位置不是尾部, 那么新建一个长度为len+1的数组 + newElements = new Object[len + 1]; + //将旧数组拷贝到新数组 + System.arraycopy(elements, 0, newElements, 0, index); + //将插入元素位置的右边所有元素通过数组拷贝右移一位 + System.arraycopy(elements, index, newElements, index + 1, numMoved); + } + //插入元素位置赋值 + newElements[index] = element; + //将新数组设置为底层存储元素的array + setArray(newElements); + } finally { + //释放锁 + lock.unlock(); + } +} +``` + +### get(int index) 获取指定索引位置的元素 +(1)先通过getArray()获取到底层存储元素的数组 +(2)再从该数组中返回index位置的元素 +```java +public E get(int index) { + //get()底层是通过getArray()获取数组 + return get(getArray(), index); +} + +final Object[] getArray() { + return array; +} + +@SuppressWarnings("unchecked") +private E get(Object[] a, int index) { + return (E) a[index]; +} +``` + +### remove(int index) 删除指定索引位置的元素 +(1)加锁 +(2)获取待删除元素的值 +(3)如果待删除元素是末尾元素,那么直接通过拷贝一份旧数组长度-1的新数组返回即可 +(4)如果不是末尾元素,那么先拷贝[0, index-1]部分, 再拷贝[index+1, oldArray.len-1], 覆盖掉待删除元素 +(5)将新数组赋值给底层数组 +(6)解锁并返回旧值 +```java +/** + * Removes the element at the specified position in this list. + * Shifts any subsequent elements to the left (subtracts one from their + * indices). Returns the element that was removed from the list. + * + * @throws IndexOutOfBoundsException {@inheritDoc} + */ +public E remove(int index) { + //获取锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //获取旧数组 + Object[] elements = getArray(); + int len = elements.length; + //获取待删除位置的旧址 + E oldValue = get(elements, index); + //删除元素后, 后序需要左移的元素个数 + int numMoved = len - index - 1; + if (numMoved == 0) + //如果删除的是末尾节点, 那么将底层数组array赋值为旧数组长度-1的拷贝即可 + setArray(Arrays.copyOf(elements, len - 1)); + else { + //否则新建一个数组, 长度为旧数组的长度-1 + Object[] newElements = new Object[len - 1]; + //将旧数组[0, index - 1]部分的元素拷贝到新数组 + System.arraycopy(elements, 0, newElements, 0, index); + //将旧数组[index + 1, oldArray.len - 1]部分的元素以index为起始位置拷贝到新数组 + System.arraycopy(elements, index + 1, newElements, index, numMoved); + setArray(newElements); + } + return oldValue; + } finally { + lock.unlock(); + } +} +``` + +### 总结 +(1)CopyOnWriteArrayList通过ReenTrantLock加锁,以此保证线程安全 +(2)CopyOnWriteArrayList的写操作都需要通过原数组拷贝一份新数组,在新数组上作修改后再赋值给原数组,因此每次写操作都涉及到新建数组,时间复杂度为O(n) +(3)CopyOnWriteArrayList采用读写分离的思想,读操作不加锁,通过volatile保证可见性;写操作加锁,且写操作会占用较大的内存空间,因此适合读多写少的场景 +(4)CopyOnWriteArrayList只能保证最终一致性,不能保证实时一致性 + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/week_04/25/DelayQueue_25.md b/week_04/25/DelayQueue_25.md new file mode 100644 index 0000000000000000000000000000000000000000..02493afa29348004af4a058f23ce582c40c0bf36 --- /dev/null +++ b/week_04/25/DelayQueue_25.md @@ -0,0 +1,126 @@ +# DelayQueue +DelayQueue是java并发包下的延时阻塞队列,常用于实现定时任务。 + +```java +public class DelayQueue extends AbstractQueue implements BlockingQueue { +``` + +(1)继承自AbstractQueue,具备Queue的所有基本实现 +(2)实现了BlockingQueue接口,具备阻塞队列的所有特性 +(3)组合了Delayed接口,DelayQueue中存储的所有元素都必须实现这个接口 + +```java +public interface Delayed extends Comparable { + //获取剩余的时间, 如果时间到期应当返回小于等于0的数值 + long getDelay(TimeUnit unit); +} +``` + +### 基本使用 + + +### 属性 +延时队列主要使用优先队列实现,并通过可重入锁以及条件来控制并发安全。 +```java +//用于控制并发的锁 +private final transient ReentrantLock lock = new ReentrantLock(); + +//优先队列 +private final PriorityQueue q = new PriorityQueue(); + +//用于标记当前是否有线程在排队(仅用于获取元素时) +private Thread leader = null; + +//条件, 用于标识当前是否存在可获取元素 +private final Condition available = lock.newCondition(); +``` + +### 构造方法 +```java +/** + * 默认构造 + */ +public DelayQueue() {} + +/** + * 初始化添加集合c中所有元素的构造方法 + */ +public DelayQueue(Collection c) { + this.addAll(c); +} +``` + +### 入队 +因为DelayQueue底层通过优先队列实现,且优先队列是无界的,因此入队不会阻塞超时,因此他的四个入队方法都是一样的。 + + +### 出队 + +#### poll() +```java +public E poll() { + //加锁 + final ReentrantLock lock = this.lock; + lock.lock(); + try { + //获取队列头元素(即将出队的元素) + E first = q.peek(); + //如果头元素为空或延时还没到期, return null + if (first == null || first.getDelay(NANOSECONDS) > 0) + return null; + else + //如果头元素到期了就调用优先队列的poll()弹出第一个元素 + return q.poll(); + } finally { + //释放锁 + lock.unlock(); + } +} +``` + +### take() +```java +public E take() throws InterruptedException { + //加锁 + final ReentrantLock lock = this.lock; + lock.lockInterruptibly(); + try { + for (;;) { + //获取队列头元素 + E first = q.peek(); + //若头元素为空, 说明队列还没有元素, 阻塞当前线程等待其他线程添加元素到队列 + if (first == null) + available.await(); + else { + //获取头元素的到期时间 + long delay = first.getDelay(NANOSECONDS); + //若已经到期, 头元素出队 + if (delay <= 0) + return q.poll(); + //help GC + first = null; // don't retain ref while waiting + //若前面还有其他线程在等待, 直接进入等待 + if (leader != null) + available.await(); + else { + //当前面不存在等待线程, 则获取到当前线程并把值赋给leader + Thread thisThread = Thread.currentThread(); + leader = thisThread; + try { + //等待delay时间后被唤醒 + available.awaitNanos(delay); + } finally { + if (leader == thisThread) + leader = null; + } + } + } + } + } finally { + //成功出队后,如果leader为空且堆顶还有元素,就唤醒下一个等待的线程 + if (leader == null && q.peek() != null) + available.signal(); + lock.unlock(); + } +} +```