diff --git a/week_05/08/Executors-008.md b/week_05/08/Executors-008.md new file mode 100644 index 0000000000000000000000000000000000000000..f4fd95f17db4e31ee56b5820bb71de4ef86d8ea5 --- /dev/null +++ b/week_05/08/Executors-008.md @@ -0,0 +1,73 @@ +# Executors-008 + +它是一个工具类,用于创建各种不同的线程池。 + +### FixedThreadPool + +线程数量固定的线程池 + +```java + * + * @param nThreads the number of threads in the pool + * @return the newly created thread pool + * @throws IllegalArgumentException if {@code nThreads <= 0} + */ + public static ExecutorService newFixedThreadPool(int nThreads) { + + return new ThreadPoolExecutor(nThreads, nThreads,// 核心线程数量和最大线程数量均为nThreads + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue()); //无界阻塞队列 + } + + public ThreadPoolExecutor(int corePoolSize,// 核心线程数量 + int maximumPoolSize,// 最大线程数量 + long keepAliveTime,// 线程最大空闲时间 + TimeUnit unit,// 时间单位 + BlockingQueue workQueue) {// 阻塞队列 + this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, + Executors.defaultThreadFactory(), defaultHandler); + } +``` + +### CachedThreadPool + +创建线程数量不限 + +```java + public static ExecutorService newCachedThreadPool() { + return new ThreadPoolExecutor(0, Integer.MAX_VALUE,// 核心线程数量为0,最大线程几乎无限 + 60L, TimeUnit.SECONDS,// 线程最大空闲时间为60s + new SynchronousQueue()); //同步队列 + } +``` + +适用场景:快速处理大量耗时较小的任务 + +### SingleThreadExecutor + +只有一个线程的线程池,所有task都是串行执行的 + +```java + public static ExecutorService newSingleThreadExecutor() { + return new FinalizableDelegatedExecutorService + (new ThreadPoolExecutor(1, 1,// 核心线程数量和最大线程数量均为1 + 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue())); + } +``` + +### ScheduledThreadPool + +定时任务线程池 + +```java + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { + return new ScheduledThreadPoolExecutor(corePoolSize); + } + + public ScheduledThreadPoolExecutor(int corePoolSize) { + super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, + new DelayedWorkQueue()); + } +``` + diff --git a/week_05/08/Executors-008.xmind b/week_05/08/Executors-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..7754d2af0b779134bed8b9b313db645288f5e61b Binary files /dev/null and b/week_05/08/Executors-008.xmind differ diff --git a/week_05/08/Thread-08.md b/week_05/08/Thread-08.md new file mode 100644 index 0000000000000000000000000000000000000000..c93a2fd0db732622033cf2b016c362d2104d73cd --- /dev/null +++ b/week_05/08/Thread-08.md @@ -0,0 +1,360 @@ +# 读源码--Thread + +## 继承体系 + +![Thread继承图](E:\BBBMyProject\JavaStudy\week_05\08\attachment\Thread继承图.jpg) + +## 源码阅读 + +### 构造器 + +无参构造器:可用于Thread子类的实例化 + +```java + + public Thread() { + init(null, null, "Thread-" + nextThreadNum(), 0); + } + // 初始化线程 + private void init(ThreadGroup g, Runnable target, String name, + long stackSize) { + init(g, target, name, stackSize, null, true); + } + + private void init(ThreadGroup g, Runnable target, String name, + long stackSize, AccessControlContext acc, + boolean inheritThreadLocals) { + // 线程名称不能为空 + if (name == null) { + throw new NullPointerException("name cannot be null"); + } + + this.name = name; + // 获取当前线程 + Thread parent = currentThread(); + SecurityManager security = System.getSecurityManager(); + if (g == null) { + /* Determine if it's an applet or not */ + + /* If there is a security manager, ask the security manager + what to do. */ + // 从SecurityManager里获取线程组 + if (security != null) { + g = security.getThreadGroup(); + } + + /* If the security doesn't have a strong opinion of the matter + use the parent thread group. */ + // 从当前线程里获取线程组 + if (g == null) { + g = parent.getThreadGroup(); + } + } + + /* checkAccess regardless of whether or not threadgroup is + explicitly passed in. */ + g.checkAccess(); + + /* + * Do we have the required permissions? + */ + // 权限验证 + if (security != null) { + if (isCCLOverridden(getClass())) { + security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION); + } + } + + g.addUnstarted(); + + this.group = g; + this.daemon = parent.isDaemon(); + this.priority = parent.getPriority(); + if (security == null || isCCLOverridden(parent.getClass())) + this.contextClassLoader = parent.getContextClassLoader(); + else + this.contextClassLoader = parent.contextClassLoader; + this.inheritedAccessControlContext = + acc != null ? acc : AccessController.getContext(); + this.target = target; + setPriority(priority); + if (inheritThreadLocals && parent.inheritableThreadLocals != null) + this.inheritableThreadLocals = + ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); + /* Stash the specified stack size in case the VM cares */ + this.stackSize = stackSize; + + /* Set thread ID */ + tid = nextThreadID(); + } +``` + +含参构造器:可用于Runnable实现类的实例化 + +```java + public Thread(Runnable target) { + init(null, target, "Thread-" + nextThreadNum(), 0); + } +``` + +### 主要属性 + +```java + // 线程名称 + private volatile String name; + // 线程优先级 + private int priority; + // + private Thread threadQ; + private long eetop; + // 执行对象 + private Runnable target; + +``` + +### 主要方法 + +currentThread():获取当前线程 + +```java + // 当前线程 + public static native Thread currentThread(); +``` + +start():线程启动,线程进入就绪状态,synchronized修饰 + +```java + public synchronized void start() { + /** + * This method is not invoked for the main method thread or "system" + * group threads created/set up by the VM. Any new functionality added + * to this method in the future may have to also be added to the VM. + * + * A zero status value corresponds to state "NEW". + */ + // 新创建的线程的状态为NEW,即为0 + if (threadStatus != 0) + throw new IllegalThreadStateException(); + + /* Notify the group that this thread is about to be started + * so that it can be added to the group's list of threads + * and the group's unstarted count can be decremented. */ + // 通知线程组,该线程即将启动,未启动线程减一 + group.add(this); + // 启动状态 + boolean started = false; + try { + start0(); + started = true; + } finally { + try { + if (!started) { + group.threadStartFailed(this); + } + } catch (Throwable ignore) { + /* do nothing. If start0 threw a Throwable then + it will be passed up the call stack */ + } + } + } +``` + +run():线程实际执行的方法,无论是通过继承Thread类还是实现接口Runnable来创建线程,都要重写该方法。重写的是Runnable接口中的方法。 + +若一个类既继承Thread类同时实现Runnable呢,由于重写了run,就与target无关了。所以执行的是继承Thread类的重写的run方法 + +```java + @Override + public void run() { + // + if (target != null) { + target.run(); + } + } +``` + +exit():退出。在退出之前将个变量置空,以便资源释放 + +```java + private void exit() { + if (group != null) { + group.threadTerminated(this); + group = null; + } + /* Aggressively null out all reference fields: see bug 4006245 */ + target = null; + /* Speed the release of some of these resources */ + // 置空以便GC回收 + threadLocals = null; + inheritableThreadLocals = null; + inheritedAccessControlContext = null; + blocker = null; + uncaughtExceptionHandler = null; + } +``` + +interupt:中断 + +```java + public void interrupt() { + if (this != Thread.currentThread()) + checkAccess(); + + synchronized (blockerLock) { + Interruptible b = blocker; + if (b != null) { + interrupt0(); // Just to set the interrupt flag + b.interrupt(this); + return; + } + } + interrupt0(); + } +``` + +sleep:休眠,不释放锁。native方法:只提供函数体,其实现由C/C++在其他文件中实现,编写规则遵循java本地接口的规范(JNI) + +```java + public static native void sleep(long millis) throws InterruptedException; + + public static void sleep(long millis, int nanos) + throws InterruptedException { + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (nanos < 0 || nanos > 999999) { + throw new IllegalArgumentException( + "nanosecond timeout value out of range"); + } + + if (nanos >= 500000 || (nanos != 0 && millis == 0)) { + millis++; + } + + sleep(millis); + } +``` + +join:等待线程结束。A调用B的join,即A等待线程B结束 + +```java + public final synchronized void join(long millis) + throws InterruptedException { + long base = System.currentTimeMillis(); + long now = 0; + + if (millis < 0) { + throw new IllegalArgumentException("timeout value is negative"); + } + + if (millis == 0) { + // 如果是活跃的 + while (isAlive()) { + // 无限期等待 + wait(0); + } + } else { + while (isAlive()) { + long delay = millis - now; + if (delay <= 0) { + break; + } + // 有限期等待 + wait(delay); + now = System.currentTimeMillis() - base; + } + } + } +``` + +yield:让步,native方法。暂停当前正在执行的线程对象,并执行其他线程。 在多线程的情况下,由CPU决定执行哪一个线程,而yield()方法就是暂停当前的线程,让给其他线程(包括它自己)执行,具体由谁执行由CPU决定。 + +```java + public static native void yield(); +``` + +### 线程的状态 + +```java + public enum State { + // 新建状态,线程还未启动 + NEW, + + // 就绪状态,等待系统资源,例如CPU + RUNNABLE, + + // 阻塞状态,等待监视器锁,如Synchronized + // 或者在wait后被notify后也会进入该状态 + BLOCKED, + + // 等待状态, + // Object.wait()无超时方法被notify之前 + // Thread.join()无超时方法后 + // LockSupport.park()无超时的方法后 + WAITING, + + // 超时等待状态 + // Thread.sleep + // Object.wait(long)超时方法 + // Thread.join(long)方法后 + // LockSupport.parkNanos(nanos)方法后 + // LockSupport.parkUntil(deadline)方法后 + TIMED_WAITING, + + // 结束状态,线程执行完毕 + TERMINATED; + } +``` + +### 线程的生命周期 + +![线程生命周期](E:\BBBMyProject\JavaStudy\week_05\08\attachment\线程生命周期.png) + +(1)为了方便讲解,我们把锁分成两大类,一类是synchronized锁,一类是基于AQS的锁(我们拿重入锁举例),也就是内部使用了LockSupport.park()/parkNanos()/parkUntil()几个方法的锁; + +(2)不管是synchronized锁还是基于AQS的锁,内部都是分成两个队列,一个是同步队列(AQS的队列),一个是等待队列(Condition的队列); + +(3)对于内部调用了object.wait()/wait(timeout)或者condition.await()/await(timeout)方法,线程都是先进入等待队列,被notify()/signal()或者超时后,才会进入同步队列; + +(4)**明确声明,BLOCKED状态只有线程处于synchronized的同步队列的时候才会有这个状态,其它任何情况都跟这个状态无关;** + +(5)对于synchronized,线程执行synchronized的时候,如果立即获得了锁(没有进入同步队列),线程处于RUNNABLE状态; + +(6)对于synchronized,线程执行synchronized的时候,如果无法获得锁(直接进入同步队列),线程处于BLOCKED状态; + +(5)对于synchronized内部,调用了object.wait()之后线程处于WAITING状态(进入等待队列); + +(6)对于synchronized内部,调用了object.wait(timeout)之后线程处于TIMED_WAITING状态(进入等待队列); + +(7)对于synchronized内部,调用了object.wait()之后且被notify()了,如果线程立即获得了锁(也就是没有进入同步队列),线程处于RUNNABLE状态; + +(8)对于synchronized内部,调用了object.wait(timeout)之后且被notify()了,如果线程立即获得了锁(也就是没有进入同步队列),线程处于RUNNABLE状态; + +(9)对于synchronized内部,调用了object.wait(timeout)之后且超时了,这时如果线程正好立即获得了锁(也就是没有进入同步队列),线程处于RUNNABLE状态; + +(10)对于synchronized内部,调用了object.wait()之后且被notify()了,如果线程无法获得锁(也就是进入了同步队列),线程处于BLOCKED状态; + +(11)对于synchronized内部,调用了object.wait(timeout)之后且被notify()了或者超时了,如果线程无法获得锁(也就是进入了同步队列),线程处于BLOCKED状态; + +(12)对于重入锁,线程执行lock.lock()的时候,如果立即获得了锁(没有进入同步队列),线程处于RUNNABLE状态; + +(13)对于重入锁,线程执行lock.lock()的时候,如果无法获得锁(直接进入同步队列),线程处于WAITING状态; + +(14)对于重入锁内部,调用了condition.await()之后线程处于WAITING状态(进入等待队列); + +(15)对于重入锁内部,调用了condition.await(timeout)之后线程处于TIMED_WAITING状态(进入等待队列); + +(16)对于重入锁内部,调用了condition.await()之后且被signal()了,如果线程立即获得了锁(也就是没有进入同步队列),线程处于RUNNABLE状态; + +(17)对于重入锁内部,调用了condition.await(timeout)之后且被signal()了,如果线程立即获得了锁(也就是没有进入同步队列),线程处于RUNNABLE状态; + +(18)对于重入锁内部,调用了condition.await(timeout)之后且超时了,这时如果线程正好立即获得了锁(也就是没有进入同步队列),线程处于RUNNABLE状态; + +(19)对于重入锁内部,调用了condition.await()之后且被signal()了,如果线程无法获得锁(也就是进入了同步队列),线程处于WAITING状态; + +(20)对于重入锁内部,调用了condition.await(timeout)之后且被signal()了或者超时了,如果线程无法获得锁(也就是进入了同步队列),线程处于WAITING状态; + +(21)对于重入锁,如果内部调用了condition.await()之后且被signal()之后依然无法获取锁的,其实经历了两次WAITING状态的切换,一次是在等待队列,一次是在同步队列; + +(22)对于重入锁,如果内部调用了condition.await(timeout)之后且被signal()或超时了的,状态会有一个从TIMED_WAITING切换到WAITING的过程,也就是从等待队列进入到同步队列; \ No newline at end of file diff --git a/week_05/08/Thread-08.xmind b/week_05/08/Thread-08.xmind new file mode 100644 index 0000000000000000000000000000000000000000..f8197a543f0dc57ec3d4eba9096d144e3c42ebd8 Binary files /dev/null and b/week_05/08/Thread-08.xmind differ diff --git a/week_05/08/ThreadLocal-008.md b/week_05/08/ThreadLocal-008.md new file mode 100644 index 0000000000000000000000000000000000000000..323a15a2d03e8cef0f3bf50c4f2c8744bfe0ade3 --- /dev/null +++ b/week_05/08/ThreadLocal-008.md @@ -0,0 +1,215 @@ +# 读源码ThreadLocal + +## 简介 + +ThreadLocal类提供了线程局部 (thread-local) 变量。这些变量与普通变量不同,每个线程都可以通过其 get 或 set方法来访问自己的独立初始化的变量副本。 + +类结构图 + +![ThreadLocal类结构图](E:\Project\JavaStudy\week_05\08\attachment\ThreadLocal类结构图.png) + +## 源码分析 + +### 内部类ThreaLocalMap + +ThreadLocalMap时Thread的内部类,每个线程在向Thread中塞值的时候都会向自己所持有的ThreadLocalMap中设置数据;取值的时候同理。所以首先了解一下ThreaLocalMap + +#### 主要属性 + +```java + + /** + * Entry继承WeakReference,并且用ThreadLocal作为key.如果key为null + * (entry.get() == null)表示key不再被引用,表示ThreadLocal对象被回收 + * 因此这时候entry也可以从table从清除。 + */ + static class Entry extends WeakReference> { + /** The value associated with this ThreadLocal. */ + Object value; + Entry(ThreadLocal k, Object v) { + super(k); + value = v; + } + } + + // 初始化默认容量,table,size,threshold与Hashamap中基本一致 + private static final int INITIAL_CAPACITY = 16; + private Entry[] table; + private int size = 0; + private int threshold; // Default to 0 +``` + +#### 主要方法 + +索引相关方法 + +```java + // 获取下一个索引,可以看出该索引形成了环 + private static int nextIndex(int i, int len) { + return ((i + 1 < len) ? i + 1 : 0); + } + // 获取上一个索引 + private static int prevIndex(int i, int len) { + return ((i - 1 >= 0) ? i - 1 : len - 1); + } +``` + +set,get + +```java + private void set(ThreadLocal key, Object value) { + + Entry[] tab = table; + int len = tab.length; + int i = key.threadLocalHashCode & (len-1); + + for (Entry e = tab[i]; + e != null; + e = tab[i = nextIndex(i, len)]) { + ThreadLocal k = e.get(); + // table[i]找到相同的key,则覆盖value + if (k == key) { + e.value = value; + return; + } + // table[i]的key为null,说明被回收了(弱引用),说明该table[i]可重新使用,用key-value替换 + // 并删除其他无效Entry + if (k == null) { + replaceStaleEntry(key, value, i); + return; + } + } + // 不为空的位置遍历完成,找到为空的位置,插入值,size加1 + tab[i] = new Entry(key, value); + int sz = ++size; + /** + * cleanSomeSlots用于清除那些e.get()==null,也就是table[index] != null && * table[index].get()==null,之前提到过,这种数据key关联的对象已经被回收,所以这个 * Entry(table[index])可以被置null。 + * 如果没有清除任何entry,并且当前使用量达到了负载因子所定义(长度的2/3),那么进行rehash() + */ + if (!cleanSomeSlots(i, sz) && sz >= threshold) + rehash(); + } + + // 根据ThreadLocal获取值 + private Entry getEntry(ThreadLocal key) { + // 找到下标 + int i = key.threadLocalHashCode & (table.length - 1); + Entry e = table[i]; + if (e != null && e.get() == key) + return e; + // 通过i无法找到value + else + return getEntryAfterMiss(key, i, e); + } + // 通过i无法找到value,用这个方法 + private Entry getEntryAfterMiss(ThreadLocal key, int i, Entry e) { + Entry[] tab = table; + int len = tab.length; + + while (e != null) { + ThreadLocal k = e.get(); + if (k == key) + return e; + //清除无效的entry + if (k == null) + expungeStaleEntry(i); + else + //基于线性探测法向后扫描 + i = nextIndex(i, len); + e = tab[i]; + } + return null; + } +``` + + + +### ThreadLocal主要属性 + +```java + + private final int threadLocalHashCode = nextHashCode(); + + private static AtomicInteger nextHashCode = + new AtomicInteger(); + // 固定序列,生成方式很巧妙,通过它计算的HashCode能够有效避免哈希冲突 + private static final int HASH_INCREMENT = 0x61c88647; +``` + +### ThreadLocal主要方法 + +```java + // 获取下个HashCode + private static int nextHashCode() { + return nextHashCode.getAndAdd(HASH_INCREMENT); + } + + protected T initialValue() { + return null; + } +``` + +set,get + +```java + public void set(T value) { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) + // 用到map的set方法 + map.set(this, value); + else + createMap(t, value); + } + + ThreadLocalMap getMap(Thread t) { + return t.threadLocals; + } + + public T get() { + Thread t = Thread.currentThread(); + ThreadLocalMap map = getMap(t); + if (map != null) { + // 用到map的getEntry方法 + ThreadLocalMap.Entry e = map.getEntry(this); + if (e != null) { + @SuppressWarnings("unchecked") + T result = (T)e.value; + return result; + } + } + return setInitialValue(); + } +``` + +### 线性探测法 + +解决hash冲突,除了在HashMap中用到的链地址法,还可以用线性探测方法。 + +当我们要往哈希表中插入一个数据时,通过哈希函数计算该值的哈希地址,当我们找到哈希地址时却发现该位置已经被别的数据插入了,那么此时我们就找紧跟着这一位置的下一个位置,看是否能够插入,如果能则插入,不能则继续探测紧跟着当前位置的下一个位置。 + +![线性探测法](E:\Project\JavaStudy\week_05\08\attachment\线性探测法.jpg) + +### 强引用、弱引用、软引用和虚引用 + +先简单做个了解 + +#### 强引用(StrongReference) + +如果一个对象具有强引用,那**垃圾回收器**绝不会回收它 + +```java + Object strongReference = new Object(); +``` + +#### 软引用(SoftReference) + +如果一个对象只具有**软引用**,则**内存空间充足**时,**垃圾回收器**就**不会**回收它;如果**内存空间不足**了,就会**回收**这些对象的内存。只要垃圾回收器没有回收它,该对象就可以被程序使用。 + +#### 弱引用(WeakReference) + +弱引用与软引用的区别在于:只具有弱引用的对象拥有更短暂的生命周期。在垃圾回收器线程扫描它所管辖的内存区域的过程中,一旦发现了只具有弱引用的对象,不管当前内存空间足够与否,都会回收它的内存。不过,由于垃圾回收器是一个优先级很低的线程,因此不一定会很快发现那些只具有弱引用的对象。 + +#### 虚引用(PhantomReference) + +**虚引用**顾名思义,就是**形同虚设**。与其他几种引用都不同,**虚引用**并**不会**决定对象的**生命周期**。如果一个对象**仅持有虚引用**,那么它就和**没有任何引用**一样,在任何时候都可能被垃圾回收器回收。 \ No newline at end of file diff --git a/week_05/08/ThreadLocal-008.xmind b/week_05/08/ThreadLocal-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..947b22d5dd378d305dca15cee45b8f78dd023f3d Binary files /dev/null and b/week_05/08/ThreadLocal-008.xmind differ diff --git a/week_05/08/ThreadPoolExecutor-008.md b/week_05/08/ThreadPoolExecutor-008.md new file mode 100644 index 0000000000000000000000000000000000000000..dbe458ae7f911121ed962b4e75ee63a87a37be72 --- /dev/null +++ b/week_05/08/ThreadPoolExecutor-008.md @@ -0,0 +1,397 @@ +# 读源码--ThreadPoolExecutor + +## 继承体系 + +实现Executor接口 + +![ThreadPoolExecutor继承图](E:\Project\JavaStudy\week_05\08\attachment\ThreadPoolExecutor继承图.png) + +## 源码分析 + +### 主要属性 + +```java + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); + private static final int COUNT_BITS = Integer.SIZE - 3; + private static final int CAPACITY = (1 << COUNT_BITS) - 1; + + // runState is stored in the high-order bits + // 接收新任务,且可执行队列中的任务 + private static final int RUNNING = -1 << COUNT_BITS; + // 不接收新任务,但可执行队列中的任务 + private static final int SHUTDOWN = 0 << COUNT_BITS; + // 不接收新任务,也不执行队列中的任务 + private static final int STOP = 1 << COUNT_BITS; + // 任务中止,工作线程为0,最后变到这个状态的线程执行钩子方法terminated() + private static final int TIDYING = 2 << COUNT_BITS; + // 钩子方法terminated()执行完毕 + private static final int TERMINATED = 3 << COUNT_BITS; +``` + +### ![线程状态切换图](E:\Project\JavaStudy\week_05\08\attachment\线程状态切换图.png)基础方法 + +在其他方法中用到的方法 + +```java + // 线程池的状态 + private static int runStateOf(int c) { return c & ~CAPACITY; } + // 工作线程的数量 + private static int workerCountOf(int c) { return c & CAPACITY; } + // ctl,运行状态(高三位)+线程数量(后面29位) + private static int ctlOf(int rs, int wc) { return rs | wc; } + +``` + +### 主要方法 + +状态切换相关方法 + +1)RUNNING:创建线程池,初始化ctl为RUNNING + +```java + private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); +``` + +2)SHUTDOWN + +```java + public void shutdown() { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + // 切换为SHUTDOWN状态 + advanceRunState(SHUTDOWN); + // 标记空闲线程为中断状态 + interruptIdleWorkers(); + onShutdown(); // hook for ScheduledThreadPoolExecutor + } finally { + mainLock.unlock(); + } + tryTerminate(); + } + + // 自旋切换状态 + private void advanceRunState(int targetState) { + for (;;) { + int c = ctl.get(); + if (runStateAtLeast(c, targetState) || + ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) + break; + } + } +``` + +3)STOP:调用shutdownNow()方法 + +```java + public List shutdownNow() { + List tasks; + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + checkShutdownAccess(); + // 切换为STOP状态 + advanceRunState(STOP); + interruptWorkers(); + tasks = drainQueue(); + } finally { + mainLock.unlock(); + } + tryTerminate(); + return tasks; + } + + private List drainQueue() { + BlockingQueue q = workQueue; + ArrayList taskList = new ArrayList(); + q.drainTo(taskList); + if (!q.isEmpty()) { + for (Runnable r : q.toArray(new Runnable[0])) { + if (q.remove(r)) + taskList.add(r); + } + } + return taskList; + } +``` + +4)TYDING:如上图所示,调用shutdown()或shutdownow()方法后,队列为空,工作线程数量为0的时候,会进入该状态。但只有最后一个变为该状态的才会调用tryTerminate()方法 + +```java + final void tryTerminate() { + for (;;) { + int c = ctl.get(); + // 下列状态不会调用terminated + // 1.正在运行 + // 2.状态大于TIDYNING(2),即为TEMINATED + // 3.SHUTDOWN状态,且队列不为空 + if (isRunning(c) || + runStateAtLeast(c, TIDYING) || + (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) + return; + // 工作线程的数量不为0 + if (workerCountOf(c) != 0) { // Eligible to terminate + interruptIdleWorkers(ONLY_ONE); + return; + } + + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { + try { + terminated(); + } finally { + ctl.set(ctlOf(TERMINATED, 0)); + termination.signalAll(); + } + return; + } + } finally { + mainLock.unlock(); + } + // else retry on failed CAS + } + } +``` + +5)TERMINATED,如上 + +### 重要方法 + +为线程任务执行相关 + +先来一张图,展示一下任务的大致执行流程 + +![任务执行流程](E:\Project\JavaStudy\week_05\08\attachment\任务执行流程.png) + +execute():任务提交 + +```java + public void execute(Runnable command) { + if (command == null) + throw new NullPointerException(); + int c = ctl.get(); + // 工作线程小于最大核心线程,则新建线程 + if (workerCountOf(c) < corePoolSize) { + if (addWorker(command, true)) + return; + c = ctl.get(); + } + // 工作线程大于核心线程,且线程池在允许,任务入队 + if (isRunning(c) && workQueue.offer(command)) { + int recheck = ctl.get(); + // 再次检查线程池状态,如果不是运行状态,就移除任务并执行拒绝策略 + if (! isRunning(recheck) && remove(command)) + reject(command); + // 容错检查工作线程数量是否为0,如果为0就创建一个 + else if (workerCountOf(recheck) == 0) + addWorker(null, false); + } + // 入队失败,尝试创建非核心线程 + else if (!addWorker(command, false)) + // 尝试创建非核心线程,执行拒绝策略 + reject(command); + } +``` + +addWorker:创建工作线程 + +```java + private boolean addWorker(Runnable firstTask, boolean core) { + // 检查 + retry: + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + if (rs >= SHUTDOWN && + ! (rs == SHUTDOWN && + firstTask == null && + ! workQueue.isEmpty())) + return false; + + for (;;) { + int wc = workerCountOf(c); + if (wc >= CAPACITY || + wc >= (core ? corePoolSize : maximumPoolSize)) + return false; + if (compareAndIncrementWorkerCount(c)) + break retry; + c = ctl.get(); // Re-read ctl + if (runStateOf(c) != rs) + continue retry; + // else CAS failed due to workerCount change; retry inner loop + } + } + + boolean workerStarted = false; + boolean workerAdded = false; + Worker w = null; + try { + w = new Worker(firstTask); + final Thread t = w.thread; + if (t != null) { + final ReentrantLock mainLock = this.mainLock; + mainLock.lock(); + try { + // Recheck while holding lock. + // Back out on ThreadFactory failure or if + // shut down before lock acquired. + int rs = runStateOf(ctl.get()); + // 再次线程池状态 + if (rs < SHUTDOWN || + (rs == SHUTDOWN && firstTask == null)) { + if (t.isAlive()) // precheck that t is startable + throw new IllegalThreadStateException(); + // 添加工作线程队列 + workers.add(w); + int s = workers.size(); + if (s > largestPoolSize) + largestPoolSize = s; + workerAdded = true; + } + } finally { + mainLock.unlock(); + } + // 线程添加成功后,启动线程 + if (workerAdded) { + t.start(); + workerStarted = true; + } + } + } finally { + // 启动失败执行失败方法 + if (! workerStarted) + addWorkerFailed(w); + } + return workerStarted; + } +``` + +Worker内部类 + +```java + private final class Worker + extends AbstractQueuedSynchronizer + implements Runnable + { + final Thread thread; + /** Initial task to run. Possibly null. */ + Runnable firstTask; + /** Per-thread task counter */ + volatile long completedTasks; + + Worker(Runnable firstTask) { + setState(-1); // inhibit interrupts until runWorker + this.firstTask = firstTask; + this.thread = getThreadFactory().newThread(this); + } + + /** Delegates main run loop to outer runWorker */ + public void run() { + runWorker(this); + } + + } +``` + +runWorker:真正执行任务的方法 + +```java + final void runWorker(Worker w) { + Thread wt = Thread.currentThread(); + Runnable task = w.firstTask; + w.firstTask = null; + w.unlock(); // allow interrupts + boolean completedAbruptly = true; + try { + // 取任务,直到任务为空 + while (task != null || (task = getTask()) != null) { + w.lock(); + // 线程池状态检查 + if ((runStateAtLeast(ctl.get(), STOP) || + (Thread.interrupted() && + runStateAtLeast(ctl.get(), STOP))) && + !wt.isInterrupted()) + wt.interrupt(); + try { + // 任务执行前处理 + beforeExecute(wt, task); + Throwable thrown = null; + try { + task.run(); + } catch (RuntimeException x) { + thrown = x; throw x; + } catch (Error x) { + thrown = x; throw x; + } catch (Throwable x) { + thrown = x; throw new Error(x); + } finally { + afterExecute(task, thrown); + } + } finally { + // 任务置空,重取 + task = null; + w.completedTasks++; + w.unlock(); + } + } + completedAbruptly = false; + } finally { + processWorkerExit(w, completedAbruptly); + } + } +``` + +getTask:获取任务 + +```java + private Runnable getTask() { + boolean timedOut = false; // Did the last poll() time out? + + for (;;) { + int c = ctl.get(); + int rs = runStateOf(c); + + // Check if queue empty only if necessary. + // 线程池状态为SHUTDOWN时,将队列中的任务运行完 + // 线程池状态为STOP或队列为空时推出 + if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { + decrementWorkerCount(); + return null; + } + + int wc = workerCountOf(c); + + // Are workers subject to culling? + // 允许超时(非核心线程一定允许超时) + // 1.允许核心超时,既所有线程都允许 + // 2.工作线程数量大于核心线程 + boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; + + if ((wc > maximumPoolSize || (timed && timedOut)) + && (wc > 1 || workQueue.isEmpty())) { + if (compareAndDecrementWorkerCount(c)) + return null; + continue; + } + + try { + Runnable r = timed ? + workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : + workQueue.take(); + if (r != null) + return r; + timedOut = true; + } catch (InterruptedException retry) { + timedOut = false; + } + } + } +``` + + + diff --git a/week_05/08/TrhreadPoolExecutor-008.xmind b/week_05/08/TrhreadPoolExecutor-008.xmind new file mode 100644 index 0000000000000000000000000000000000000000..1ea4f1ddf88e66a95867be4ecd6a442bd2f555e9 Binary files /dev/null and b/week_05/08/TrhreadPoolExecutor-008.xmind differ diff --git "a/week_05/08/attachment/ThreadLocal\347\244\272\346\204\217\345\233\276.png" "b/week_05/08/attachment/ThreadLocal\347\244\272\346\204\217\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..616c07825094b3473e3677a86720e6e81219d767 Binary files /dev/null and "b/week_05/08/attachment/ThreadLocal\347\244\272\346\204\217\345\233\276.png" differ diff --git "a/week_05/08/attachment/ThreadLocal\347\261\273\347\273\223\346\236\204\345\233\276.png" "b/week_05/08/attachment/ThreadLocal\347\261\273\347\273\223\346\236\204\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..23c1bfdc0f2fc2aa0cf7d32280df9604e163e56c Binary files /dev/null and "b/week_05/08/attachment/ThreadLocal\347\261\273\347\273\223\346\236\204\345\233\276.png" differ diff --git "a/week_05/08/attachment/ThreadPoolExecutor\347\273\247\346\211\277\345\233\276.png" "b/week_05/08/attachment/ThreadPoolExecutor\347\273\247\346\211\277\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..47eaa2a0944b5a02af867996d80b3ca73df6c711 Binary files /dev/null and "b/week_05/08/attachment/ThreadPoolExecutor\347\273\247\346\211\277\345\233\276.png" differ diff --git "a/week_05/08/attachment/Thread\347\273\247\346\211\277\345\233\276.jpg" "b/week_05/08/attachment/Thread\347\273\247\346\211\277\345\233\276.jpg" new file mode 100644 index 0000000000000000000000000000000000000000..7d73fdfcdfb3849343e5605364fc698542c2f5ae Binary files /dev/null and "b/week_05/08/attachment/Thread\347\273\247\346\211\277\345\233\276.jpg" differ diff --git "a/week_05/08/attachment/\344\273\273\345\212\241\346\211\247\350\241\214\346\265\201\347\250\213.png" "b/week_05/08/attachment/\344\273\273\345\212\241\346\211\247\350\241\214\346\265\201\347\250\213.png" new file mode 100644 index 0000000000000000000000000000000000000000..94ef9474e9106ac4d35340816eacbe60972c0b41 Binary files /dev/null and "b/week_05/08/attachment/\344\273\273\345\212\241\346\211\247\350\241\214\346\265\201\347\250\213.png" differ diff --git "a/week_05/08/attachment/\347\272\277\346\200\247\346\216\242\346\265\213\346\263\225.jpg" "b/week_05/08/attachment/\347\272\277\346\200\247\346\216\242\346\265\213\346\263\225.jpg" new file mode 100644 index 0000000000000000000000000000000000000000..b663c519c7ee718243ad79e09c195a19b930e2ce Binary files /dev/null and "b/week_05/08/attachment/\347\272\277\346\200\247\346\216\242\346\265\213\346\263\225.jpg" differ diff --git "a/week_05/08/attachment/\347\272\277\347\250\213\347\212\266\346\200\201\345\210\207\346\215\242\345\233\276.png" "b/week_05/08/attachment/\347\272\277\347\250\213\347\212\266\346\200\201\345\210\207\346\215\242\345\233\276.png" new file mode 100644 index 0000000000000000000000000000000000000000..af718b04e4942d8631b0b6925394dd2ae72d8bbc Binary files /dev/null and "b/week_05/08/attachment/\347\272\277\347\250\213\347\212\266\346\200\201\345\210\207\346\215\242\345\233\276.png" differ diff --git "a/week_05/08/attachment/\347\272\277\347\250\213\347\224\237\345\221\275\345\221\250\346\234\237.png" "b/week_05/08/attachment/\347\272\277\347\250\213\347\224\237\345\221\275\345\221\250\346\234\237.png" new file mode 100644 index 0000000000000000000000000000000000000000..b0d543d5a65e26ee6d94e5e04facbefb4c4b4bc6 Binary files /dev/null and "b/week_05/08/attachment/\347\272\277\347\250\213\347\224\237\345\221\275\345\221\250\346\234\237.png" differ diff --git "a/week_05/08/\347\272\277\347\250\213.xmind" "b/week_05/08/\347\272\277\347\250\213.xmind" new file mode 100644 index 0000000000000000000000000000000000000000..70661df60460eccaed18fea8ff7790f81487ce8f Binary files /dev/null and "b/week_05/08/\347\272\277\347\250\213.xmind" differ diff --git "a/week_05/08/\347\272\277\347\250\213\345\256\236\346\210\230(1).md" "b/week_05/08/\347\272\277\347\250\213\345\256\236\346\210\230(1).md" new file mode 100644 index 0000000000000000000000000000000000000000..2f09514affe1ad2b4694de4dfc93bba8cbc330c1 --- /dev/null +++ "b/week_05/08/\347\272\277\347\250\213\345\256\236\346\210\230(1).md" @@ -0,0 +1,74 @@ +## 线程实战(1)--按序打印 + +### 题目描述 + +我们提供了一个类: + +```java +public class Foo { + public void one() { print("one"); } + public void two() { print("two"); } + public void three() { print("three"); } +} +三个不同的线程将会共用一个 Foo 实例。 +``` + +线程 A 将会调用 one() 方法 +线程 B 将会调用 two() 方法 +线程 C 将会调用 three() 方法 +请设计修改程序,以确保 two() 方法在 one() 方法之后被执行,three() 方法在 two() 方法之后被执行。 + +示例 1: + +输入: [1,2,3] +输出: "onetwothree" +解释: +有三个线程会被异步启动。 +输入 [1,2,3] 表示线程 A 将会调用 one() 方法,线程 B 将会调用 two() 方法,线程 C 将会调用 three() 方法。 +正确的输出是 "onetwothree"。 +示例 2: + +输入: [1,3,2] +输出: "onetwothree" +解释: +输入 [1,3,2] 表示线程 A 将会调用 one() 方法,线程 B 将会调用 three() 方法,线程 C 将会调用 two() 方法。 +正确的输出是 "onetwothree"。 + + + +### 题解 + +第一次做线程相关的题,完全没有头绪,思考无果,就直接看了题解。点赞最高的是利用了first Finished和SecondFinished两个flag来控制线程执行顺序。但看了第二个,觉得更好,更简洁。 + +```java +class Foo { + + private volatile int count=1; + + public Foo() { + + } + + public void first(Runnable printFirst) throws InterruptedException { + printFirst.run(); + count++; + } + + public void second(Runnable printSecond) throws InterruptedException { + while(count!=2); + // printSecond.run() outputs "second". Do not change or remove this line. + printSecond.run(); + count++; + } + + public void third(Runnable printThird) throws InterruptedException { + while(count!=3); + // printThird.run() outputs "third". Do not change or remove this line. + printThird.run(); + } +} +``` + +### 总结 + +利用flag来控制线程的执行顺序。 \ No newline at end of file diff --git "a/week_05/08/\347\272\277\347\250\213\345\256\236\346\210\230(2).md" "b/week_05/08/\347\272\277\347\250\213\345\256\236\346\210\230(2).md" new file mode 100644 index 0000000000000000000000000000000000000000..ee61580d8af6219882b16c8fd56ed7442440172f --- /dev/null +++ "b/week_05/08/\347\272\277\347\250\213\345\256\236\346\210\230(2).md" @@ -0,0 +1,82 @@ +## 线程实战(2) + +### 题目描述 + +我们提供一个类: + +```java +class FooBar { + public void foo() { + for (int i = 0; i < n; i++) { + print("foo"); + } + } + + public void bar() { + for (int i = 0; i < n; i++) { + print("bar"); + } + } +}两个不同的线程将会共用一个 FooBar 实例。其中一个线程将会调用 foo() 方法,另一个线程将会调用 bar() 方法。 +``` + +请设计修改程序,以确保 "foobar" 被输出 n 次。 + + + +示例 1: + +输入: n = 1 +输出: "foobar" +解释: 这里有两个线程被异步启动。其中一个调用 foo() 方法, 另一个调用 bar() 方法,"foobar" 将被输出一次。 +示例 2: + +输入: n = 2 +输出: "foobarfoobar" +解释: "foobar" 将被输出两次。 + +### 题解 + +经过了题目一,这次就有感觉了,交替打印就是顺序问题,跟上次类似。用标志flag来控制顺序。需要注意的是flag须用volatile修饰,保证多线程中flag的可见性。直接加的if判断来执行,发现只能执行一次。看了正确题解才理解到可以用yield()方法让出线程,让当前线程暂停。 + +```java +class FooBar { + private int n; + private volatile boolean flag; + public FooBar(int n) { + this.n = n; + } + public void foo(Runnable printFoo) throws InterruptedException { + + for (int i = 0; i < n; i++) { + while(flag){ + Thread.yield(); + } + // printFoo.run() outputs "foo". Do not change or remove this line. + printFoo.run(); + flag=true; + + } + } + + public void bar(Runnable printBar) throws InterruptedException { + + for (int i = 0; i < n; i++) { + while(!flag){ + Thread.yield(); + } + // printBar.run() outputs "bar". Do not change or remove this line. + printBar.run(); + flag=false; + + } + } +} +``` + +### 总结 + +1. 共享变量flag +2. yield():让出线程 +3. 对线程方法的使用还不够熟悉,须增强 +