From de611cfbbb8b30a2721db48c09ccd110340d6c01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8D=8E=E4=BB=94?= Date: Sun, 12 Jan 2020 22:09:00 +0800 Subject: [PATCH] =?UTF-8?q?028-Week=2005=20=E6=9C=AC=E5=91=A8=E5=B7=A5?= =?UTF-8?q?=E4=BD=9C=E8=BE=83=E5=BF=99=EF=BC=8C=E4=BE=9D=E7=84=B6=E6=B2=A1?= =?UTF-8?q?=E6=9C=89=E8=BE=BE=E5=88=B0=E4=B8=AA=E4=BA=BA=E9=A2=84=E6=9C=9F?= =?UTF-8?q?=E3=80=82=E4=B8=8B=E5=91=A8=E9=9C=80=E8=A6=81=E5=8A=AA=E5=8A=9B?= =?UTF-8?q?=E4=BA=86=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- week_05/28/Executors.md | 141 +++++ ...\345\272\217\346\211\223\345\215\260.java" | 34 ++ week_05/28/LeetCode_1117_H20.java | 47 ++ week_05/28/Thread.md | 528 ++++++++++++++++++ week_05/28/ThreadLocal.md | 425 ++++++++++++++ week_05/28/ThreadPoolExecutor.md | 244 ++++++++ 6 files changed, 1419 insertions(+) create mode 100644 week_05/28/Executors.md create mode 100644 "week_05/28/LeetCode_1114_\351\241\272\345\272\217\346\211\223\345\215\260.java" create mode 100644 week_05/28/LeetCode_1117_H20.java create mode 100644 week_05/28/Thread.md create mode 100644 week_05/28/ThreadLocal.md create mode 100644 week_05/28/ThreadPoolExecutor.md diff --git a/week_05/28/Executors.md b/week_05/28/Executors.md new file mode 100644 index 0000000..47b4732 --- /dev/null +++ b/week_05/28/Executors.md @@ -0,0 +1,141 @@ +# Executors 源码分析 +Executors为快速创建线程池的工具类,其可以创建基于ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool及包装的不可配置的线程池。 + +## newFixedThreadPool + +```java +//nThread为线程池中线程的数量 +public static ExecutorService newFixedThreadPool(int nThreads) { + //通过ThreadPoolExecutor实现类,创建核心线程数及最大线程数都为nThreads的线程池 + //空闲超时时间为0,且由于核心线程数和最大线程数一样,核心线程不会空闲超时后被回收 + //LinkedBlockingQueue为任务队列实现 + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); +} + +//nThread为线程池中线程的数量,线程创建的工厂类实现类为threadFactory +public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { + return new ThreadPoolExecutor(nThreads, nThreads, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory); +} +``` +## newWorkStealingPool +```java +//parallelism并行任务数 +public static ExecutorService newWorkStealingPool(int parallelism) { + //基于ForkJoinPool创建线程池; + //使用默认线程创建工厂类 + //执行任务异常处理类为null + //任务队列模型为FIFO,true为FIFI,false为FILO + return new ForkJoinPool + (parallelism, + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); +} + +public static ExecutorService newWorkStealingPool() { + //并行数为处理器的并行数 + return new ForkJoinPool + (Runtime.getRuntime().availableProcessors(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true); +} + +``` + +## newSingleThreadExecutor +```java +//创建单个线程的线程池 +public static ExecutorService newSingleThreadExecutor() { + //用FinalizableDelegatedExecutorService进行封装,其会在 + //finalize()方法中调用线程池的shutdown()方法,对线程池进行关闭 + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); +} + + +public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + threadFactory)); +} + +``` + +## newCachedThreadPool +```java +//创建缓存的线程池,即线程池数量不限 +public static ExecutorService newCachedThreadPool() { + //核心线程数量为0,最大线程数量为Integer.MAX_VALUE + //阻塞队列为SynchronousQueue + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue()); +} + +public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, + 60L, TimeUnit.SECONDS, + new SynchronousQueue(), + threadFactory); +} + +``` + +## newSingleThreadScheduledExecutor +```java +//创建单个可进行时间调度的线程池 +public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + //利用DelegatedScheduledExecutorService对线程池进行包装 + //DelegatedScheduledExecutorService利用门面模式包装了ScheduledExecutorService + //使得用户无法修改ScheduledExecutorService中的一些参数 + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1)); +} + +public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1, threadFactory)); +} +``` + +## newScheduledThreadPool +```java +//创建固定线程池的可基于时间调度的线程池,corePoolSize为核心线程池测数量 +public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return new ScheduledThreadPoolExecutor(corePoolSize); +} + +public static ScheduledExecutorService newScheduledThreadPool( + int corePoolSize, ThreadFactory threadFactory) { + return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); +} +``` + +## 创建不可变更的线程池 +```java +//对ExecutorService类型线程池进行包装,使用户无法对一些参数进行修改 +public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedExecutorService(executor); +} + +//对ScheduledExecutorService类型线程池进行包装,使用户无法对一些参数进行修改 +public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { + if (executor == null) + throw new NullPointerException(); + return new DelegatedScheduledExecutorService(executor); +} +``` +## 总结 +提供各类线程池创建的工具类,具体内部实现由其他对象,还需进一步分析。 + + diff --git "a/week_05/28/LeetCode_1114_\351\241\272\345\272\217\346\211\223\345\215\260.java" "b/week_05/28/LeetCode_1114_\351\241\272\345\272\217\346\211\223\345\215\260.java" new file mode 100644 index 0000000..06ead8c --- /dev/null +++ "b/week_05/28/LeetCode_1114_\351\241\272\345\272\217\346\211\223\345\215\260.java" @@ -0,0 +1,34 @@ +package com.ufo.java.week05; + +import java.util.concurrent.Semaphore; + +public class LeetCode_1114_顺序打印 { + class Foo { + + private Semaphore spa,spb; + public Foo() { + //初始化Semaphore为0的原因:如果这个Semaphore为零,如果另一线程调用(acquire)这个Semaphore就会产生阻塞,便可以控制second和third线程的执行 + spa = new Semaphore(0); + spb = new Semaphore(0); + } + public void first(Runnable printFirst) throws InterruptedException { + // printFirst.run() outputs "first". Do not change or remove this line. + printFirst.run(); + //只有等first线程释放Semaphore后使Semaphore值为1,另外一个线程才可以调用(acquire) + spa.release(); + } + public void second(Runnable printSecond) throws InterruptedException { + //只有spa为1才能执行acquire,如果为0就会产生阻塞 + spa.acquire(); + // printSecond.run() outputs "second". Do not change or remove this line. + printSecond.run(); + spb.release(); + } + public void third(Runnable printThird) throws InterruptedException { + //只有spb为1才能通过,如果为0就会阻塞 + spb.acquire(); + // printThird.run() outputs "third". Do not change or remove this line. + printThird.run(); + } + } +} diff --git a/week_05/28/LeetCode_1117_H20.java b/week_05/28/LeetCode_1117_H20.java new file mode 100644 index 0000000..28e012e --- /dev/null +++ b/week_05/28/LeetCode_1117_H20.java @@ -0,0 +1,47 @@ +package com.ufo.java.week05; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * 采用信号量方式实现线程生产 + * + */ +public class LeetCode_1117_H20 { + + class H2O { + private Semaphore semaH = new Semaphore(2); + private Semaphore semaO = new Semaphore(1); + + // Initialize group count. + private AtomicInteger groupCount = new AtomicInteger(0); + + private static final int GROUP_H_LIMIT = 2; + private static final int GROUP_O_LIMIT = 1; + private static final int GROUP_TOTAL_LIMIT = GROUP_H_LIMIT + GROUP_O_LIMIT; + + public H2O() { + } + + public void hydrogen(Runnable releaseHydrogen) throws InterruptedException { + this.semaH.acquire(1); + releaseHydrogen.run(); + this.groupCount.incrementAndGet(); + resetIfNeeded(); + } + + public void oxygen(Runnable releaseOxygen) throws InterruptedException { + this.semaO.acquire(1); + releaseOxygen.run(); + this.groupCount.incrementAndGet(); + resetIfNeeded(); + } + + private void resetIfNeeded() { + if (this.groupCount.compareAndSet(GROUP_TOTAL_LIMIT, 0)) { + this.semaH.release(GROUP_H_LIMIT); + this.semaO.release(GROUP_O_LIMIT); + } + } + } +} diff --git a/week_05/28/Thread.md b/week_05/28/Thread.md new file mode 100644 index 0000000..a41257f --- /dev/null +++ b/week_05/28/Thread.md @@ -0,0 +1,528 @@ +# Thread 源码分析 + +线程是java 里的核心,是执行任务的载体。 + +## 构造方法 +```java +/** + * Thread的所有public构造方法传入的inheritThreadLocals均为true + * 只有一个权限为default的构造方法传入为false + * 所有的构造方法实现均是调用了该方法 + */ +private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { + // 构造方法中传入的线程名不能为null + // 默认线程名:"Thread-" + nextThreadNum() + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + this.name = name; + // 被创建出来的线程是创建线程的子线程 + Thread parent = currentThread(); + // 新建线程的线程组 + // 如果构造方法传入的线程组为null,则通过这个流程来决定其线程组,通常,新建线程的线程组为其创建线程的线程组 + SecurityManager security = System.getSecurityManager(); + if (g == null) { + /* Determine if it's an applet or not */ + /* If there is a security manager, ask the security manager + what to do. */ + if (security != null) { + g = security.getThreadGroup(); + } + /* If the security doesn't have a strong opinion of the matter + use the parent thread group. */ + if (g == null) { + g = parent.getThreadGroup(); + } + } + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + /* + * Do we have the required permissions? + */ + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + g.addUnstarted(); + this.group = g; + // 子线程默认拥有父线程的优先级和daemon属性 + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + // 产生新的tid并设置 + tid = nextThreadID(); +} +public long getId() { + return tid; +} + +/* For autonumbering anonymous threads. */ +private static int threadInitNumber; +// 同步方法,保证tid的唯一性和连续性 +private static synchronized int nextThreadNum() { + return threadInitNumber++; +} +// 线程名可以被修改 +public final synchronized void setName(String name) { + checkAccess(); + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + this.name = name; + if (threadStatus != 0) { + setNativeName(name); + } +} + public final String getName() { + return name; +} +``` + +## 守护线程 + +线程设置为daemon,当jvm中没有正常线程[非daemon线程]就会退出。 +```java +/** + * Marks this thread as either a {@linkplain #isDaemon daemon} thread + * or a user thread. The Java Virtual Machine exits when the only + * threads running are all daemon threads. + *

This method must be invoked before the thread is started. + */ +public final void setDaemon(boolean on) { + checkAccess(); + // 只有在线程开始前设置才有效 + if (isAlive()) { + throw new IllegalThreadStateException(); + } + daemon = on; +} +public final boolean isDaemon() { + return daemon; +} + +``` + +## 线程优先级及状态 + +创建时不指定优先级时,默认设置为 NORM_PRIORITY +一个线程在被从Object.wait()中被唤醒时,会立即进入BLOCKED状态,这时其并没有获得锁,只是被唤醒了,再次开始对Object的监视器锁进行竞争;只有在其竞争获得锁之后才会进入RUNNABLE状态. + +```java +public final static int MIN_PRIORITY = 1; +public final static int NORM_PRIORITY = 5; +public final static int MAX_PRIORITY = 10; + +public final void setPriority(int newPriority) { + ThreadGroup g; + checkAccess(); + // 优先级范围 1~10 + if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) { + throw new IllegalArgumentException(); + } + if((g = getThreadGroup()) != null) { + if (newPriority > g.getMaxPriority()) { + newPriority = g.getMaxPriority(); + } + // 设置优先级 + setPriority0(priority = newPriority); + } +} + +public final int getPriority() { + return priority; +} +private native void setPriority0(int newPriority); +/** + * A thread state. A thread can be in one of the following states: + * NEW:A thread that has not yet started is in this state. + * RUNNABLE:A thread executing in the Java virtual machine is in this state. + * BLOCKED:A thread that is blocked waiting for a monitor lock is in this state. + * WAITING:A thread that is waiting indefinitely for another thread to + * perform a particular action is in this state. + * TIMED_WAITING:A thread that is waiting for another thread to perform an action + * for up to a specified waiting time is in this state. + * TERMINATED:A thread that has exited is in this state. + * A thread can be in only one state at a given point in time. + * These states are virtual machine states which do not reflect + * any operating system thread states. + * @since 1.5 + */ +public enum State { + // 创建后,但是没有start(),调用了start()后,线程才算准备就绪,可以运行(RUNNABLE) + NEW, + // 正在运行或正在等待操作系统调度 + RUNNABLE, + // 线程正在等待监视器锁 + // 正在synchronized块/方法上等待获取锁,或者调用了Object.wait(),等待重新获得锁进入同步块 + BLOCKED, + // 调用Object.wait(),Thread.join()或LockSupport.park()会进入该状态,注意这里的调用均为没有设置超时, + // 线程正在等待其他线程进行特定操作,比如,调用了Object.wait()的线程在另一个线程调用Object.notify()/Object.notifyAll() + // 调用了Thread.join()的线程在等待指定线程停止,join()的内部实现方式也是Object.wait(),只不过其Object就是线程对象本身 + WAITING, + // 调用Thread.sleep(),Object.wait(long),Thread.join(long), + // LockSupport.parkNanos(long),LockSupport.parkUntil(long)会进入该状态, + // 注意,这里的调用均设置了超时 + TIMED_WAITING, + // 线程执行完成,退出 + TERMINATED; +} +// @since 1.5 +public State getState() { + // get current thread state + return sun.misc.VM.toThreadState(threadStatus); +} +``` + +## 线程启动和run + +实现 Runnable 或者继承 Thread 复写 run 方法,再run 方法中编写线程要处理的任务。 +```java + +/* What will be run. */ +private Runnable target; +@Override +public void run() { + if (target != null) { + target.run(); + } +} + + +/* Java thread status for tools, +* initialized to indicate thread 'not yet started' +*/ +private volatile int threadStatus = 0; +// 启动线程,JVM会调用当前Thread对象的run() +// 同步方法 +public synchronized void start() { + // A zero status value corresponds to state "NEW". + // 如果调用时不是在线程状态不是NEW,则抛出IllegalThreadStateException + if (threadStatus != 0) + throw new IllegalThreadStateException(); + /* Notify the group that this thread is about to be started + * so that it can be added to the group's list of threads + * and the group's unstarted count can be decremented. */ + group.add(this); + boolean started = false; + try { + // 通过start0()来实现线程启动 + start0(); + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } +} + +private native void start0(); +``` + +## 线程打断 + +interrupt()不能中断执行阻塞IO操作的线程. + +```java +/* The object in which this thread is blocked in an interruptible I/O + * operation, if any. The blocker's interrupt method should be invoked + * after setting this thread's interrupt status. + */ +private volatile Interruptible blocker; +private final Object blockerLock = new Object(); + +/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code + */ +void blockedOn(Interruptible b) { + synchronized (blockerLock) { + blocker = b; + } +} +/** + * 打断当前执行线程 + * 如果当前线程阻塞在Object.wait(),Thread.join(),Thread.sleep()上, + * 那么该线程会收到InterruptedException,且线程的打断标志会被清除; + * 如果当前线程阻塞在InterruptibleChannel上,那么该InterruptibleChannel + * 会被关闭,线程的打断标志会被置位,且当前线程会收到ClosedByInterruptException; + * 如果当前线程阻塞在Selector上,那么该Selector的selection操作将会立即返回一个非0的结果, + * 且Selector.wakeup()会被调用,线程的打断标志会被置位, + * 如果上述情况均不存在,将当前线程的打断标志置位 + * 打断一个isAlive()返回false的线程没有效果,isInterrupted()仍然会返回false; + */ +public void interrupt() { + if (this != Thread.currentThread()) + checkAccess(); + synchronized (blockerLock) { + Interruptible b = blocker; + // 在Interruptible上阻塞 + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); +} +private native void interrupt0(); + +/** + * 返回线程是否被打断(打断标志是否被置位) + * 传入的参数决定是否该方法是否会清除终端标志位 + */ +private native boolean isInterrupted(boolean ClearInterrupted); + +public static boolean interrupted() { + return currentThread().isInterrupted(true); +} + +public boolean isInterrupted() { + return isInterrupted(false); +} +``` + +## 线程礼让控制 yield() sleep() join() +```java +/** + * 暗示调度器让出当前线程的执行时间片,调度器可以选择忽略该暗示; + * 该方法在用来调试和测试时可能很有用,可以用来重现需要特殊条件才能复现的bug; + * 也可以用来进行并发优化等; + */ +public static native void yield(); + +/** + * 当前执行线程休眠指定毫秒在休眠期间,不释放任何当前线程持有的锁; + */ +public static native void sleep(long millis) throws InterruptedException; + +/** + * 当前执行线程休眠指定毫秒在休眠期间,不释放任何当前线程持有的锁; + * 如果当前被打断(该方法调用前或该方法调用时),抛出InterruptedException,同时将打断标志清掉 + */ +public static void sleep(long millis, int nanos) throws InterruptedException { + // 取值范围检查 + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + if (nanos < 0 || nanos > 999999) { + throw new IllegalArgumentException( + "nanosecond timeout value out of range"); + } + // 纳秒最后还是转换成了毫秒233333 + // 可能是考虑都有些 + if (nanos >= 500000 || (nanos != 0 && millis == 0)) { + millis++; + } + sleep(millis); +} +/** + * 当前执行线程等待指定线程(也就是该调用发生的Thread对象)死后再继续执行; + * 可以设置超时,如果设置超时为0,则为不设置超时; + * 线程结束时(terminate),将会调用自身的notifyAll(),唤醒在该Thread对象上wait()的方法; + * 如果该线程被打断,该方法将抛出InterruptedException,并将打断标志位清除 + */ +// 同步方法,同步当前Thread对象,所以才能在其内部调用wait() +public final synchronized void join(long millis) +throws InterruptedException { + long base = System.currentTimeMillis(); + long now = 0; + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + // 使用isAlive()和wait()的循环实现 + if (millis == 0) { + while (isAlive()) { + wait(0); + } + } else { + while (isAlive()) { + long delay = millis - now; + if (delay <= 0) { + break; + } + wait(delay); + now = System.currentTimeMillis() - base; + } + } +} + +public final synchronized void join(long millis, int nanos) +throws InterruptedException { + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + if (nanos < 0 || nanos > 999999) { + throw new IllegalArgumentException( + "nanosecond timeout value out of range"); + } + if (nanos >= 500000 || (nanos != 0 && millis == 0)) { + millis++; + } + join(millis); +} +public final void join() throws InterruptedException { + join(0); +} +/** + * This method is called by the system to give a Thread + * a chance to clean up before it actually exits. + */ +private void exit() { + if (group != null) { + group.threadTerminated(this); + group = null; + } + /* Aggressively null out all reference fields: see bug 4006245 */ + target = null; + /* Speed the release of some of these resources */ + threadLocals = null; + inheritableThreadLocals = null; + inheritedAccessControlContext = null; + blocker = null; + uncaughtExceptionHandler = null; +} +``` + +## 废弃的方法 suspend() resume() stop() + +```java +/** + * 挂起当前线程 + * 弃用原因:容易导致死锁 + */ +@Deprecated +public final void suspend() { + checkAccess(); + suspend0(); +} +/** + * 从suspend()中恢复线程运行 + * 弃用原因:容易导致死锁 + */ +@Deprecated +public final void resume() { + checkAccess(); + resume0(); +} +/** + * 强制线程停止执行; + * 通过抛出一个ThreadDeath的方式来停止线程; + * 废弃原因:stop()会释放所有已持有的锁的监视器,如果存在之前被这些监视器保护的对象处于一个不连续 + * 的状态(inconsistent state),这些被损坏的对象将会对其他线程可见,出现不可预期的行为; + */ +@Deprecated +public final void stop() { + SecurityManager security = System.getSecurityManager(); + if (security != null) { + checkAccess(); + if (this != Thread.currentThread()) { + security.checkPermission(SecurityConstants.STOP_THREAD_PERMISSION); + } + } + // A zero status value corresponds to "NEW", it can't change to + // not-NEW because we hold the lock. + if (threadStatus != 0) { + resume(); // Wake up thread if it was suspended; no-op otherwise + } + // The VM can handle all thread states + stop0(new ThreadDeath()); +} +``` + +## 线程堆栈 + +```java +// 特殊编程技巧 +private static final StackTraceElement[] EMPTY_STACK_TRACE + = new StackTraceElement[0]; +private native static StackTraceElement[][] dumpThreads(Thread[] threads); +private native static Thread[] getThreads(); + +// 打印当前线程的堆栈信息(StackTrace),通过新建一个异常的方式实现 +// 注意:这是一个静态方法 +public static void dumpStack() { + new Exception("Stack trace").printStackTrace(); +} + +/** + * 获得堆栈信息,返回的是一个数组 + * 数组的第0个堆栈信息为最近调用的堆栈信息 + * @since 1.5 + */ +public StackTraceElement[] getStackTrace() { + if (this != Thread.currentThread()) { + // check for getStackTrace permission + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkPermission(SecurityConstants.GET_STACK_TRACE_PERMISSION); + } + // 不是活着的,返回的堆栈信息长度为0 + if (!isAlive()) { + return EMPTY_STACK_TRACE; + } + StackTraceElement[][] stackTraceArray = dumpThreads(new Thread[] {this}); + StackTraceElement[] stackTrace = stackTraceArray[0]; + // a thread that was alive during the previous isAlive call may have + // since terminated, therefore not having a stacktrace. + if (stackTrace == null) { + // 这样就不会返回null,调用者也无需判断null了 + stackTrace = EMPTY_STACK_TRACE; + } + return stackTrace; + } else { + // Don't need JVM help for current thread + return (new Exception()).getStackTrace(); + } +} +/** + * 返回所有线程的堆栈信息 + * @since 1.5 + */ +public static Map getAllStackTraces() { + // check for getStackTrace permission + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkPermission( + SecurityConstants.GET_STACK_TRACE_PERMISSION); + security.checkPermission( + SecurityConstants.MODIFY_THREADGROUP_PERMISSION); + } + + // Get a snapshot of the list of all threads + Thread[] threads = getThreads(); + StackTraceElement[][] traces = dumpThreads(threads); + Map m = new HashMap<>(threads.length); + for (int i = 0; i < threads.length; i++) { + StackTraceElement[] stackTrace = traces[i]; + if (stackTrace != null) { + m.put(threads[i], stackTrace); + } + // else terminated so we don't put it in the map + } + return m; +} +``` +## 总结 + +线程是java中的核心知识点,并发编程中都是多个线程并发处理任务或数据。 +上几周学习的并发集合及锁相关的控制都是解决并发问题而产生的。 + diff --git a/week_05/28/ThreadLocal.md b/week_05/28/ThreadLocal.md new file mode 100644 index 0000000..e7efcf9 --- /dev/null +++ b/week_05/28/ThreadLocal.md @@ -0,0 +1,425 @@ +# ThreadLocal 源码分析 + +ThreadLocal类可以理解为线程本地变量。也就是说如果定义了一个ThreadLocal,每个线程往这个ThreadLocal中读写是线程隔离,互相之间不会影响的。它提供了一种将可变数据通过每个线程有自己的独立副本从而实现线程封闭的机制。 + +## ThreadLocalMap +ThreadLocalMap 存储 threadLocal 弱引用作为key, 普通的key-value形式来定义存储结构会造成节点的生命周期与线程强绑定,只要线程没有销毁,那么节点在GC分析中一直处于可达状态,没办法被回收,而程序本身也无法判断是否可以清理节点。弱引用是Java中四档引用的第三档,比软引用更加弱一些,如果一个对象没有强引用链可达,那么一般活不过下一次GC。当某个ThreadLocal已经没有强引用可达,则随着它被垃圾回收,在ThreadLocalMap里对应的Entry的键值会失效,这为ThreadLocalMap本身的垃圾清理提供了便利 + +ThreadLocal需要维持一个最坏2/3的负载因子,对于负载因子相信应该不会陌生,在HashMap中就有这个概念。 +ThreadLocal有两个方法用于得到上一个/下一个索引,注意这里实际上是环形意义下的上一个与下一个。 + +由于ThreadLocalMap使用线性探测法来解决散列冲突,所以实际上Entry[]数组在程序逻辑上是作为一个环形存在的。 + +```java +/** + * 初始容量,必须为2的幂 + */ +private static final int INITIAL_CAPACITY = 16; + +/** + * Entry表,大小必须为2的幂 + */ +private Entry[] table; + +/** + * 表里entry的个数 + */ +private int size = 0; + +/** + * 重新分配表大小的阈值,默认为0 + */ +private int threshold; + +/** + * 设置resize阈值以维持最坏2/3的装载因子 + */ +private void setThreshold(int len) { + threshold = len * 2 / 3; +} + +/** + * 环形意义的下一个索引 + */ +private static int nextIndex(int i, int len) { + return ((i + 1 < len) ? i + 1 : 0); +} + +/** + * 环形意义的上一个索引 + */ +private static int prevIndex(int i, int len) { + return ((i - 1 >= 0) ? i - 1 : len - 1); +} + +static class Entry extends WeakReference> { + // 往ThreadLocal里实际塞入的值 + Object value; + + Entry(java.lang.ThreadLocal k, Object v) { + super(k); + value = v; + } +} +``` + +## 构造方法 +```java +/** + * 构造一个包含firstKey和firstValue的map。 + * ThreadLocalMap是惰性构造的,所以只有当至少要往里面放一个元素的时候才会构建它。 + */ +ThreadLocalMap(java.lang.ThreadLocal firstKey, Object firstValue) { + // 初始化table数组 + table = new Entry[INITIAL_CAPACITY]; + // 用firstKey的threadLocalHashCode与初始大小16取模得到哈希值 + int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); + // 初始化该节点 + table[i] = new Entry(firstKey, firstValue); + // 设置节点表大小为1 + size = 1; + // 设定扩容阈值 + setThreshold(INITIAL_CAPACITY); +} + +/* + * 生成hash code间隙为这个魔数,可以让生成出来的值或者说ThreadLocal的ID较为均匀地分布在2的幂大小的数组中。 + */ +private static final int HASH_INCREMENT = 0x61c88647; + +private static int nextHashCode() { + return nextHashCode.getAndAdd(HASH_INCREMENT); +} + +``` + + +## getEntry +```java +private Entry getEntry(ThreadLocal key) { + // 根据key这个ThreadLocal的ID来获取索引,也即哈希值 + int i = key.threadLocalHashCode & (table.length - 1); + Entry e = table[i]; + // 对应的entry存在且未失效且弱引用指向的ThreadLocal就是key,则命中返回 + if (e != null && e.get() == key) { + return e; + } else { + // 因为用的是线性探测,所以往后找还是有可能能够找到目标Entry的。 + return getEntryAfterMiss(key, i, e); + } +} + +/* + * 调用getEntry未直接命中的时候调用此方法 + */ +private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { + Entry[] tab = table; + int len = tab.length; + + + // 基于线性探测法不断向后探测直到遇到空entry。 + while (e != null) { + ThreadLocal k = e.get(); + // 找到目标 + if (k == key) { + return e; + } + if (k == null) { + // 该entry对应的ThreadLocal已经被回收,调用expungeStaleEntry来清理无效的entry + expungeStaleEntry(i); + } else { + // 环形意义下往后面走 + i = nextIndex(i, len); + } + e = tab[i]; + } + return null; +} + + +/** + * 这个函数是ThreadLocal中核心清理函数,它做的事情很简单: + * 就是从staleSlot开始遍历,将无效(弱引用指向对象被回收)清理,即对应entry中的value置为null,将指向这个entry的table[i]置为null,直到扫到空entry。 + * 另外,在过程中还会对非空的entry作rehash。 + * 可以说这个函数的作用就是从staleSlot开始清理连续段中的slot(断开强引用,rehash slot等) + */ +private int expungeStaleEntry(int staleSlot) { + Entry[] tab = table; + int len = tab.length; + + // 因为entry对应的ThreadLocal已经被回收,value设为null,显式断开强引用 + tab[staleSlot].value = null; + // 显式设置该entry为null,以便垃圾回收 + tab[staleSlot] = null; + size--; + + Entry e; + int i; + for (i = nextIndex(staleSlot, len); (e = tab[i]) != null; i = nextIndex(i, len)) { + ThreadLocal k = e.get(); + // 清理对应ThreadLocal已经被回收的entry + if (k == null) { + e.value = null; + tab[i] = null; + size--; + } else { + /* + * 对于还没有被回收的情况,需要做一次rehash。 + * + * 如果对应的ThreadLocal的ID对len取模出来的索引h不为当前位置i, + * 则从h向后线性探测到第一个空的slot,把当前的entry给挪过去。 + */ + int h = k.threadLocalHashCode & (len - 1); + if (h != i) { + tab[i] = null; + + /* + * 在原代码的这里有句注释值得一提,原注释如下: + * + * Unlike Knuth 6.4 Algorithm R, we must scan until + * null because multiple entries could have been stale. + * + * 这段话提及了Knuth高德纳的著作TAOCP(《计算机程序设计艺术》)的6.4章节(散列) + * 中的R算法。R算法描述了如何从使用线性探测的散列表中删除一个元素。 + * R算法维护了一个上次删除元素的index,当在非空连续段中扫到某个entry的哈希值取模后的索引 + * 还没有遍历到时,会将该entry挪到index那个位置,并更新当前位置为新的index, + * 继续向后扫描直到遇到空的entry。 + * + * ThreadLocalMap因为使用了弱引用,所以其实每个slot的状态有三种也即 + * 有效(value未回收),无效(value已回收),空(entry==null)。 + * 正是因为ThreadLocalMap的entry有三种状态,所以不能完全套高德纳原书的R算法。 + * + * 因为expungeStaleEntry函数在扫描过程中还会对无效slot清理将之转为空slot, + * 如果直接套用R算法,可能会出现具有相同哈希值的entry之间断开(中间有空entry)。 + */ + while (tab[h] != null) { + h = nextIndex(h, len); + } + tab[h] = e; + } + } + } + // 返回staleSlot之后第一个空的slot索引 + return i; +} +``` + +## set + +探测过程中slot都不无效,并且顺利找到key所在的slot,直接替换即可 +探测过程中发现有无效slot,调用replaceStaleEntry,效果是最终一定会把key和value放在这个slot,并且会尽可能清理无效slot +在replaceStaleEntry过程中,如果找到了key,则做一个swap把它放到那个无效slot中,value置为新值 +在replaceStaleEntry过程中,没有找到key,直接在无效slot原地放entry +探测没有发现key,则在连续段末尾的后一个空位置放上entry,这也是线性探测法的一部分。放完后,做一次启发式清理,如果没清理出去key,并且当前table大小已经超过阈值了,则做一次rehash,rehash函数会调用一次全量清理slot方法也即expungeStaleEntries,如果完了之后table大小超过了threshold - threshold / 4,则进行扩容2倍 +```java +private void set(ThreadLocal key, Object value) { + + Entry[] tab = table; + int len = tab.length; + int i = key.threadLocalHashCode & (len - 1); + // 线性探测 + for (Entry e = tab[i]; e != null; e = tab[i = nextIndex(i, len)]) { + ThreadLocal k = e.get(); + // 找到对应的entry + if (k == key) { + e.value = value; + return; + } + // 替换失效的entry + if (k == null) { + replaceStaleEntry(key, value, i); + return; + } + } + + tab[i] = new Entry(key, value); + int sz = ++size; + if (!cleanSomeSlots(i, sz) && sz >= threshold) { + rehash(); + } +} + +private void replaceStaleEntry(ThreadLocal key, Object value, + int staleSlot) { + Entry[] tab = table; + int len = tab.length; + Entry e; + + // 向前扫描,查找最前的一个无效slot + int slotToExpunge = staleSlot; + for (int i = prevIndex(staleSlot, len); + (e = tab[i]) != null; + i = prevIndex(i, len)) { + if (e.get() == null) { + slotToExpunge = i; + } + } + + // 向后遍历table + for (int i = nextIndex(staleSlot, len); + (e = tab[i]) != null; + i = nextIndex(i, len)) { + ThreadLocal k = e.get(); + + // 找到了key,将其与无效的slot交换 + if (k == key) { + // 更新对应slot的value值 + e.value = value; + + tab[i] = tab[staleSlot]; + tab[staleSlot] = e; + + /* + * 如果在整个扫描过程中(包括函数一开始的向前扫描与i之前的向后扫描) + * 找到了之前的无效slot则以那个位置作为清理的起点, + * 否则则以当前的i作为清理起点 + */ + if (slotToExpunge == staleSlot) { + slotToExpunge = i; + } + // 从slotToExpunge开始做一次连续段的清理,再做一次启发式清理 + cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); + return; + } + + // 如果当前的slot已经无效,并且向前扫描过程中没有无效slot,则更新slotToExpunge为当前位置 + if (k == null && slotToExpunge == staleSlot) { + slotToExpunge = i; + } + } + + // 如果key在table中不存在,则在原地放一个即可 + tab[staleSlot].value = null; + tab[staleSlot] = new Entry(key, value); + + // 在探测过程中如果发现任何无效slot,则做一次清理(连续段清理+启发式清理) + if (slotToExpunge != staleSlot) { + cleanSomeSlots(expungeStaleEntry(slotToExpunge), len); + } +} + +/** + * 启发式地清理slot, + * i对应entry是非无效(指向的ThreadLocal没被回收,或者entry本身为空) + * n是用于控制控制扫描次数的 + * 正常情况下如果log n次扫描没有发现无效slot,函数就结束了 + * 但是如果发现了无效的slot,将n置为table的长度len,做一次连续段的清理 + * 再从下一个空的slot开始继续扫描 + * + * 这个函数有两处地方会被调用,一处是插入的时候可能会被调用,另外个是在替换无效slot的时候可能会被调用, + * 区别是前者传入的n为元素个数,后者为table的容量 + */ +private boolean cleanSomeSlots(int i, int n) { + boolean removed = false; + Entry[] tab = table; + int len = tab.length; + do { + // i在任何情况下自己都不会是一个无效slot,所以从下一个开始判断 + i = nextIndex(i, len); + Entry e = tab[i]; + if (e != null && e.get() == null) { + // 扩大扫描控制因子 + n = len; + removed = true; + // 清理一个连续段 + i = expungeStaleEntry(i); + } + } while ((n >>>= 1) != 0); + return removed; +} + + +private void rehash() { + // 做一次全量清理 + expungeStaleEntries(); + + /* + * 因为做了一次清理,所以size很可能会变小。 + * ThreadLocalMap这里的实现是调低阈值来判断是否需要扩容, + * threshold默认为len*2/3,所以这里的threshold - threshold / 4相当于len/2 + */ + if (size >= threshold - threshold / 4) { + resize(); + } +} + +/* + * 做一次全量清理 + */ +private void expungeStaleEntries() { + Entry[] tab = table; + int len = tab.length; + for (int j = 0; j < len; j++) { + Entry e = tab[j]; + if (e != null && e.get() == null) { + /* + * 个人觉得这里可以取返回值,如果大于j的话取了用,这样也是可行的。 + * 因为expungeStaleEntry执行过程中是把连续段内所有无效slot都清理了一遍了。 + */ + expungeStaleEntry(j); + } + } +} + +/** + * 扩容,因为需要保证table的容量len为2的幂,所以扩容即扩大2倍 + */ +private void resize() { + Entry[] oldTab = table; + int oldLen = oldTab.length; + int newLen = oldLen * 2; + Entry[] newTab = new Entry[newLen]; + int count = 0; + + for (int j = 0; j < oldLen; ++j) { + Entry e = oldTab[j]; + if (e != null) { + ThreadLocal k = e.get(); + if (k == null) { + e.value = null; + } else { + // 线性探测来存放Entry + int h = k.threadLocalHashCode & (newLen - 1); + while (newTab[h] != null) { + h = nextIndex(h, newLen); + } + newTab[h] = e; + count++; + } + } + } + + setThreshold(newLen); + size = count; + table = newTab; +} +``` + +## remove + +```java +/** + * 从map中删除ThreadLocal + */ +private void remove(ThreadLocal key) { + Entry[] tab = table; + int len = tab.length; + int i = key.threadLocalHashCode & (len - 1); + for (Entry e = tab[i]; + e != null; + e = tab[i = nextIndex(i, len)]) { + if (e.get() == key) { + // 显式断开弱引用 + e.clear(); + // 进行段清理 + expungeStaleEntry(i); + return; + } + } +} +``` + +## 总结 +ThreadLocal 在 springAop ,链路追踪,统一日志ID 等信息获取等方式比较常用。 +但是最好再使用完成后进行数据的手动清理。 + diff --git a/week_05/28/ThreadPoolExecutor.md b/week_05/28/ThreadPoolExecutor.md new file mode 100644 index 0000000..e3241a7 --- /dev/null +++ b/week_05/28/ThreadPoolExecutor.md @@ -0,0 +1,244 @@ +# ThreadPoolExecutor 源码分析 + +ThreadPoolExecutor 是 JDK 中线程池的实现类, 我们可以直接使用。 +corePoolSize:池里维持的最小线程数,即使它们是空闲线程,也不会进行销毁 +maximumPoolSize:最大线程数 +keepAliveTime:当池里的线程数量超过了corePoolSize时,如果额外线程在keepAliveTime时间段内都未执行新任务,将被销毁 + +## 构造方法 + +```java +public ThreadPoolExecutor(int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + RejectedExecutionHandler handler) { + if (corePoolSize < 0 || + maximumPoolSize <= 0 || + maximumPoolSize < corePoolSize || + keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.acc = System.getSecurityManager() == null ? + null : + AccessController.getContext(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; +} +``` +## submit +```java +public Future submit(Runnable task) { + if (task == null) throw new NullPointerException(); + //将 Runnable 封装成一个 RunnableFuture,对于 Runnable 意义不大,其主要针对 Callable + RunnableFuture ftask = newTaskFor(task, null); + //执行任务,由子类 ThreadPoolExecutor 实现 + execute(ftask); + //返回 Future 接口,可返回最终计算结果,对于 Runnable 意义不大,其主要针对 Callable + return ftask; +} + +protected RunnableFuture newTaskFor(Runnable runnable, T value) { + return new FutureTask(runnable, value); +} +``` + +## execute +```java +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + + //获取线程池状态值,其包含两个核心属性(方便同时对两个值的原子操作,底层采用了 CAS 而非锁) + //1.workerCount,有效线程数(代表那些已启动但不准停止的线程数量,与实际线程数可能不符) + //2.runState,主要用于线程池生命周期的控制 + int c = ctl.get(); + + //如果有效线程数少于corePoolSize,创建一个Worker(包含一个新的核心线程) + if (workerCountOf(c) < corePoolSize) { + //如果创建成功,将command作为首次任务,因此不需要任务入队,可立即返回 + if (addWorker(command, true)) + return; + c = ctl.get(); + } + //如果线程池可接收新的任务,尝试将任务入队 + if (isRunning(c) && workQueue.offer(command)) { + //任务入队成功,必须进行二次检查 + int recheck = ctl.get(); + //检查:如果线程池已停止接收,回滚已入队的任务并拒绝 + if (! isRunning(recheck) && remove(command)) + reject(command); + //如果有效线程数为0,创建一个Worker(包含一个新的额外线程),其不存在首次任务,故只能从工作队列中依次取任务 + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + //当然,如果有效线程数不为0,不用管它,等待现有的空闲线程从工作队列中依次取任务即可 + } + //(为了缓冲任务过多、队列已满)尝试创建一个Worker(包含一个新的额外线程),将command作为首次任务 + else if (!addWorker(command, false)) + //一旦创建失败了,意味着线程池已停工或饱和,直接拒绝 + reject(command); +} +``` + +## addWorker +```java +private boolean addWorker(Runnable firstTask, boolean core) { + //在环境允许时,将有效线程数+1(使用CAS原语保证同步,性能高于锁) + retry: + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN && + ! (rs == SHUTDOWN && + firstTask == null && + ! workQueue.isEmpty())) + return false; + + for (;;) { + int wc = workerCountOf(c); + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + if (compareAndIncrementWorkerCount(c)) + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + + //一旦线程数成功+1,开始创建新线程 + + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + //此处构造很关键,其内部创建了一个标准的工作线程:一旦启动,可以不断地从队列中取出任务 + w = new Worker(firstTask); + final Thread t = w.thread; + //检查线程是否被创建 + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + //使用锁同步,将新线程加入池中(池的类型是HashSet,本身是非线程安全的) + mainLock.lock(); + try { + // Recheck while holding lock. + // Back out on ThreadFactory failure or if + // shut down before lock acquired. + int rs = runStateOf(ctl.get()); + + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + workers.add(w); + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + workerAdded = true; + } + } finally { + mainLock.unlock(); + } + //如果新线程加入池成功,就启动它 + if (workerAdded) { + t.start(); + workerStarted = true; + } + } + } finally { + //如果新线程产生各种失败,回滚整个流程:从池中删除它、线程数-1、尝试终止线程池 + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; +} +``` + +## ThreadPoolExecutor.Worker +```java +//Worker构造函数 +Worker(Runnable firstTask) { + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + //使用工厂类来新建一个工作线程 + this.thread = getThreadFactory().newThread(this); +} + +//传递给工作线程的run()方法 +public void run() { + runWorker(this); +} + +final void runWorker(Worker w) { + //注意,当前线程为工作线程,而非主线程 + Thread wt = Thread.currentThread(); + //设置首次任务 + Runnable task = w.firstTask; + w.firstTask = null; + w.unlock(); // allow interrupts + boolean completedAbruptly = true; + try { + //获得首次任务,或从工作队列中获取最旧的任务(FIFO队列) + //由于是阻塞式的生产者/消费者队列,当缺少任务时,当前线程等待并挂起 + while (task != null || (task = getTask()) != null) { + w.lock(); + // If pool is stopping, ensure thread is interrupted; + // if not, ensure thread is not interrupted. This + // requires a recheck in second case to deal with + // shutdownNow race while clearing interrupt + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) + wt.interrupt(); + try { + //钩子方法,可由子类重写 + beforeExecute(wt, task); + Throwable thrown = null; + try { + //执行任务(队列只接受Runnable接口的任务) + task.run(); + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + } finally { + //钩子方法,可由子类重写 + afterExecute(task, thrown); + } + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + //当线程数超过了最大值、或线程池停止、或线程超时,则getTask()返回null,代码到达此处 + completedAbruptly = false; + } finally { + //根据运行异常或线程池配置问题(例如最大数量、超时),选择性地进行线程销毁处理 + processWorkerExit(w, completedAbruptly); + } +} +``` + +## 总结 +java 线程池实现比较经典,可以参考实现其他池。其中一些细节还需进一步了解。 +创建及使用线程大概过程如下: +1.初始化一个容量为 corePoolSize 的池子; +2.刚开始,每来一个任务就在池中创建一个线程去执行该任务,直到池中的容量到达 corePoolSize; +3.此时若再来任务,则把这些任务放到 workQueue 中; +4.若 workQueue 也满了,则继续创建线程执行任务,直到线程数量达到 maximumPoolSize; +5.若 workQueue 已满,且线程数量达到 maximumPoolSize,此时若还有任务到来,则执行拒绝策略(handler)。 -- Gitee