From 8288463eebfb97fd89e8c3a74ad748516c0082dd Mon Sep 17 00:00:00 2001 From: sljie1988 Date: Sun, 22 Dec 2019 23:53:15 +0800 Subject: [PATCH 1/4] =?UTF-8?q?week=5F02=20=E5=8E=9F=E5=AD=90=E7=B3=BB?= =?UTF-8?q?=E5=88=97=E6=8F=90=E4=BA=A4=20unsafe=E3=80=81AtomicInteger?= =?UTF-8?q?=E3=80=81AtomicStampedReference=E3=80=81LongAdder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_02/12/AtomicInteger-012.md | 90 ++++++++++ week_02/12/AtomicStampedReference-012.md | 87 +++++++++ week_02/12/LongAdder-012.md | 220 +++++++++++++++++++++++ week_02/12/unsafe-012.md | 159 ++++++++++++++++ 4 files changed, 556 insertions(+) create mode 100644 week_02/12/AtomicInteger-012.md create mode 100644 week_02/12/AtomicStampedReference-012.md create mode 100644 week_02/12/LongAdder-012.md create mode 100644 week_02/12/unsafe-012.md diff --git a/week_02/12/AtomicInteger-012.md b/week_02/12/AtomicInteger-012.md new file mode 100644 index 0000000..92f0ee7 --- /dev/null +++ b/week_02/12/AtomicInteger-012.md @@ -0,0 +1,90 @@ +#### 问题 +高并发的情况下,为何对AtomicInteger的操作是线程安全的 + +#### 简介 +高并发的情况下,使得数据操作具有原子性,保证线程安全 + +#### 源码解析 +##### 构造方法 +public AtomicInteger(int initialValue) { + value = initialValue; +} +public AtomicInteger() {} + +##### 主要方法 +``` +private static final Unsafe unsafe = Unsafe.getUnsafe(); +private static final long valueOffset; + +// 在类初始化的时候,计算value变量在对象中的偏移 +static { + try { + valueOffset = unsafe.objectFieldOffset + (AtomicInteger.class.getDeclaredField("value")); + } catch (Exception ex) { throw new Error(ex); } +} + +private volatile int value; + +public final void set(int newValue) { + value = newValue; +} + +public final int get() { + return value; +} +``` + +设置、获取值的相关操作,unsafe的相关应用 +``` +public final int getAndSet(int newValue) { + return unsafe.getAndSetInt(this, valueOffset, newValue); +} + +public final int getAndIncrement() { + return unsafe.getAndAddInt(this, valueOffset, 1); +} + +public final int getAndDecrement() { + return unsafe.getAndAddInt(this, valueOffset, -1); +} + +public final int getAndAdd(int delta) { + return unsafe.getAndAddInt(this, valueOffset, delta); +} + +public int intValue() { + return get(); +} + +public long longValue() { + return (long)get(); +} +``` + +#### 总结 +高并发采用AtomicInteger可以保证线程安全的原因: +volatile修饰,使数据对多线程保持可见性。因为操作未必是原子性,所以不能保证线程安全; +AtomicInteger操作采用unsafe cas,操作是原子性的。 +两者共同保证了AtomicInteger的操作在多线程下是线程安全的。 + +AtomicInteger操作属于非阻塞同步,当存在数据冲突时不用使线程挂起,采取重试机制直到成功,因为unsafe cas操作和冲突检测具备原子性。 +AtomicInteger只能在指定环境下使用,volatile修饰会限制指令重排,影响效率 + +#### 延伸 +AtomicInteger native深入理解: +https://www.jianshu.com/p/4ed887664b13 +原子操作类AtomicInteger应用: +https://blog.csdn.net/fanrenxiang/article/details/80623884 +https://www.cnblogs.com/zhaoyan001/p/8885360.html + +volatile相关知识: +1.修饰数据时,使得数据对多个线程保持可见性 +read、load、use动作必须连续出现,保证每次读取前必须先从主内存刷新最新的值; +assign、store、write动作必须连续出现,每次写入后必须立即同步回主内存当中 +volatile关键字修饰的变量看到的随时是自己的最新值,线程1中对变量v的最新修改,对线程2是可见的。 +2.防止指令重排 +在每个volatile写操作的前面插入一个StoreStore屏障。 +在每个volatile写操作的后面插入一个StoreLoad屏障。 +在每个volatile读操作的后面插入一个LoadLoad屏障。 +在每个volatile读操作的后面插入一个LoadStore屏障 \ No newline at end of file diff --git a/week_02/12/AtomicStampedReference-012.md b/week_02/12/AtomicStampedReference-012.md new file mode 100644 index 0000000..8ad5bb1 --- /dev/null +++ b/week_02/12/AtomicStampedReference-012.md @@ -0,0 +1,87 @@ +#### 问题 +如何解决ABA问题 +解决ABA问题还有哪些措施 + +#### 简介 +通过引入版本戳stamp,解决ABA问题 + +#### 源码解析 +##### 构造方法 +``` +public AtomicStampedReference(V initialRef, int initialStamp) { + pair = Pair.of(initialRef, initialStamp); +} +``` +##### 主要方法 +``` +private static class Pair { + // 属性值 + final T reference; + // 版本戳 + final int stamp; + private Pair(T reference, int stamp) { + this.reference = reference; + this.stamp = stamp; + } + static Pair of(T reference, int stamp) { + return new Pair(reference, stamp); + } +} + +private volatile Pair pair; +``` + +``` +// 数据更新,old与new数据分别加上版本戳,通过cas进行更新 +public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) { + Pair current = pair; + return + expectedReference == current.reference && + expectedStamp == current.stamp && + ((newReference == current.reference && + newStamp == current.stamp) || + casPair(current, Pair.of(newReference, newStamp))); +} + +// 若当前值是期望值,更新版本戳 +public boolean attemptStamp(V expectedReference, int newStamp) { + Pair current = pair; + return + expectedReference == current.reference && + (newStamp == current.stamp || + casPair(current, Pair.of(expectedReference, newStamp))); +} +``` + +``` +private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); +private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class); + +// 获取'pair'属性在内存中的位置 +static long objectFieldOffset(sun.misc.Unsafe UNSAFE, String field, Class klazz) { + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } +} + +// 通过unsafe casObject原子性操作,实现数据的更新 +private boolean casPair(Pair cmp, Pair val) { + return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); +} + +``` +#### 总结 +给数据加上版本戳封装成对象,定位需要更新的数据; +casObject实现原子性操作。 +两者结合解决了ABA问题 + +#### 延伸 +用AtomicStampedReference解决ABA问题: +https://www.cnblogs.com/java20130722/p/3206742.html +AtomicStampedReference深入理解: +https://blog.csdn.net/lizc_lizc/article/details/102989288 \ No newline at end of file diff --git a/week_02/12/LongAdder-012.md b/week_02/12/LongAdder-012.md new file mode 100644 index 0000000..205348b --- /dev/null +++ b/week_02/12/LongAdder-012.md @@ -0,0 +1,220 @@ +#### 问题 +LongAdder与AtomicLong区别是什么 +LongAdder在高并发下为什么效率比AtomicLog高 + +#### 简介 +LongAdder在高并发下原子性操作数据时效率是比较稳定的,优化AtomicLong + +#### 继承体系 +父类:Striped64 + +#### 源码解析 +##### 主要方法 +``` +public void increment() { + add(1L); +} + +public void decrement() { + add(-1L); +} + +public void add(long x) { + Cell[] as; long b, v; int m; Cell a; + // 无并发冲突,与AtomicLong相同 + // casBase()设置值,sum()取值 + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[getProbe() & m]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + longAccumulate(x, null, uncontended); + } +} + +final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { + int h; + if ((h = getProbe()) == 0) { + ThreadLocalRandom.current(); // force initialization + h = getProbe(); + wasUncontended = true; + } + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (cellsBusy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (cellsBusy == 0 && casCellsBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + cellsBusy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + else if (cellsBusy == 0 && casCellsBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + cellsBusy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h = advanceProbe(h); + } + else if (cellsBusy == 0 && cells == as && casCellsBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + cellsBusy = 0; + } + if (init) + break; + } + else if (casBase(v = base, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) + break; // Fall back on using base + } +} + +public long sum() { + Cell[] as = cells; Cell a; + long sum = base; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; +} + +// 重置数据 +public void reset() { + Cell[] as = cells; Cell a; + base = 0L; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + a.value = 0L; + } + } +} + +public long longValue() { + return sum(); +} + +public int intValue() { + return (int)sum(); +} +``` + +#### 父类 Striped64 +##### 静态方法 +``` +@sun.misc.Contended static final class Cell { + volatile long value; + Cell(long x) { value = x; } + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } +} + +// Unsafe mechanics +private static final sun.misc.Unsafe UNSAFE; +private static final long BASE; +private static final long CELLSBUSY; +private static final long PROBE; +static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class sk = Striped64.class; + BASE = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + CELLSBUSY = UNSAFE.objectFieldOffset + (sk.getDeclaredField("cellsBusy")); + Class tk = Thread.class; + PROBE = UNSAFE.objectFieldOffset + (tk.getDeclaredField("threadLocalRandomProbe")); + } catch (Exception e) { + throw new Error(e); + } +} +``` + +##### 属性 +``` +// Table of cells. When non-null, size is a power of 2 +transient volatile Cell[] cells; +transient volatile long base; +// Spinlock (locked via CAS) used when resizing and/or creating Cells +transient volatile int cellsBusy; + +``` + +##### 主要方法 +``` +final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); +} +final boolean casCellsBusy() { + return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); +} +static final int getProbe() { + return UNSAFE.getInt(Thread.currentThread(), PROBE); +} +``` + +#### 总结 + +#### 延伸 \ No newline at end of file diff --git a/week_02/12/unsafe-012.md b/week_02/12/unsafe-012.md new file mode 100644 index 0000000..889e802 --- /dev/null +++ b/week_02/12/unsafe-012.md @@ -0,0 +1,159 @@ +#### 问题 +unsafe是什么 +unsafe应用有哪些 + +#### 简介 +Unsafe类是rt.jar中sun.misc包下的类,不属于java标准,使用Unsafe可用来直接访问系统内存资源并进行自主管理,Unsafe类在提升Java运行效率,增强Java语言底层操作能力方面起了很大的作用,提供了一些低层次操作,如直接内存访问、线程调度等。 +Unsafe大部分API都是native的方法,主要包括以下几类,Object相关、Class相关、数组相关、并发相关、内存相关、系统相关。 +1)Class相关。主要提供Class和它的静态字段的操作方法。 +2)Object相关。主要提供Object和它的字段的操作方法。 +3)Arrray相关。主要提供数组及其中元素的操作方法。 +4)并发相关。主要提供低级别同步原语,如CAS、线程调度、volatile、内存屏障等。 +5)Memory相关。提供了直接内存访问方法(绕过Java堆直接操作本地内存),可做到像C一样自由利用系统内存资源。 +6)系统相关。主要返回某些低级别的内存信息,如地址大小、内存页大小。 + +#### 类结构说明 +##### 并发相关 +Class相关 +``` +//静态属性的偏移量,用于在对应的Class对象中读写静态属性 +public native long staticFieldOffset(Field f); + +public native Object staticFieldBase(Field f); +//判断是否需要初始化一个类 +public native boolean shouldBeInitialized(Class c); +//确保类被初始化 +public native void ensureClassInitialized(Class c); +//定义一个类,可用于动态创建类 +public native Class defineClass(String name, byte[] b, int off, int len, ClassLoader loader, ProtectionDomain protectionDomain); +//定义一个匿名类,可用于动态创建类 +public native Class defineAnonymousClass(Class hostClass, byte[] data, Object[] cpPatches); + +``` + +Object相关 +``` +//获得对象的字段偏移量 +public native long objectFieldOffset(Field f); +//获得给定对象地址偏移量的int值 +public native int getInt(Object o, long offset); +//设置给定对象地址偏移量的int值 +public native void putInt(Object o, long offset, int x); + +//创建对象,但并不会调用其构造方法。如果类未被初始化,将初始化类。 +public native Object allocateInstance(Class cls) throws InstantiationException; +``` + +数组相关 +``` +//返回数组中第一个元素的偏移地址 +public native int arrayBaseOffset(Class arrayClass); +//返回数组中每一个元素占用的大小 +public native int arrayIndexScale(Class arrayClass); +``` + +CAS相关 +``` +var1 当前对象; var2 当前数据内存偏移地址; var4 预期值; var5 新值 +public final native boolean compareAndSwapObject(Object this, long offset, Object expectValue, Object newValue); +public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); +public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6); +``` + +volatile相关读写 +``` +public native Object getObjectVolatile(Object o, long offset); +public native void putObjectVolatile(Object o, long offset, Object x); +``` +线程调度相关 +``` +//取消阻塞线程 +public native void unpark(Object thread); +//阻塞线程 +public native void park(boolean isAbsolute, long time); +//获得对象锁 +public native void monitorEnter(Object o); +//释放对象锁 +public native void monitorExit(Object o); +//尝试获取对象锁,返回true或false表示是否获取成功 +public native boolean tryMonitorEnter(Object o); +``` +内存屏障相关 +``` +//内存屏障,禁止load操作重排序 +public native void loadFence(); +//内存屏障,禁止store操作重排序 +public native void storeFence(); +//内存屏障,禁止load、store操作重排序 +public native void fullFence(); +``` + +##### 内存操作 +直接内存访问(非堆内存) +``` +//(boolean、byte、char、short、int、long、float、double)都有以下get、put两个方法。 +//获得给定地址上的int值 +public native int getInt(long address); +//设置给定地址上的int值 +public native void putInt(long address, int x); +//获得本地指针 +public native long getAddress(long address); +//存储本地指针到给定的内存地址 +public native void putAddress(long address, long x); + +//分配内存 +public native long allocateMemory(long bytes); +//重新分配内存 +public native long reallocateMemory(long address, long bytes); +//初始化内存内容 +public native void setMemory(Object o, long address, long bytes, byte value); +//初始化内存内容 +public void setMemory(long address, long bytes, byte value) { + setMemory(null, address, bytes, value); +} +//内存内容拷贝 +public native void copyMemory(Object srcBase, long address,Object destBase, long destOffset,long bytes); +//内存内容拷贝 +public void copyMemory(long srcAddress, long destAddress, long bytes) { + copyMemory(null, srcAddress, null, destAddress, bytes); +} +//释放内存 +public native void freeMemory(long address); +``` + +##### 系统相关 +``` +// 获取指针的大小,单位是字节。 +// 对于64位系统,返回8,表示指针大小是8字节 +// 对于32位系统,返回4,表示指针大小是4字节 +public native int addressSize(); +// 返回内存页的大小,单位是字节。返回值一定是2的多少次幂 +public native int pageSize(); +``` + +#### 总结 +绕过类初始化方法,实例化一个类 +修改私有字段的值 +内存修改 +实现Java浅复制 +包装受检异常为运行时异常 +在非Java堆中分配内存 +CAS操作 +阻塞/唤醒线程 +动态加载类 + +#### 相关知识 +offset:给定属性在内存中的位置 + +#### 延伸 +一篇看懂Java中的Unsafe类: +https://www.jb51.net/article/140726.htm +https://www.jianshu.com/p/db8dce09232d +https://blog.csdn.net/qq_34436819/article/details/102723579 +https://www.jianshu.com/p/cda24891f9e4 +unsafe类相关应用: +https://blog.csdn.net/hotpots/article/details/85933312 +volatile关键字的作用、原理: +https://www.cnblogs.com/monkeysayhi/p/7654460.html +cas使用unsafe的实现原理、产生的问题、与synchronized的比较: +https://blog.csdn.net/tiandao321/article/details/80811103 \ No newline at end of file -- Gitee From e50b44c6cd4fc14312731a2a5fe758cb231f2b2b Mon Sep 17 00:00:00 2001 From: sljie1988 Date: Mon, 23 Dec 2019 01:13:35 +0800 Subject: [PATCH 2/4] =?UTF-8?q?LongAdder=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_02/12/LongAdder-012.md | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/week_02/12/LongAdder-012.md b/week_02/12/LongAdder-012.md index 205348b..1d30df3 100644 --- a/week_02/12/LongAdder-012.md +++ b/week_02/12/LongAdder-012.md @@ -3,7 +3,8 @@ LongAdder与AtomicLong区别是什么 LongAdder在高并发下为什么效率比AtomicLog高 #### 简介 -LongAdder在高并发下原子性操作数据时效率是比较稳定的,优化AtomicLong +LongAdder是高并发下效率高效且线程安全的数据操作类。原理为,最初竞争无冲突时与AtomicLong相同,采用cas增加;当多线程竞争时,采用分段思想,不同线程更新不同段的段,最后将这些段相加求和即是最终结果。 +LongAdder在高并发下原子性操作数据时效率是比较稳定的,优于AtomicLong #### 继承体系 父类:Striped64 @@ -42,7 +43,9 @@ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; + // cells已经初始化过 if ((as = cells) != null && (n = as.length) > 0) { + // 当前线程cell未初始化,初始化cell if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create @@ -75,6 +78,7 @@ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) collide = false; // At max size or stale else if (!collide) collide = true; + // 上面为false则说明冲突了,尝试占有锁并扩容 else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale @@ -89,8 +93,10 @@ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) collide = false; continue; // Retry with expanded table } + // 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况 h = advanceProbe(h); } + // cells未初始化、尝试占有锁,进行初始化 else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table @@ -106,6 +112,7 @@ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) if (init) break; } + // 尝试占有锁,尝试更新base else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base @@ -175,6 +182,7 @@ private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; +// 获取属性内存地址 static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); @@ -195,9 +203,12 @@ static { ##### 属性 ``` // Table of cells. When non-null, size is a power of 2 +// 存储数据的段组 transient volatile Cell[] cells; +// 多线程分段操作数据时其中一个特殊的段 transient volatile long base; // Spinlock (locked via CAS) used when resizing and/or creating Cells +// 扩容、创建段组时用的锁,通过cas实现 transient volatile int cellsBusy; ``` @@ -207,14 +218,27 @@ transient volatile int cellsBusy; final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } + +// 使用CAS将cells自旋标识更新为1 final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } +// 线程本身的hash值 static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } + +// 相当于rehash,重新算一遍线程的hash值 +static final int advanceProbe(int probe) { + probe ^= probe << 13; // xorshift + probe ^= probe >>> 17; + probe ^= probe << 5; + UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + return probe; +} ``` #### 总结 - -#### 延伸 \ No newline at end of file +LongAdder通过base和cells数组分段来存储值,分段求和方式获取最终结果;前面已经累加到sum上的Cell的value有修改通过最终一致性获得最终结果 +不同的线程会hash到不同的cell上去更新,减少了竞争; +LongAdder的性能非常高,最终会达到一种无竞争的状态; \ No newline at end of file -- Gitee From 006786c2782e85a81b2d01b1ab1306c3bbd9407d Mon Sep 17 00:00:00 2001 From: sljie1988 Date: Mon, 23 Dec 2019 01:27:36 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=88=A0=E9=99=A4LongAdder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_02/12/LongAdder-012.md | 244 ------------------------------------ 1 file changed, 244 deletions(-) delete mode 100644 week_02/12/LongAdder-012.md diff --git a/week_02/12/LongAdder-012.md b/week_02/12/LongAdder-012.md deleted file mode 100644 index 1d30df3..0000000 --- a/week_02/12/LongAdder-012.md +++ /dev/null @@ -1,244 +0,0 @@ -#### 问题 -LongAdder与AtomicLong区别是什么 -LongAdder在高并发下为什么效率比AtomicLog高 - -#### 简介 -LongAdder是高并发下效率高效且线程安全的数据操作类。原理为,最初竞争无冲突时与AtomicLong相同,采用cas增加;当多线程竞争时,采用分段思想,不同线程更新不同段的段,最后将这些段相加求和即是最终结果。 -LongAdder在高并发下原子性操作数据时效率是比较稳定的,优于AtomicLong - -#### 继承体系 -父类:Striped64 - -#### 源码解析 -##### 主要方法 -``` -public void increment() { - add(1L); -} - -public void decrement() { - add(-1L); -} - -public void add(long x) { - Cell[] as; long b, v; int m; Cell a; - // 无并发冲突,与AtomicLong相同 - // casBase()设置值,sum()取值 - if ((as = cells) != null || !casBase(b = base, b + x)) { - boolean uncontended = true; - if (as == null || (m = as.length - 1) < 0 || - (a = as[getProbe() & m]) == null || - !(uncontended = a.cas(v = a.value, v + x))) - longAccumulate(x, null, uncontended); - } -} - -final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { - int h; - if ((h = getProbe()) == 0) { - ThreadLocalRandom.current(); // force initialization - h = getProbe(); - wasUncontended = true; - } - boolean collide = false; // True if last slot nonempty - for (;;) { - Cell[] as; Cell a; int n; long v; - // cells已经初始化过 - if ((as = cells) != null && (n = as.length) > 0) { - // 当前线程cell未初始化,初始化cell - if ((a = as[(n - 1) & h]) == null) { - if (cellsBusy == 0) { // Try to attach new Cell - Cell r = new Cell(x); // Optimistically create - if (cellsBusy == 0 && casCellsBusy()) { - boolean created = false; - try { // Recheck under lock - Cell[] rs; int m, j; - if ((rs = cells) != null && - (m = rs.length) > 0 && - rs[j = (m - 1) & h] == null) { - rs[j] = r; - created = true; - } - } finally { - cellsBusy = 0; - } - if (created) - break; - continue; // Slot is now non-empty - } - } - collide = false; - } - else if (!wasUncontended) // CAS already known to fail - wasUncontended = true; // Continue after rehash - else if (a.cas(v = a.value, ((fn == null) ? v + x : - fn.applyAsLong(v, x)))) - break; - else if (n >= NCPU || cells != as) - collide = false; // At max size or stale - else if (!collide) - collide = true; - // 上面为false则说明冲突了,尝试占有锁并扩容 - else if (cellsBusy == 0 && casCellsBusy()) { - try { - if (cells == as) { // Expand table unless stale - Cell[] rs = new Cell[n << 1]; - for (int i = 0; i < n; ++i) - rs[i] = as[i]; - cells = rs; - } - } finally { - cellsBusy = 0; - } - collide = false; - continue; // Retry with expanded table - } - // 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况 - h = advanceProbe(h); - } - // cells未初始化、尝试占有锁,进行初始化 - else if (cellsBusy == 0 && cells == as && casCellsBusy()) { - boolean init = false; - try { // Initialize table - if (cells == as) { - Cell[] rs = new Cell[2]; - rs[h & 1] = new Cell(x); - cells = rs; - init = true; - } - } finally { - cellsBusy = 0; - } - if (init) - break; - } - // 尝试占有锁,尝试更新base - else if (casBase(v = base, ((fn == null) ? v + x : - fn.applyAsLong(v, x)))) - break; // Fall back on using base - } -} - -public long sum() { - Cell[] as = cells; Cell a; - long sum = base; - if (as != null) { - for (int i = 0; i < as.length; ++i) { - if ((a = as[i]) != null) - sum += a.value; - } - } - return sum; -} - -// 重置数据 -public void reset() { - Cell[] as = cells; Cell a; - base = 0L; - if (as != null) { - for (int i = 0; i < as.length; ++i) { - if ((a = as[i]) != null) - a.value = 0L; - } - } -} - -public long longValue() { - return sum(); -} - -public int intValue() { - return (int)sum(); -} -``` - -#### 父类 Striped64 -##### 静态方法 -``` -@sun.misc.Contended static final class Cell { - volatile long value; - Cell(long x) { value = x; } - final boolean cas(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); - } - - // Unsafe mechanics - private static final sun.misc.Unsafe UNSAFE; - private static final long valueOffset; - static { - try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); - Class ak = Cell.class; - valueOffset = UNSAFE.objectFieldOffset - (ak.getDeclaredField("value")); - } catch (Exception e) { - throw new Error(e); - } - } -} - -// Unsafe mechanics -private static final sun.misc.Unsafe UNSAFE; -private static final long BASE; -private static final long CELLSBUSY; -private static final long PROBE; -// 获取属性内存地址 -static { - try { - UNSAFE = sun.misc.Unsafe.getUnsafe(); - Class sk = Striped64.class; - BASE = UNSAFE.objectFieldOffset - (sk.getDeclaredField("base")); - CELLSBUSY = UNSAFE.objectFieldOffset - (sk.getDeclaredField("cellsBusy")); - Class tk = Thread.class; - PROBE = UNSAFE.objectFieldOffset - (tk.getDeclaredField("threadLocalRandomProbe")); - } catch (Exception e) { - throw new Error(e); - } -} -``` - -##### 属性 -``` -// Table of cells. When non-null, size is a power of 2 -// 存储数据的段组 -transient volatile Cell[] cells; -// 多线程分段操作数据时其中一个特殊的段 -transient volatile long base; -// Spinlock (locked via CAS) used when resizing and/or creating Cells -// 扩容、创建段组时用的锁,通过cas实现 -transient volatile int cellsBusy; - -``` - -##### 主要方法 -``` -final boolean casBase(long cmp, long val) { - return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); -} - -// 使用CAS将cells自旋标识更新为1 -final boolean casCellsBusy() { - return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); -} -// 线程本身的hash值 -static final int getProbe() { - return UNSAFE.getInt(Thread.currentThread(), PROBE); -} - -// 相当于rehash,重新算一遍线程的hash值 -static final int advanceProbe(int probe) { - probe ^= probe << 13; // xorshift - probe ^= probe >>> 17; - probe ^= probe << 5; - UNSAFE.putInt(Thread.currentThread(), PROBE, probe); - return probe; -} -``` - -#### 总结 -LongAdder通过base和cells数组分段来存储值,分段求和方式获取最终结果;前面已经累加到sum上的Cell的value有修改通过最终一致性获得最终结果 -不同的线程会hash到不同的cell上去更新,减少了竞争; -LongAdder的性能非常高,最终会达到一种无竞争的状态; \ No newline at end of file -- Gitee From 898737f8c10623f9b023f4ca9d9e248ba3c765fa Mon Sep 17 00:00:00 2001 From: sljie1988 Date: Mon, 23 Dec 2019 01:29:40 +0800 Subject: [PATCH 4/4] =?UTF-8?q?LongAdder=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_02/12/LongAdder-012.md | 244 ++++++++++++++++++++++++++++++++++++ 1 file changed, 244 insertions(+) create mode 100644 week_02/12/LongAdder-012.md diff --git a/week_02/12/LongAdder-012.md b/week_02/12/LongAdder-012.md new file mode 100644 index 0000000..1d30df3 --- /dev/null +++ b/week_02/12/LongAdder-012.md @@ -0,0 +1,244 @@ +#### 问题 +LongAdder与AtomicLong区别是什么 +LongAdder在高并发下为什么效率比AtomicLog高 + +#### 简介 +LongAdder是高并发下效率高效且线程安全的数据操作类。原理为,最初竞争无冲突时与AtomicLong相同,采用cas增加;当多线程竞争时,采用分段思想,不同线程更新不同段的段,最后将这些段相加求和即是最终结果。 +LongAdder在高并发下原子性操作数据时效率是比较稳定的,优于AtomicLong + +#### 继承体系 +父类:Striped64 + +#### 源码解析 +##### 主要方法 +``` +public void increment() { + add(1L); +} + +public void decrement() { + add(-1L); +} + +public void add(long x) { + Cell[] as; long b, v; int m; Cell a; + // 无并发冲突,与AtomicLong相同 + // casBase()设置值,sum()取值 + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + if (as == null || (m = as.length - 1) < 0 || + (a = as[getProbe() & m]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + longAccumulate(x, null, uncontended); + } +} + +final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { + int h; + if ((h = getProbe()) == 0) { + ThreadLocalRandom.current(); // force initialization + h = getProbe(); + wasUncontended = true; + } + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; + // cells已经初始化过 + if ((as = cells) != null && (n = as.length) > 0) { + // 当前线程cell未初始化,初始化cell + if ((a = as[(n - 1) & h]) == null) { + if (cellsBusy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (cellsBusy == 0 && casCellsBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + cellsBusy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } + else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + // 上面为false则说明冲突了,尝试占有锁并扩容 + else if (cellsBusy == 0 && casCellsBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + cellsBusy = 0; + } + collide = false; + continue; // Retry with expanded table + } + // 重新给线程生成一个hash值,降低hash冲突,减少映射到同一个Cell导致CAS竞争的情况 + h = advanceProbe(h); + } + // cells未初始化、尝试占有锁,进行初始化 + else if (cellsBusy == 0 && cells == as && casCellsBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + cellsBusy = 0; + } + if (init) + break; + } + // 尝试占有锁,尝试更新base + else if (casBase(v = base, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) + break; // Fall back on using base + } +} + +public long sum() { + Cell[] as = cells; Cell a; + long sum = base; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; +} + +// 重置数据 +public void reset() { + Cell[] as = cells; Cell a; + base = 0L; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + a.value = 0L; + } + } +} + +public long longValue() { + return sum(); +} + +public int intValue() { + return (int)sum(); +} +``` + +#### 父类 Striped64 +##### 静态方法 +``` +@sun.misc.Contended static final class Cell { + volatile long value; + Cell(long x) { value = x; } + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } +} + +// Unsafe mechanics +private static final sun.misc.Unsafe UNSAFE; +private static final long BASE; +private static final long CELLSBUSY; +private static final long PROBE; +// 获取属性内存地址 +static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class sk = Striped64.class; + BASE = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + CELLSBUSY = UNSAFE.objectFieldOffset + (sk.getDeclaredField("cellsBusy")); + Class tk = Thread.class; + PROBE = UNSAFE.objectFieldOffset + (tk.getDeclaredField("threadLocalRandomProbe")); + } catch (Exception e) { + throw new Error(e); + } +} +``` + +##### 属性 +``` +// Table of cells. When non-null, size is a power of 2 +// 存储数据的段组 +transient volatile Cell[] cells; +// 多线程分段操作数据时其中一个特殊的段 +transient volatile long base; +// Spinlock (locked via CAS) used when resizing and/or creating Cells +// 扩容、创建段组时用的锁,通过cas实现 +transient volatile int cellsBusy; + +``` + +##### 主要方法 +``` +final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); +} + +// 使用CAS将cells自旋标识更新为1 +final boolean casCellsBusy() { + return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); +} +// 线程本身的hash值 +static final int getProbe() { + return UNSAFE.getInt(Thread.currentThread(), PROBE); +} + +// 相当于rehash,重新算一遍线程的hash值 +static final int advanceProbe(int probe) { + probe ^= probe << 13; // xorshift + probe ^= probe >>> 17; + probe ^= probe << 5; + UNSAFE.putInt(Thread.currentThread(), PROBE, probe); + return probe; +} +``` + +#### 总结 +LongAdder通过base和cells数组分段来存储值,分段求和方式获取最终结果;前面已经累加到sum上的Cell的value有修改通过最终一致性获得最终结果 +不同的线程会hash到不同的cell上去更新,减少了竞争; +LongAdder的性能非常高,最终会达到一种无竞争的状态; \ No newline at end of file -- Gitee