diff --git a/second/week_05/87/Executor.md b/second/week_05/87/Executor.md new file mode 100644 index 0000000000000000000000000000000000000000..97b114ee7f1382fedc248b6931a2db690ef09bd2 --- /dev/null +++ b/second/week_05/87/Executor.md @@ -0,0 +1,18 @@ +# Executor + +引入该接口的主要目的是**解耦任务本身和任务的执行**。我们之前通过线程执行一个任务时,往往需要先创建一个线程,然后调用线程的`start`方法来执行任务 + +```java +public interface Executor { + /** + * 执行给定的Runnable任务. + * 根据Executor的实现不同, 具体执行方式也不相同. + * + * @param command the runnable task + * @throws RejectedExecutionException if this task cannot be accepted for execution + * @throws NullPointerException if command is null + */ + void execute(Runnable command); +} +``` + diff --git a/second/week_05/87/Foo.java b/second/week_05/87/Foo.java new file mode 100644 index 0000000000000000000000000000000000000000..d60d65e49bfd83569325b0b922ab40efac7503d5 --- /dev/null +++ b/second/week_05/87/Foo.java @@ -0,0 +1,26 @@ +class Foo { + private Semaphore seam01 = new Semaphore(0); + private Semaphore seam02 = new Semaphore(0); + public Foo() { + + } + + public void first(Runnable printFirst) throws InterruptedException { + // printFirst.run() outputs "first". Do not change or remove this line. + printFirst.run(); + seam01.release(); + } + + public void second(Runnable printSecond) throws InterruptedException { + seam01.acquire(); + // printSecond.run() outputs "second". Do not change or remove this line. + printSecond.run(); + seam02.release(); + } + + public void third(Runnable printThird) throws InterruptedException { + seam02.acquire(); + // printThird.run() outputs "third". Do not change or remove this line. + printThird.run(); + } +} diff --git a/second/week_05/87/FooBar.java b/second/week_05/87/FooBar.java new file mode 100644 index 0000000000000000000000000000000000000000..4d0aa2b8cf7a08c348de2ad84e72c9dab9817fc9 --- /dev/null +++ b/second/week_05/87/FooBar.java @@ -0,0 +1,51 @@ + class FooBar { + private int n; + private Lock lock = new ReentrantLock(); + private Condition condition = lock.newCondition(); + private boolean isEmpty = true; + + public FooBar(int n) { + this.n = n; + } + + //类似生产者offer数据 + public void foo(Runnable printFoo) throws InterruptedException { + + lock.lock(); + + for (int i = 0; i < n; i++) { + + while (!isEmpty) { + condition.await(); + } + // printFoo.run() outputs "foo". Do not change or remove this line. + printFoo.run(); + isEmpty = false; + + condition.signal(); + + } + lock.unlock(); + + } + + //类似消费者take数据 + public void bar(Runnable printBar) throws InterruptedException { + lock.lock(); + + for (int i = 0; i < n; i++) { + + while (isEmpty) { + condition.await(); + } + // printBar.run() outputs "bar". Do not change or remove this line. + printBar.run(); + isEmpty = true; + + condition.signal(); + + } + lock.unlock(); + + } + } diff --git a/second/week_05/87/Thread.assets/image-20200405223906197.png b/second/week_05/87/Thread.assets/image-20200405223906197.png new file mode 100644 index 0000000000000000000000000000000000000000..0fb95f9a5171211210b52a5b41f3d10a4b71eb17 Binary files /dev/null and b/second/week_05/87/Thread.assets/image-20200405223906197.png differ diff --git a/second/week_05/87/Thread.md b/second/week_05/87/Thread.md new file mode 100644 index 0000000000000000000000000000000000000000..0540598bff8320f40b7667a7ad009a09033570e6 --- /dev/null +++ b/second/week_05/87/Thread.md @@ -0,0 +1,117 @@ +# Thread + +## 构造线程 + +```java +private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + // 当前线程就是该线程的父线程 + Thread parent = currentThread(); + SecurityManager security = System.getSecurityManager(); + if (g == null) { + /* Determine if it's an applet or not */ + + /* If there is a security manager, ask the security manager + what to do. */ + if (security != null) { + g = security.getThreadGroup(); + } + + /* If the security doesn't have a strong opinion of the matter + use the parent thread group. */ + if (g == null) { + // 如果ThreadGroup为空就设置父线程组 + g = parent.getThreadGroup(); + } + } + + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + + /* + * Do we have the required permissions? + */ + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + // 将daemon、priority属性设置为父线程的对应的属性 + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + // 将父线程的InheritableThreadLocal复制过来 + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + + /* Set thread ID 分配一个线程Id */ + tid = nextThreadID(); +} +``` + +![image-20200405223906197](Thread.assets/image-20200405223906197.png) + +- New: 至今尚未启动的线程的状态。 +- Runnable :可运行线程的线程状态。 +- Blocked :受阻塞并且正在等待监视器锁的某一线程的线程状态。 +- Waiting :某一等待线程的线程状态。 +- Timed_waiting:具有指定等待时间的某一等待线程的线程状态。 +- Terminated:已终止线程的线程状态。线程已经结束执行。 + + + + + +```java +// 创建 +Thread thread = New Thread(); +Thread thread = New Thread(String name); +Thread thread = New Thread(Runnable target); +Thread thread = New Thread(Runnable target, String name); + +// 动态方法 +thread.isAlive();// 测试线程是否处于活动状态 +thread.start();// 开始执行线程,JVM 调用该线程的 run 方法 +thread.setName(String name);// 改变线程名称 +thread.getName();// 获取线程名称 +thread.setPriority(int priority);// 更改线程的优先级 +/* 具有较高优先级的线程应该在低优先级的线程之前分配处理器时间,然而,线程优先级不能保证线程执行的顺序,而且非常依赖于平台 + Thread.MIN_PRIORITY 1 + Thread.NORM_PRIORITY 5(默认) + Thread.MAX_PRIORITY 10 */ +thread.getPriority();// 获取线程的优先级 +thread.setDaemon(boolean on);// 将该线程标记为守护线程。守护线程是服务提供者线程,当 JVM 检测到应用程序中的所有线程都只是守护线程时,它将退出应用程序 +thread.isDaemon();// 测试是否为守护线程 +thread.join(long millisec);// 等待该线程终止的时间。假设有两个线程 t1 和 t2,如果线程 t1 调用 t2.join(),线程 t1 开始等待,直到线程 t2 终止,t1 才会继续执行 +thread.interrupt();// 尝试中断线程 + +// 静态方法 +Thread.currentThread();// 返回对当前正在执行的线程对象的引用 +Thread.yield();// 尝试暂停当前正在执行的线程对象,并执行其他线程。该线程放弃当前对处理器的使用 +Thread.sleep(long millisec);// 在指定的毫秒数内让当前正在执行的线程休眠(暂停执行)。如果线程在进入休眠之前拥有锁的所有权,则它在休眠期间继续保持这些锁 +Thread.interrupted();// 测试线程是否中断 +Thread.holdsLock(Object o);// 返回 boolean 类型,当且仅当当前线程在指定的对象上保持监视器锁时,才返回 true +Thread.dumpStack();// 将当前线程的堆栈跟踪打印至标准错误流 +``` + diff --git a/second/week_05/87/ThreadLocal.md b/second/week_05/87/ThreadLocal.md new file mode 100644 index 0000000000000000000000000000000000000000..8f39bc72ae125db97b00253d0857d0c1866ffd94 --- /dev/null +++ b/second/week_05/87/ThreadLocal.md @@ -0,0 +1,73 @@ +# ThreadLocal + +它提供线程本地变量,如果创建一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题。 + + + +## set + +```java +public void set(T value) { + //(1)获取当前线程(调用者线程) + Thread t = Thread.currentThread(); + //(2)以当前线程作为key值,去查找对应的线程变量,找到对应的map + ThreadLocalMap map = getMap(t); + //(3)如果map不为null,就直接添加本地变量,key为当前线程,值为添加的本地变量值 + if (map != null) + map.set(this, value); + //(4)如果map为null,说明首次添加,需要首先创建出对应的map + else + createMap(t, value); +} +``` + +## get + +```java +public T get() { + //(1)获取当前线程 + Thread t = Thread.currentThread(); + //(2)获取当前线程的threadLocals变量 + ThreadLocalMap map = getMap(t); + //(3)如果threadLocals变量不为null,就可以在map中查找到本地变量的值 + if (map != null) { + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + //(4)执行到此处,threadLocals为null,调用该更改初始化当前线程的threadLocals变量 + return setInitialValue(); +} + +private T setInitialValue() { + //protected T initialValue() {return null;} + T value = initialValue(); + //获取当前线程 + Thread t = Thread.currentThread(); + //以当前线程作为key值,去查找对应的线程变量,找到对应的map + ThreadLocalMap map = getMap(t); + //如果map不为null,就直接添加本地变量,key为当前线程,值为添加的本地变量值 + if (map != null) + map.set(this, value); + //如果map为null,说明首次添加,需要首先创建出对应的map + else + createMap(t, value); + return value; +} +``` + +## remove + +```java +public void remove() { + //获取当前线程绑定的threadLocals + ThreadLocalMap m = getMap(Thread.currentThread()); + //如果map不为null,就移除当前线程中指定ThreadLocal实例的本地变量 + if (m != null) + m.remove(this); + } +``` + diff --git a/second/week_05/87/ThreadPoolExecutor.assets/image-20200403165300672.png b/second/week_05/87/ThreadPoolExecutor.assets/image-20200403165300672.png new file mode 100644 index 0000000000000000000000000000000000000000..e2caff8fd4b76a59836b35a0488bbad924018e65 Binary files /dev/null and b/second/week_05/87/ThreadPoolExecutor.assets/image-20200403165300672.png differ diff --git a/second/week_05/87/ThreadPoolExecutor.md b/second/week_05/87/ThreadPoolExecutor.md new file mode 100644 index 0000000000000000000000000000000000000000..c96abe3bceba3e8cd37f1c1ee2b4674455b948d4 --- /dev/null +++ b/second/week_05/87/ThreadPoolExecutor.md @@ -0,0 +1,211 @@ +# ThreadPoolExecutor + +## 是什么?为什么? + +### 线程池是什么? + +> 线程池是一种基于**池化思想**管理线程的工具,经常出现在多线程服务器中,如MySQL。 +> +> 线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配并行执行任务。 +> +> 1. 避免了处理任务时创建销毁线程开销的代价 +> 2. 避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。 + +好处: + + 1. 降低资源消耗: 通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。 + 2. 提高响应速度: 任务到达时,无需等待线程创建即可立即执行。 + 3. 提高线程的可管理性: 线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程不合理分布到导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。 + 4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池SecheduleThreadPoolExecutor,就允许任务延期执行或定期执行。 + +### 为什么要用线程池? + +1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。 +2. 对资源无限申请缺少抑制手段,易发生系统资源耗尽的风险。 +3. 系统无法合理管理内部资源的分布,会降低系统的稳定性。 + +池化即将资源统一在一起管理的一种思想。 + +计算机领域表现为:统一管理IT资源,包括服务器、存储和网络资源等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较经典的几种使用策略: + +1. 内存池(Memory Pooling): 预先申请内存、提升申请内存速度、减少内存碎片。 +2. 连接池(Connection Pooling): 预先申请数据库连接,提升申请链接的速度、降低系统的开销。 +3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。 + +## 设计与实现 + +基于JDK1.8 + +### 总体设计 + +![image-20200403165300672](ThreadPoolExecutor.assets/image-20200403165300672.png) + +--- + + + +Executor: 将任务提交和任务执行进行解耦,用户无需关心如何创建线程,如何调度线程来执行任务,用户只需提供Runnable对象。将Runnable对象提交给Executor,由此框架来完成线程调配和任务的执行。 + +ExecutorService接口: + + 1. 扩充执行任务的能力,补充可以为一个或一批一步任务生成Future的方法 + 2. 提供了管控线程池的方法,比如停止线程池的运行。 + +AbstractExecutorService: + + 1. 将执行任务的流程串联起来,保证下层的实现只需关注一个执行任务的方法即可。 + +ThreadPoolExecutor: + + 1. 维护自身的生命周期 + 2. 管理线程和任务 + + + +线程池内部实际上构建了一个生产者消费者模型,将线程和任务两者解耦,并不知节关联,从而良好的缓冲任务,复用线程。 + +任务管理:(生产者) + + 1. 直接申请线程执行该任务 + 2. 缓冲到队列中等待线程执行 + 3. 拒绝该任务 + +线程管理:(消费者) + + 1. 维护在消费池内,根据任务请求进行线程分配 + 2. 线程执行完成后会去获取新的任务执行 + 3. 获取不到任务,线程就会被回收 + +--- + +### 生命周期管理 + +如何描述线程的生命周期: + +```java +private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNING,0)); +``` + +此变量维护了两个值:运行状态(高3位)和线程数量(低29位) + +避免了为了维护两者的一致,而占用锁资源。 + +```java +// 计算当前运行状态 +private static int runStateOf(int c) { return c & ~CAPACITY;} +// 计算当前线程数量 +private static int workerCountOf(int c) {return c & CAPACITY;} +// 通过状态和线程生成ctl +private static int ctlOf(int rs,int wc) {return rs | wc}; +``` + +运行状态 + +| 运行状态 | 状态描述 | +| ---------- | ------------------------------------------------------------ | +| RUNNING | 能接受新提交的任务,并且也能处理阻塞队列中的任务 | +| SHUTDOWN | 关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已经保存的任务。 | +| STOP | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。 | +| TIDYING | 所有的任务都已终止了,workerCount(有效线程)为0. | +| TERMINATED | 在terminated()方法执行完成后进入该状态 | + +生命周期转化如图所示: + + + +--- + +### 任务执行机制 + +#### 任务调度 + +任务调度时线程池的主要入口,当用户提交了一个任务,接下来这个任务将如何执行都是由这个阶段决定的。了解这部分就相当于了解了线程池的核心运行机制。 + +首先所有的任务调度都是由execute方法完成的,这部分的工作是:检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。其执行过程如下: + +1. 首先检测线程池运行状态,如果不是RUNNING,则直接拒绝,线程池要保证在RUNNING的状态下执行任务。 +2. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新的提交的任务。 +3. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。 +4. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行提交的任务。 +5. 如果workerCount >= maximumPoolSize,并且线程内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。 + +#### 任务缓冲 + +任务缓冲模块是线程池管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模型,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。 + +阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。 + +1. 队列为空时,获取元素的线程会等待队列变为非空。 +2. 队列满时,存储元素的线程会等待队列可用。 + +#### 任务申请 + +1. 任务直接由新创建的线程执行 +2. 线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去队列中申请任务再去执行。 + +#### 任务拒绝 + +线程池有一个最大的容量,当线程池的任务缓存队列已满,并且线程池中的线程数目已到达maximumPoolSize,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。 + +```java +public interface RejectedExecutionHandler { + void rejectedExecution(Runnable r,ThreadPoolExecutor executor); +} +``` + +| 序号 | 名称 | 描述 | +| ---- | -------------------------------------- | ------------------------------------------------------------ | +| 1 | ThreadPoolExecutor.AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常。这是线程池默认的拒绝策略 | +| 2 | ThreadPoolExecutor.DiscardPolicy | 丢弃任务,但是不抛出异常,使用此策略,可能会是我们无法发现系统的异常状态。建议是一些无关紧要的业务采用此策略 | +| 3 | ThreadPoolExecutor.DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新提交被拒绝的任务,是否采用此种拒绝策略,还得根据实际业务是否允许丢弃老任务来认真衡量 | +| 4 | ThreadPoolExecutor.CallerRunsPolicy | 由调用线程(提交任务的线程)处理该任务,这种情况是需要让所有任务都执行完毕,那么就适合大量计算的任务类型去执行,多线程仅仅是增大吞吐量的手段,最终必须要让每个任务都执行完毕。 | + + + +### Worker线程管理 + +#### Worker线程 + +线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程Worker + +```java +private final class Worker extends AbstractQueuedSynchronizer implements Runnable { + // Worker 持有线程 + final Thread thread; + // 初始化的任务,可以为空 + Runnable firstTask; +} +``` + + + +#### Worker线程增加 + +增加线程是通过线程池中的addWorker方法,该方法的功能就是增加一个线程,该方法不考虑线程池是在哪个阶段增加的该线程,这个分配线程的策略是在上个步骤完成的,该步骤仅仅完成增加线程,并使它运行,最后返回是否成功这个结果。addWorker方法有两个参数:firstTask、core。firstTask参数用于指定新增的线程执行的第一个任务,该参数可以为空;core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize, + +#### Worker线程回收 + +线程池中线程的销毁以来JVM自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的 线程引用,防止这部分线程被JVM回收,当线程池决定哪些线程需要回收时,只需要将其饮用消除即可。Worker被创建出来后,就会不断进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当Worker无法获取到任务,也就是获得任务为空时,循环会结束,worker会主动消除自身在线程池内的引用。 + +```java +try { + while (task != null || (task = getTask() != null)) { + // 执行任务 + } +}finally { + // 获取不到任务时,主动回收自己 + processWorkerExit(w, completedAbruptly); +} +``` + + + +#### Worker线程执行任务 + +1. while循环不断地通过getTask方法来获取任务 +2. getTask方法从阻塞队列中获取任务 +3. 如果线程正在停止,那么要保证当前线程的中断状态,否则要保证当前线程不是中断状态 +4. 执行任务 +5. 如果getTask结果为null跳出循环,执行processWorkerExit方法,销毁线程。 + +## 业务实践 \ No newline at end of file