diff --git a/week_05/12/Executors-012.md b/week_05/12/Executors-012.md new file mode 100644 index 0000000000000000000000000000000000000000..85318e4daae7a9823b27da0dbd61f6aa0927e333 --- /dev/null +++ b/week_05/12/Executors-012.md @@ -0,0 +1,129 @@ +#### 问题 +``` +提供哪些创建线程池的方法, 各有什么特点 +提供了哪些其他方法 +``` + +#### 简介 +``` +线程池工具类,提供了不同种创建线程池的方法 +``` + +#### 类结构说明 +DelegatedExecutorService +``` +// 线程池装饰类 +static class DelegatedExecutorService extends AbstractExecutorService { + private final ExecutorService e; + DelegatedExecutorService(ExecutorService executor) { e = executor; } + public void execute(Runnable command) { e.execute(command); } +} +``` + +FinalizableDelegatedExecutorService +``` +static class FinalizableDelegatedExecutorService extends DelegatedExecutorService { + FinalizableDelegatedExecutorService(ExecutorService executor) { + super(executor); + } + protected void finalize() { + super.shutdown(); + } +} +``` + +DefaultThreadFactory +``` +static class DefaultThreadFactory implements ThreadFactory { + private static final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + private final String namePrefix; + + DefaultThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; + } + + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (t.isDaemon()) + t.setDaemon(false); + if (t.getPriority() != Thread.NORM_PRIORITY) + t.setPriority(Thread.NORM_PRIORITY); + return t; + } +} +``` + +RunnableAdapter +``` +// runnable与callable适配器,与FutureTask结合使用,用于有返回值的线程 +static final class RunnableAdapter implements Callable { + final Runnable task; + final T result; + RunnableAdapter(Runnable task, T result) { + this.task = task; + this.result = result; + } + public T call() { + task.run(); + return result; + } +} +``` + +#### 源码解析 +##### 主要方法 +``` +public static ExecutorService newFixedThreadPool(int nThreads) { + return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); +} + +public static ExecutorService newWorkStealingPool() { + return new ForkJoinPool + ( Runtime.getRuntime().availableProcessors(), + ForkJoinPool.defaultForkJoinWorkerThreadFactory, + null, true ); +} + +public static ExecutorService newSingleThreadExecutor() { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); +} + +public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); +} + +public static ScheduledExecutorService newSingleThreadScheduledExecutor() { + return new DelegatedScheduledExecutorService + (new ScheduledThreadPoolExecutor(1)); +} + +``` + +##### 其他方法 +``` +public static ThreadFactory defaultThreadFactory() { + return new DefaultThreadFactory(); +} + +public static ThreadFactory privilegedThreadFactory() { + return new PrivilegedThreadFactory(); +} + +// 用于有返回值的线程任务执行 +public static Callable callable(Runnable task, T result) { + if (task == null) + throw new NullPointerException(); + return new RunnableAdapter(task, result); +} +``` + +#### 总结 +``` +固定数目线程池、分而治之线程池、单例线程池、缓存型线程池、调度型线程池 +// todo 特点 +``` \ No newline at end of file diff --git a/week_05/12/Thread-012.md b/week_05/12/Thread-012.md new file mode 100644 index 0000000000000000000000000000000000000000..886ce0c763f59d27a965c5108684807d396e55fe --- /dev/null +++ b/week_05/12/Thread-012.md @@ -0,0 +1,406 @@ +#### 问题 +``` +线程类型、线程模型概念,以及各语言使用哪种线程模型 +创建线程有哪8种方式 +线程的生命周期是什么 +``` + +#### 简介 +``` +操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。 +一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。 +同一进程中的多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。 +``` + +#### 继承体系 +``` +public class Thread implements Runnable +``` + +#### 类结构说明 +``` +public enum State { + NEW, + RUNNABLE, + BLOCKED, + WAITING, + TIMED_WAITING, + TERMINATED; +} +``` + +#### 源码解析 +##### 属性 +``` +private volatile String name; +private int priority; +private Thread threadQ; +private long eetop; +private boolean single_step; +private boolean daemon = false; +/* JVM state */ +private boolean stillborn = false; +private Runnable target; +/* The group of this thread */ +private ThreadGroup group; +/* The context ClassLoader for this thread */ +private ClassLoader contextClassLoader; +/* The inherited AccessControlContext of this thread */ +private AccessControlContext inheritedAccessControlContext; +/* For autonumbering anonymous threads. */ +private static int threadInitNumber; +private static synchronized int nextThreadNum() { + return threadInitNumber++; +} + +/* ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class. */ +ThreadLocal.ThreadLocalMap threadLocals = null; +/* InheritableThreadLocal values pertaining to this thread */ +ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; + +/* + * The requested stack size for this thread, or 0 if the creator did not specify a stack size. + * It is up to the VM to do whatever it likes with this number; some VMs will ignore it. + */ +private long stackSize; + +private long nativeParkEventPointer; + +/* Thread ID */ +private long tid; + +/* For generating thread ID */ +private static long threadSeqNumber; + +/* Java thread status for tools, initialized to indicate thread 'not yet started' */ +private volatile int threadStatus = 0; + +private static synchronized long nextThreadID() { + return ++threadSeqNumber; +} + +volatile Object parkBlocker; +private volatile Interruptible blocker; +private final Object blockerLock = new Object(); +void blockedOn(Interruptible b) { + synchronized (blockerLock) { + blocker = b; + } +} + +public final static int MIN_PRIORITY = 1; +public final static int NORM_PRIORITY = 5; +public final static int MAX_PRIORITY = 10; +``` + +##### 构造方法 +``` +public Thread(Runnable target) { + init(null, target, "Thread-" + nextThreadNum(), 0); +} + +private void init(ThreadGroup g, Runnable target, String name, long stackSize) { + init(g, target, name, stackSize, null, true); +} + +private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + + Thread parent = currentThread(); + SecurityManager security = System.getSecurityManager(); + if (g == null) { + if (security != null) { + g = security.getThreadGroup(); + } + + if (g == null) { + g = parent.getThreadGroup(); + } + } + + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + /* Set thread ID */ + tid = nextThreadID(); +} +``` + +##### 主要方法 +start +``` +public synchronized void start() { + // threadStatus=0与state "NEW"状态一致 + if (threadStatus != 0) + throw new IllegalThreadStateException(); + + group.add(this); + + boolean started = false; + try { + start0(); + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } +} + +private native void start0(); +``` + +run +``` +public void run() { + if (target != null) { + target.run(); + } +} +``` + +yield、sleep、join +``` +// 暂停当前线程,依赖cpu时间片划分,若持有锁不会释放资源 +public static native void yield(); +// 暂停当前线程,依赖休眠时间,若持有锁不会释放资源 +public static native void sleep(long millis) throws InterruptedException; +// 执行当前线程,正在执行的线程阻塞 +public final void join() throws InterruptedException { + join(0); +} +public final synchronized void join(long millis) throws InterruptedException { + long base = System.currentTimeMillis(); + long now = 0; + + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (millis == 0) { + while (isAlive()) { + wait(0); + } + } else { + while (isAlive()) { + long delay = millis - now; + if (delay <= 0) { + break; + } + wait(delay); + now = System.currentTimeMillis() - base; + } + } +} +``` + +interrupt +``` +public void interrupt() { + if (this != Thread.currentThread()) + checkAccess(); + + synchronized (blockerLock) { + Interruptible b = blocker; + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); +} + +public boolean isInterrupted() { + return isInterrupted(false); +} + +private native boolean isInterrupted(boolean ClearInterrupted); +``` + +Priority、Name、Daemon +``` +public final void setPriority(int newPriority) { + ThreadGroup g; + checkAccess(); + if (newPriority > MAX_PRIORITY || newPriority < MIN_PRIORITY) { + throw new IllegalArgumentException(); + } + if((g = getThreadGroup()) != null) { + if (newPriority > g.getMaxPriority()) { + newPriority = g.getMaxPriority(); + } + setPriority0(priority = newPriority); + } +} + +public final synchronized void setName(String name) { + checkAccess(); + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + if (threadStatus != 0) { + setNativeName(name); + } +} + +public final void setDaemon(boolean on) { + checkAccess(); + if (isAlive()) { + throw new IllegalThreadStateException(); + } + daemon = on; +} +``` +##### 其他方法 +``` +private static int threadInitNumber; +private static synchronized int nextThreadNum() { + return threadInitNumber++; +} + +private static long threadSeqNumber; +private static synchronized long nextThreadID() { + return ++threadSeqNumber; +} + +public static native Thread currentThread() + +public ThreadGroup getThreadGroup() { + return Thread.currentThread().getThreadGroup(); +} + +public final void checkAccess() { + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkAccess(this); + } +} + +@CallerSensitive +public ClassLoader getContextClassLoader() { + if (contextClassLoader == null) + return null; + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + ClassLoader.checkClassLoaderPermission(contextClassLoader, Reflection.getCallerClass()); + } + return contextClassLoader; +} + +// Returns an array of stack trace elements representing the stack dump of this thread +public StackTraceElement[] getStackTrace() { + if (this != Thread.currentThread()) { + // check for getStackTrace permission + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkPermission( + SecurityConstants.GET_STACK_TRACE_PERMISSION); + } + // optimization so we do not call into the vm for threads that + // have not yet started or have terminated + if (!isAlive()) { + return EMPTY_STACK_TRACE; + } + StackTraceElement[][] stackTraceArray = dumpThreads(new Thread[] {this}); + StackTraceElement[] stackTrace = stackTraceArray[0]; + // a thread that was alive during the previous isAlive call may have + // since terminated, therefore not having a stacktrace. + if (stackTrace == null) { + stackTrace = EMPTY_STACK_TRACE; + } + return stackTrace; + } else { + // Don't need JVM help for current thread + return (new Exception()).getStackTrace(); + } +} + +// Returns a map of stack traces for all live threads +public static Map getAllStackTraces() { + // check for getStackTrace permission + SecurityManager security = System.getSecurityManager(); + if (security != null) { + security.checkPermission( + SecurityConstants.GET_STACK_TRACE_PERMISSION); + security.checkPermission( + SecurityConstants.MODIFY_THREADGROUP_PERMISSION); + } + + // Get a snapshot of the list of all threads + Thread[] threads = getThreads(); + StackTraceElement[][] traces = dumpThreads(threads); + Map m = new HashMap<>(threads.length); + for (int i = 0; i < threads.length; i++) { + StackTraceElement[] stackTrace = traces[i]; + if (stackTrace != null) { + m.put(threads[i], stackTrace); + } + // else terminated so we don't put it in the map + } + return m; +} +``` + +#### 总结 +``` +线程分为用户线程和内核线程: +线程模型有多对一模型、一对一模型、多对多模型 +操作系统一般只实现到一对一模型 +Java使用的是一对一线程模型,所以它的一个线程对应于一个内核线程,调度完全交给操作系统来处理 +Go语言使用的是多对多线程模型,这也是其高并发的原因,它的线程模型与Java中的ForkJoinPool非常类似 +python的gevent使用的是多对一线程模型 + +创建线程的8种方式: +继承Thread类重新run方法 +实现Runnable接口 +匿名内部类 +实现Callable接口,通过FutureTask封装任务和获取结果 +定时器(java.util.Timer) +线程池 +并行计算(java8+) +Spring异步方法 + +线程生命周期: +NEW :未启动 +RUNNABLE:JVM在执行,OS可能在等待 +BLOCKED :等待获取锁 +WAITING :Object.wait、Thread.join、LockSupport.park -> + Object.notify()或Object.notifyAll()、等待指定线程终止、LockSupport.unpark +TIMED_WAITING :Object.wait(long)、Thread.join(long)、LockSupport.parkNanos或LockSupport.parkUntil +TERMINATED :终止,执行完成 +``` \ No newline at end of file diff --git a/week_05/12/ThreadLocal-012.md b/week_05/12/ThreadLocal-012.md new file mode 100644 index 0000000000000000000000000000000000000000..3635489908ad0a38447a2fda7adabf3f3df489a3 --- /dev/null +++ b/week_05/12/ThreadLocal-012.md @@ -0,0 +1,133 @@ +#### 问题 +``` +是什么,有什么作用 +线程如何获取自己的ThreadLocal +``` +#### 简介 +``` +ThreadLocal即ThreadLocalVariable,线程局部变量,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题 +``` + +#### 继承体系 +``` +public class ThreadLocal +``` + +#### 类结构说明 +ThreadLocalMap +``` +static class Entry extends WeakReference> { + /** The value associated with this ThreadLocal. */ + Object value; + + Entry(ThreadLocal k, Object v) { + super(k); + value = v; + } +} + +private static final int INITIAL_CAPACITY = 16; +private Entry[] table; +private int size = 0; +private int threshold; // Default to 0 +// Set the resize threshold to maintain at worst a 2/3 load factor. +private void setThreshold(int len) { + threshold = len * 2 / 3; +} +// Increment i modulo len. +private static int nextIndex(int i, int len) { + return ((i + 1 < len) ? i + 1 : 0); +} +// Decrement i modulo len. +private static int prevIndex(int i, int len) { + return ((i - 1 >= 0) ? i - 1 : len - 1); +} +``` + +#### 源码解析 +##### 主要方法 +###### 线程局部变量初始化过程 +``` +static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { + return new ThreadLocalMap(parentMap); +} + +private ThreadLocalMap(ThreadLocalMap parentMap) { + Entry[] parentTable = parentMap.table; + int len = parentTable.length; + setThreshold(len); + table = new Entry[len]; + + for (int j = 0; j < len; j++) { + Entry e = parentTable[j]; + if (e != null) { + @SuppressWarnings("unchecked") + ThreadLocal key = (ThreadLocal) e.get(); + if (key != null) { + Object value = key.childValue(e.value); + Entry c = new Entry(key, value); + int h = key.threadLocalHashCode & (len - 1); + while (table[h] != null) + h = nextIndex(h, len); + table[h] = c; + size++; + } + } + } +} +``` + +###### 线程局部变量获取过程 +``` +public T get() { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) { + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + return setInitialValue(); +} + +private T setInitialValue() { + T value = initialValue(); + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + map.set(this, value); + else + createMap(t, value); + return value; +} + +void createMap(Thread t, T firstValue) { + t.threadLocals = new ThreadLocalMap(this, firstValue); +} + +ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { + table = new Entry[INITIAL_CAPACITY]; + int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); + table[i] = new Entry(firstKey, firstValue); + size = 1; + setThreshold(INITIAL_CAPACITY); +} + +private final int threadLocalHashCode = nextHashCode(); +private static AtomicInteger nextHashCode = new AtomicInteger(); +private static final int HASH_INCREMENT = 0x61c88647; +private static int nextHashCode() { + return nextHashCode.getAndAdd(HASH_INCREMENT); +} + +``` + +#### 总结 +``` +ThreadLocal即ThreadLocalVariable,线程局部变量,它通过为每个线程提供一个独立的变量副本解决了变量并发访问的冲突问题 +线程如何获取自己的ThreadLocal: +Thread定义了threadLocals和inheritableThreadLocals变量,初始化线程时,将守护线程的局部变量复制一份到新创建线程中,确保每个线程布局变量相同 +``` diff --git a/week_05/12/ThreadPoolExecutor-012.md b/week_05/12/ThreadPoolExecutor-012.md new file mode 100644 index 0000000000000000000000000000000000000000..8bf56c5561153cee16a67400946b65746f62fd79 --- /dev/null +++ b/week_05/12/ThreadPoolExecutor-012.md @@ -0,0 +1,403 @@ +#### 问题 +``` +线程池 +分类有哪些 +体系结构是怎么构成的 +构造函数各参数含义和意义是什么 +生命周期是什么 +如何执行任务的 +``` +#### 简介 +``` +线程池执行器,可以用于执行普通任务和未来任务 +``` + +#### 继承体系 +``` +public interface Executor +public interface ExecutorService extends Executor +public abstract class AbstractExecutorService implements ExecutorService +public class ThreadPoolExecutor extends AbstractExecutorService + +public interface ScheduledExecutorService extends ExecutorService +public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService + +public class ForkJoinPool extends AbstractExecutorService + +public class Executors +``` + +#### 类结构说明 +``` +public static class AbortPolicy implements RejectedExecutionHandler +public static class DiscardPolicy implements RejectedExecutionHandler +public static class DiscardOldestPolicy implements RejectedExecutionHandler +public static class CallerRunsPolicy implements RejectedExecutionHandler +private final class Worker extends AbstractQueuedSynchronizer implements Runnable { + final Thread thread; + Runnable firstTask; + volatile long completedTasks; + + Worker(Runnable firstTask) { + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + this.thread = getThreadFactory().newThread(this); + } + + public void run() { + runWorker(this); + } +} +``` + +#### 源码解析 +##### 属性 +``` +// 线程池状态和数量相关操作 +private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); +private static final int COUNT_BITS = Integer.SIZE - 3; +private static final int CAPACITY = (1 << COUNT_BITS) - 1; + +// runState is stored in the high-order bits +private static final int RUNNING = -1 << COUNT_BITS; +private static final int SHUTDOWN = 0 << COUNT_BITS; +private static final int STOP = 1 << COUNT_BITS; +private static final int TIDYING = 2 << COUNT_BITS; +private static final int TERMINATED = 3 << COUNT_BITS; + +// Packing and unpacking ctl +private static int runStateOf(int c) { return c & ~CAPACITY; } +private static int workerCountOf(int c) { return c & CAPACITY; } +private static int ctlOf(int rs, int wc) { return rs | wc; } + +// 构造函数的参数 +private volatile int corePoolSize; +private volatile int maximumPoolSize; +private volatile long keepAliveTime; +private final BlockingQueue workQueue; +private volatile ThreadFactory threadFactory; +private volatile RejectedExecutionHandler handler; + +// 其他数据 +private final HashSet workers = new HashSet(); +private final ReentrantLock mainLock = new ReentrantLock(); +private final Condition termination = mainLock.newCondition(); + +private int largestPoolSize; +private long completedTaskCount; +private volatile boolean allowCoreThreadTimeOut; +private final AccessControlContext acc; +``` + +##### 构造方法 +``` +public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); +} +public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, + ThreadFactory threadFactory) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); +} +public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, + RejectedExecutionHandler handler) { + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); +} +public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, + ThreadFactory threadFactory, RejectedExecutionHandler handler) { + if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) + throw new IllegalArgumentException(); + if (workQueue == null || threadFactory == null || handler == null) + throw new NullPointerException(); + this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); + this.corePoolSize = corePoolSize; + this.maximumPoolSize = maximumPoolSize; + this.workQueue = workQueue; + this.keepAliveTime = unit.toNanos(keepAliveTime); + this.threadFactory = threadFactory; + this.handler = handler; +} +``` + +##### 主要方法 +###### 任务执行流程 +execute +``` +public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + // 线程状态和有效数量标识符,添加工作线程 + int c = ctl.get(); + // 线程数小于coreSize + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + // 线程数大于coreSize,添加阻塞队列 + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + // 任务已在阻塞队列中,线程池未运行,队列中移除任务,执行拒绝策略 + if (! isRunning(recheck) && remove(command)) + reject(command); + // 任务已在阻塞队列中,线程池有效线程数为0 + else if (workerCountOf(recheck) == 0) + // 开启新线程,任务已在队列中,所以任务为null,只需开启新线程即可 + addWorker(null, false); + } + // 阻塞队列满时且小于maxSize,添加工作线程 + else if (!addWorker(command, false)) + // 线程数大于maxSize,拒绝策略 + reject(command); +} +``` + +addWorker +``` +private boolean addWorker(Runnable firstTask, boolean core) { + // 校验 + retry: + for (;;) { + int c = ctl.get(); + + // 判断线程池状态 + int rs = runStateOf(c); + // 非运行状态 + if (rs >= SHUTDOWN && + // 针对线程池已关闭,无可用线程而言,任务队列若无空,直接返回 + ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) + return false; + + // 判断线程池数量 + for (;;) { + int wc = workerCountOf(c); + // 根据参数core定义最大线程数 + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + // 通过cas线程有效数+1 + if (compareAndIncrementWorkerCount(c)) + break retry; + // 如果线程有效数改变,跳出外部循环重试 + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + } + } + + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + // 创建工作线程 + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + // 加锁后,重复检查线程池状态 + int rs = runStateOf(ctl.get()); + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + // 做此判断,是保证线程池已关闭,无可用线程状况下,新创建的线程可以启动 + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + // 添加到工作线程集合中 + workers.add(w); + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + workerAdded = true; + } + } finally { + mainLock.unlock(); + } + // 开启工作线程 + if (workerAdded) { + t.start(); + workerStarted = true; + } + } + } finally { + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; +} +``` + +run +``` +Worker内部类 +public void run() { + runWorker(this); +} +``` + +runWorker +``` +final void runWorker(Worker w) { + Thread wt = Thread.currentThread(); + Runnable task = w.firstTask; + w.firstTask = null; + w.unlock(); // allow interrupts + boolean completedAbruptly = true; + try { + while (task != null || (task = getTask()) != null) { + w.lock(); + // 线程池已stop,终止线程 + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) + wt.interrupt(); + try { + // 执行任务前的相关处理 + beforeExecute(wt, task); + Throwable thrown = null; + try { + task.run(); + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + // 执行任务后的异常处理 + } finally { + afterExecute(task, thrown); + } + } finally { + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + // 执行任务退出 + processWorkerExit(w, completedAbruptly); + } +} +``` + +getTask +``` +private Runnable getTask() { + boolean timedOut = false; // Did the last poll() time out? + + for (;;) { + int c = ctl.get(); + // 线程池状态检验 + int rs = runStateOf(c); + if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { + decrementWorkerCount(); + return null; + } + + // for循环线程池数量检验 + int wc = workerCountOf(c); + boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; + if ( (wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty()) ) { + // 减少线程池数 + if (compareAndDecrementWorkerCount(c)) + return null; + continue; + } + + try { + Runnable r = timed ? + // 非核心线程通过poll取任务 + // 队列为空时,非核心线程等待任务的时间 + // 也是非核心线程销毁的保持时间,上面decrement会将线程数减少 + // 哪里将改线成销毁了呢->processWorkerExit + workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : + // 核心线程通过take取线程 + workQueue.take(); + if (r != null) + return r; + timedOut = true; + } catch (InterruptedException retry) { + timedOut = false; + } + } +} +``` + +processWorkerExit +``` +private void processWorkerExit(Worker w, boolean completedAbruptly) { + if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted + decrementWorkerCount(); + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + // 任务队列清除任务,任务完成数+1 + completedTaskCount += w.completedTasks; + workers.remove(w); + } finally { + mainLock.unlock(); + } + + tryTerminate(); + + int c = ctl.get(); + if (runStateLessThan(c, STOP)) { + if (!completedAbruptly) { + int min = allowCoreThreadTimeOut ? 0 : corePoolSize; + if (min == 0 && ! workQueue.isEmpty()) + min = 1; + if (workerCountOf(c) >= min) + return; // replacement not needed + } + addWorker(null, false); + } +} +``` + +如果keepAliveTime设置为0,当任务队列为空时,非核心线程取不出来任务,会立即结束其生命周期。
+默认情况下,是不允许核心线程超时的,但是可以通过下面这个方法设置使核心线程也可超时
+``` +public void allowCoreThreadTimeOut(boolean value) { + if (value && keepAliveTime <= 0) + throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); + if (value != allowCoreThreadTimeOut) { + allowCoreThreadTimeOut = value; + if (value) + interruptIdleWorkers(); + } +} +``` + +// todo 线程生命周期 + +#### 总结 +``` +线程池分类有普通任务、未来任务、定时任务、任务分而治之,实现类分别为 +ThreadPoolExecutor.execute、ThreadPoolExecutor.submit、ScheduledThreadPoolExecutor、ForkJoinPool + +线程池体系结构的构成有, +Executor、ExecutorService、ScheduledExecutorService +AbstractExecutorService +ThreadPoolExecutor、ScheduledThreadPoolExecutor、ForkJoinPool +Executors,线程工具类 + +构造函数参数有7个, +corePoolSize、maximumPoolSize、keepAliveTime + unit、BlockingQueue、threadFactory、RejectedExecutionHandler +keepAliveTime + unit: +两参数仅当正在运行的线程数大于核心线程数时才有效,即只针对非核心线程。 +当任务队列为空时,线程保持多久才会销毁,内部主要是通过阻塞队列带超时的poll(timeout, unit)方法实现的 + +线程池生命周期的状态有5中, +RUNNING,表示可接受新任务,且可执行队列中的任务; +SHUTDOWN,表示不接受新任务,但可执行队列中的任务; +STOP,表示不接受新任务,且不再执行队列中的任务,且中断正在执行的任务; +TIDYING,所有任务已经中止,且工作线程数量为0,最后变迁到这个状态的线程将要执行terminated()钩子方法,只会有一个线程执行这个方法; +TERMINATED,中止状态,已经执行完terminated()钩子方法 + +执行任务流程, +线程池有效线程数->countSize +countSize小于coreSize时,添加工作线程; +大于coreSize,任务添加任务阻塞队列; +阻塞队列满时且小于countmaxSize时,添加工作线程; +countSize大于maxSize,拒绝策略 +``` \ No newline at end of file