diff --git a/second/week_05/85/Executors.md b/second/week_05/85/Executors.md new file mode 100644 index 0000000000000000000000000000000000000000..d99bf9fed99b6b92b1611ed92921b36829a4448b --- /dev/null +++ b/second/week_05/85/Executors.md @@ -0,0 +1,21 @@ +# Executors工具类 + +##### 创建线程池相关函数-总览 + +- newFixedThreadPool:创建一个固定大小的线程池 +- newWorkStealingPool:创建一个固定大小的携程线程池 +- newWorkStealingPool:创建一个默认大小携程池,根据系统内核来默认大小 +- newFixedThreadPool:创建一个固定大小线程池,可自己选择线程工厂 +- newSingleThreadExecutor:创建只有一个线程的线程池 +- newSingleThreadExecutor:创建只有一个线程的线程池,可自己选择线程工厂 +- newCachedThreadPool:创建一个可缓存线程池,如果池长度需要可灵活回收空闲线程 +- newCachedThreadPool:创建一个可缓存线程池,如果池长度需要可灵活回收空闲线程,可自定义线程工厂 +- newSingleThreadScheduledExecutor:创建一个线程的线程池,执行定时及周期性任务执行 +- newSingleThreadScheduledExecutor: + - ​ 创建一个线程的线程池,执行定时及周期性任务执行。可自定义线程工厂 +- newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。 +- newScheduledThreadPool:创建一个定长线程池,支持定时及周期性任务执行。 可自定义线程工厂 +- unconfigurableExecutorService:把ExecutorService包装成DelegatedExecutorService +- unconfigurableScheduledExecutorService: + - ​ 把ScheduledExecutorService包装成DelegatedScheduledExecutorService + diff --git a/second/week_05/85/Thread.md b/second/week_05/85/Thread.md new file mode 100644 index 0000000000000000000000000000000000000000..a5d0c80749e30fdd65740ef55b76e8362f97fb34 --- /dev/null +++ b/second/week_05/85/Thread.md @@ -0,0 +1,286 @@ +# Thread阅读笔记 + +[Thread源码分析 - 简书](https://www.jianshu.com/p/7a2c51635d4c) + + +#### 内部类 + +枚举State + +```java +public enum State { + /** + * 新建状态:线程刚创建,还未执行start方法 + */ + NEW, + + /** + * 可运行状态:已经就绪可运行的状态,处于此状态的线程是正在JVM中运行的, + * 但可能再等待操作系统级别的资源,例如CPU时间片 + */ + RUNNABLE, + + /** + * 阻塞状态:阻塞等待监视器锁,处于此状态的线程正在阻塞等待监视器锁, + * 以进入一个同步块/方法,或者再执行完wait()方法后重入同步块/方法 + */ + BLOCKED, + + /** + * 等待状态:执行完Object.wait无超时参数操作, + * 或者 Thread.join无超时参数操作 + * 或者 LockSupport.park 操作后,线程进入等待状态 + * 一般在等待状态的线程在等待其它线程执行特殊操作,例如:、 + * 等待其它线程操作Object.notify() 唤醒 或 Object.notifyAll() 唤醒所有 + */ + WAITING, + + /** + * 等待状态:Thread.sleep、Object.await待超时时间、Thread.join带超时时间 + * LockSupport.parkNanos、LockSupport.parkUntil这些 操作会使线程进入显示等待 + */ + TIMED_WAITING, + + /** + * 终止状态(线程执行完毕) + */ + TERMINATED; +} +``` + + + +#### 属性 + +```java +// 线程名称 +private volatile String name; +// 线程优先级 +private int priority; +private Thread threadQ; +private long eetop; +// 是否单步执行这个线程 +private boolean single_step; +// 是否为守护线程 +private boolean daemon = false; +// JVM状态值 +private boolean stillborn = false; +// 线程任务 +private Runnable target; +// 线程组 +private ThreadGroup group; +// 此线程的上下文类装入器 +private ClassLoader contextClassLoader; +// 它封装的上下文做出系统资源访问决策 +private AccessControlContext inheritedAccessControlContext; + +// 自动编号,就是当没有设置线程名称的时候,我们看到的thread-0、thread-1 +private static int threadInitNumber; +// 线程堆栈空间大小,默认0, +private long stackSize; +// JVM私有状态,在本机线程终止后仍然存在。 +private long nativeParkEventPointer; +// 线程ID +private long tid; +// 用于生成线程ID +private static long threadSeqNumber; +// 线程初始状态,表示线程尚未启动 +private volatile int threadStatus = 0; +volatile Object parkBlocker; +private volatile Interruptible blocker; +// 用于synchronized关键字的实例锁 +private final Object blockerLock = new Object(); +// 最小优先级 +public final static int MIN_PRIORITY = 1; +// 默认优先级 +public final static int NORM_PRIORITY = 5; +// 最大优先级 +public final static int MAX_PRIORITY = 10; + + + + + +#### 主要方法 + +##### 1、start()-启动方法 + +```java +public synchronized void start() { + + if (threadStatus != 0) + // 必须是就绪状态 + throw new IllegalThreadStateException(); + + // 通知线程组即将启动,以便可以将其添加到线程列表中,并减少线程组的未启动线程数量 + group.add(this); + + // 默认启动失败 + boolean started = false; + try { + // 调用native方法 + start0(); + // 启动成功,设置为true + started = true; + } finally { + try { + if (!started) { + // 如果启动就从线程组任务中移除,并增加未启动线程数量 + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } +} +private native void start0(); +``` + + + +##### 2、run()-运行方法 + +```java +@Override +public void run() { + if (target != null) { + // 直接调用线程任务 + target.run(); + } +} +``` + +##### 3、exit()-退出方法-私有 + +```java +private void exit() { + if (group != null) { + // 从线程组中移除自身,并notifyAll() + group.threadTerminated(this); + // 帮助gc + group = null; + } + // 帮忙 GC + /* Aggressively null out all reference fields: see bug 4006245 */ + target = null; + /* Speed the release of some of these resources */ + threadLocals = null; + inheritableThreadLocals = null; + inheritedAccessControlContext = null; + blocker = null; + uncaughtExceptionHandler = null; +} +``` + +##### 4、interrupt()-中断方法 + +```java +public void interrupt() { + if (this != Thread.currentThread()) + // 如果是当前线程,就检查是否有安全管理器 + // 如果有安全管理器就检查权限 + checkAccess(); + + // 加锁,中断blockerLock锁 + synchronized (blockerLock) { + // 设置中断接口 + Interruptible b = blocker; + if (b != null) { + // 设置中断标记 + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + // 调用本地native方法中断 + interrupt0(); +} +``` + + + +##### 5、interrupted()-是否已中断 + +```java +// 判断当前运行的线程是否中断 +public static boolean interrupted() { + return currentThread().isInterrupted(true); +} +// 判断线程是否中断 +public boolean isInterrupted() { + return isInterrupted(false); +} +// 本地native方法 +private native boolean isInterrupted(boolean ClearInterrupted); +``` + + + +##### 6、isAlive()是否被激活 + +- 本地方法,`public final native boolean isAlive();` + +##### 7、yield()-暂停线程 + +- 暂停一下线程,回到就绪状态,让出CPU资源,`public static native void yield();` + +##### 8、sleep - 2个方法 + +- 会占用CPU资源 + +```java + +``` + + + +##### 9、join - 3个方法 + +- 会释放CPU资源 + +```java +// 立即 +public final void join() throws InterruptedException { + join(0); +} +public final synchronized void join(long millis) + throws InterruptedException { + // 加锁执行 + // 设置开始时间 + long base = System.currentTimeMillis(); + long now = 0; + + // 参数效验,低于0 抛参数异常 + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (millis == 0) { + // 如果millis等于0,则判断线程是否激活中, + // 如果是就执行native方法Object.wait(0)使其等待 + while (isAlive()) { + wait(0); + } + } else { + // 循环判断线程是否激活, + while (isAlive()) { + // 获取当前还剩余延迟时间 + long delay = millis - now; + // 如果延迟时间超过了还没休眠线程,则退出不休眠了 + if (delay <= 0) { + break; + } + // 调用native方法Object.wait(delay) 使其等待 + wait(delay); + // 更新已用休眠时间 + now = System.currentTimeMillis() - base; + } + } +} + + +## 四、总结 + +- 利用的操作系统来创建启动线程, +- 内含线程的启动、休眠、阻塞、停止、中断方法,还包含各种线程参数的获取和判断参数 \ No newline at end of file diff --git a/second/week_05/85/ThreadLocal.md b/second/week_05/85/ThreadLocal.md new file mode 100644 index 0000000000000000000000000000000000000000..79c0f9d8e6296acf9acc39cf3f3152e7b37b0257 --- /dev/null +++ b/second/week_05/85/ThreadLocal.md @@ -0,0 +1,269 @@ +# TreadLocal阅读笔记 + +[转-ThreadLocal源码解读(大牛之作) - 简书](https://www.jianshu.com/p/6e4233f96532) + +## 类结构 + +`ThreadLocal`中嵌套内部类`ThreadLocalMap`,这个类本质上是一个Map,和HashMap之类的实现相似,依然是key-value的形式,其中有一个内部类`Entry`,其中key可以看做是ThreadLocal实例,但是本质是持有ThreadLocal实例的弱引用。 + + + +## 存储结构 + + +- 每个Thread线程内部都有一个Map +- Map里面存储线程本地对象(key)和线程的变量副本(value) +- 但是Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向Map获取和设置线程的副本变量值 +- 所以对于不通的线程,每次获取副本值时,别的线程并不能获取到当前线程的副本之,形成了副本的隔离,互不干扰 + +## 源码分析 + + + +###### ThreadLocalMap内部类-子类 + +- ```Java + static class Entry extends WeakReference> { + /** The value associated with this ThreadLocal. */ + Object value; + + Entry(ThreadLocal k, Object v) { + super(k); + value = v; + } + } + ``` + + + +###### ThreadLocalMap内部类-构造器 + +- ```java + /** 初始化构造器:会初始化一个元素*/ + ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { + // 初始化表 + table = new Entry[INITIAL_CAPACITY]; + // 计算索引 + int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); + // 生成Entry对象,并保存咋表的索引中 + table[i] = new Entry(firstKey, firstValue); + // 初始化元素后,默认size = 1 + size = 1; + // 设置下一次扩容的阔值 + setThreshold(INITIAL_CAPACITY); + } + + /** 初始化构造器:根据传入的ThreadLocalMap来初始化 */ + private ThreadLocalMap(ThreadLocalMap parentMap){ + // 根据TreadLocalMap 参数 初始化表 + Entry[] parentTable = parentMap.table; + // 获取参数表的大小 + int len = parentTable.length; + // 根据上面获取的大小初始化 + setThreshold(len); + table = new Entry[len]; + + // 循环获取元素 + for (int j = 0; j < len; j++) { + Entry e = parentTable[j]; + if (e != null) { + @SuppressWarnings("unchecked") + ThreadLocal key = (ThreadLocal) e.get(); + if (key != null) { + Object value = key.childValue(e.value); + Entry c = new Entry(key, value); + int h = key.threadLocalHashCode & (len - 1); + // 如果当前由元素,就循环到最后 + while (table[h] != null) + /* + private static int nextIndex(int i, int len) { + // 下一个就是索引 + 1 ,超出就索引为0 + return ((i + 1 < len) ? i + 1 : 0); + } + */ + // 次数是不会出现索引被占用,因为本身就是创建的一样大的空数组 + h = nextIndex(h, len); + // 此时table[h] 肯定是null,所以直接把c赋值 + table[h] = c; + size++; + } + } + } + } + ``` + + + + ###### ThreadLocalMap内部类-主要方法 + + **环形索引函数** + +- ```java + /** 获取当前索引下一个索引,处理了边界问题 */ + private static int nextIndex(int i, int len) { + return ((i + 1 < len) ? i + 1 : 0); + } + + /** 获取当前索引上一个索引,处理了边界问题 */ + private static int prevIndex(int i, int len) { + return ((i - 1 >= 0) ? i - 1 : len - 1); + } + + ``` + +**getXXX函数** + +- ```java + + /** 根据传入的线程,获取对应的Entry() */ + private Entry getEntry(ThreadLocal key) { + int i = key.threadLocalHashCode & (table.length - 1); + Entry e = table[i]; + if (e != null && e.get() == key) + // 对应的entry存在且未失效且弱引用指向的ThreadLocal就是key,则命中返回 + return e; + else + // 因为用的是线性探测,所有往后找还是有可能能够找到目标Entry值 + return getEntryAfterMiss(key, i, e); + } + + /** 调动getEntry未直接命中搞得时候调用此方法 */ + private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { + Entry[] tab = table; + int len = tab.length; + + // 基于线性探测法不断向后探测直到遇到空Entry + while (e != null) { + ThreadLocal k = e.get(); + if (k == key) + // 找到目标 + return e; + if (k == null) + // 该entry对应的ThreadLocal已经被回收, + // 调用expungeStaleEntry来清理无效的entry + expungeStaleEntry(i); + else + // 继续找下一个索引 + i = nextIndex(i, len); + e = tab[i]; + } + return null; + } + + /** + * 这个函数是ThreadLocal的核心清理函数,它做的事情很简答: + * 就是从staleSlot开始遍历,将无效(弱引用指向对象被回收)清理, + * 即对应entry中的value置为null,将指向这个entry的table[i]置为null,直到扫到空Entry + * 另外,在过程中还会对费控的entry作rehash + * 可是说这个函数的作用就是从staleSlot开始清理连续段中的slot(断开强引用,rehash slot等) + */ + private int expungeStaleEntry(int staleSlot) { + Entry[] tab = table; + int len = tab.length; + + // 因为entry对应的 ThreadLocal 已被回收,value 设为null,显式断开强引用 + // expunge entry at staleSlot + tab[staleSlot].value = null; + // 显式设置该entry 为 null,以便垃圾回收 + tab[staleSlot] = null; + size--; + + // Rehash until we encounter null + Entry e; + int i; + for (i = nextIndex(staleSlot, len); + (e = tab[i]) != null; + i = nextIndex(i, len)) { + + ThreadLocal k = e.get(); + if (k == null) { + // 清理对应ThreadLocal已被回收的entry + e.value = null; + tab[i] = null; + size--; + } else { + // 对于还没有被回收的情况,需要做一次rehash + // 如果对应的ThreadLocal 的ID 对len取模出来的索引 h 不为当前位置 i, + // 则从h 向后线性探测到第一个空的slot,把当前的entry给挪过去。 + int h = k.threadLocalHashCode & (len - 1); + if (h != i) { + tab[i] = null; + + // Unlike Knuth 6.4 Algorithm R, we must scan until + // null because multiple entries could have been stale. + + while (tab[h] != null) + h = nextIndex(h, len); + tab[h] = e; + } + } + } + // 返回staleSlot之后第一个空的slot索引 + return i; + } + ``` + +- + + + + + + + +#### 主要方法 + +- 基本都是对ThreadLocalMap的操作 + +```java +/** 为当前线程初始化副本变量值 */ +public static ThreadLocal withInitial(Supplier supplier) { + return new SuppliedThreadLocal<>(supplier); +} + +/** 获取当前线程的副本变量值 */ +public T get() { + // 获取当前线程 + Thread t = Thread.currentThread(); + + ThreadLocalMap map = getMap(t); + if (map != null) { + // 从ThreadLocalMap中获取K-V Entry节点 + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + // 获取存储的value副本值,并且返回 + T result = (T)e.value; + return result; + } + } + // ThreadLocal类返回的是null,其他子类实现的是对应的hashCode + // 如果找不到map,会初始化ThreadLocalMap,并把当前ThreadLocal和initialValue()副本值放入 + return setInitialValue(); +} + +/** 保存当前线程的副本变量值 */ +public void set(T value) { + Thread t = Thread.currentThread(); + // 获取当前线程的map + ThreadLocalMap map = getMap(t); + if (map != null) + // 如果不为null。则修改当前线程的副本 + map.set(this, value); + else + // 如果map为null 就会初始化ThreadLocalMap,并把当前ThreadLocal和副本value放入map + createMap(t, value); +} + +/** 溢出当前线程的副本变量值 */ +public void remove() { + ThreadLocalMap m = getMap(Thread.currentThread()); + if (m != null) + m.remove(this); +} +``` + + + +## 五、总结 +可以保存线程特有的变量及信息。 diff --git a/second/week_05/85/ThreadPoolExecutor.md b/second/week_05/85/ThreadPoolExecutor.md new file mode 100644 index 0000000000000000000000000000000000000000..5a9245f04c4fe408f6f9eebcc9cb704d986ff057 --- /dev/null +++ b/second/week_05/85/ThreadPoolExecutor.md @@ -0,0 +1,558 @@ +# ThreadPoolExecutor阅读笔记 + +[并发番@ThreadPoolExecutor一文通(1.8版) - 作业部落 Cmd Markdown 编辑阅读器](https://www.zybuluo.com/kiraSally/note/990993) + + +## 简介 + + JDK常用的线程池,里面有一个内部Worker类继承了AQS实现了Runnable + + +## 重要结构 + +- 核心线程 +- 最大线程(超过核心线程的加入队列) +- 超时时间 +- 线程工厂 +- 拒绝策略 + +## 源码分析 + +#### 内部类 + +##### Worker + +```java +private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ + /** + * This class will never be serialized, but we provide a + * serialVersionUID to suppress a javac warning. + */ + private static final long serialVersionUID = 6138294804551838833L; + + /** Thread this worker is running in. Null if factory fails. */ + // 真正工作的线程 + final Thread thread; + /** Initial task to run. Possibly null. */ + // 第一个任务,从个构造方法传进来 + Runnable firstTask; + /** Per-thread task counter */ + // 完成任务数 + volatile long completedTasks; + + /** + * Creates with given first task and thread from ThreadFactory. + * @param firstTask the first task (null if none) + */ + // 构造方法 + Worker(Runnable firstTask) { + // 设置为等待唤醒状态 + setState(-1); // inhibit interrupts until runWorker + // 第一个任务 + this.firstTask = firstTask; + // 使用线程工厂生成一个线程 + // 注意,这里把Worker本身作为Runnable传递给线程工厂 + this.thread = getThreadFactory().newThread(this); + } + + /** Delegates main run loop to outer runWorker */ + // 实现Runbale的run方法 + public void run() { + // 调用ThreadPoolExecutor的runWorker()方法执行线程 + runWorker(this); + } + + // Lock methods + // + // The value 0 represents the unlocked state. + // The value 1 represents the locked state. + + protected boolean isHeldExclusively() { + return getState() != 0; + } + + protected boolean tryAcquire(int unused) { + if (compareAndSetState(0, 1)) { + setExclusiveOwnerThread(Thread.currentThread()); + return true; + } + return false; + } + + protected boolean tryRelease(int unused) { + setExclusiveOwnerThread(null); + setState(0); + return true; + } + + public void lock() { acquire(1); } + public boolean tryLock() { return tryAcquire(1); } + public void unlock() { release(1); } + public boolean isLocked() { return isHeldExclusively(); } + + void interruptIfStarted() { + Thread t; + if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } + } + } +} +``` + + + +#### 属性 + +```java +******************************************************************* +/** + * 下面都是线程池的生命周期的属性 + * + */ + +// 初始状态为RUNNING +private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); +private static final int COUNT_BITS = Integer.SIZE - 3; +private static final int CAPACITY = (1 << COUNT_BITS) - 1; + +/*** 线程池的状态 ***/ + +private static final int RUNNING = -1 << COUNT_BITS; +// 执行shutdown()后会把状态修改为SHUTDOWN +private static final int SHUTDOWN = 0 << COUNT_BITS; +// 执行shutdownNow()方法时,会把线程池修改为stop状态,同时标记所有线程为中断状态 +private static final int STOP = 1 << COUNT_BITS; +// 当执行shutdown()或shotdownNow()之后,所有任务中已中止,且工作线程数量为0,就会进入这个状态 +// tryTerminate() 函数中可见 +private static final int TIDYING = 2 << COUNT_BITS; +// 修改状态为TIDYING后执行terminated()方法,最后修改状态为TERMINATED,标志线程真正消亡了 +// tryTerminate() 函数中可见 +private static final int TERMINATED = 3 << COUNT_BITS; + + +******************************************************************* + + +/** + * 任务队列(构造参数之一) + * 当正在运行的线程数大于或等于核心线程的时候,任务来了先进入任务队列中的 + * 这个队列必须是阻塞队列,所以像ConcurrentHashMap就不能作为参数 + * ConcurrentHashMap是线程安全的队列,但是不是阻塞队列 + */ +private final BlockingQueue workQueue; + +private final ReentrantLock mainLock = new ReentrantLock(); + +/** + * 处于运行中的线程数量 + * 例如判断运行中的线程数量是否大于等于核心线程,如果是,那就入任务队列 + */ +private final HashSet workers = new HashSet(); + +private final Condition termination = mainLock.newCondition(); +/** 工作线程历史最大数量:也是线程池 同时处于运行中的线程数量最大是多少 */ +private int largestPoolSize; +/** 正常完成的任务数量:也是线程池 正常完成的线程数量 */ +private long completedTaskCount; + +/** + * 线程工厂 (构造参数之一) + * {@link java.util.concurrent.Executors} + * 默认使用的是Executors工具类中的DefaultThreadFactory类, + * 这个类有一个缺点,创建的线程名称是自动生成的,无法自定义以区别不同的线程 + * 并且这个类都是非守护线程 + * 如果要自定义工厂,可以自己实现一个ThreadFactory工厂,然后把名称和守护线程作为构造方法当参数 + */ +private volatile ThreadFactory threadFactory; + +/** + * 拒绝策略(构造参数之一) + * 拒绝策略表示当任务队列满了且线程数也达到最大了,这时候再新加任务,线程池已经无法承受了, + * 这些新来的任务应该按什么逻辑来处理 + * 常用的拒绝策略有丢弃当前任务、丢弃最老的任务、抛出异常、调用者自己处理等待 + * 默认的拒绝策略是抛出异常,即线程池无法承载了,调用者再往里面添加任务会抛出异常 + * 默认的拒绝车辆虽然比较简单粗暴,但是相对丢弃任务策略明显要好很多,最起码调用者 + *自己可以捕获这个异常在进行二次处理 + */ +private volatile RejectedExecutionHandler handler; +/** + * 线程保持空闲时间(构造参数之一) + * 默认情况下,此参数仅当正在运行的线程大于核心线程时才有效,即针对非核心线程 + * 但是,如果allowCoreThreadTimeOut被设置了true,针对核心线程也有效 + * 即当任务队列为空时,线程保持多久才会销毁, + * 内部主要通过阻塞队列待超时的poll(timeout,unit)方法来实现 + */ +private volatile long keepAliveTime; + +// 是否允许核心线程设置超时,默认是不允许 +private volatile boolean allowCoreThreadTimeOut; +/** + * 核心线程数(构造参数之一) + * 当正在运行的线程数小于核心线程时,来一个任务就创建一个核心线程 + * 当正在运行的线程大于或等于核心线程数时,任务来了先不创建线程而是丢入阻塞任务队列中 + */ +private volatile int corePoolSize; +/** + * 最大线程数(构造参数之一) + * 当任务队列满了时,来一个任务才创建非核心线程,但不能超过最大线程数 + */ +private volatile int maximumPoolSize; +/** + * 默认的拒绝策略,任务满了会直接抛出RejectedExecutionException异常 + * (构造参数之一) + */ +private static final RejectedExecutionHandler defaultHandler = + new AbortPolicy(); +private static final RuntimePermission shutdownPerm = + new RuntimePermission("modifyThread"); +private final AccessControlContext acc; +``` + + + +#### 主要方法 + +##### 1、线程池生命周期的方法 + +- shudown() 关闭线程池 +- shudownNow() 立即关闭线程池 +- tryTerminate() 尝试终止线程池 + +```java +// 线程池的状态 +private static int runStateOf(int c) { return c & ~CAPACITY; } +// 线程池中工作线程的数量 +private static int workerCountOf(int c) { return c & CAPACITY; } +// 计算ctl的值,等于运行状态“加上” 线程数量 +private static int ctlOf(int rs, int wc) { return rs | wc; } + +/** 关闭线程池,自旋+CAD所以一定可以关闭,并把所有线程设置为中断状态 */ +public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + // 加 主锁 + mainLock.lock(); + try { + // 检查权限 + checkShutdownAccess(); + // 修改状态为SHUTDOWN,采用自旋+CAS + advanceRunState(SHUTDOWN); + // 标记空闲线程为中断状态 + interruptIdleWorkers(); + // 为ScheduledThreadPoolExecutor 预留的 + onShutdown(); // hook for ScheduledThreadPoolExecutor + } finally { + // 释放主锁 + mainLock.unlock(); + } + + tryTerminate(); +} +private void advanceRunState(int targetState) { + for (;;) { + int c = ctl.get(); + // 如果状态大于SHUDOWN,或者修改SHUTDOWN成功了,才会break出自旋 + if (runStateAtLeast(c, targetState) || + ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) + break; + } +} + +/** 立即关闭线程池*/ +public List shutdownNow() { + List tasks; + final ReentrantLock mainLock = this.mainLock; + // 加 主锁 + mainLock.lock(); + try { + // 权限检查 + checkShutdownAccess(); + // 自旋+CAS 让线程进入STOP状态 + advanceRunState(STOP); + // 标记空闲线程为中断状态 + interruptWorkers(); + // 把任务队列加入到一个新的队列保存到 tasks + tasks = drainQueue(); + } finally { + // 释放主锁 + mainLock.unlock(); + } + tryTerminate(); + return tasks; +} + +/** */ +final void tryTerminate() { + for (;;) { + // 获得线程池状态 + int c = ctl.get(); + if (isRunning(c) || + runStateAtLeast(c, TIDYING) || + (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) + // 1.如果线程池是RUNNING状态 + // 2.如果线程池是TIDYING或TERMINATED状态 + // 3.如果是SHOTDOWN状态且任务队列不为空 + // 符合上面任何一点,则直接返回,不进入下面代码 + return; + if (workerCountOf(c) != 0) { // Eligible to terminate + // 工作线程数量不为0, + // 尝试中断空闲线程,然后直接返回 + interruptIdleWorkers(ONLY_ONE); + return; + } + + final ReentrantLock mainLock = this.mainLock; + // 加主锁 + mainLock.lock(); + try { + // CAS更新修改状态为TIDYING + if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { + try { + // 更新成功,执行terminated()钩子方法 + terminated(); + } finally { + // 强制更新状态为TERMINATED,这里不用CAS + ctl.set(ctlOf(TERMINATED, 0)); + termination.signalAll(); + } + return; + } + } finally { + // 释放主锁 + mainLock.unlock(); + } + // else retry on failed CAS + } +} +``` + + + +##### 2、线程的执行方法 + +- execute():线程池提交任务的方法之一,也是最核心的方法,会根据核心线程数量、最大数量、任务队列大小, + - 先进行添加核心线程(添加到线程HashSet集合,此次会考虑容量),添加成功就会采用核心线程方式CAS启动任务 + - 如果添加失败,那么就加入等待队列 + - 加入任务队列成功,就会去CAS添加非核心线程(添加到线程HashSet集合,此次会考虑容量),添加成功,然后就会启动一次线程(加锁启动),启动失败就直接删除添加到HashSet集合中的线程 + - 如果加入任务队列也失败,就会去CAS添加非核心线程(添加到线程HashSet集合),添加成功,然后去启动一次线程(加锁启动),如果启动失败,就直接采用拒绝策略 + - **根据上面情况总结**: + - 只要能加入队列,那么任务早晚会被运行,也不会被拒绝。 + - 加入队列也失败,只要线程加入到工作线程集合,启动一次任务还是失败,那么肯定采用拒绝策略了 +- addWorker():创建一个工作线程,并去启动,会做各种线程池状态和工作线程数量检测 + - 这里会做线程池最大容量检查,true检查核心数量,false检查最大容量,如果不符合要求就会直接返回(但是如果加入到队列的话,是不影响的,所以LinkedBlockingQueue无界的,那么是没有容量限制的,maxSize是无效的) +- runWorker() :真正执行任务的方法 + +```java +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + + /* + * Proceed in 3 steps: + * + * 1. If fewer than corePoolSize threads are running, try to + * start a new thread with the given command as its first + * task. The call to addWorker atomically checks runState and + * workerCount, and so prevents false alarms that would add + * threads when it shouldn't, by returning false. + * + * 2. If a task can be successfully queued, then we still need + * to double-check whether we should have added a thread + * (because existing ones died since last checking) or that + * the pool shut down since entry into this method. So we + * recheck state and if necessary roll back the enqueuing if + * stopped, or start a new thread if there are none. + * + * 3. If we cannot queue task, then we try to add a new + * thread. If it fails, we know we are shut down or saturated + * and so reject the task. + */ + int c = ctl.get(); + // 1、如果线程少于 corePoolSize,尝试以给定的命令作为第一个任务启动新线程,对于 addWorker 的调用原子地检查 runState 和 workerCount,从而通过返回 false 来防止不应该添加线程的情况下发出错误警报; + if (workerCountOf(c) < corePoolSize) { + // 添加一个工作线程(核心) + if (addWorker(command, true)) + // 核心线程已启动,返回成功 + return; + // 添加工作线程(核心)失败,重新获取控制变量 + c = ctl.get(); + } + // 2、如果任务可以成功排队,那么我们仍然需要仔细检查是否应该添加线程(因为现有线程自上次检查后就已死亡)或自进入此方法以来池已关闭。因此,我么重新检查状态,如果停止,并在必要时回滚排队,如果已停止,或者再没有线程的情况下启动新线程; + if (isRunning(c) && workQueue.offer(command)) { + // 达到了核心线程最大数量,且线程池也是运行状态,就任务入队列 + + // 重新获取线程池状态,防止进入任务后线程池关闭 + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) + // 重新检查线程状态不是运行状态,就从工作队列中删除,然后抵达都用拒绝策略 + reject(command); + else if (workerCountOf(recheck) == 0) + // 容错检查,工作线程数量是否为0,如果为0就创建一个 + addWorker(null, false); + } + // 3、如果无法将任务排队,则尝试添加一个新线程,如果失败了,我们知道我们被关闭或已达到最大线程,因此拒绝该任务 + // 任务入队列失败,尝试创建工作线程(非核心),失败则拒绝 + else if (!addWorker(command, false)) + reject(command); +} + +/** + * 加入工作线程 + * core代表加入的是 核心线程还是非核心线程 + */ +private boolean addWorker(Runnable firstTask, boolean core) { + retry://自定义标记 + for (;;) { + // 获得线程池状态 + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + // 检查线程池状态 + if (rs >= SHUTDOWN && + ! (rs == SHUTDOWN && + firstTask == null && + ! workQueue.isEmpty())) + return false; + //循环检查数量及对工作线程数量+1 + for (;;) { + int wc = workerCountOf(c); + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + // 工作线程数量检查 + return false; + if (compareAndIncrementWorkerCount(c)) + // 尝试更新数量,成功跳出自定义标记块retry + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + // 再次获取状态 + // 如果线程池状态有变化,跳过不执行下面代码(如果有的话),然后重新for循环 + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + // 如果上面的条件满足,会把工作线程数量+1 + + boolean workerStarted = false;// 标记线程是否已经启动成功,如果没成功会执行移除方法 + boolean workerAdded = false;//标记添加到Woker的HashSet工作集合是否成功,成功了才启动线程 + Worker w = null; + try { + // 创建一个工作线程(Worker继承了AQS,实现了Runnable) + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + // 加-主锁 + mainLock.lock(); + try { + // Recheck while holding lock. + // Back out on ThreadFactory failure or if + // shut down before lock acquired. + // 加上主锁,再重新获取线程池状态,必变现在线程池状态有变化 + int rs = runStateOf(ctl.get()); + + // 再次检查线程池状态 + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + + // 检测线程是否已经激活(启动),如果启动抛出IllegalThreadStateException异常 + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + // 加入到HashSet工作线程集合中 + workers.add(w); + + // 获取工作线程集合数量(largestPoolSize只可在主锁下使用) + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + // 标记添加成功 + workerAdded = true; + } + } finally { + // 释放-主锁 + mainLock.unlock(); + } + if (workerAdded) { + // 启动线程,此处执行的是Worker里面的run(), + // Worker类中的run调用的是runWorker(); + t.start(); + // 设置为已启动 + workerStarted = true; + } + } + } finally { + if (! workerStarted) + // 线程启动失败,执行失败方法 + //移除Woker的HashSet中的w线程,如果有的话,数量-1,tryTerminate()方法等 + addWorkerFailed(w); + } + // 返回启动状态,true 成功,false 失败 + return workerStarted; +} + +/** + * Worker线程类,run()方法中调用此方法进行启动线程 + */ +final void runWorker(Worker w) { + Thread wt = Thread.currentThread();//当前线程 + Runnable task = w.firstTask;//缓存当前待运行线程 + w.firstTask = null;//GC + // 强制释放锁,(shutdown()里面有加锁) + // 相当于无视那边的中断标记 + w.unlock(); // allow interrupts + boolean completedAbruptly = true;// 是否突然完成了 + try { + // 如果参数的Woker线程中的任务线程是null,就会直接从队列中获取 + // 只要还能取到任务,这就是一个死循环 + while (task != null || (task = getTask()) != null) { + // 对当前Work加锁 + w.lock(); + // If pool is stopping, ensure thread is interrupted; + // if not, ensure thread is not interrupted. This + // requires a recheck in second case to deal with + // shutdownNow race while clearing interrupt + // 继续检查线程池状态 + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) + wt.interrupt(); + try { + // 钩子方法,方便子类在任务执行前做一些逻辑,由子类自己来实现 + beforeExecute(wt, task); + Throwable thrown = null; + try { + // 开始真正执行任务了。 + task.run(); + // 异常处理 + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + } finally { + afterExecute(task, thrown); + } + } finally { + // 每次把task置为null,然后重新从队列中获取 + task = null; + // 任务执行完成,就completedTasks加1 + w.completedTasks++; + // 释放当前work线程锁 + w.unlock(); + } + } + completedAbruptly = false;// 没有突然完成,正常完成了任务 + } finally { + // 到这里要么是突然完成,要么是正常完成了队列的任务, + // 如果是突然完成的(也就是其他地方完成 或者是异常了等情况),就会直接减少ctl值 + // 如果是正常完成,就更新completedTaskCount值,并移除worker集合中当前的worker + // 等等其他逻辑 + processWorkerExit(w, completedAbruptly); + } +} + \ No newline at end of file