diff --git a/week_02/05/04-Unsafe.md b/week_02/05/04-Unsafe.md new file mode 100644 index 0000000000000000000000000000000000000000..f28bac1358fa73630e98961192266b351114295a --- /dev/null +++ b/week_02/05/04-Unsafe.md @@ -0,0 +1,193 @@ +# Unsafe 源码分析 + +## TOP 带着问题看源码 + +1. 如何获取 Unsafe 实例 +2. 如何利用 Unsafe API 绕开 JVM的控制 +3. CAS 到底是什么 +4. Unsafe 中的线程调度是怎么回事 + +## 1. 基本介绍 + +Unsafe 是用于在实质上扩展 Java 语言表达能力、便于在 Java 代码里实现原本要在 C 层实现的核心库功能用的。这些功能包括裸内存的申请、释放、访问,低层硬件的 atomic/volatile 支持,创建未初始化对象等。但由于 Unsafe 类使 Java 语言拥有不应该暴露的骚操作,增加了程序出问题的风险。 + +### 1.1 获取 Unsafe 实例 + +```java +public class UnsafeTest { + private static Unsafe unsafe; + public static void main(String[] args) throws Exception { + Class c = UnsafeTest.class.getClassLoader().loadClass("sun.misc.Unsafe"); + Field f = c.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe)f.get(c); + + unsafe.xx(); + } +} +``` + +回到 **TOP 1** 可以明白,通过反射获取 unsafe 实例。 + +## 2. 功能介绍 + + + +## 3. 数组相关 + +```java +// 返回数组中第一个元素的偏移地址 +public native int arrayBaseOffset(Class var1); +// 返回数组中一个元素占用的大小 +public native int arrayIndexScale(Class var1); +``` + +通过定位数组第一个元素的偏移地址和每个元素占用的大小。 + +例如第一个元素偏移地址是16,存的是 int 类型,则可以通过要查询的 index * 4 + 16 来获取到对应的值。 + +我们可以在 AtomicIntegerArray 中看到这些操作,不过作者巧妙的通过位运算来计算index对应的偏移量。 + +```java +// 1. first index 偏移量 +private static final int base = unsafe.arrayBaseOffset(int[].class); +// 2. scale = 4; +int scale = unsafe.arrayIndexScale(int[].class); +// 3. 计算 scale 二进制后面有几个0,如scale = 4(0100),shift = 2 +shift = 31 - Integer.numberOfLeadingZeros(scale); +// 4. 根据index对scale进行乘法运算获取偏移量 offset,如index = 1,offset = 4(1 << 2) + 16 = 20 +offset = index << shift + base; +// 5. 通过 offset 原子的获取对应的值 +unsafe.getIntVolatile(array, offset); +``` + +## 4. 内存屏障 + +```java +// 内存屏障,禁止 load 操作重排序 +public native void loadFence(); +// 内存屏障,禁止 store 操作重排序 +public native void storeFence(); +// 内存屏障,禁止 load、store 操作重排序 +public native void fullFence(); +``` + +内存屏障主要是避免 CPU 或者 编译器对代码重排序。 + +如并发包中 StampedLock 解决因代码重排序校验不准确,采用loadFence()。 + +```java +public boolean validate(long stamp) { + U.loadFence(); + return (stamp & SBITS) == (state & SBITS); +} +``` + +## 5. 系统相关 + +```java +// 获取系统指针的大小, 64 位是8 +public native int addressSize(); +// 获取内存页大小,2的幂次方,我本机测试是4096 +public native int pageSize(); +``` + +可以根据内存页大小计算分配页数 + +## 6. 线程调度 + +```java +// 取消阻塞 +public native void unpark(Object var1); +// 阻塞直到超时或中断等条件 +public native void park(boolean var1, long var2); +// 弃用,获取对象锁 +public native void monitorEnter(Object var1); +// 弃用,释放对象锁 +public native void monitorExit(Object var1); +// 弃用,尝试获取对象锁 +public native boolean tryMonitorEnter(Object var1); +``` + +大名鼎鼎的 AQS 就是通过 park、unpark 来对线程阻塞和唤醒的 + +回到 **TOP 4** 可以明白其实就是 park unpark + +## 7. 内存操作 + +```java +// 内存分配,相当于c++的os::malloc +public native long allocateMemory(long var1); +// 扩容内存 +public native long reallocateMemory(long var1, long var3); +// 给定的内存块中设置值 +public native void setMemory(Object var1, long var2, long var4, byte var6); +// 内存拷贝 +public native void copyMemory(Object var1, long var2, Object var4, long var5, long var7); +// 释放内存,相当于c++的os::free +public native void freeMemory(long var1); +// 获取给定地址的XX类型的值 +public native byte getXx(long var1); +// 为给定地址设置XX类型的值 +public native void putXx(long var1, xx var3); +``` + +以上的内存操作针对的都是堆外内存操作,与我们平时自己创建的对象都在堆内不同,堆外不会受到 JVM 内存管理,合理使用可以减少原本堆内内存使 GC 时间减少。 + +`java.nio.DirectByteBuffer` 中利用了堆外内存减少堆内堆外的copy + +回到 **TOP 2** 可以明白使用堆外内存操作可以绕开 JVM 控制 + +## 8. CAS(Compare And Swap, 比较和替换) + +```java +// 根据第二个参数”偏移量”去拿偏移量这么多的属性的值和第三个参数对比,如果相同则将该属性值替换为第四个参数。该偏移量是指某个字段相对Java对象的起始位置的偏移量,可以通过unsafe.objectFieldOffset(param)去获取对应属性的偏移量。 +public final native boolean compareAndSwapXx(Object var1, long var2, Xx var4, Xx var5); +``` + +CAS 是一条 CPU 的原子指令(cmpxchg),如果是多核处理器会加上 LOCK 前缀 + +```c++ +inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) { + int mp = os::is_MP(); + __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)" + : "=a" (exchange_value) + : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp) + : "cc", "memory"); + return exchange_value; +} +``` + +CAS 在并发包中被广泛应用,回到 **TOP 3** 可以明白 CAS 是一条 CPU 的原子指令。 + +## 9. Class 相关 + +```java +// 获取静态字段的内存地址偏移量 +public native long staticFieldOffset(Field var1); +// 获取一个静态类中给定字段的对象指针 +public native Object staticFieldBase(Field var1); +// 判断是否需要初始化一个类,因为有可能类还没初始化却去获取静态属性 +public native boolean shouldBeInitialized(Class var1); +// 检测类是否已经初始化 +public native void ensureClassInitialized(Class var1); +// 定义一个类 +public native Class defineClass(String var1, byte[] var2, int var3, int var4, ClassLoader var5, ProtectionDomain var6); +// 定义一个匿名类 +public native Class defineAnonymousClass(Class var1, byte[] var2, Object[] var3); +``` + +## 10. 对象操作 + +```java +// 返回对象某个属性相对对象内存地址的偏移量 +public native long objectFieldOffset(Field var1); +// 从对象的指定偏移量处获取变量的引用,使用volatile的加载语义 +public native Object getObjectVolatile(Object o, long offset); +// 存储变量的引用到对象的指定的偏移量处,使用volatile的存储语义 +public native void putObjectVolatile(Object o, long offset, Object x); +// 有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即看到。只有在field被volatile修饰符修饰时有效 +public native void putOrderedObject(Object o, long offset, Object x); +// 绕过构造方法、初始化代码来创建对象 +public native Object allocateInstance(Class cls) throws InstantiationException; +``` \ No newline at end of file diff --git a/week_02/05/05-AtomicInteger.md b/week_02/05/05-AtomicInteger.md new file mode 100644 index 0000000000000000000000000000000000000000..b747cd0377bfc7f766311316f4a7cb7b4f8d6b05 --- /dev/null +++ b/week_02/05/05-AtomicInteger.md @@ -0,0 +1,65 @@ +# AtomicInteger 源码分析 + +## TOP 带着问题看源码 + +1. AtomicInteger 是怎么做到线程安全的 +2. AtomicInteger 是怎么实现自增的 + +## 1. 基本介绍 + +AtomicInteger 扩展了 Number,适用于基于数字的处理,并提供了如原子递增等,适合一些计数场景 + +```java +private static final Unsafe unsafe = Unsafe.getUnsafe(); +private static final long valueOffset; + +static { + try { + valueOffset = unsafe.objectFieldOffset + (AtomicInteger.class.getDeclaredField("value")); + } catch (Exception ex) { throw new Error(ex); } +} + +private volatile int value; +``` + +可以看到 value 是采用 volatile 修饰的,并通过 Unsafe 类获取 value 的偏移量,方便后续使用 CAS 操作 + +## 2. 自增 & 自减 + +```java +// 获取 & 自增 +public final int getAndIncrement() { + return unsafe.getAndAddInt(this, valueOffset, 1); +} +// 自增 & 获取 +public final int incrementAndGet() { + return unsafe.getAndAddInt(this, valueOffset, 1) + 1; +} +// 获取 & 自减 +public final int getAndDecrement() { + return unsafe.getAndAddInt(this, valueOffset, -1); +} +// 自减 & 获取 +public final int decrementAndGet() { + return unsafe.getAndAddInt(this, valueOffset, -1) - 1; +} +``` + +AtomicInteger 提供了自增/自减的两个场景方法,一个返回旧值,一个返回新增/自减后的。 + +实际都是通过Unsafe 的 getAndAddInt 方法来实现的,可以看到实际上 getAndAddInt 就是一个 cas + 自旋操作来实现。 + +```java +public final int getAndAddInt(Object var1, long var2, int var4) { + int var5; + do { + var5 = this.getIntVolatile(var1, var2); + } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4)); + + return var5; +} +``` + +回到 **TOP 问题1 2** 可以看到实际是采用 CAS + 自旋来实现线程安全的自增 + diff --git a/week_02/05/06-AtomicStampedReference.md b/week_02/05/06-AtomicStampedReference.md new file mode 100644 index 0000000000000000000000000000000000000000..d54794d9955a5b7b97eb4e56e8304674767db4a4 --- /dev/null +++ b/week_02/05/06-AtomicStampedReference.md @@ -0,0 +1,64 @@ +# AtomicStampedReference 源码分析 + +## TOP 带着问题看源码 + +1. CAS ABA 是什么 +2. AtomicStampedReference 是怎么解决 CAS ABA问题的 + +## 1. 基本介绍 + +AtomicStampedReference 是对 AtomicReference 的一个补充,解决了在 CAS 场景下 ABA 的问题 + +## 2. CAS ABA 是什么 + +从前面几篇分析,我们已经知道了 CAS 其实是一条 CPU 指令,作用是比较和替换,但是有可能 内存值原本是 A 然后变成 B 最后又变回了 A,这个时候 CAS 比较 A 发现是通过的(认为没有变化或者说是竞争),也就直接更新了,但是实际是有变化的。 + +一个解决思路就是加一个版本戳,每次更新变量同步更新一下版本号。这样就发现 1A != 3A,也就不会更新成功了。 + +回到 **TOP 1** 可以明白在并发情况下出现 ABA 的原因 + +## 3. 内部结构 + +```java +private static class Pair { + final T reference; + final int stamp; + private Pair(T reference, int stamp) { + this.reference = reference; + this.stamp = stamp; + } + static Pair of(T reference, int stamp) { + return new Pair(reference, stamp); + } +} + +private volatile Pair pair; +``` + +我们可以发现,AtomicStampedReference 对比 AtomicReference,全局维护的不是 T reference,而是 Pair。Pair 对象里多维护了一个 stamp 标识。 + +## 4. AtomicStampedReference 的 CAS + +```java +public boolean compareAndSet(V expectedReference, + V newReference, + int expectedStamp, + int newStamp) { + Pair current = pair; + return + // 1 引用没变 + expectedReference == current.reference && + // 2 版本号没变 + expectedStamp == current.stamp && + // 3 新引用等于旧引用 + ((newReference == current.reference && + // 4 新版本号等于旧版本号 + newStamp == current.stamp) || + // 5 构造 Pair 然后 cas + casPair(current, Pair.of(newReference, newStamp))); +} +``` + +可以看到,最后的 return 的逻辑很复杂,我们可以看到多了版本号的校验。 + +回到 **TOP 2** 可以明白多加一个维度来保存版本更新信息即可解决。 \ No newline at end of file diff --git a/week_02/05/07-LongAdder.md b/week_02/05/07-LongAdder.md new file mode 100644 index 0000000000000000000000000000000000000000..cfb5486be75339d51e29a0a24d852f2571ee4337 --- /dev/null +++ b/week_02/05/07-LongAdder.md @@ -0,0 +1,232 @@ +# LongAdder 源码分析 + +## TOP 带着问题看源码 + +1. 有了 AtomicLong 为什么还会有 LongAdder + +## 1. 基本介绍 + +LongAdder 是一个线程安全,JDK 8新加入的一个用来计数的工具类 + +按照作者的说法,LongAdder 在多个线程更新下比 AtomicLong 性能更好,但要消耗更多的空间 + +LongAdder 继承自 Striped64,其对一些简单情况做了处理(cell 存在且更新没有竞争),复杂情况交给 Striped64 的 longAccumulate。 + +## 2. Striped64 + +Striped64 设计思路是把多个线程分散到不同计数单元,减少线程竞争,提高并发效率 + +### 2.1 成员变量分析 + +```java +// 可用 CPU 数量 +static final int NCPU = Runtime.getRuntime().availableProcessors(); +// cell 数组,大小为2的幂次方 +transient volatile Cell[] cells; +// 基础偏移值 +transient volatile long base; +// 0 无锁 1 有锁 +transient volatile int cellsBusy; +``` + +### 2.2 Cell 类分析 + +```java +@sun.misc.Contended static final class Cell { + volatile long value; + Cell(long x) { value = x; } + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } +} +``` + +`Cell` 类是 `Striped64` 的静态内部类。通过注解 `@sun.misc.Contended` 来自动实现缓存行填充,让 Java 编译器和 JRE 运行时来决定如何填充。本质上是一个填充了的、提供了 CAS 更新的 volatile 变量。 + +### 2.3 longAccumulate() 分析 + +```java +final void longAccumulate(long x, LongBinaryOperator fn, + boolean wasUncontended) { + int h; + // 获取线程的 probe hash值,如果 seed 初始化,probe 为非0 + if ((h = getProbe()) == 0) { + // 如果 probe 为0,就强制初始化一次 + ThreadLocalRandom.current(); // force initialization + // get 到 probe + h = getProbe(); + wasUncontended = true; + } + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + if ((as = cells) != null && (n = as.length) > 0) { + // 通过 hash值获取数组 cells 一个index + if ((a = as[(n - 1) & h]) == null) { + // 当前位置为空,并且拿到锁(cellsBusy 0是无锁,1是有锁) + if (cellsBusy == 0) { // Try to attach new Cell + // 构建一个 Cell + Cell r = new Cell(x); // Optimistically create + // casCellsBusy 会把 cellsBuy 设置为1,也即是获取锁 + if (cellsBusy == 0 && casCellsBusy()) { + // 创建标识 + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + // 计算hash位置j + rs[j = (m - 1) & h] == null) { + // 把新构建的 Cell 塞到数组cells的index j的地方 + rs[j] = r; + // 更新创建完成状态 + created = true; + } + } finally { + // free lock + cellsBusy = 0; + } + // 如果完成直接退出 + if (created) + break; + // 否则继续创建(失败) + continue; // Slot is now non-empty + } + } + // 执行到这里说明也是失败(没拿到锁),设置碰撞标识为false + collide = false; + } + // hash位置已经有值,则往下走 + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + // 对当前位置累加,例如原本地方的值是1,要加1,现在则为2。成功就退出 + else if (a.cas(v = a.value, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) + break; + else if (n >= NCPU || cells != as) + // cells 长度大于cpu数量,设置碰撞标识为false + collide = false; // At max size or stale + else if (!collide) + // 碰撞标识设置为 true + collide = true; + // 说明前面操作没有成功,再次尝试获取锁进行扩容 + else if (cellsBusy == 0 && casCellsBusy()) { + try { + if (cells == as) { // Expand table unless stale + // 扩容2倍,然后数组copy + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + // lock free + cellsBusy = 0; + } + // 扩容后重试 + collide = false; + continue; // Retry with expanded table + } + // 重新计算 hash 值 + h = advanceProbe(h); + } + // 1. 初始化 cells, + else if (cellsBusy == 0 && cells == as && casCellsBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + // 最开始容量是2 + Cell[] rs = new Cell[2]; + // hash对应位置赋值 + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + cellsBusy = 0; + } + if (init) + break; + } + // 初始化失败,CAS 把 value 累加到 base + else if (casBase(v = base, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) + break; // Fall back on using base + } +} +``` + +## 3. add() 分析 + +```java +public void add(long x) { + Cell[] as; long b, v; int m; Cell a; + // cells 为空直接使用 cas 赋值,cas成功直接返回 + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[getProbe() & m]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + // cas 失败 || cells 不为空 且 index 处为null || cas 再次修改失败 + // 调用 Striped64 的 longAccumulate + longAccumulate(x, null, uncontended); + } +} +``` + +## 4. sum() 分析 + +熟悉 ConcurrentHashMap 的同鞋看到 sum 相比已经很熟悉,惰性按需计算,可能会不太精准 + +```java +public long sum() { + Cell[] as = cells; Cell a; + // 先统计 base的值 + long sum = base; + if (as != null) { + // 再遍历 cells 中的值进行累加 + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; +} +``` + +## 5. reset() 分析 + +遍历 cells 数组,重置为0 + +```java +public void reset() { + Cell[] as = cells; Cell a; + base = 0L; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + a.value = 0L; + } + } +} +``` + +## 总结 + +可以看到 LongAdder 的核心思路就是保证高并发最坏的情况,通过对线程进行散列分片减少竞争时长,利用上了多核的性能。这种设计方式和 CSAPP 中 [提高并行性]([https://github.com/itliusir/CS_Notes/blob/master/%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F/%E6%93%8D%E4%BD%9C%E7%B3%BB%E7%BB%9F(%E5%8D%81%E4%BA%8C).md#%E6%8F%90%E9%AB%98%E5%B9%B6%E8%A1%8C%E6%80%A7](https://github.com/itliusir/CS_Notes/blob/master/操作系统/操作系统(十二).md#提高并行性) 提到的方式是一样的。 + +回到开篇 **TOP 1** 问题,可以看到 LongAdder 主要目的是解决高并发下 AtomicLong 自旋开销问题 。 \ No newline at end of file