From cabad68a4ce5c6677d19c428e16e48a5f37a45ff Mon Sep 17 00:00:00 2001 From: webflyer1981 <723726786@qq.com> Date: Sun, 12 Jan 2020 21:03:55 +0800 Subject: [PATCH] week_05-055 --- week_05/55/Executors-55.md | 128 +++++ week_05/55/Thread-55.md | 254 +++++++++ week_05/55/ThreadLocal -55.md | 250 +++++++++ week_05/55/ThreadPoolExecutor-55.md | 790 ++++++++++++++++++++++++++++ 4 files changed, 1422 insertions(+) create mode 100644 week_05/55/Executors-55.md create mode 100644 week_05/55/Thread-55.md create mode 100644 week_05/55/ThreadLocal -55.md create mode 100644 week_05/55/ThreadPoolExecutor-55.md diff --git a/week_05/55/Executors-55.md b/week_05/55/Executors-55.md new file mode 100644 index 0000000..a0f667a --- /dev/null +++ b/week_05/55/Executors-55.md @@ -0,0 +1,128 @@ +Executors +1、newFixedThreadPool()创建固定数量的普通线程池 +有如下实现: + +//nThread为线程池中线程的数量 +public static ExecutorService newFixedThreadPool(int nThreads) { + //通过ThreadPoolExecutor实现类,创建核心线程数及最大线程数都为nThreads的线程池 + //空闲超时时间为0,且由于核心线程数和最大线程数一样,核心线程不会空闲超时后被回收 + //LinkedBlockingQueue为任务队列实现 + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); +} + +//nThread为线程池中线程的数量,线程创建的工厂类实现类为threadFactory +public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); +} +2、newWorkStealingPool()创建可任务窃取的线程池 +有如下实现: + +//parallelism并行任务数 +public static ExecutorService newWorkStealingPool(int parallelism) { + //基于ForkJoinPool创建线程池; + //使用默认线程创建工厂类 + //执行任务异常处理类为null + //任务队列模型为FIFO,true为FIFI,false为FILO + return new ForkJoinPool + (parallelism, + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); +} + +public static ExecutorService newWorkStealingPool() { + //并行数为处理器的并行数 + return new ForkJoinPool + (Runtime.getRuntime().availableProcessors(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); +} +3、newSingleThreadExecutor()创建单个普通线程池 +有如下实现: + +//创建单个线程的线程池 +public static ExecutorService newSingleThreadExecutor() { + //用FinalizableDelegatedExecutorService进行封装,其会在 + //finalize()方法中调用线程池的shutdown()方法,对线程池进行关闭 + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); +} + + +public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory)); +} +4、newCachedThreadPool()创建线程数量不限的普通线程池 +有如下实现: + +//创建缓存的线程池,即线程池数量不限 +public static ExecutorService newCachedThreadPool() { + //核心线程数量为0,最大线程数量为Integer.MAX_VALUE + //阻塞队列为SynchronousQueue + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); +} + +public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); +} +5、newSingleThreadScheduledExecutor()创建单个基于时间调度的线程池 +有如下实现: + +//创建单个可进行时间调度的线程池 +public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + //利用DelegatedScheduledExecutorService对线程池进行包装 + //DelegatedScheduledExecutorService利用门面模式包装了ScheduledExecutorService + //使得用户无法修改ScheduledExecutorService中的一些参数 + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1)); +} + +public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1, threadFactory)); +} + +6、newScheduledThreadPool()创建多个基于时间调度的线程池 +有如下实现: + +//创建固定线程池的可基于时间调度的线程池,corePoolSize为核心线程池测数量 +public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return new ScheduledThreadPoolExecutor(corePoolSize); +} + +public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); +} +7、包装配置不可更改的线程池 +有如下实现: + + +//对ExecutorService类型线程池进行包装,使用户无法对一些参数进行修改 +public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedExecutorService(executor); +} + +//对ScheduledExecutorService类型线程池进行包装,使用户无法对一些参数进行修改 +public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedScheduledExecutorService(executor); +} diff --git a/week_05/55/Thread-55.md b/week_05/55/Thread-55.md new file mode 100644 index 0000000..b2358d6 --- /dev/null +++ b/week_05/55/Thread-55.md @@ -0,0 +1,254 @@ +Thread +1 JVM允许一个应用同时并发运行多个线程 + +2 线程都有优先级,高优先级被执行的概率大于低优先级的线程;每个线程也会被标记为是否是守护线程;被某线程创建的子线程,与父线程有同样的优先级;守护线程只能由守护线程创建; + +3 JVM启动时,通常会有一个非守护线程【main】(主线程)随之启动 + +4 两种方式去创建一个新的线程:1) 继承Thread 实现run + +* class PrimeThread extends Thread { + * long minPrime; + * PrimeThread(long minPrime) { + * this.minPrime = minPrime; + * } + * + * public void run() { + * // compute primes larger than minPrime + * } + * } + * PrimeThread p = new PrimeThread(143); + * p.start(); +2)实现Runnable接口 实现run + +class PrimeRun implements Runnable { + * long minPrime; + * PrimeRun(long minPrime) { + * this.minPrime = minPrime; + * } + * + * public void run() { + * // compute primes larger than minPrime + * } + * } +PrimeRun p = new PrimeRun(143); + * new Thread(p).start(); +核心数据结构: + private volatile char name[]; //线程名称 + private int priority; //线程优先级 + private boolean single_step; //是否单步执行 + private boolean stillborn = false; //虚拟机状态 + private Runnable target; //将会被执行目标对象 + private ThreadGroup group; //当前线程组对象,这里会用到 SecurityManger 如 + // SecurityManager security = System.getSecurityManager(); security.getThreadGroup(); + private ClassLoader contextClassLoader; //上下文类加载器 + private static int threadInitNumber; //线程编号,线程编号每次++实现,++不是线程安全的, + //private static synchronized int nextThreadNum() {return threadInitNumber++; } + // ThreadLocal 用于实现线程内的数据共享,这里得单独讲 ThreadLocal 时候讲 + ThreadLocal.ThreadLocalMap threadLocals = null; + ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; + private long stackSize; //线程操作时所需堆栈大小 + private long tid; //线程id + //线程优先级 + public final static int MIN_PRIORITY = 1; + public final static int NORM_PRIORITY = 5; + public final static int MAX_PRIORITY = 10; + //中断阻塞 + volatile Object parkBlocker; + private volatile Interruptible blocker; + public static native Thread currentThread();//当前线程 + private long nativeParkEventPointer; //本地阻塞事件指针 +核心方法: +1 yield:本质上 这个方法不建议在生产环境中使用,可以在测试benchmark时适度使用;因为线程主动让出cpu不一定能够能够明确的提升CPU的吞吐量 + +//线程让出当前调度 主动让步 和release属于执行完的被动让步 + public static native void yield(); +2 init类簇方法: + +private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + + Thread parent = currentThread(); + SecurityManager security = System.getSecurityManager(); + if (g == null) { + /* Determine if it's an applet or not */ + + /* If there is a security manager, ask the security manager + what to do. */ + if (security != null) { + g = security.getThreadGroup(); + } + + /* If the security doesn't have a strong opinion of the matter + use the parent thread group. */ + if (g == null) { + g = parent.getThreadGroup(); + } + } + + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + + /* + * Do we have the required permissions? + */ + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + this.daemon = parent.isDaemon(); //判断父线程是否是守护线程 + this.priority = parent.getPriority(); //优先级继承 + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + // 设置子线程的继承信息,给孙子线程(新创建的)使用 + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + + /* Set thread ID */ + tid = nextThreadID(); + } +3 start 和 run 所以 根据源码可以明显看出 线程启动的方法是start + +public synchronized void start() { + /** + * This method is not invoked for the main method thread or "system" + * group threads created/set up by the VM. Any new functionality added + * to this method in the future may have to also be added to the VM. + * + * A zero status value corresponds to state "NEW". + */ + if (threadStatus != 0) + throw new IllegalThreadStateException(); + + /* Notify the group that this thread is about to be started + * so that it can be added to the group's list of threads + * and the group's unstarted count can be decremented. */ + group.add(this); + + boolean started = false; + try { + start0(); + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } + } + + private native void start0(); + + /** + * If this thread was constructed using a separate + * Runnable run object, then that + * Runnable object's run method is called; + * otherwise, this method does nothing and returns. + *

+ * Subclasses of Thread should override this method. + * + * @see #start() + * @see #stop() + * @see #Thread(ThreadGroup, Runnable, String) + */ + @Override + public void run() { + if (target != null) { + target.run(); + } + } +4 interrupt类簇函数: + +public void interrupt() { + if (this != Thread.currentThread()) + checkAccess(); + + synchronized (blockerLock) { + Interruptible b = blocker; + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); + } +public static boolean interrupted() { + return currentThread().isInterrupted(true); + } + + /** + * Tests whether this thread has been interrupted. The interrupted + * status of the thread is unaffected by this method. + * + *

A thread interruption ignored because a thread was not alive + * at the time of the interrupt will be reflected by this method + * returning false. + * + * @return true if this thread has been interrupted; + * false otherwise. + * @see #interrupted() + * @revised 6.0 + */ + public boolean isInterrupted() { + return isInterrupted(false); + } + + /** + * Tests if some Thread has been interrupted. The interrupted state + * is reset or not based on the value of ClearInterrupted that is + * passed. + */ + private native boolean isInterrupted(boolean ClearInterrupted); +通过注释我们能发现 类静态方法是抹掉当前的中断标志位,成员方法仅是获取 + +5 join + + public final synchronized void join(long millis) + throws InterruptedException { + long base = System.currentTimeMillis(); + long now = 0; + if (millis == 0) { + while (isAlive()) { + wait(0); + } + } else { + while (isAlive()) {//这里主要通过isAlive() 来判断目标线程是否已经执行完来决定 join + long delay = millis - now; + if (delay <= 0) { + break; + } + wait(delay); + now = System.currentTimeMillis() - base; + } + } + } + public State getState() { + // 通过这里判断线程状态 + return sun.misc.VM.toThreadState(threadStatus); + } diff --git a/week_05/55/ThreadLocal -55.md b/week_05/55/ThreadLocal -55.md new file mode 100644 index 0000000..14714fe --- /dev/null +++ b/week_05/55/ThreadLocal -55.md @@ -0,0 +1,250 @@ +ThreadLocal +注释: +ThreadLocal的静态内部类ThreadLocalMap为每个Thread都维护了一个数组table,ThreadLocal确定了一个数组下标,而这个下标就是value存储的对应位置 + +核心方法 +get + +public T get() { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) { + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + return setInitialValue(); +} +getMap实现: + + ThreadLocalMap getMap(Thread t) { + //thred中维护了一个ThreadLocalMap + return t.threadLocals; + } +可以看出,其实ThreadLocalMap作为线程Thread的属性而存在: + +ThreadLocal.ThreadLocalMap threadLocals = null; +Map创建 +先来看一下线程的ThreadLocalMap属性不存在的情况,setInitialValue方法: + +private T setInitialValue() { + T value = initialValue(); + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); + return value; +} +initialValue方法便是我们第一次访问用以获得初始化值的方法: + +protected T initialValue() { + return null; +} +所以,这便解释了我们在使用ThreadLocal时为什么要创建一个ThreadLocal的子类并覆盖此方法。 + +void createMap(Thread t, T firstValue) { + t.threadLocals = new ThreadLocalMap(this, firstValue); +} +构造参数为初始增加的一个键值对,从这里可以看出,ThreadLocalMap以ThreadLocal对象为键。 + +ThreadLocalMap +其声明如下: + +static class ThreadLocalMap {} +那么问题来了,这里为什么要重新实现一个Map,而不用已有的HashMap等类呢?基于以下几点考虑: + +所有方法均为private。 +内部类Entry继承自WeakReference,当内存紧张时可以对ThreadLocal变量进行回收,注意这里并没有结合ReferenceQueue使用。 +构造器源码: + +ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { + table = new Entry[INITIAL_CAPACITY]; + /** + * 与hashmap的套路一致 + */ + int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); + table[i] = new Entry(firstKey, firstValue); + size = 1; + setThreshold(INITIAL_CAPACITY); +} +setThreshold: + +private void setThreshold(int len) { + threshold = len * 2 / 3; +} +和HashMap的套路一样,只不过这里负载因子写死了,2 / 3,强调一下,不是3 / 4 !!! + +set +public void set(T value) { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); +} +正如注释中所说,我们在使用ThreadLocal时应该去覆盖initialValue方法,而不是set。显然这里的核心便是ThreadLocalMap的set方法: + +private void set(ThreadLocal key, Object value) { + Entry[] tab = table; + int len = tab.length; + int i = key.threadLocalHashCode & (len-1); + for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { + ThreadLocal k = e.get(); + //bin里的第一个节点即为所需key,更新value + if (k == key) { + e.value = value; + return; + } + if (k == null) { + replaceStaleEntry(key, value, i); + return; + } + } + tab[i] = new Entry(key, value); + int sz = ++size; + if (!cleanSomeSlots(i, sz) && sz >= threshold) + rehash(); +} +注意 +ThreadLocalMap的底层实现貌似是基于一个叫做"Knuth Algorithm"的算法,在这里不再细究其实现细节,但有几个地方值得注意。 + + +扩容 +不同于Map接口的实现,ThreadLocalMap的扩容似乎没有上限限制,resize方法部分源码可以证明: + +private void resize() { + Entry[] oldTab = table; + int oldLen = oldTab.length; + int newLen = oldLen * 2; + Entry[] newTab = new Entry[newLen]; + //... +} +哈希冲突 +不同于喜闻乐见的HashMap用链表 + 红黑树的方式解决哈希冲突,这里用的应该是线性探查法,即如果根据哈希值计算得来的位置不为空,那么将继续尝试下一个位置。 + +这一点可以从resize方法的下列源码得到证明: + +int h = k.threadLocalHashCode & (newLen - 1); +while (newTab[h] != null) + h = nextIndex(h, newLen); +newTab[h] = e; +哈希值 +ThreadLocalMap使用的哈希值源自ThreadLocal的下列属性: + +private final int threadLocalHashCode = nextHashCode(); +private static int nextHashCode() { + return nextHashCode.getAndAdd(HASH_INCREMENT); +} +而nextHashCode属性则是AtomicInteger类型,HASH_INCREMENT定义: + +private static final int HASH_INCREMENT = 0x61c88647; +清除 +由于ThreadLocalMap的key(即ThreadLocal)为弱引用,所以当其被回收时,势必需要将value置为null以便于进行垃圾回收。那么这个清除的时机又是什么呢? + +答案是get, set, remove都有可能。 + +Lambda支持 +jdk8支持使用以下方式进行初始化: + +ThreadLocal local = ThreadLocal.withInitial(() -> "hello"); +withInitial源码: + +public static ThreadLocal withInitial(Supplier supplier) { + return new SuppliedThreadLocal<>(supplier); +} +SuppliedThreadLocal是ThreadLocal的内部类,也是其子类: + +static final class SuppliedThreadLocal extends ThreadLocal { + private final Supplier supplier; + SuppliedThreadLocal(Supplier supplier) { + this.supplier = Objects.requireNonNull(supplier); + } + @Override + protected T initialValue() { + return supplier.get(); + } +} +一目了然。 + +内存泄漏 + + +继承性问题 +子线程中是否可以获得父线程设置的ThreadLocal变量? 答案是不可以,如以下测试代码: + +public class Test { + + private ThreadLocal threadLocal = new InheritableThreadLocal<>(); + + public void test() throws InterruptedException { + threadLocal.set("parent"); + + Thread thread = new Thread(() -> { + System.out.println(threadLocal.get()); + threadLocal.set("child"); + System.out.println(threadLocal.get()); + }); + + thread.start(); + + thread.join(); + + System.out.println(threadLocal.get()); + } + + public static void main(String[] args) throws InterruptedException { + new Test().test(); + } + +} +执行结果是: + +null +child +parent +从中可以得出两个结论: + +子线程无法获得父线程设置的ThreadLocal。 +父子线程的ThreadLocal是相互独立的。 +解决方法是使用java.lang.InheritableThreadLocal类。原因其实很容易理解: ThreadLocal数据以线程为单位进行保存,InheritableThreadLocal的原理是在子线程创建的时候 将父线程的变量(浅)拷贝到自身中。 + +从源码的角度进行原理的说明,Thread中其实有两个ThreadLocalMap: + +ThreadLocal.ThreadLocalMap threadLocals = null; +ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; +普通的ThreadLocal被保存在threadLocals中,InheritableThreadLocal被保存在inheritableThreadLocal中,注意这里是并列的关系,即两者可以同时存在且不为空。另外一个关键的问题便是 父线程的变量是何时被复制到子线程中的,答案是在子线程创建时,init方法: + +private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); +} +inheritThreadLocals除非我们使用了带AccessControlContext参数的构造器,默认都是true。 + +然而到了这里仍有问题存在:那就是线程池场景。一个线程只会在创建时从其父线程中拷贝一次属性,而线程池中的线程需要动态地执行从不同的上级线程提交地任务,在此种情形下逻辑上的 父线程也就不再存在了,阿里巴巴的transmittable-thread-local解决了这一问题,核心原理其实是实现了一个Runnable的包装, 伪代码如下: + +public class Wrapper implements Runnable { + + private final Runnable target; + + @Override + public final void run() { + //1.拷贝父变量 + try { + target.run(); + } finally { + //2.还原... + } + } +} +总结: + +1 对于某一ThreadLocal来讲,他的索引值i是确定的,在不同线程之间访问时访问的是不同的table数组的同一位置即都为table[i],只不过这个不同线程之间的table是独立的。 +2 对于同一线程的不同ThreadLocal来讲,这些ThreadLocal实例共享一个table数组,然后每个ThreadLocal实例在table中的索引i是不同的。 diff --git a/week_05/55/ThreadPoolExecutor-55.md b/week_05/55/ThreadPoolExecutor-55.md new file mode 100644 index 0000000..3685bc8 --- /dev/null +++ b/week_05/55/ThreadPoolExecutor-55.md @@ -0,0 +1,790 @@ +ThreadPoolExecutor +注释: +1 线程创建:按照ThreadFactory类中接口被实现的机制进行创建 + +2 线程维护 :如果没有超过核心线程时,倾向于新创建而不是排队;如果超过核心线程,那么倾向排队而不是新建线程;如果长时间得不到线程轮转,在没有超越最大线程数的时候,倾向创建线程,否则只能拒绝任务; + +3 线程超时KeepAlive策略 : 如果当前线程池中有超过corePoolSize的线程,那么如果空闲时间超过keepAliveTime,则多余的线程将被终止。这提供了一种在没有被积极使用时减少资源消耗的方法。如果池变得更活跃以后,新的线程将被构建。还可以使用方法setKeepAliveTime动态更改此参数。有效地禁止空闲线程在关闭之前终止。默认情况下,keep-alive策略仅适用于拥有多于corePoolSize线程的情况。但是方法{#allowCoreThreadTimeOut(boolean)}也可以用于将这个超时策略应用到核心线程,只要keepAliveTime值不为0。 + +4 排队三大策略: + +4.1 直接传递。工作队列的一个很好的默认选择是同步队列,它将任务交给线程,而不需要其他方法来持有它们。在这里,如果没有线程可以立即运行任务,则尝试对任务进行排队将失败,因此将构造一个新线程。当处理可能具有内部依赖项的请求集时,此策略避免了锁定。直接移交通常需要无界的maxPoolSize,这样可以避免拒绝新提交的任务。反过来,当命令到达的平均速度比它们被处理的速度还要快时,就有可能出现无限的线程增长。 + +4.2 无界队列。使用一个无界队列,例如一个LinkedBlockingQueue,当所有corePoolSize线程都忙的时候,会导致新的任务在队列中等待。因此,创建的线程不会超过corePoolSize 。(因此,maximumPoolSize 的值没有任何影响。当每个任务完全独立于其他任务时,这可能是合适的,因此任务不能影响其他任务的执行;例如,在web页面服务器中。虽然这种类型的排队在平滑短暂的请求爆发方面很有用,但它也承认,当命令以平均速度到达时,可能会出现无限的工作队列增长。 + +4.3 有界的队列。一个有限的队列(例如,一个* {@link ArrayBlockingQueue})有助于防止在与有限的maximumpoolsize一起使用时资源耗尽,但是调优和控制可能更困难。队列大小和最大池大小可以相互交换:使用大队列和小池可以最小化 CPU使用、OS资源和上下文切换开销,但是会导致人为的低吞吐量。如果任务经常阻塞(例如,它们是I/O绑定的),系统可能会为更多的线程安排时间。使用小队列通常需要更大的池大小,这会使cpu更忙,但是可能会遇到不可接受的调度开销,这也会降低吞吐量。 + +5 4种拒绝策略: + +5.1 在默认的{@link ThreadPoolExecutor中。处理程序拒绝时抛出一个运行时{@link RejectedExecutionException}。 + +5.2 {@link ThreadPoolExecutor。调用{@code execute}的线程本身运行任务。这提供了一个简单的反馈控制机制,可以降低提交新任务的速度。 + +5.3 直接抛弃 + +5.4 队列头的先被抛弃 FIFO + +6 队列维护:当有大量的队列里的任务被取消时,队列访问方法getQueue()可以控制队列:其中提供了两个方法,remove与purge + +核心方法: +创建 +我们以下列代码为例: + +public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); +} +可见默认使用LinkedBlockingQueue作为工作队列,其构造器: + +public LinkedBlockingQueue() { + this(Integer.MAX_VALUE); +} +可见,这其实是一个有界队列,虽然大小为int最大值。 + +ThreadPoolExecutor便是JDK线程池的核心了,类图: + + + + +ThreadPoolExecutor构造器: + +public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); +} +线程工厂 + + + + + +默认的线程工厂是Executors的内部类,核心的newThread方法: + +public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; +} +namePrefix定义: + +namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; +这便是为线程池默认创建的线程起名的地方了,Thread构造器的最后一个0为stackSize,0表示忽略此参数。 + +拒绝策略 +从上面可以看出,线程池默认使用有界队列,所以当队列满的时候就需要考虑如何处理这种情况。 + + + + +线程池默认采用的是: + +private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); +即抛出异常,线程池退出: + +public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { + throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); +} +execute +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + int c = ctl.get(); + //corePoolSize为volatile,下面会提到为什么 + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + //创建新线程成功,交由其执行 + return; + c = ctl.get(); + } + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + if (! isRunning(recheck) && remove(command)) + reject(command); + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + else if (!addWorker(command, false)) + reject(command); +} +控制变量 +ctl是线程池的核心控制变量: + +private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); +有以下两个用途: + +高3位标志线程池的运行状态,比如运行、关闭。 +低29位存储当前工作线程的个数,所以一个线程池最多可以创建2 ^ 29 - 1(约为5亿)个线程。 +线程创建 +当我们调用execute方法时,线程池将首先检查当前线程数是否已达到上限,如果没有创建新的工作线程,而不是入队。 + +private boolean addWorker(Runnable firstTask, boolean core) { + retry: + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + // 检查线程池状态,如果已关闭,返回false + if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) + return false; + for (;;) { + int wc = workerCountOf(c); + //检查是否达到上限 + if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + //如果CAS增加线程数成功,中断循环 ,进行线程创建 + if (compareAndIncrementWorkerCount(c)) + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + //shutdown等方法也需要加锁,所以可以保证线程安全 + mainLock.lock(); + try { + //再次检查状态 + int rs = runStateOf(ctl.get()); + if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + //workers是一个HashSet + workers.add(w); + int s = workers.size(); + //用于记录出现过的最大线程数 + if (s > largestPoolSize) + largestPoolSize = s; + workerAdded = true; + } + } finally { + mainLock.unlock(); + } + if (workerAdded) { + t.start(); + workerStarted = true; + } + } + } finally { + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; +} +核心 vs 最大线程数 +注意execute方法中的细节,第一次addWorker调用的core参数为true,即表示已corePoolSize为上限,后两次为false。这就说明了execute方法执行时遵从一下顺序进行尝试: + +如果当前线程数小于corePoolSize,那么增加线程。 +尝试加入队列。 +如果入队失败那么尝试将线程数增加至maximumPoolSize。 +如果还是失败,那么交给RejectedExecutionHandler。 +Worker +这里的"线程(即Worker)"其实是ThreadPoolExecutor的内部类。 + + + + +又见AQS。构造器: + +Worker(Runnable firstTask) { + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + this.thread = getThreadFactory().newThread(this); +} +其run方法的真正逻辑由ThreadPoolExecutor.runWorker实现: + +final void runWorker(Worker w) { + Thread wt = Thread.currentThread(); + Runnable task = w.firstTask; + w.firstTask = null; + boolean completedAbruptly = true; + try { + while (task != null || (task = getTask()) != null) { + w.lock(); + //中断状态 + if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) + wt.interrupt(); + try { + task.run(); + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + processWorkerExit(w, completedAbruptly); + } +} +锁 +可以看出,一次任务的执行是在所在Worker的锁的保护下进行的,结合后面shutdownNow的源码可以发现,shutdownNow中断Worker的前提是获得锁,这就很好的体现了shutdownNow的语义: 阻止新任务的提交,等待所有已有任务执行完毕。 + +中断状态 +这里有两种意义 : + +如果线程池处于STOP(或之后)的状态,即shutdownNow方法已被调用,那么此处代码将确保线程的中断标志位一定被设置。 +如果线程池处于STOP之前的状态,比如SHUTDOWN或RUNNING,那么Worker不应响应中断,即应当清除中断标志,但是暂时没有想到谁会设置Worker线程的中断标志位,难道是我们的业务代码? +在这里扒一扒到底什么是线程中断: + +public void interrupt() { + synchronized (blockerLock) { + Interruptible b = blocker; + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); +} +blocker在nio部分已经见过了,interrupt0的最终native实现位于openjdk\hotspot\src\os\solaris\vm\os_solaris.cpp(Linux): + +void os::interrupt(Thread* thread) { + OSThread* osthread = thread->osthread(); + int isInterrupted = osthread->interrupted(); + if (!isInterrupted) { + //设置标志位 + osthread->set_interrupted(true); + OrderAccess::fence(); + //唤醒sleep()? + ParkEvent * const slp = thread->_SleepEvent ; + if (slp != NULL) slp->unpark() ; + } + //唤醒LockSupport.park()? + if (thread->is_Java_thread()) { + ((JavaThread*)thread)->parker()->unpark(); + } + //唤醒Object.wait()? + ParkEvent * const ev = thread->_ParkEvent ; + if (ev != NULL) ev->unpark() ; + // When events are used everywhere for os::sleep, then this thr_kill + // will only be needed if UseVMInterruptibleIO is true. + if (!isInterrupted) { + int status = thr_kill(osthread->thread_id(), os::Solaris::SIGinterrupt()); + assert_status(status == 0, status, "thr_kill"); + // Bump thread interruption counter + RuntimeService::record_thread_interrupt_signaled_count(); + } +} +与java里已知的可被中断的阻塞大体可以找到对应关系。 + +任务获取 +private Runnable getTask() { + boolean timedOut = false; // Did the last poll() time out? + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + // 线程池已经关闭且队列中没有剩余的任务,退出 + if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { + decrementWorkerCount(); + return null; + } + int wc = workerCountOf(c); + // 如果启用了超时并且已经超时且队列中没有任务,线程退出 + boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; + if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { + if (compareAndDecrementWorkerCount(c)) + return null; + continue; + } + try { + Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); + if (r != null) + return r; + timedOut = true; + } catch (InterruptedException retry) { + //如果被中断不是马上退出,而是在下一次循环中检查线程池状态 + timedOut = false; + } + } +} +结合runWorker方法可以发现,如果getTask返回null,那么即说明当前Worker线程应该退出。 + +超时 +allowCoreThreadTimeOut定义如下: + +private volatile boolean allowCoreThreadTimeOut; +默认为false,如果开启,Worker不会无限期等待任务,而是超时之后便退出。我们可以通过allowCoreThreadTimeOut方法进行设置: + +public void allowCoreThreadTimeOut(boolean value) { + if (value && keepAliveTime <= 0) + throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); + if (value != allowCoreThreadTimeOut) { + allowCoreThreadTimeOut = value; + if (value) + interruptIdleWorkers(); + } +} +注意同时需传入一个大于零的keepAliveTime。所以受这两个参数的影响,当没有任务执行时线程数并不一定等于corePoolSize。 + +超额线程回收 +这里的超额指超出corePoolSize的线程,源码中有一处隐蔽的细节: + +boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; +当线程数大于corePoolSize时timed也为true,再结合下面的条件判断可以得出结论: 当线程池当前的线程数超过corePoolSize且队列为空且corePoolSize不为0(0是被允许的),超出的线程会退出。 + +这一点可以使用测试代码test.Test的maxPoolSize方法进行验证。 + +另一种减少线程数的方法就是调用setCorePoolSize或setMaximumPoolSize重设线程池相关参数。 + +退出 +Worker在退出时将触发processWorkerExit方法: + +private void processWorkerExit(Worker w, boolean completedAbruptly) { + if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted + decrementWorkerCount(); + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + completedTaskCount += w.completedTasks; + workers.remove(w); + } finally { + mainLock.unlock(); + } + + tryTerminate(); + + int c = ctl.get(); + if (runStateLessThan(c, STOP)) { + if (!completedAbruptly) { + int min = allowCoreThreadTimeOut ? 0 : corePoolSize; + if (min == 0 && ! workQueue.isEmpty()) + min = 1; + if (workerCountOf(c) >= min) + return; // replacement not needed + } + addWorker(null, false); + } +} +其逻辑可以分为3个部分。 + +状态修改 +线程池内部使用如下变量统计总共完成的任务数: + +private long completedTaskCount; +在退出时Worker线程将自己完成的数量加至以上变量中。并且将自身从Worker Set中移除。 + +关闭线程池 +tryTerminate方法将会尝试关闭线程池。 + +final void tryTerminate() { + for (;;) { + int c = ctl.get(); + if (isRunning(c) || runStateAtLeast(c, TIDYING) || + (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) + return; + if (workerCountOf(c) != 0) { // Eligible to terminate + interruptIdleWorkers(ONLY_ONE); + return; + } + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { + try { + //空实现 + terminated(); + } finally { + ctl.set(ctlOf(TERMINATED, 0)); + termination.signalAll(); + } + return; + } + } finally { + mainLock.unlock(); + } + // else retry on failed CAS + } +} +什么情况下才会尝试调用interruptIdleWorkers呢? + +当前状态为STOP,即执行了shutdownNow()方法。 +当前状态为SHUTDOWN且任务队列为null,这正对应shutdown()方法被调用且所有任务已执行完毕。 +那么为什么只中断一个Worker线程而不是全部呢?猜测是这相当于链式唤醒,一个唤醒另一个直到最后一个将状态最终修改为TERMINATED。 + +termination.signalAll(); +用于唤醒正在等待线程终结的线程,termination定义如下: + +private final Condition termination = mainLock.newCondition(); +awaitTermination方法部分源码: + +nanos = termination.awaitNanos(nanos); +线程重生 +为什么叫重生呢?首先回顾一下runWorker方法任务执行的相关源码: + +try { + task.run(); +} catch (RuntimeException x) { + thrown = x; throw x; +} catch (Error x) { + thrown = x; throw x; +} catch (Throwable x) { + thrown = x; throw new Error(x); +} finally { + afterExecute(task, thrown); +} +可以看到,异常又被重新抛了出去,也就是说如果我们任务出现了未检查异常就会导致Worker线程的退出,而processWorkerExit方法将会检测当前线程池是否还需要再增加Worker,如果是由于任务逻辑异常导致的退出势必是需要增加的,这便是"重生"。 + +submit & FutureTask +我们以单参数Callable task方法为例,AbstractExecutorService.submit: + +public Future submit(Callable task) { + RunnableFuture ftask = newTaskFor(task); + execute(ftask); + return ftask; +} +AbstractExecutorService.newTaskFor: + +protected RunnableFuture newTaskFor(Callable callable) { + return new FutureTask(callable); +} +被包装成了一个FutureTask对象: + + + + +FutureTask组合了Runnable和Future两个接口。下面我们来看一下其主要方法的实现。 + +get +public V get() { + int s = state; + if (s <= COMPLETING) + s = awaitDone(false, 0L); + return report(s); +} +state为状态标识,其声明(和可取的值)如下: + +private volatile int state; +private static final int NEW = 0; +private static final int COMPLETING = 1; +private static final int NORMAL = 2; +private static final int EXCEPTIONAL = 3; +private static final int CANCELLED = 4; +private static final int INTERRUPTING = 5; +private static final int INTERRUPTED = 6; +显然get方法的核心便是用于进行等待的awaitDone方法: + +private int awaitDone(boolean timed, long nanos) { + final long deadline = timed ? System.nanoTime() + nanos : 0L; + WaitNode q = null; + boolean queued = false; + for (;;) { + if (Thread.interrupted()) { + removeWaiter(q); + throw new InterruptedException(); + } + int s = state; + if (s > COMPLETING) { + //已经完成 + if (q != null) + q.thread = null; + return s; + } + else if (s == COMPLETING) + //正在完成,只需要让CPU空转进行等待即可 + Thread.yield(); + else if (q == null) + q = new WaitNode(); + else if (!queued) + //CAS将新节点q设为等待链表的头结点 + queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); + else if (timed) { + nanos = deadline - System.nanoTime(); + if (nanos <= 0L) { + removeWaiter(q); + return state; + } + LockSupport.parkNanos(this, nanos); + } + else + LockSupport.park(this); + } +} +这里并没有使用Lock或是Condition,而是直接使用了类似AQS等待队列的思想。我们来看一下WaitNode的类图: + + + + +属性thread取自构造器: + +WaitNode() { thread = Thread.currentThread(); } +而report方法用于根据最后的状态采取对应的动作,比如抛出异常或者是返回结果: + +private V report(int s) throws ExecutionException { + Object x = outcome; + if (s == NORMAL) + return (V)x; + if (s >= CANCELLED) + throw new CancellationException(); + throw new ExecutionException((Throwable)x); +} +run +很自然的想到一个问题: 是在哪里将状态设为已完成的呢? + +public void run() { + //runner屏障,防止任务重复执行 + if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) + return; + try { + Callable c = callable; + //volatile读 + if (c != null && state == NEW) { + //窗口开始 + V result; + boolean ran; + try { + result = c.call(); + ran = true; + //窗口结束 + } catch (Throwable ex) { + result = null; + ran = false; + setException(ex); + } + if (ran) + set(result); + } + } finally { + // 解除runner屏障 + runner = null; + // state must be re-read after nulling runner to prevent + // leaked interrupts + int s = state; + if (s >= INTERRUPTING) + handlePossibleCancellationInterrupt(s); + } +} +set和setException方法便是用于改变任务的状态,通知我们的等待线程,将在后面进行说明。 + +这里最有意思的是handlePossibleCancellationInterrupt方法的调用,注释中提到的"泄漏的中断"指的是什么呢?其实在任务call方法调用前后存在一个状态被其它线程修改的时间窗口,窗口的起止位置见上面源码。 + +在这个窗口时间内,另外一个线程完全可能通过对cancel方法的调用将状态改为INTERRUPTING或CANCELLED。cancel方法的说明见下面,注意,一旦当前状态不再是NEW,那么set和setException方法便不会执行,因为其前提条件是状态为NEW,set方法部分源码: + +protected void set(V v) { + if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { + //... + } +} +所以handlePossibleCancellationInterrupt被执行的条件是: + +在业务逻辑(call方法)执行期间发生了cancell调用。 + +private void handlePossibleCancellationInterrupt(int s) { + if (s == INTERRUPTING) + while (state == INTERRUPTING) + Thread.yield(); // wait out pending interrupt +} +线程到这里便会空转等待,直到cancel线程将状态最终修改为INTERRUPTED。为什么要这么做呢? + +猜测Doug Lea大神是为了保证被取消的线程晚于取消线程退出。 + +这里还有一个很有意思的问题,这里能不能清除中断标志呢?答案是不能。因为cancel靠中断取消任务的执行,同时我们也有可能利用中断语义自主结束任务的执行,FutureTask在这里不能分辨出是取消还是用户中断。那么问题来了,额外再引入一个标志变量可否? + +cancel +public boolean cancel(boolean mayInterruptIfRunning) { + if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, + mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) + return false; + try { // in case call to interrupt throws exception + if (mayInterruptIfRunning) { + try { + Thread t = runner; + if (t != null) + t.interrupt(); + } finally { // final state + UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); + } + } + } finally { + finishCompletion(); + } + return true; +} +可以看出,只有任务尚处于NEW状态时此方法才会返回true。这里有一个有意思的问题,为什么对于INTERRUPTED状态的设置使用putOrderedInt方法呢? + +putOrderedInt方法是一种底层的优化手段,效果就是对volatile变量进行普通写操作,也就是说并不保证可见性,可以参考: + +AtomicXXX.lazySet(…) in terms of happens before edges + +可以进行此处优化的原因是执行到这里时状态(state)必定为INTERRUPTING或CANCELLED,而对于get/run等方法其实并不关心状态具体是哪一种,get方法源码回顾: + +public V get() throws InterruptedException, ExecutionException { + int s = state; + if (s <= COMPLETING) + s = awaitDone(false, 0L); + return report(s); +} +只要state大于COMPLETING便会直接report,既然对其它线程没有影响也就没必要保证可见性(再加一次内存屏障了)。 + +finishCompletion方法用以最后执行唤醒等待线程等操作: + +private void finishCompletion() { + // assert state > COMPLETING; + for (WaitNode q; (q = waiters) != null;) { + if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { + for (;;) { + Thread t = q.thread; + if (t != null) { + q.thread = null; + LockSupport.unpark(t); + } + WaitNode next = q.next; + if (next == null) + break; + q.next = null; // unlink to help gc + q = next; + } + break; + } + } + //模板方法,空实现 + done(); + callable = null; // to reduce footprint +} +其实就是一个遍历等待链表并逐个unpark的过程。 + +set +protected void set(V v) { + if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { + outcome = v; + UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state + finishCompletion(); + } +} +一目了然。 + +shutdown +public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + //设置状态 + advanceRunState(SHUTDOWN); + interruptIdleWorkers(); + } finally { + mainLock.unlock(); + } + tryTerminate(); +} +这里加了锁,是时候总结一下这个mainLock用在哪些地方了: + +shutdown & shutdownNow +awaitTermination +getPoolSize +getActiveCount +getLargestPoolSize +getTaskCount +getCompletedTaskCount +toString +可见,锁用在对Worker集合的操作以及线程池的关闭、线程数量获取上。tryTerminate方法已经见识过了,这里重点在于interruptIdleWorkers: + +private void interruptIdleWorkers(boolean onlyOne) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (Worker w : workers) { + Thread t = w.thread; + if (!t.isInterrupted() && w.tryLock()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } finally { + w.unlock(); + } + } + if (onlyOne) + break; + } + } finally { + mainLock.unlock(); + } +} +onlyOne参数为false,这里最有意思的便是w.tryLock()。回顾之前Worker部分,Worker继承自AbstractQueuedSynchronizer,而Worker对业务逻辑的执行处于其自身锁的保护之下,也就是说,如果Worker当前正在由任务执行,根本不可能被中断,这就符合了线程池shutdown不会中断正在执行的任务的语义。由于interruptIdleWorkers执行时线程池的状态已被修改为SHUTDOWN,所以在下一次进行任务获取的时候Worker线程自然会感知到shutdown调用,等到将队列中所有任务执行完毕时自然也就退出了,参考上面任务获取一节。 + +shutdownNow +public List shutdownNow() { + List tasks; + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + advanceRunState(STOP); + interruptWorkers(); + tasks = drainQueue(); + } finally { + mainLock.unlock(); + } + tryTerminate(); + return tasks; +} +这里会将所有尚未来得及执行的任务一并返回。 + +private void interruptWorkers() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + for (Worker w : workers) + w.interruptIfStarted(); + } finally { + mainLock.unlock(); + } +} +Worker.interruptIfStarted: + +void interruptIfStarted() { + Thread t; + if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { + try { + t.interrupt(); + } catch (SecurityException ignore) { + } + } +} +getState() >= 0表示当前Worker已启动。没有获取锁直接中断,这便是和shutdown的区别了。drainQueue其实是对BlockingQueue接口drainTo方法的调用,因为线程池的队列必须是一个BlockingQueue。 + +这里有一个很有意思的细节: + +如果我们submit的任务尚未被执行,shutdownNow就被调用了,同时有一个线程正在阻塞在future上,那么此线程会被唤醒吗? + +答案是不会,源码中没有看到相关唤醒的代码,测试方法test.Test.canWakeUp可以证明这一现象。 + +getActiveCount +此方法用以获取线程池中当前正在执行任务的线程数,其实现很有趣: + +public int getActiveCount() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + int n = 0; + for (Worker w : workers) + if (w.isLocked()) + ++n; + return n; + } finally { + mainLock.unlock(); + } +} +是通过判断Worker是否持有锁完成的,新技能get。 + +finalize +ThreadPoolExecutor覆盖了此方法: + +protected void finalize() { + shutdown(); +} -- Gitee