From 18a30b833c525356146c849781af6be29c82f398 Mon Sep 17 00:00:00 2001 From: qjwxpz Date: Sun, 22 Dec 2019 15:48:16 +0800 Subject: [PATCH] 032-week-02 --- week_02/32/AtomicInteger.md | 159 +++++++++++++++++++ week_02/32/AtomicStampedReference.md | 138 +++++++++++++++++ week_02/32/LongAdder.md | 222 +++++++++++++++++++++++++++ week_02/32/Unsafe.md | 172 +++++++++++++++++++++ 4 files changed, 691 insertions(+) create mode 100644 week_02/32/AtomicInteger.md create mode 100644 week_02/32/AtomicStampedReference.md create mode 100644 week_02/32/LongAdder.md create mode 100644 week_02/32/Unsafe.md diff --git a/week_02/32/AtomicInteger.md b/week_02/32/AtomicInteger.md new file mode 100644 index 0000000..7430972 --- /dev/null +++ b/week_02/32/AtomicInteger.md @@ -0,0 +1,159 @@ +# AtomicInteger源码阅读 + +## 1.1 AtomicInteger继承了Number,提供方法将数值转化为 byte, double 等 + +## 1.2 AtomicInteger: + 1.2.1: 实现java.io.Serializable接口,为了网络传输等的序列化用 + +## 1.3 AtomicInteger属性: + 1.3.1: private volatile int value; //volatile修饰保证每次取到的是最新值 + 1.3.2: private static final Unsafe unsafe = Unsafe.getUnsafe(); //底层是通过Unsafe来实现的 + 1.3.3: private static final long valueOffset; //定位value在AtomicInteger对象中的位置 + +## 1.4 AtomicInteger构造方法: + 1.4.1: +```java + public AtomicInteger(int initialValue) { //初始化AtomicInteger的value = initialValue + value = initialValue; + } +``` + 1.4.2: +```java + public AtomicInteger() { //无参构造 + } +``` +## 1.5 AtomicInteger静态方法: +```java + static { //在类加载的时候运行 + try { + valueOffset = unsafe.objectFieldOffset + (AtomicInteger.class.getDeclaredField("value")); //在类加载的时候存放value在AtomicInteger对象中的位置,个人理解就好比定长报文中第好多到好多位表示XX的值 + } catch (Exception ex) { throw new Error(ex); } + } +``` +## 1.6 AtomicInteger方法 + 1.6.1: +```java + public final int get() { //返回value + return value; + } +``` + 1.6.2: +```java + public final void set(int newValue) { //设置当前值 + value = newValue; + } +``` + 1.6.3: +```java + public final int addAndGet(int delta) { //获取当前值+delta并返回新值 + for (;;) { + int current = get(); + int next = current + delta; + if (compareAndSet(current, next)) + return next; + } + } +``` + 1.6.4: +```java + public final void lazySet(int newValue) { + //最终把值设置为newValue,使用该方法后,其他线程在一段时间内还会获取到旧值 + unsafe.putOrderedInt(this, valueOffset, newValue); + } +``` + 1.6.5: +```java + public final int getAndSet(int newValue) { + for (;;) { + int current = get(); //获取旧值 + if (compareAndSet(current, newValue)) //如果当前值是current,就更新为newValue + return current; //更新成功返回旧值 + } + } +``` + 1.6.6: +```java + public final boolean compareAndSet(int expect, int update) { //如果当前值是expect,就更新为update + return unsafe.compareAndSwapInt(this, valueOffset, expect, update); //对象this的valueOffset位置的当前值是expect时就更新为update + } +``` + 1.6.7: +```java + public final int getAndIncrement() { //获取当前值+1并返回旧值 + for (;;) { + int current = get(); //获取当前值 + int next = current + 1; //当前值+1 + if (compareAndSet(current, next)) //如果当前值是current,就更新为next + return current; //更新成功返回旧值 + } + } +``` + 1.6.8: +```java + public final int getAndDecrement() { //获取当前值-1并返回旧值 + for (;;) { + int current = get(); + int next = current - 1; + if (compareAndSet(current, next)) + return current; + } + } +``` + 1.6.9: +```java + public final int getAndAdd(int delta) {//获取当前值+delta并返回旧值 + for (;;) { + int current = get(); + int next = current + delta; + if (compareAndSet(current, next)) + return current; + } + } +``` + 1.6.10: +```java + public final int incrementAndGet() { //获取当前值+1并返回新值 + for (;;) { + int current = get(); + int next = current + 1; + if (compareAndSet(current, next)) + return next; + } + } +``` + 1.6.11: +```java + public final int decrementAndGet() { //获取当前值-1并返回新值 + for (;;) { + int current = get(); + int next = current - 1; + if (compareAndSet(current, next)) + return next; + } + } + +``` +## 1.7 AtomicInteger总结 + 1.8.1: 使用AtomicInteger操作能够保证线程安全性 + + + + + + + + + + + + + + + + + + + + + diff --git a/week_02/32/AtomicStampedReference.md b/week_02/32/AtomicStampedReference.md new file mode 100644 index 0000000..670cd9f --- /dev/null +++ b/week_02/32/AtomicStampedReference.md @@ -0,0 +1,138 @@ +# AtomicStampedReference源码阅读 + +## 1.1 AtomicStampedReference属性: + 1.3.1: private volatile Pair pair; //volatile修饰保证每次取到的pair是最新值 + 1.3.2: private static final sun.misc.Unsafe UNSAFE = sun.misc.Unsafe.getUnsafe(); //底层是通过Unsafe来实现的 + 1.3.3: private static final long pairOffset = objectFieldOffset(UNSAFE, "pair", AtomicStampedReference.class); //定位pair在AtomicStampedReference对象中的位置 + +## 1.2 AtomicStampedReference构造方法: +```java + public AtomicStampedReference(V initialRef, int initialStamp) { //创建AtomicStampedReference的时候将需要原子操作的对象封装为pair对象 + pair = Pair.of(initialRef, initialStamp); + } +``` +## 1.3 AtomicStampedReference静态内部类: +```java + private static class Pair { //将需要不停的做原子操作的对象和版本号封装成一个pair对象,每次修改内容就是比较版本号后再修改对象 + final T reference; //要进行原子操作的对象 + final int stamp; //当前的版本号 + private Pair(T reference, int stamp) { + this.reference = reference; + this.stamp = stamp; + } + static Pair of(T reference, int stamp) { + return new Pair(reference, stamp); + } + } +``` +## 1.4 AtomicStampedReference方法 + 1.4.1: +```java + public V getReference() { //返回pair对象中需要做原子操作的对象 + return pair.reference; + } +``` + 1.4.2: +```java + public int getStamp() { //获取pair对象中需要做原子操作的对象的版本号 + return pair.stamp; + } +``` + 1.4.3: +```java + public V get(int[] stampHolder) { //获取当前pair的版本号和,pair中需要原子操作的对象 + Pair pair = this.pair; + stampHolder[0] = pair.stamp; + return pair.reference; + } +``` + 1.4.4: +```java + public boolean weakCompareAndSet(V expectedReference, + V newReference, + int expectedStamp, + int newStamp) { //将pair中reference,stamp修改为:newReference,newStamp + return compareAndSet(expectedReference, newReference, + expectedStamp, newStamp); + } +``` + 1.4.5: +```java + public boolean compareAndSet(V expectedReference, + V newReference, + int expectedStamp, + int newStamp) { //修改pair的reference,stamp,需要传入原reference,原expectedStamp,新reference,新stamp + Pair current = pair; //获取pair对象 + //规则是:判断传入的原stamp和reference与当前AtomicStampedReference中pair的stamp,reference相同时做操作 + //如果新的reference,stamp和原来相同直接返回true,如果不同调用方法casPair方法做原子操作 + return + expectedReference == current.reference && + expectedStamp == current.stamp && + ((newReference == current.reference && + newStamp == current.stamp) || + casPair(current, Pair.of(newReference, newStamp))); + } +``` + 1.4.6: +```java + public void set(V newReference, int newStamp) { //如果传入的参数和pair属性有一个不同就设置pair的属性为传入的参数 + Pair current = pair; //获取pair对象 + if (newReference != current.reference || newStamp != current.stamp) //传入的参数和pair的属性比较 + this.pair = Pair.of(newReference, newStamp); //重置pair对象的属性 + } +``` + 1.4.7: +```java + public boolean attemptStamp(V expectedReference, int newStamp) { //根据原子对象来修改版本号 + Pair current = pair; //获取pair对象 + return + expectedReference == current.reference && //比较原子对象是否相同,如果相同比较版本号版本号相同返回true,版本号不同进行cas操作修改版本号 + (newStamp == current.stamp || + casPair(current, Pair.of(expectedReference, newStamp))); + } +``` + 1.4.8: +```java + private boolean casPair(Pair cmp, Pair val) { //将pair由原来的cmp改变成val + return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val); + } +``` + 1.4.9: +```java + static long objectFieldOffset(sun.misc.Unsafe UNSAFE, + String field, Class klazz) { //获取值field在当前类中的位置,个人理解就好比定长报文中第好多到好多位表示XX的值 + try { + return UNSAFE.objectFieldOffset(klazz.getDeclaredField(field)); + } catch (NoSuchFieldException e) { + // Convert Exception to corresponding Error + NoSuchFieldError error = new NoSuchFieldError(field); + error.initCause(e); + throw error; + } + } +``` +## 1.5 AtomicStampedReference总结 + 1.8.1: 将需要做原子操作的对象reference和int类型的标志stamp封装到AtomicStampedReference的pair中, + 然后可以用原子的方式进行更新 + + + + + + + + + + + + + + + + + + + + + + diff --git a/week_02/32/LongAdder.md b/week_02/32/LongAdder.md new file mode 100644 index 0000000..cecf1ea --- /dev/null +++ b/week_02/32/LongAdder.md @@ -0,0 +1,222 @@ +# LongAdder源码阅读 + +## 1.1 LongAdder继承了Striped64,来实现累加功能的,它是实现高并发累加的工具类 + +## 1.2 Striped64属性: + 1.2.1: transient volatile Cell[] cells; // Cell对象的数组,长度一般是2的指数 + 1.2.2: static final int NCPU = Runtime.getRuntime().availableProcessors(); // CPU的数量 + 1.2.3: transient volatile long base; // 基础value值,没有遇到并发的情况,只累加该值 + 1.2.4: transient volatile int cellsBusy; // 创建或者扩容Cells数组时使用的自旋锁变量 +## 1.3 Striped64方法: + 1.3.1: +```java + final boolean casBase(long cmp, long val) { //将base进行cas操作 + return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); + } +``` + 1.3.2: +```java + final void longAccumulate(long x, LongBinaryOperator fn, + boolean wasUncontended) { /开始分段更新:1.base更新失败。2.前面分段更新不行或失败。 + int h;//线程hash值 + if ((h = getProbe()) == 0) { // 看下ThreadLocalRandom是否初始化。如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法, + ThreadLocalRandom.current(); //初始化当前线程的PROBE值不为0, + h = getProbe(); + wasUncontended = true; + } + boolean collide = false; // True if last slot nonempty + for (;;) { + Cell[] as; Cell a; int n; long v; //设置局部变量,以免多线程修改 + if ((as = cells) != null && (n = as.length) > 0) { //已经有分段更新了 + if ((a = as[(n - 1) & h]) == null) { //没有这个线程的Cell,新建 + if (cellsBusy == 0) { // cellsBusy=1表示有人在修改Cells数组(修改Cell从null到new Cell,扩容,初始化),CAS更新一个已经存在的Cell不用判断cellsBusy + Cell r = new Cell(x); // 创建一个Cell值为X + if (cellsBusy == 0 && casCellsBusy()) { //cellsBusy是0就进来,然后变成cellsBusy=1,表示我再修改Cells数组 + boolean created = false; + try { // Recheck under lock + Cell[] rs; int m, j; + //// 再次判断没有这个cell, 前面if判断了是空,走到这里时候有可能别人放进去了并且cellsBusy从0变到1再变到0了。如果不是null了,就不放,下次再来(直接更新)。 + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r;// 赋值 + created = true;// 创建完成,退出,不用重新来了。 + } + } finally { + cellsBusy = 0;//释放锁 + } + if (created) // 创建完成,退出 + break; + continue; // Slot is now non-empty + } + } + collide = false; // cell不存在,但是有人修改cells,collide = false + } + else if (!wasUncontended) // wasUncontended=false表示更新失败了,再来,wasUncontended=true下次不进这里直接去cas更新,否则先不cas先再来一次 + wasUncontended = true; // 重新来 + else if (a.cas(v = a.value, ((fn == null) ? v + x : + fn.applyAsLong(v, x))))//这个线程有Cell,去更新。 + break; // 更新成功,退出 + else if (n >= NCPU || cells != as) // CPU能够并行的CAS操作的最大数量是它的核心数 ,cells被改变了(扩容了肯定重新来) + collide = false; // + else if (!collide) + collide = true; + else if (cellsBusy == 0 && casCellsBusy()) { //wasUncontended=true,更新失败,cells没有初始化扩容,collide=true,占用cells,扩容完成,重新来 + //有这个线程的cell,cas失败,说明2个线程同时更新这个cell,就扩容。既然你不让我加,那么扩容试试看 + try { + if (cells == as) { // //锁住cells了,最开始as = cells,但是现在as不一定=cells,所以判断cells没变扩容 + Cell[] rs = new Cell[n << 1]; // 执行2倍扩容 + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + cellsBusy = 0; // 释放锁 + } + collide = false; // 扩容意向为false + continue; // 扩容后还没有设置值(肯定重新来) + } + h = advanceProbe(h); // 修改当前线程的hash,降低hash冲突 + } + else if (cellsBusy == 0 && cells == as && casCellsBusy()) { //// cellsBusy=1,别的线程就不能动cells + boolean init = false; + try { + if (cells == as) { ////锁住cells了,但是cells不一定=as=空或者null了, 锁住之后一定要再检测一次,如果还是null就初始化 + Cell[] rs = new Cell[2]; // 初始化时只创建两个单元 + rs[h & 1] = new Cell(x); // 对其中一个单元进行累积操作,另一个不管,继续为null + cells = rs; + init = true; + } + } finally { + cellsBusy = 0;// 释放锁 + } + if (init) // 初始化成功退出,初始化失败继续来 + break; + } + else if (casBase(v = base, ((fn == null) ? v + x : + fn.applyAsLong(v, x)))) // 更新base,成功就退出。 + break; + } + } +``` + 1.3.3: +```java + final boolean casCellsBusy() { //将cellsBusy进行cas操作 + return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); + } +``` +## 1.4 Striped64: +```java + static final class Cell { /// 一个Cell里面一个value,可以看成是一个简化的AtomicLong,通过cas操作来更新value的值 + volatile long value;// cas更新其值 + Cell(long x) { + value = x; + } + final boolean cas(long cmp, long val) {// cas更新 + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + static { + try { + UNSAFE = sun.misc.Unsafe.getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + } +``` +## 1.5 LongAdder构造方法: + 1.5.1: +```java + public LongAdder() { + } +``` +## 1.6 LongAdder方法: + 1.6.1: +```java + public void add(long x) { //并发计数器LongAdder加X。要么在base+x更新要么在Cell[]数组里面找到对应的Cell+x更新 + Cell[] as; long b, v; int m; Cell a; + if ((as = cells) != null //表明已经启用了分段更新 + || !casBase(b = base, b + x)) { //表示没有启用分段更新但是casBase失败说明出现高并发了,需要启用分段更新 + boolean uncontended = true; + if (as == null //cells=null进去,没有启用分段更新(进来了)表示高并发了。 + || (m = as.length - 1) < 0 ////cells.length<=0,没有启用分段更新(进来了)表示高并发了。 + || (a = as[getProbe() & m]) == null //对as的长度取余,从as中获取这个线程对应的a Cell。=null表示还没有这个线程对应的cell, + || !(uncontended = a.cas(v = a.value, v + x))) //a这个Cell里面的value增加x失败, 更新成功就不会进下面了 + longAccumulate(x, null, uncontended);//uncontended=false表示更新失败了,=true表示没有这个线程的Cell + } + } +``` + 1.6.2: +```java + public long sum() { //将多个cell数组中的值加起来的和,此返回值可能不是绝对准确的 + Cell[] as = cells; Cell a; + long sum = base; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + sum += a.value; + } + } + return sum; + } +``` + 1.6.3: +```java + public void reset() { //获取cell数组中的值的和 + Cell[] as = cells; Cell a; + base = 0L; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) + a.value = 0L; + } + } + } +``` + 1.6.4: +```java + public long sumThenReset() { //计算base和cell数组中的值的和并将它们的值重置为0 + Cell[] as = cells; Cell a; + long sum = base; + base = 0L; + if (as != null) { + for (int i = 0; i < as.length; ++i) { + if ((a = as[i]) != null) { + sum += a.value; + a.value = 0L; + } + } + } + return sum; + } +``` +## 1.7 LongAdder总结 + 1.7.1: 在java1.8中新加入了一个新的原子类LongAdder,该类也可以保证Long类型操作的原子性 + 1.7.2: 并发高时用LongAdder,线程竞争很低的情况下进行计数,使用Atomic还是更简单更直接,并且效率稍微高一些 + 1.7.3: LongAdder是根据ConcurrentHashMap这类为并发设计的类的基本原理——锁分段,来实现的,它里面维护一组按需分配的计数单元,并发计数时,不同的线程可以在不同的计数单元上进行计数,这样减少了线程竞争,提高了并发效率 + + + + + + + + + + + + + + + + + + + + + + diff --git a/week_02/32/Unsafe.md b/week_02/32/Unsafe.md new file mode 100644 index 0000000..0970558 --- /dev/null +++ b/week_02/32/Unsafe.md @@ -0,0 +1,172 @@ +# Unsafe学习 + +## 1.1 Unsafe: + 1.1.1: Unsafe是final修饰,我们不能继承它 + 1.1.2: Unsafe的构造方法用private修饰,我们不能通过new创建Unsafe实例 + 1.1.3: Unsafe类使用了单例模式,需要通过一个静态方法getUnsafe()来获取 + 1.1.4: Unsafe提供了getUnsafe() 获取Unsafe实例,但是直接调用这个方法会抛出一个SecurityException异常,因为Unsafe仅供java内部类使用,外部类不应该使用它,但是我们可以通过反射获取Unsafe实例 + + +## 1.2 Unsafe内存管理: + 1.2.1: public native long allocateMemory(long var1); // 分配内存 + 1.2.2: public native long reallocateMemory(long var1, long var3); // 重新分配内存 + 1.2.3: public native void setMemory(long var1, long var3, byte var5); // 内存初始化 + 1.2.4: public native void copyMemory(Object var1, long var2, Object var4, long var5, long var7); // 内存复制 + 1.2.5: public native void freeMemory(long var1); // 清除内存 +## 1.3 Unsafe实例一个类并修改对象属性: +```java +public class Test { + + public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe) f.get(null); //发射获取Unsafe + + //通过unsafe给TempTest创建一个内存 + TempTest t = (TempTest) unsafe.allocateInstance(TempTest.class); + //输出:null----------------0;说明没有调用构造方法 + System.out.println(t.getStr() + "----------------" + t.getNumber()); + //通过unsafe的putXX()方法可以修改字段值 + Field str = t.getClass().getDeclaredField("str"); + Field number = t.getClass().getDeclaredField("number"); + unsafe.putObject(t, unsafe.objectFieldOffset(str), "这是字符串"); //修改str字段值 + //输出:这是字符串----------------0 + System.out.println(t.getStr() + "----------------" + t.getNumber()); + unsafe.putInt(t, unsafe.objectFieldOffset(number), 111); //修改number字段值 + //输出:这是字符串----------------111 + System.out.println(t.getStr() + "----------------" + t.getNumber()); + } +} + +class TempTest { + private String str; + private int number; + + public TempTest() { + this.str = "aa"; + this.number = 1; + } + + public String getStr() { + return str; + } + + public int getNumber(){ + return number; + } +} +``` +## 1.4 Unsafe的CAS操作: +### compareAndSwapObject(Object arg0, long arg1, Object arg2, Object arg3);//对象,对象的字段,字段的原来值,更新后的值 +### compareAndSwapInt(Object arg0, long arg1, int arg2, int arg3);//对象,对象的字段,字段的原来值,更新后的值 +### compareAndSwapLong(Object arg0, long arg1, long arg2, long arg3);//对象,对象的字段,字段的原来值,更新后的值 +### 执行 CAS 操作的时候,将内存位置的值与预期原值比较,如果相匹配,那么处理器会自动将该位置值更新为新值,否则,处理器不做任何操作 + +```java +package Test; + + + +import java.lang.reflect.Field; + +import sun.misc.Unsafe; + + + + + +public class Test { + + public static void main(String[] args) throws NoSuchFieldException, IllegalAccessException, InstantiationException { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + Unsafe unsafe = (Unsafe) f.get(null); //发射获取Unsafe + + //通过unsafe给TempTest创建一个内存 + TempTest t = (TempTest) unsafe.allocateInstance(TempTest.class); + //通过unsafe的putXX()方法可以修改字段值 + Field str = t.getClass().getDeclaredField("str"); + Field number = t.getClass().getDeclaredField("number"); + unsafe.putObject(t, unsafe.objectFieldOffset(str), "oldStr"); //修改str字段值 + unsafe.putInt(t, unsafe.objectFieldOffset(number), 111); //修改number字段值 + //输出:oldStr----------------111 + System.out.println(t.getStr() + "----------------" + t.getNumber()); + + + unsafe.compareAndSwapObject(t, unsafe.objectFieldOffset(str), "abc", "aaa");//更新失败abc != oldStr + unsafe.compareAndSwapInt(t, unsafe.objectFieldOffset(number), 222, 333);//更新失败111 != 222 + //输出:oldStr----------------111 + System.out.println(t.getStr() + "----------------" + t.getNumber()); + + unsafe.compareAndSwapObject(t, unsafe.objectFieldOffset(str), "oldStr", "aaa");//更新成功 + unsafe.compareAndSwapInt(t, unsafe.objectFieldOffset(number), 111, 333);//更新成功 + //输出:aaa----------------333 + System.out.println(t.getStr() + "----------------" + t.getNumber()); + } +} + +class TempTest { + private String str; + private int number; + + public TempTest() { + this.str = "aa"; + this.number = 1; + } + + public String getStr() { + return str; + } + + public int getNumber(){ + return number; + } +} + +``` +## 1.5 Unsafe的线程调度: + 在LockSupport中通过Unsafe来唤醒和挂起 +```java +// 挂起线程 +public static void park(Object blocker) { + Thread t = Thread.currentThread(); + setBlocker(t, blocker); // 通过Unsafe的putObject方法设置阻塞阻塞当前线程的blocker + UNSAFE.park(false, 0L); // 通过Unsafe的park方法来阻塞当前线程,注意此方法将当前线程阻塞后,当前线程就不会继续往下走了,直到其他线程unpark此线程 + setBlocker(t, null); // 清除blocker +} + +// 唤醒线程 +public static void unpark(Thread thread) { + if (thread != null) + UNSAFE.unpark(thread); +} +``` + +## 1.6 Unsafe总结 + 1.6.1: 通过Unsafe可以申请和释放内存 + 1.6.2: 通过Unsafe可以实例一个对象 + 1.6.3: 通过Unsafe可以修改对象的私有字段的值 + 1.6.4: 通过Unsafe可以CAS操作 + 1.6.5: 通过Unsafe可以唤醒和挂起线程 + + + + + + + + + + + + + + + + + + + + + + -- Gitee