From 39eb0ff331eadd2f06f75ace6d6d2909a751fb6d Mon Sep 17 00:00:00 2001 From: qjwxpz Date: Fri, 10 Jan 2020 23:55:27 +0800 Subject: [PATCH] 032-week-05 --- week_05/32/Executors.md | 82 ++++++++ week_05/32/Thread.md | 351 +++++++++++++++++++++++++++++++ week_05/32/ThreadLocal.md | 304 ++++++++++++++++++++++++++ week_05/32/ThreadPoolExecutor.md | 328 +++++++++++++++++++++++++++++ week_05/32/test1115-1116.md | 210 ++++++++++++++++++ 5 files changed, 1275 insertions(+) create mode 100644 week_05/32/Executors.md create mode 100644 week_05/32/Thread.md create mode 100644 week_05/32/ThreadLocal.md create mode 100644 week_05/32/ThreadPoolExecutor.md create mode 100644 week_05/32/test1115-1116.md diff --git a/week_05/32/Executors.md b/week_05/32/Executors.md new file mode 100644 index 0000000..51ae159 --- /dev/null +++ b/week_05/32/Executors.md @@ -0,0 +1,82 @@ +## Executors源码阅读 + +### 1.1 Executors: + 1.1.1:Executors是一个快速创建线程池的工具类。 + 1.1.2:Executors主要提供很多newXXX()来创建线程池, + +### 1.2 Executors方法: +```java + + /** + *实际创建线程池 + */ + ThreadPoolExecutor(int corePoolSize, //核心线程大小,标示线程此维护线程的最少数量 + int maximumPoolSize, //表示线程池维护线程的最大数量 + long keepAliveTime, //线程池中允许空闲时间 + TimeUnit unit, //空闲时间的单位 + BlockingQueue workQueue, //阻塞队列,表示如果任务数量超过核心池大小,多余的任务添加到阻塞队列中 + ThreadFactory threadFactory, //线程工厂,用来创建线程 + RejectedExecutionHandler handler) //线程池对拒绝任务的处理策略 + + public static ExecutorService newFixedThreadPool(int nThreads) { //创建一个nThreads个线程的线程池 + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); + } + + public static ExecutorService newWorkStealingPool(int parallelism) { //创建一个并发parallelism个线程数量的具有抢占式操作的线程池 + return new ForkJoinPool + (parallelism, + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); + } + + public static ExecutorService newWorkStealingPool() { //创建一个具有抢占式操作的线程池 + return new ForkJoinPool + (Runtime.getRuntime().availableProcessors(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); + } + + public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { //创建一个nThreads个线程的,threadFactory的线程工厂的线程池 + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); + } + + public static ExecutorService newSingleThreadExecutor() { //创建 一个单线程的线程池 + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } + + public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { //创建 一个单线程的线程池 + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory)); + } + + public static ExecutorService newCachedThreadPool() { //创建一个可缓存的线程池 + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); + } + + public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { //创建一个可缓存的线程池 + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); + } + + public static Callable callable(Runnable task, T result) { //将Runnable装饰为Callabl + if (task == null) + throw new NullPointerException(); + return new RunnableAdapter(task, result); + } + +``` \ No newline at end of file diff --git a/week_05/32/Thread.md b/week_05/32/Thread.md new file mode 100644 index 0000000..cba637b --- /dev/null +++ b/week_05/32/Thread.md @@ -0,0 +1,351 @@ +## Thread源码学习 + +### 1.1 Thread属性: +```java + private volatile String name; //线程名称 + + private int priority; //优先级 + + private Thread threadQ; + + private long eetop; + + private boolean single_step; //是否单步执行 + + private boolean daemon = false; //是否守护线程 + + private boolean stillborn = false; //虚拟机状态 + + private Runnable target; //将会被执行的runnable + + private ThreadGroup group; //线程组 + + private ClassLoader contextClassLoader; //线程的上下文 + + private AccessControlContext inheritedAccessControlContext; //继承的请求控制 + + private static int threadInitNumber; //默认线程的自动编号 + + ThreadLocal.ThreadLocalMap threadLocals = null; //当前线程的附属ThreadLocal + + ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; + + private long stackSize; //该现场请求的堆栈大小 + + private long nativeParkEventPointer; + + private long tid; //线程专属ID + + private static long threadSeqNumber; //线程流水号 + + private volatile int threadStatus = 0; //线程状态标识 + + volatile Object parkBlocker; //中断阻塞器 + + private volatile Interruptible blocker; //阻塞器锁,处理阻塞情况 + + private final Object blockerLock = new Object(); + + public final static int MIN_PRIORITY = 1; //最小优先级 + + public final static int NORM_PRIORITY = 5; //第二优先级 + + public final static int MAX_PRIORITY = 10; //最高优先级 + + + public enum State { //线程状态枚举 + + NEW, //创建后未分配资源的状态,没有start的线程 + + //可运行,调用start()方法,线程进入Runnable状态, + //此时该线程可能在运行,可能没运行,看操作系统给线程提供的运行的时间 + RUNNABLE, + + /** + *被阻塞,当一个线程获取一个内部的对象锁,而这个锁被其他线程持有,则该线程进入阻塞状态。当所有线程释放掉 + *该锁,且线程调度器允许该线程持有这个对象锁时,该线程进入到非阻塞状态。 + */ + BLOCKED, + + WAITING, //等待,等待被唤醒 + + TIMED_WAITING, //时间等待,等待时间够了就起来 + + TERMINATED; //被终止 + } + +``` + +### 1.2 Thread构造器: +```java + + private void init(ThreadGroup g, Runnable target, String name, + long stackSize) { //实际调用下面init方法创建 + init(g, target, name, stackSize, null, true); + } + + private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { //实际创建线程方法 + if (name == null) { //线程名非空效验 + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + + Thread parent = currentThread(); //获取当前正在执行的线程对象的引用 + SecurityManager security = System.getSecurityManager(); + if (g == null) { //获取线程组 + if (security != null) { //从securityManager拿线程组 + g = security.getThreadGroup(); + } + + if (g == null) { //如果从securityManager没有拿到线程组,就从当前线程拿 + g = parent.getThreadGroup(); + } + } + + g.checkAccess(); //检查是否可获取 + + if (security != null) { //权限控制检查 + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) //从服现场继承可继承的threadLocal + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + this.stackSize = stackSize; + + tid = nextThreadID(); //设置线程ID + } + + public Thread() { //全默认 + init(null, null, "Thread-" + nextThreadNum(), 0); + } + + public Thread(Runnable target) { //传入runnable实现 + init(null, target, "Thread-" + nextThreadNum(), 0); + } + + Thread(Runnable target, AccessControlContext acc) { //传入runnable和上下文加载器 + init(null, target, "Thread-" + nextThreadNum(), 0, acc, false); + } + + public Thread(ThreadGroup group, Runnable target) { //传入线程组和runnable + init(group, target, "Thread-" + nextThreadNum(), 0); + } + + public Thread(String name) { //传入线程名称 + init(null, null, name, 0); + } + + public Thread(ThreadGroup group, String name) { //传入线程组和线程名称 + init(group, null, name, 0); + } + + public Thread(Runnable target, String name) { //传入runnable和线程名称 + init(null, target, name, 0); + } + + public Thread(ThreadGroup group, Runnable target, String name) { //传入线程组和runnable和线程名称 + init(group, target, name, 0); + } + + public Thread(ThreadGroup group, Runnable target, String name, + long stackSize) { //传入线程组,runnable,线程名称,给线程分配的栈深度 + init(group, target, name, stackSize); + } +``` +### 1.3 Thread重要方法: +```java + public synchronized void start() { //线程启动 + if (threadStatus != 0) //如果线程状态不是new状态,抛异常 + throw new IllegalThreadStateException(); + + group.add(this); //把此线程加入线程组 + + boolean started = false; // + try { + start0(); //启动 + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } + } + + public void run() { //Runnable接口的实现方法 + if (target != null) { + target.run(); + } + } + + private void exit() { //用于Run方法结束后结束线程的 + if (group != null) { + group.threadTerminated(this); //通知线程组这个线程已经停止 + group = null; + } + target = null; + threadLocals = null; + inheritableThreadLocals = null; + inheritedAccessControlContext = null; + blocker = null; + uncaughtExceptionHandler = null; + } + + public void interrupt() { //中断当前线程 + if (this != Thread.currentThread()) + checkAccess(); + + synchronized (blockerLock) { //对阻塞锁使用同步机制 + Interruptible b = blocker; + if (b != null) { + interrupt0(); //只是为了设定中断标识位 + b.interrupt(this); //在好哦功能段当前线程 + return; + } + } + interrupt0(); + } + + public final void suspend() {//将一个线程挂起 + checkAccess(); + suspend0(); + } + + public final void resume() { //将一个悬挂的线程回复 + checkAccess(); + resume0(); + } + + public final void setPriority(int newPriority) { //设置线程优先级 + ThreadGroup g; + checkAccess(); + if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) { + throw new IllegalArgumentException(); + } + if((g = getThreadGroup()) != null) { //此线程必须有线程组 + if (newPriority > g.getMaxPriority()) { + newPriority = g.getMaxPriority(); + } + setPriority0(priority = newPriority); + } + } + + public final int getPriority() { //获取线程的优先级 + return priority; + } + + public final synchronized void setName(String name) { //设置线程名称 + checkAccess(); + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + if (threadStatus != 0) { + setNativeName(name); + } + } + + public final synchronized void join(long millis) + throws InterruptedException { //最多等待millis(ms)时长当前线程就会死亡,如果millis为0则一直等待 + long base = System.currentTimeMillis(); + long now = 0; + + if (millis < 0) { //如果等待时间小于0,抛异常 + throw new IllegalArgumentException("timeout value is negative"); + } + + if (millis == 0) { //如果等待时间为0,一直等待 + while (isAlive()) { + wait(0); + } + } else { + while (isAlive()) { + long delay = millis - now; + if (delay <= 0) { + break; + } + wait(delay); + now = System.currentTimeMillis() - base; + } + } + } + + public StackTraceElement[] getStackTrace() { //获取堆栈中的元素数组 + if (this != Thread.currentThread()) { //判断此线程是否当前线程 + SecurityManager security = System.getSecurityManager(); //检查getStackTrace许可情况 + if (security != null) { //如果安全管理器不为空,就检查 + security.checkPermission( + SecurityConstants.GET_STACK_TRACE_PERMISSION); + } + if (!isAlive()) { + return EMPTY_STACK_TRACE; + } + StackTraceElement[][] stackTraceArray = dumpThreads(new Thread[] {this}); + StackTraceElement[] stackTrace = stackTraceArray[0]; + if (stackTrace == null) { //在前面的isAlive检查中1各活着的线程可能已经终结,所以此时不会在有栈跟踪 + stackTrace = EMPTY_STACK_TRACE; + } + return stackTrace; + } else { + //此处不需要JVM的帮忙 + return (new Exception()).getStackTrace(); + } + } + + private static class Caches { //缓存 + static final ConcurrentMap subclassAudits = + new ConcurrentHashMap<>(); //子类安全审核结果的缓存值 + + + static final ReferenceQueue> subclassAuditsQueue = + new ReferenceQueue<>(); //审核子类的弱引用队列 + } + + public static native Thread currentThread();//获取当前正在执行线程对象的引用 + + public static native void yield();// + + public static native void sleep(long millis) throws InterruptedException; //让线程睡眠,交出CPU,但不释放当前持有的锁 + + public static void sleep(long millis, int nanos) + throws InterruptedException { //和上面一样 + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (nanos < 0 || nanos > 999999) { + throw new IllegalArgumentException( + "nanosecond timeout value out of range"); + } + + if (nanos >= 500000 || (nanos != 0 && millis == 0)) { + millis++; + } + + sleep(millis); + } +``` \ No newline at end of file diff --git a/week_05/32/ThreadLocal.md b/week_05/32/ThreadLocal.md new file mode 100644 index 0000000..539860b --- /dev/null +++ b/week_05/32/ThreadLocal.md @@ -0,0 +1,304 @@ +## ThreadLocal源码阅读 + +### 1.1 ThreadLocal的内部类ThreadLocalMap: + + 1.1.1 ThreadLocalMap源码: +```java + static class ThreadLocalMap { //储存线程所有的threadlocal和value对应关系 + + static class Entry extends WeakReference> { + Object value; //与threadlocal关联的值 + Entry(ThreadLocal k, Object v) { + super(k); + value = v; + } + } + + private static final int INITIAL_CAPACITY = 16; //初始容量 + + private Entry[] table; //存放元素的数组Entry + + private int size = 0; //数组中元素个数 + + private int threshold; //进行扩容的阀值 + + private void setThreshold(int len) { //定义扩容的阀值多少 + threshold = len * 2 / 3; + } + + private static int nextIndex(int i, int len) { //增量i根据len取模 + return ((i + 1 < len) ? i + 1 : 0); //i+1小于len返回i+1,否则返回0 + } + + private static int prevIndex(int i, int len) { + return ((i - 1 >= 0) ? i - 1 : len - 1); //i-1不小于len返回i-1,否则返回len-1 + } + + ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { //构造器--如果原inheritableThreadLocals为空才使用这个构造器 + table = new Entry[INITIAL_CAPACITY]; //创建一个INITIAL_CAPACITY大小的ENTRY数组 + int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); + table[i] = new Entry(firstKey, firstValue); //创建一个Entry然后放入数组中 + size = 1; //设置数组大小1 + setThreshold(INITIAL_CAPACITY); //设置重新调整大小的阀值 + } + + private ThreadLocalMap(ThreadLocalMap parentMap) { //构造器--构造一个新的threadLocalMap包含父类中有效的Entry + Entry[] parentTable = parentMap.table; //获取父类中数组 + int len = parentTable.length; //获取父类中数组的长度 + setThreshold(len); //以父类数组长度设置阀值 + table = new Entry[len]; //创建一个新的Entry数组 + + for (int j = 0; j < len; j++) { //循环将父类中有效的Entry放入新Entry数组中 + Entry e = parentTable[j]; + if (e != null) { + @SuppressWarnings("unchecked") + ThreadLocal key = (ThreadLocal) e.get(); + if (key != null) { + Object value = key.childValue(e.value); + Entry c = new Entry(key, value); + int h = key.threadLocalHashCode & (len - 1); + while (table[h] != null) //把entry放入数组时判断计算出的位置的值在数组table中是否已经被其他entry占用,如果占用计算下一个位置的值 + h = nextIndex(h, len); + table[h] = c; + size++; + } + } + } + } + + private Entry getEntry(ThreadLocal key) { //根据key获取关联的Entry + int i = key.threadLocalHashCode & (table.length - 1); //计算index + Entry e = table[i]; //从数组中获取Entry + if (e != null && e.get() == key) //如果Entry不为空,并且Entry中的key和key相等则返回e + return e; + else //可能e中保存的key和参数不同导致进入下面这个方法获取Entry + return getEntryAfterMiss(key, i, e); + } + + private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { //根据key,key计算出的index,Entry获取Entry + Entry[] tab = table; //获取数组 + int len = tab.length; //获取数组对象 + + while (e != null) { //如果传入参数不为空,就去遍历查询Entry + ThreadLocal k = e.get(); //获取Entry中关联的key + if (k == key) //判断从e中获取的key和参数的key是否相等 + return e; + if (k == null) + expungeStaleEntry(i); //清除过时Entry + else + i = nextIndex(i, len); //计算下一个index然后用该index获取数组中的元素再重复上述判断 + e = tab[i]; + } + return null; + } + + private void set(ThreadLocal key, Object value) { //将key对应的value放入ThreadLocalMap中 + + Entry[] tab = table; //获取数组 + int len = tab.length; //获取数组长度 + int i = key.threadLocalHashCode & (len-1); //计算threadLocal的值用来作为数组下标 + + for (Entry e = tab[i]; + e != null; + e = tab[i = nextIndex(i, len)]) { //根据计算的i来循环判断是否在数组中有相同的key + ThreadLocal k = e.get(); + + if (k == key) { //如果key相同,则替换value + e.value = value; + return; + } + + if (k == null) { //说明这个Entry失效 + replaceStaleEntry(key, value, i); //把这个Entry替换成新的Entry + return; + } + } + + tab[i] = new Entry(key, value); //到这里标示计算出的i在tab中这个位置没有元素,然后将新的Entry放入这个位置 + int sz = ++size; //size自增 + if (!cleanSomeSlots(i, sz) && sz >= threshold) //如果table中已经不存在过时的Entry,并且Entry的个数已经大于阀值,就进行扩容 + rehash(); //扩容 + } + + private void remove(ThreadLocal key) { //通过threadLoca删除Entry + Entry[] tab = table; + int len = tab.length; + int i = key.threadLocalHashCode & (len-1); + for (Entry e = tab[i]; + e != null; + e = tab[i = nextIndex(i, len)]) { + if (e.get() == key) { + e.clear(); + expungeStaleEntry(i); + return; + } + } + } + + private void replaceStaleEntry(ThreadLocal key, Object value, + int staleSlot) { //将过时的Entry替换为新的以key的Entry + Entry[] tab = table; + int len = tab.length; + Entry e; + + for (int i = prevIndex(staleSlot, len); + (e = tab[i]) != null; + i = prevIndex(i, len)) //循环寻找上一个Entry的key为空的 + if (e.get() == null) //标示Entry失效 + slotToExpunge = i; + + for (int i = nextIndex(staleSlot, len); + (e = tab[i]) != null; + i = nextIndex(i, len)) { //循环寻找下一个Entry的key为空的 + ThreadLocal k = e.get(); + + if (k == key) { //如果key相同,则表明参数中key的Entry已经在数组中 + e.value = value; + + tab[i] = tab[staleSlot]; + tab[staleSlot] = e; + + if (slotToExpunge == staleSlot) //如果要被清除的位置与过时的位置相等 + slotToExpunge = i; //过时的位置替换为新Entry的位置 + cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); + return; + } + + if (k == null && slotToExpunge == staleSlot) + slotToExpunge = i; + } + //如果key找不到,放入一个新的Entry在过时位置 + tab[staleSlot].value = null; + tab[staleSlot] = new Entry(key, value); + + if (slotToExpunge != staleSlot) //如果这里有任何其他过时的Entry在run中,清除他们 + cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); + } + + private int expungeStaleEntry(int staleSlot) { //清楚失效的Entry,并返回下一个空位置的索引 + Entry[] tab = table; + int len = tab.length; + + //清楚过时Entry + tab[staleSlot].value = null; + tab[staleSlot] = null; + size--; + + Entry e; + int i; + for (i = nextIndex(staleSlot, len); + (e = tab[i]) != null; + i = nextIndex(i, len)) { //循环数组下一个位置对象 + ThreadLocal k = e.get(); + if (k == null) { //如果下一个还是失效,计算清楚 + e.value = null; + tab[i] = null; + size--; + } else { //没失效 + int h = k.threadLocalHashCode & (len - 1); //计算这个key的hash + if (h != i) { //如果旧的和新的不一样 + tab[i] = null; //把旧的存在的Entry清空 + + while (tab[h] != null) + h = nextIndex(h, len); + tab[h] = e; + } + } + } + return i; + } + + private boolean cleanSomeSlots(int i, int n) { //清除过时的Entry + boolean removed = false; + Entry[] tab = table; + int len = tab.length; + do { + i = nextIndex(i, len); + Entry e = tab[i]; + if (e != null && e.get() == null) { + n = len; + removed = true; + i = expungeStaleEntry(i); + } + } while ( (n >>>= 1) != 0); + return removed; + } + + private void rehash() { //调整数组 + expungeStaleEntries(); + + if (size >= threshold - threshold / 4) + resize(); + } + + private void resize() { //扩容 + Entry[] oldTab = table; //获取旧的数组 + int oldLen = oldTab.length; //获取旧的数组的长度 + int newLen = oldLen * 2; + Entry[] newTab = new Entry[newLen]; //创建一个新的数组长度为旧的2倍 + int count = 0; + + for (int j = 0; j < oldLen; ++j) { //循环将旧的Entry移动到新的数组中 + Entry e = oldTab[j]; + if (e != null) { + ThreadLocal k = e.get(); + if (k == null) { //如果Entry失效就不移动到新数组中 + e.value = null; // Help the GC + } else { + int h = k.threadLocalHashCode & (newLen - 1); + while (newTab[h] != null) + h = nextIndex(h, newLen); + newTab[h] = e; + count++; + } + } + } + + setThreshold(newLen); + size = count; + table = newTab; + } + + private void expungeStaleEntries() { //移除数组中所有失效的Entry + Entry[] tab = table; + int len = tab.length; + for (int j = 0; j < len; j++) { + Entry e = tab[j]; + if (e != null && e.get() == null) + expungeStaleEntry(j); + } + } + } +``` + +### 1.2 ThreadLocal的重要方法: +```java + public T get() { //获取当前线程保存在ThreadLocalMap中的val + Thread t = Thread.currentThread(); //获取当前线程 + ThreadLocalMap map = getMap(t); //获取当前线程的ThreadLocalMap + if (map != null) { //如果map为空,就创建一个ThreadLocalMap + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + return setInitialValue(); + } + + public void set(T value) { //将当前线程对应一个value放入ThreadLocalMap中 + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); + } + + public void remove() { //将ThreadLocalMap中当前线程的数据删除 + ThreadLocalMap m = getMap(Thread.currentThread()); + if (m != null) + m.remove(this); + } +``` \ No newline at end of file diff --git a/week_05/32/ThreadPoolExecutor.md b/week_05/32/ThreadPoolExecutor.md new file mode 100644 index 0000000..a65569b --- /dev/null +++ b/week_05/32/ThreadPoolExecutor.md @@ -0,0 +1,328 @@ +## ThreadPoolExecutor源码阅读 + +### 1.1 ThreadPoolExecutor属性: +```java + /**运行状态标志位*/ + private static final int RUNNING = -1 << COUNT_BITS; + private static final int SHUTDOWN = 0 << COUNT_BITS; + private static final int STOP = 1 << COUNT_BITS; + private static final int TIDYING = 2 << COUNT_BITS; + private static final int TERMINATED = 3 << COUNT_BITS; + + //线程缓冲队列,当线程池线程运行超过一定线程时并满足一定的条件,待运行的线程会放入到这个队列 + private final BlockingQueue workQueue; + //重入锁,更新核心线程池大小,最大线程池大小时要加锁 + private final ReentrantLock mainLock = new ReentrantLock(); + //重入锁状态 + private final Condition termination = mainLock.newCondition(); + //工作任务set集合 + private final HashSet workers = new HashSet(); + //用来记录线程池中曾经出现过的最大线程数 + private int largestPoolSize; + //用来记录已经执行完毕的任务个数 + private long completedTaskCount; + //线程工厂,用来创建线程 + private volatile ThreadFactory threadFactory; + //当缓冲队列也放不下线程时的拒绝策略 + private volatile RejectedExecutionHandler handler; + //线程执行完成后空闲时间 + private volatile long keepAliveTime; + //核心线程池大小 + private volatile int corePoolSize; + //最大线程池大小 + private volatile int maximumPoolSize; + +``` + +### 1.2 ThreadPoolExecutor内部类: +```java + private final class Worker + extends AbstractQueuedSynchronizer + implements Runnable + { + /** + * This class will never be serialized, but we provide a + * serialVersionUID to suppress a javac warning. + */ + private static final long serialVersionUID = 6138294804551838833L; + + final Thread thread; //工作线程 + + Runnable firstTask; //提交任务 + + volatile long completedTasks; // + + /** + * Creates with given first task and thread from ThreadFactory. + * @param firstTask the first task (null if none) + */ + Worker(Runnable firstTask) { + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + this.thread = getThreadFactory().newThread(this); //每次创建一个线程处理任务 + } + + public void run() { + runWorker(this); //执行委托给runWorker的方法 + } + + protected boolean isHeldExclusively() { + return getState() != 0; + } + + protected boolean tryAcquire(int unused) { + if (compareAndSetState(0, 1)) { + setExclusiveOwnerThread(Thread.currentThread()); + return true; + } + return false; + } + + protected boolean tryRelease(int unused) { + setExclusiveOwnerThread(null); + setState(0); + return true; + } + + public void lock() { acquire(1); } + public boolean tryLock() { return tryAcquire(1); } + public void unlock() { release(1); } + public boolean isLocked() { return isHeldExclusively(); } + + void interruptIfStarted() { + Thread t; + if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } + } + } + } + +``` + +### 1.3 ThreadPoolExecutor构造器: +```java + + public ThreadPoolExecutor(int corePoolSize, //核心线程数 + int maximumPoolSize, //最大线程数 + long keepAliveTime, //线程空闲时间 + TimeUnit unit, //空闲时间的单位 + BlockingQueue workQueue, //任务队列 + RejectedExecutionHandler handler) { //拒绝策略 + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), handler); //创建一个线程池,线程工厂用默认的,其他的参数由创建时提供 + } + + public ThreadPoolExecutor(int corePoolSize, //核心线程数 + int maximumPoolSize, //最大线程数 + long keepAliveTime, //线程空闲时间 + TimeUnit unit, //空闲时间的单位 + BlockingQueue workQueue, //任务队列 + ThreadFactory threadFactory) { //线程工厂 + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + threadFactory, defaultHandler); //创建一个线程池,拒绝策略用默认的,其他的参数由创建时提供 + } + public ThreadPoolExecutor(int corePoolSize, //核心线程数 + int maximumPoolSize, //最大线程数 + long keepAliveTime, //线程空闲时间 + TimeUnit unit, //空闲时间的单位 + BlockingQueue workQueue) { //任务队列 + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); //创建一个线程池,拒绝策略和线程工厂用默认的,其他的参数由创建时提供 + } + + //上面构造器最后都调用这个方法,这个方法就是设置各个参数,并且效验参数是否正确 + public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) //参数合法效验 + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) //参数非空判断 + throw new NullPointerException(); + //下面进行赋值操作 + this.acc = System.getSecurityManager() == null ? + null : + AccessController.getContext(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; + } +``` +### 1.4 ThreadPoolExecutor方法: +```java + + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + + int c = ctl.get(); + if (workerCountOf(c) < corePoolSize) { //线程数量小于corePoolSize,直接创建线程执行 + if (addWorker(command, true)) + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) { //任务加入队列中去 + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) //非运行状态,拒绝任务 + reject(command); + else if (workerCountOf(recheck) == 0) //线程池识running状态,但是没有线程,则创建线程 + addWorker(null, false); + } + else if (!addWorker(command, false)) //创建线程执行失败,丢弃任务 + reject(command); + } + + private boolean addWorker(Runnable firstTask, boolean core) { //创建一个worker线程 + retry: + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + if (rs >= SHUTDOWN && + ! (rs == SHUTDOWN && + firstTask == null && + ! workQueue.isEmpty())) + return false; + + for (;;) { + int wc = workerCountOf(c); + //检查工作线程数,如果大于线程池最大上线CAPACITY,或者更边界值比较已经达到边界值都返回false + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + if (compareAndIncrementWorkerCount(c)) //cas更新增加工作线程数量 + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + w = new Worker(firstTask); //创建worker + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + //再次检查线程池状态 + int rs = runStateOf(ctl.get()); + //如果识运行态直接执行,或如果是shutdown状态 但是传进来是个null,即移除队列失败情况 + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + if (t.isAlive()) //检查这个对象是否被其他线程执行过 + throw new IllegalThreadStateException(); + workers.add(w); //加入工作对象中 + int s = workers.size(); + if (s > largestPoolSize) //如果大于曾经执行过的最大线程数则最大线程数加1 + largestPoolSize = s; + workerAdded = true; + } + } finally { + mainLock.unlock(); + } + if (workerAdded) { + t.start(); //启动线程执行,调用worker中的run方法 + workerStarted = true; + } + } + } finally { + if (! workerStarted) //如果启动失败从workers中移除 + addWorkerFailed(w); + } + return workerStarted; + } + + final void runWorker(Worker w) { //循环从队列中获取任务并执行 + Thread wt = Thread.currentThread(); //当前线程 + Runnable task = w.firstTask; //待处理任务 + w.firstTask = null; + w.unlock(); //将state置为0 + boolean completedAbruptly = true; + try { + while (task != null || (task = getTask()) != null) { //先处理task,然后不停的获取队列中的任务 + w.lock(); //加锁保证调用中断后运行的任务可以正常完成 + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) //判断线程池是不是运行状态 + wt.interrupt(); //尝试终止正在执行的任务 + try { + beforeExecute(wt, task); + Throwable thrown = null; + try { + task.run(); //执行任务,在当前线程中执行 + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + } finally { + afterExecute(task, thrown); + } + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + processWorkerExit(w, completedAbruptly); + } + } + + private Runnable getTask() { //获取工作任务 + boolean timedOut = false; // Did the last poll() time out? + + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + + if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { + decrementWorkerCount(); //减少工作线程数 + return null; + } + + int wc = workerCountOf(c); + + boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; + + if ((wc > maximumPoolSize || (timed && timedOut)) + && (wc > 1 || workQueue.isEmpty())) { + if (compareAndDecrementWorkerCount(c)) //工作线程-1 + return null; + continue; + } + + try { + Runnable r = timed ? + workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //没有任务会挂起指定时间 + workQueue.take();//没有任务会阻塞知道有任务来 + if (r != null) + return r; + timedOut = true; + } catch (InterruptedException retry) { + timedOut = false; + } + } + } +``` diff --git a/week_05/32/test1115-1116.md b/week_05/32/test1115-1116.md new file mode 100644 index 0000000..620ec3e --- /dev/null +++ b/week_05/32/test1115-1116.md @@ -0,0 +1,210 @@ +package test; + +/*** + * 1115题 + * @author DEV031698 + * + */ +public class Test20200109 { + + public static void main(String[] args) { + final FooBar foobar = new Test20200109.FooBar(3); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + try { + foobar.foo(this); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + Thread t2 = new Thread(new Runnable() { + + @Override + public void run() { + try { + foobar.bar(this); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + t2.start(); + try { + System.out.println("等待10秒"); + Thread.sleep(10000); + } catch (Exception e) { + } + t1.start(); + } + + + static class FooBar{ + int n = 0; + public volatile boolean flag = true; + private Object obj = new Object(); + public FooBar(int num){ + n = num; + } + public void foo(Runnable foo) throws InterruptedException{ + for(int i = 0;i < n ;i++){ + synchronized (obj) { + if(!flag) { + obj.wait(); + } + System.out.print("foo"); + flag = false; + obj.notify(); + } + } + } + + public void bar(Runnable bar)throws InterruptedException{ + for(int i = 0;i < n ;i++){ + synchronized (obj) { + if(flag) { + obj.wait(); + } + System.out.print("bar"); + flag = true; + obj.notify(); + } + } + } + } +} + + + +package test; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/*** + * 1116题 + * @author DEV031698 + * + */ +public class Test20200110 { + + class ZeroEvenOdd{ + int startNum = 1; + int endNum; + boolean flag = true; + private Lock lock = new ReentrantLock(); + Condition c0 = lock.newCondition(); + Condition c1 = lock.newCondition(); + Condition c2 = lock.newCondition(); + + + public ZeroEvenOdd(int num){ + this.endNum = num; + } + public void zero(){ + for(;startNum < endNum ;){ + try { + lock.lock(); + while (!flag) { + c0.await(); + } + System.out.println(0); + flag = false; + if(startNum % 2 == 1){ + c1.signal(); + }else{ + c2.signal(); + } + } catch (Exception e) { + + }finally{ + lock.unlock(); + } + } + } + + public void even(){ + for(;startNum < endNum ;){ + try { + lock.lock(); + while ( startNum %2 == 0 || flag) { + c1.await(); + } + System.out.println(startNum); + startNum++; + flag = true; + c0.signal(); + } catch (Exception e) { + + }finally{ + lock.unlock(); + } + } + } + + public void odd(){ + for(;startNum < endNum ;){ + try { + lock.lock(); + while (startNum % 2 == 1) { + c2.await(); + } + System.out.println(startNum); + startNum++; + flag = true; + c0.signal(); + } catch (Exception e) { + + }finally{ + lock.unlock(); + } + } + } + } + + public static void main(String[] args){ + final ZeroEvenOdd zeo = new Test20200110().new ZeroEvenOdd(3); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + try { + zeo.zero(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + Thread t2 = new Thread(new Runnable() { + + @Override + public void run() { + try { + zeo.even(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + Thread t3 = new Thread(new Runnable() { + + @Override + public void run() { + try { + zeo.odd(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + t3.start(); + t2.start(); + t1.start(); + + + } +} -- Gitee