From a1aee1ef3ca2b7690af78f85131ef639996a4b6c Mon Sep 17 00:00:00 2001 From: weewwccw <5572917+shifm@user.noreply.gitee.com> Date: Sat, 11 Jan 2020 14:20:13 +0800 Subject: [PATCH] week05 --- week_05/11/Executors.md | 179 ++++++++++++++++++++++++++++ week_05/11/Thread.md | 117 ++++++++++++++++++ week_05/11/ThreadLocal.md | 125 +++++++++++++++++++ week_05/11/ThreadPoolExecutor.md | 198 +++++++++++++++++++++++++++++++ 4 files changed, 619 insertions(+) create mode 100644 week_05/11/Executors.md create mode 100644 week_05/11/Thread.md create mode 100644 week_05/11/ThreadLocal.md create mode 100644 week_05/11/ThreadPoolExecutor.md diff --git a/week_05/11/Executors.md b/week_05/11/Executors.md new file mode 100644 index 0000000..de0e137 --- /dev/null +++ b/week_05/11/Executors.md @@ -0,0 +1,179 @@ +Executors 在Java 5之后,并发编程引入了一堆新的启动、调度和管理线程的API。Executor框架便是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。Eexecutor作为灵活且强大的异步执行框架,其支持多种不同类型的任务执行策略,提供了一种标准的方法将任务的提交过程和执行过程解耦开发,基于生产者-消费者模式,其提交任务的线程相当于生产者,执行任务的线程相当于消费者,并用Runnable来表示任务,Executor的实现还提供了对生命周期的支持,以及统计信息收集,应用程序管理机制和性能监视等机制。 一、Executor的UML图:(常用的几个接口和子类) + +Executor框架包括:线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。 + +二、Executor和ExecutorService Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),该方法接收一个Runable实例,它用来执行一个任务,任务即一个实现了Runnable接口的类,一般来说,Runnable任务开辟在新线程中的使用方法为:new Thread(new RunnableTask())).start(),但在Executor中,可以使用Executor而不用显示地创建线程:executor.execute(new RunnableTask()); // 异步执行 + +ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,返回 Future 对象,以及可跟踪一个或多个异步任务执行状况返回Future的方法;可以调用ExecutorService的shutdown()方法来平滑地关闭 ExecutorService,调用该方法后,将导致ExecutorService停止接受任何新的任务且等待已经提交的任务执行完成(已经提交的任务会分两类:一类是已经在执行的,另一类是还没有开始执行的),当所有已经提交的任务执行完毕后将会关闭ExecutorService。因此我们一般用该接口来实现和管理多线程。 + +通过 ExecutorService.submit() 方法返回的 Future 对象,可以调用isDone()方法查询Future是否已经完成。当任务完成时,它具有一个结果,你可以调用get()方法来获取该结果。你也可以不用isDone()进行检查就直接调用get()获取结果,在这种情况下,get()将阻塞,直至结果准备就绪,还可以取消任务的执行。Future 提供了 cancel() 方法用来取消执行 pending 中的任务。ExecutorService 部分代码如下: + +public interface ExecutorService extends Executor { void shutdown(); Future submit(Callable task); Future submit(Runnable task, T result); List invokeAll(Collection tasks, long timeout, TimeUnit unit) throws InterruptedException; } + +三、Executors类: 主要用于提供线程池相关的操作 Executors类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了ExecutorService接口。 + +1、public static ExecutorService newFiexedThreadPool(int Threads) 创建固定数目线程的线程池。 + +2、public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果没有可用的线程,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。 + +3、public static ExecutorService newSingleThreadExecutor():创建一个单线程化的Executor。 + +4、public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) + +创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。 + +newCachedThreadPool() + +-缓存型池子,先查看池中有没有以前建立的线程,如果有,就 reuse.如果没有,就建一个新的线程加入池中 -缓存型池子通常用于执行一些生存期很短的异步型任务 因此在一些面向连接的daemon型SERVER中用得不多。但对于生存期短的异步任务,它是Executor的首选。 -能reuse的线程,必须是timeout IDLE内的池中线程,缺省 timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。 注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。 + +newFixedThreadPool(int) + +-newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程 -其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子 -和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器 -从方法的源代码看,cache池和fixed 池调用的是同一个底层 池,只不过参数不同: fixed池线程数固定,并且是0秒IDLE(无IDLE) +cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE + +newScheduledThreadPool(int) + +-调度型线程池 -这个池子里的线程可以按schedule依次delay执行,或周期执行 + +SingleThreadExecutor() + +-单例线程,任意时间池中只能有一个线程 -用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE) + +四、Executor VS ExecutorService VS Executors 正如上面所说,这三者均是 Executor 框架中的一部分。Java 开发者很有必要学习和理解他们,以便更高效的使用 Java 提供的不同类型的线程池。总结一下这三者间的区别,以便大家更好的理解: + +Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承了 Executor 接口,是 Executor 的子接口 Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。 Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。 Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。可以通过 《Java Concurrency in Practice》 一书了解更多关于关闭线程池和如何处理 pending 的任务的知识。 Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。 下面给出一个Executor执行Callable任务的示例代码: import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; + +public class CallableDemo{ +public static void main(String[] args){ +ExecutorService executorService = Executors.newCachedThreadPool(); +List resultList = new ArrayList(); + + //创建10个任务并执行 + for (int i = 0; i < 10; i++){ + //使用ExecutorService执行Callable类型的任务,并将结果保存在future变量中 + Future future = executorService.submit(new TaskWithResult(i)); + //将任务执行结果存储到List中 + resultList.add(future); + } + + //遍历任务的结果 + for (Future fs : resultList){ + try{ + while(!fs.isDone);//Future返回如果没有完成,则一直循环等待,直到Future返回完成 + System.out.println(fs.get()); //打印各个线程(任务)执行的结果 + }catch(InterruptedException e){ + e.printStackTrace(); + }catch(ExecutionException e){ + e.printStackTrace(); + }finally{ + //启动一次顺序关闭,执行以前提交的任务,但不接受新任务 + executorService.shutdown(); + } + } +} +} + +class TaskWithResult implements Callable{ private int id; + +public TaskWithResult(int id){ + this.id = id; +} + +/** + * 任务的具体过程,一旦任务传给ExecutorService的submit方法, + * 则该方法自动在一个线程上执行 + */ +public String call() throws Exception { + System.out.println("call()方法被自动调用!!! " + Thread.currentThread().getName()); + //该返回结果将被Future的get方法得到 + return "call()方法被自动调用,任务返回的结果是:" + id + " " + Thread.currentThread().getName(); +} +} 五、自定义线程池 自定义线程池,可以用ThreadPoolExecutor类创建,它有多个构造方法来创建线程池,用该类很容易实现自定义的线程池,这里先贴上示例程序: import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +public class ThreadPoolTest{ +public static void main(String[] args){ +//创建等待队列 +BlockingQueue bqueue = new ArrayBlockingQueue(20); +//创建线程池,池中保存的线程数为3,允许的最大线程数为5 +ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue); +//创建七个任务 +Runnable t1 = new MyThread(); +Runnable t2 = new MyThread(); +Runnable t3 = new MyThread(); +Runnable t4 = new MyThread(); +Runnable t5 = new MyThread(); +Runnable t6 = new MyThread(); +Runnable t7 = new MyThread(); +//每个任务会在一个线程上执行 +pool.execute(t1); +pool.execute(t2); +pool.execute(t3); +pool.execute(t4); +pool.execute(t5); +pool.execute(t6); +pool.execute(t7); +//关闭线程池 +pool.shutdown(); +} +} + +class MyThread implements Runnable{ +@Override +public void run(){ +System.out.println(Thread.currentThread().getName() + "正在执行。。。"); +try{ +Thread.sleep(100); +}catch(InterruptedException e){ +e.printStackTrace(); +} +} +} 运行结果如下: + +从结果中可以看出,七个任务是在线程池的三个线程上执行的。这里简要说明下用到的ThreadPoolExecuror类的构造方法中各个参数的含义。 public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue workQueue) + +corePoolSize:线程池中所保存的核心线程数,包括空闲线程。 + +maximumPoolSize:池中允许的最大线程数。 + +keepAliveTime:线程池中的空闲线程所能持续的最长时间。 + +unit:持续时间的单位。 + +workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。 + +根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法将一个Runnable任务添加到线程池中时,按照如下顺序来处理: 1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务; 2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行); + +3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务; + +4、如果线程池中的线程数量等于了maximumPoolSize,有4种处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。 + +总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。 + +另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。 +我们大致来看下Executors的源码,newCachedThreadPool的不带RejectedExecutionHandler参数(即第五个参数,线程数量超过maximumPoolSize时,指定处理方式)的构造方法如下: + +public static ExecutorService newCachedThreadPool() { +return new ThreadPoolExecutor(0, Integer.MAX_VALUE, +60L, TimeUnit.SECONDS, +new SynchronousQueue()); +} 它将corePoolSize设定为0,而将maximumPoolSize设定为了Integer的最大值,线程空闲超过60秒,将会从线程池中移除。由于核心线程数为0,因此每次添加任务,都会先从线程池中找空闲线程,如果没有就会创建一个线程(SynchronousQueue决定的,后面会说)来执行新的任务,并将该线程加入到线程池中,而最大允许的线程数为Integer的最大值,因此这个线程池理论上可以不断扩大。 再来看newFixedThreadPool的不带RejectedExecutionHandler参数的构造方法,如下: + +public static ExecutorService newFixedThreadPool(int nThreads) { +return new ThreadPoolExecutor(nThreads, nThreads, +0L, TimeUnit.MILLISECONDS, +new LinkedBlockingQueue()); +} 它将corePoolSize和maximumPoolSize都设定为了nThreads,这样便实现了线程池的大小的固定,不会动态地扩大,另外,keepAliveTime设定为了0,也就是说线程只要空闲下来,就会被移除线程池,敢于LinkedBlockingQueue下面会说。 下面说说几种排队的策略: + +1、直接提交。缓冲队列采用 SynchronousQueue,它将任务直接交给线程处理而不保持它们。如果不存在可用于立即运行任务的线程(即线程池中的线程都在工作),则试图把任务加入缓冲队列将会失败,因此会构造一个新的线程来处理新添加的任务,并将其加入到线程池中。直接提交通常要求无界 maximumPoolSizes(Integer.MAX_VALUE) 以避免拒绝新提交的任务。newCachedThreadPool采用的便是这种策略。 + +2、无界队列。使用无界队列(典型的便是采用预定义容量的 LinkedBlockingQueue,理论上是该缓冲队列可以对无限多的任务排队)将导致在所有 corePoolSize 线程都工作的情况下将新任务加入到缓冲队列中。这样,创建的线程就不会超过 corePoolSize,也因此,maximumPoolSize 的值也就无效了。当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列。newFixedThreadPool采用的便是这种策略。 + +3、有界队列。当使用有限的 maximumPoolSizes 时,有界队列(一般缓冲队列使用ArrayBlockingQueue,并制定队列的最大长度)有助于防止资源耗尽,但是可能较难调整和控制,队列大小和最大池大小需要相互折衷,需要设定合理的参数。 +六、比较Executor和new Thread() new Thread的弊端如下: + +a. 每次new Thread新建对象性能差。 b. 线程缺乏统一管理,可能无限制新建线程,相互之间竞争,及可能占用过多系统资源导致死机或oom。 c. 缺乏更多功能,如定时执行、定期执行、线程中断。 相比new Thread,Java提供的四种线程池的好处在于: a. 重用存在的线程,减少对象创建、消亡的开销,性能佳。 b. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。 c. 提供定时执行、定期执行、单线程、并发数控制等功能。 ———————————————— 版权声明:本文为CSDN博主「不断前行的菜鸟_」的原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/weixin_40304387/article/details/80508236 \ No newline at end of file diff --git a/week_05/11/Thread.md b/week_05/11/Thread.md new file mode 100644 index 0000000..d0e28a3 --- /dev/null +++ b/week_05/11/Thread.md @@ -0,0 +1,117 @@ + 写入时复制(CopyOnWrite,简称COW)思想是计算机程序设计领域中的一种优化策略。其核心思想是,如果有多个调用者(Callers)同时要求相同的资源(如内存或者是磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者视图修改资源内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的(transparently)。此做法主要的优点是如果调用者没有修改资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。 CopyOnWriteArrayList的实现原理   在使用CopyOnWriteArrayList之前,我们先阅读其源码了解下它是如何实现的。以下代码是向CopyOnWriteArrayList中add方法的实现(向CopyOnWriteArrayList里添加元素),可以发现在添加的时候是需要加锁的,否则多线程写的时候会Copy出N个副本出来。 复制代码 /** * Appends the specified element to the end of this list. * * @param e element to be appended to this list * @return true (as specified by {@link Collection#add}) */ public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; Object[] newElements = Arrays.copyOf(elements, len + 1); newElements[len] = e; setArray(newElements); return true; } finally { lock.unlock(); } } 复制代码   读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWriteArrayList。 + +public E get(int index) { return get(getArray(), index); }   JDK中并没有提供CopyOnWriteMap,我们可以参考CopyOnWriteArrayList来实现一个,基本代码如下: + +复制代码 import java.util.Collection; import java.util.Map; import java.util.Set; + +public class CopyOnWriteMap implements Map, Cloneable { private volatile Map internalMap; + +public CopyOnWriteMap() { + internalMap = new HashMap(); +} + +public V put(K key, V value) { + + synchronized (this) { + Map newMap = new HashMap(internalMap); + V val = newMap.put(key, value); + internalMap = newMap; + return val; + } +} + +public V get(Object key) { + return internalMap.get(key); +} + +public void putAll(Map newData) { + synchronized (this) { + Map newMap = new HashMap(internalMap); + newMap.putAll(newData); + internalMap = newMap; + } +} +} 复制代码   实现很简单,只要了解了CopyOnWrite机制,我们可以实现各种CopyOnWrite容器,并且在不同的应用场景中使用。 + + 几个要点 实现了List接口 内部持有一个ReentrantLock lock = new ReentrantLock(); 底层是用volatile transient声明的数组 array 读写分离,写时复制出一个新的数组,完成插入、修改或者移除操作后将新数组赋值给array 注: + +  volatile (挥发物、易变的):变量修饰符,只能用来修饰变量。volatile修饰的成员变量在每次被线程访问时,都强迫从共享内存中重读该成员变量的值。而且,当成员变量发生变 化时,强迫线程将变化值回写到共享内存。这样在任何时刻,两个不同的线程总是看到某个成员变量的同一个值。 + +  transient (暂短的、临时的):修饰符,只能用来修饰字段。在对象序列化的过程中,标记为transient的变量不会被序列化。 + + 增删改查   1)增 + +复制代码 public boolean add(E e) { final ReentrantLock lock = this.lock; //获得锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; //复制一个新的数组 Object[] newElements = Arrays.copyOf(elements, len + 1); //插入新值 newElements[len] = e; //将新的数组指向原来的引用 setArray(newElements); return true; } finally { //释放锁 lock.unlock(); } } + +public void add(int index, E element) { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] elements = getArray(); int len = elements.length; if (index > len || index < 0) throw new IndexOutOfBoundsException("Index: "+index+ ", Size: "+len); Object[] newElements; int numMoved = len - index; if (numMoved == 0) newElements = Arrays.copyOf(elements, len + 1); else { newElements = new Object[len + 1]; System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index, newElements, index + 1, numMoved); } newElements[index] = element; setArray(newElements); } finally { lock.unlock(); } } 复制代码   2)删 + +复制代码 public E remove(int index) { final ReentrantLock lock = this.lock; //获得锁 lock.lock(); try { Object[] elements = getArray(); int len = elements.length; E oldValue = get(elements, index); int numMoved = len - index - 1; if (numMoved == 0) //如果删除的元素是最后一个,直接复制该元素前的所有元素到新的数组 setArray(Arrays.copyOf(elements, len - 1)); else { //创建新的数组 Object[] newElements = new Object[len - 1]; //将index+1至最后一个元素向前移动一格 System.arraycopy(elements, 0, newElements, 0, index); System.arraycopy(elements, index + 1, newElements, index, numMoved); setArray(newElements); } return oldValue; } finally { lock.unlock(); } } 复制代码 3)改 + +复制代码 public E set(int index, E element) { final ReentrantLock lock = this.lock; //获得锁 lock.lock(); try { Object[] elements = getArray(); E oldValue = get(elements, index); + + if (oldValue != element) { + int len = elements.length; + //创建新数组 + Object[] newElements = Arrays.copyOf(elements, len); + //替换元素 + newElements[index] = element; + //将新数组指向原来的引用 + setArray(newElements); + } else { + // Not quite a no-op; ensures volatile write semantics + setArray(elements); + } + return oldValue; +} finally { + //释放锁 + lock.unlock(); +} +} 复制代码 4)查 + +//直接获取index对应的元素 public E get(int index) {return get(getArray(), index);} private E get(Object[] a, int index) {return (E) a[index];} CopyOnWrite的应用场景   CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下: + +复制代码 import java.util.Map; + +import com.ifeve.book.forkjoin.CopyOnWriteMap; + +/** + +黑名单服务 + +@author fangtengfei + +/ public class BlackListServiceImpl { + +private static CopyOnWriteMap blackListMap = new CopyOnWriteMap( + + 1000); +public static boolean isBlackList(String id) { + + return blackListMap.get(id) == null ? false : true; +} + +public static void addBlackList(String id) { + + blackListMap.put(id, Boolean.TRUE); +} + +/** + +批量添加黑名单 +@param ids +/ public static void addBlackList(Map ids) { blackListMap.putAll(ids); } +} 复制代码 代码很简单,但是使用CopyOnWriteMap需要注意两件事情: + +  1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。 + +  2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。 + +CopyOnWrite的缺点  CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。 + +  内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。 + +  针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。 + +  数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。 + +CopyOnWriteArrayList为什么并发安全且性能比Vector好  我知道Vector是增删改查方法都加了synchronized,保证同步,但是每个方法执行的时候都要去获得锁,性能就会大大下降,而CopyOnWriteArrayList 只是在增删改上加锁,但是读不加锁,在读方面的性能就好于Vector,CopyOnWriteArrayList支持读多写少的并发情况。 \ No newline at end of file diff --git a/week_05/11/ThreadLocal.md b/week_05/11/ThreadLocal.md new file mode 100644 index 0000000..0c382c2 --- /dev/null +++ b/week_05/11/ThreadLocal.md @@ -0,0 +1,125 @@ +ThreadLocal threadlocal使用方法很简单 + +static final ThreadLocal sThreadLocal = new ThreadLocal(); sThreadLocal.set() sThreadLocal.get() threadlocal而是一个线程内部的存储类,可以在指定线程内存储数据,数据存储以后,只有指定线程可以得到存储数据,官方解释如下。 + +/** + +This class provides thread-local variables. These variables differ from +their normal counterparts in that each thread that accesses one (via its +{@code get} or {@code set} method) has its own, independently initialized +copy of the variable. {@code ThreadLocal} instances are typically private +static fields in classes that wish to associate state with a thread (e.g., +a user ID or Transaction ID). +/ 大致意思就是ThreadLocal提供了线程内存储变量的能力,这些变量不同之处在于每一个线程读取的变量是对应的互相独立的。通过get和set方法就可以得到当前线程对应的值。 +做个不恰当的比喻,从表面上看ThreadLocal相当于维护了一个map,key就是当前的线程,value就是需要存储的对象。 + +这里的这个比喻是不恰当的,实际上是ThreadLocal的静态内部类ThreadLocalMap为每个Thread都维护了一个数组table,ThreadLocal确定了一个数组下标,而这个下标就是value存储的对应位置。。 + +作为一个存储数据的类,关键点就在get和set方法。 + +//set 方法 public void set(T value) { //获取当前线程 Thread t = Thread.currentThread(); //实际存储的数据结构类型 ThreadLocalMap map = getMap(t); //如果存在map就直接set,没有则创建map并set if (map != null) map.set(this, value); else createMap(t, value); } + +//getMap方法 ThreadLocalMap getMap(Thread t) { //thred中维护了一个ThreadLocalMap return t.threadLocals; } + +//createMap void createMap(Thread t, T firstValue) { //实例化一个新的ThreadLocalMap,并赋值给线程的成员变量threadLocals t.threadLocals = new ThreadLocalMap(this, firstValue); } 从上面代码可以看出每个线程持有一个ThreadLocalMap对象。每一个新的线程Thread都会实例化一个ThreadLocalMap并赋值给成员变量threadLocals,使用时若已经存在threadLocals则直接使用已经存在的对象。 + +Thread /* ThreadLocal values pertaining to this thread. This map is maintained * by the ThreadLocal class. */ ThreadLocal.ThreadLocalMap threadLocals = null; Thread中关于ThreadLocalMap部分的相关声明,接下来看一下createMap方法中的实例化过程。 + +ThreadLocalMap set方法 //Entry为ThreadLocalMap静态内部类,对ThreadLocal的若引用 //同时让ThreadLocal和储值形成key-value的关系 static class Entry extends WeakReference> { /** The value associated with this ThreadLocal. */ Object value; + +Entry(ThreadLocal k, Object v) { + super(k); + value = v; +} +} + +//ThreadLocalMap构造方法 ThreadLocalMap(ThreadLocal firstKey, Object firstValue) { //内部成员数组,INITIAL_CAPACITY值为16的常量 table = new Entry[INITIAL_CAPACITY]; //位运算,结果与取模相同,计算出需要存放的位置 //threadLocalHashCode比较有趣 int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1); table[i] = new Entry(firstKey, firstValue); size = 1; setThreshold(INITIAL_CAPACITY); } 通过上面的代码不难看出在实例化ThreadLocalMap时创建了一个长度为16的Entry数组。通过hashCode与length位运算确定出一个索引值i,这个i就是被存储在table数组中的位置。 + +前面讲过每个线程Thread持有一个ThreadLocalMap类型的实例threadLocals,结合此处的构造方法可以理解成每个线程Thread都持有一个Entry型的数组table,而一切的读取过程都是通过操作这个数组table完成的。 + +显然table是set和get的焦点,在看具体的set和get方法前,先看下面这段代码。 + +//在某一线程声明了ABC三种类型的ThreadLocal ThreadLocal sThreadLocalA = new ThreadLocal(); ThreadLocal sThreadLocalB = new ThreadLocal(); ThreadLocal sThreadLocalC = new ThreadLocal(); 由前面我们知道对于一个Thread来说只有持有一个ThreadLocalMap,所以ABC对应同一个ThreadLocalMap对象。为了管理ABC,于是将他们存储在一个数组的不同位置,而这个数组就是上面提到的Entry型的数组table。 + +那么问题来了,ABC在table中的位置是如何确定的?为了能正常够正常的访问对应的值,肯定存在一种方法计算出确定的索引值i,show me code。 + +//ThreadLocalMap中set方法。 private void set(ThreadLocal key, Object value) { + + // We don't use a fast path as with get() because it is at + // least as common to use set() to create new entries as + // it is to replace existing ones, in which case, a fast + // path would fail more often than not. + + Entry[] tab = table; + int len = tab.length; + //获取索引值,这个地方是比较特别的地方 + int i = key.threadLocalHashCode & (len-1); + + //遍历tab如果已经存在则更新值 + for (Entry e = tab[i]; + e != null; + e = tab[i = nextIndex(i, len)]) { + ThreadLocal k = e.get(); + + if (k == key) { + e.value = value; + return; + } + + if (k == null) { + replaceStaleEntry(key, value, i); + return; + } + } + + //如果上面没有遍历成功则创建新值 + tab[i] = new Entry(key, value); + int sz = ++size; + //满足条件数组扩容x2 + if (!cleanSomeSlots(i, sz) && sz >= threshold) + rehash(); + } +在ThreadLocalMap中的set方法与构造方法能看到以下代码片段。 + +int i = key.threadLocalHashCode & (len-1) int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1) 简而言之就是将threadLocalHashCode进行一个位运算(取模)得到索引i,threadLocalHashCode代码如下。 //ThreadLocal中threadLocalHashCode相关代码. + +private final int threadLocalHashCode = nextHashCode(); + +/** + * The next hash code to be given out. Updated atomically. Starts at + * zero. + */ +private static AtomicInteger nextHashCode = + new AtomicInteger(); + +/** + * The difference between successively generated hash codes - turns + * implicit sequential thread-local IDs into near-optimally spread + * multiplicative hash values for power-of-two-sized tables. + */ +private static final int HASH_INCREMENT = 0x61c88647; + +/** + * Returns the next hash code. + */ +private static int nextHashCode() { + //自增 + return nextHashCode.getAndAdd(HASH_INCREMENT); +} +因为static的原因,在每次new ThreadLocal时因为threadLocalHashCode的初始化,会使threadLocalHashCode值自增一次,增量为0x61c88647。 + +0x61c88647是斐波那契散列乘数,它的优点是通过它散列(hash)出来的结果分布会比较均匀,可以很大程度上避免hash冲突,已初始容量16为例,hash并与15位运算计算数组下标结果如下: + +hashCode 数组下标 0x61c88647 7 0xc3910c8e 14 0x255992d5 5 0x8722191c 12 0xe8ea9f63 3 0x4ab325aa 10 0xac7babf1 1 0xe443238 8 0x700cb87f 15 总结如下: + +对于某一ThreadLocal来讲,他的索引值i是确定的,在不同线程之间访问时访问的是不同的table数组的同一位置即都为table[i],只不过这个不同线程之间的table是独立的。 对于同一线程的不同ThreadLocal来讲,这些ThreadLocal实例共享一个table数组,然后每个ThreadLocal实例在table中的索引i是不同的。 get()方法 //ThreadLocal中get方法 public T get() { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } return setInitialValue(); } + +//ThreadLocalMap中getEntry方法 private Entry getEntry(ThreadLocal key) { int i = key.threadLocalHashCode & (table.length - 1); Entry e = table[i]; if (e != null && e.get() == key) return e; else return getEntryAfterMiss(key, i, e); } 理解了set方法,get方法也就清楚明了,无非是通过计算出索引直接从数组对应位置读取即可。 + +ThreadLocal实现主要涉及Thread,ThreadLocal,ThreadLocalMap这三个类。关于ThreadLocal的实现流程正如上面写的那样,实际代码还有许多细节处理的部分并没有在这里写出来。 + +ThreadLocal特性 ThreadLocal和Synchronized都是为了解决多线程中相同变量的访问冲突问题,不同的点是 + +Synchronized是通过线程等待,牺牲时间来解决访问冲突 ThreadLocal是通过每个线程单独一份存储空间,牺牲空间来解决冲突,并且相比于Synchronized,ThreadLocal具有线程隔离的效果,只有在线程内才能获取到对应的值,线程外则不能访问到想要的值。 正因为ThreadLocal的线程隔离特性,使他的应用场景相对来说更为特殊一些。在android中Looper、ActivityThread以及AMS中都用到了ThreadLocal。当某些数据是以线程为作用域并且不同线程具有不同的数据副本的时候,就可以考虑采用ThreadLocal。 + +作者:ingxin 链接:https://www.jianshu.com/p/3c5d7f09dfbd 来源:简书 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 \ No newline at end of file diff --git a/week_05/11/ThreadPoolExecutor.md b/week_05/11/ThreadPoolExecutor.md new file mode 100644 index 0000000..96be761 --- /dev/null +++ b/week_05/11/ThreadPoolExecutor.md @@ -0,0 +1,198 @@ +首先看下ThreadPoolExecutor的构造函数 + +复制代码 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } 复制代码 构造函数的参数含义如下: + +corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去; + +maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量; + +keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁; + +unit:keepAliveTime的单位 + +workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列几种; + +threadFactory:线程工厂,用于创建线程,一般用默认即可; + +handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务; + +接下来我们对其中比较重要参数做进一步的了解: + +一、workQueue任务队列 + +上面我们已经介绍过了,它一般分为直接提交队列、有界任务队列、无界任务队列、优先任务队列; + +1、直接提交队列:设置为SynchronousQueue队列,SynchronousQueue是一个特殊的BlockingQueue,它没有容量,没执行一个插入操作就会阻塞,需要再执行一个删除操作才会被唤醒,反之每一个删除操作也都要等待对应的插入操作。 + +复制代码 public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //maximumPoolSize设置为2 ,拒绝策略为AbortPolic策略,直接抛出异常 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new SynchronousQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); for(int i=0;i<3;i++) { pool.execute(new ThreadTask()); } +} } + +public class ThreadTask implements Runnable{ + +public ThreadTask() { + +} + +public void run() { + System.out.println(Thread.currentThread().getName()); +} +} 复制代码 输出结果为 + +复制代码 pool-1-thread-1 pool-1-thread-2 Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.hhxx.test.ThreadTask@55f96302 rejected from java.util.concurrent.ThreadPoolExecutor@3d4eac69[Running, pool size = 2, active threads = 0, queued tasks = 0, completed tasks = 2] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source) at com.hhxx.test.ThreadPool.main(ThreadPool.java:17) 复制代码 可以看到,当任务队列为SynchronousQueue,创建的线程数大于maximumPoolSize时,直接执行了拒绝策略抛出异常。 + +使用SynchronousQueue队列,提交的任务不会被保存,总是会马上提交执行。如果用于执行任务的线程数量小于maximumPoolSize,则尝试创建新的进程,如果达到maximumPoolSize设置的最大值,则根据你设置的handler执行拒绝策略。因此这种方式你提交的任务不会被缓存起来,而是会被马上执行,在这种情况下,你需要对你程序的并发量有个准确的评估,才能设置合适的maximumPoolSize数量,否则很容易就会执行拒绝策略; + +2、有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现,如下所示 + +pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(10),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); 使用ArrayBlockingQueue有界任务队列,若有新的任务需要执行时,线程池会创建新的线程,直到创建的线程数量达到corePoolSize时,则会将新的任务加入到等待队列中。若等待队列已满,即超过ArrayBlockingQueue初始化的容量,则继续创建线程,直到线程数量达到maximumPoolSize设置的最大线程数量,若大于maximumPoolSize,则执行拒绝策略。在这种情况下,线程数量的上限与有界任务队列的状态有直接关系,如果有界队列初始容量较大或者没有达到超负荷的状态,线程数将一直维持在corePoolSize以下,反之当任务队列已满时,则会以maximumPoolSize为最大线程数上限。 + +3、无界的任务队列:有界任务队列可以使用LinkedBlockingQueue实现,如下所示 + +pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); 使用无界任务队列,线程池的任务队列可以无限制的添加新的任务,而线程池创建的最大线程数量就是你corePoolSize设置的数量,也就是说在这种情况下maximumPoolSize这个参数是无效的,哪怕你的任务队列中缓存了很多未执行的任务,当线程池的线程数达到corePoolSize后,就不会再增加了;若后续有新的任务加入,则直接进入队列等待,当使用这种任务队列模式时,一定要注意你任务提交与处理之间的协调与控制,不然会出现队列中的任务由于无法及时处理导致一直增长,直到最后资源耗尽的问题。 + +4、优先任务队列:优先任务队列通过PriorityBlockingQueue实现,下面我们通过一个例子演示下 + +复制代码 public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //优先任务队列 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new PriorityBlockingQueue(),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy()); + + for(int i=0;i<20;i++) { + pool.execute(new ThreadTask(i)); + } +} +} + +public class ThreadTask implements Runnable,Comparable{ + +private int priority; + +public int getPriority() { + return priority; +} + +public void setPriority(int priority) { + this.priority = priority; +} + +public ThreadTask() { + +} + +public ThreadTask(int priority) { + this.priority = priority; +} + +//当前对象和其他对象做比较,当前优先级大就返回-1,优先级小就返回1,值越小优先级越高 +public int compareTo(ThreadTask o) { + return this.priority>o.priority?-1:1; +} + +public void run() { + try { + //让线程阻塞,使后续任务进入缓存队列 + Thread.sleep(1000); + System.out.println("priority:"+this.priority+",ThreadName:"+Thread.currentThread().getName()); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + +} +} 复制代码 我们来看下执行的结果情况 + +复制代码 priority:0,ThreadName:pool-1-thread-1 priority:9,ThreadName:pool-1-thread-1 priority:8,ThreadName:pool-1-thread-1 priority:7,ThreadName:pool-1-thread-1 priority:6,ThreadName:pool-1-thread-1 priority:5,ThreadName:pool-1-thread-1 priority:4,ThreadName:pool-1-thread-1 priority:3,ThreadName:pool-1-thread-1 priority:2,ThreadName:pool-1-thread-1 priority:1,ThreadName:pool-1-thread-1 复制代码 大家可以看到除了第一个任务直接创建线程执行外,其他的任务都被放入了优先任务队列,按优先级进行了重新排列执行,且线程池的线程数一直为corePoolSize,也就是只有一个。 + +通过运行的代码我们可以看出PriorityBlockingQueue它其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量,只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。 + +二、拒绝策略 + +一般我们创建线程池时,为防止资源被耗尽,任务队列都会选择创建有界任务队列,但种模式下如果出现任务队列已满且线程池创建的线程数达到你设置的最大线程数时,这时就需要你指定ThreadPoolExecutor的RejectedExecutionHandler参数即合理的拒绝策略,来处理线程池"超载"的情况。ThreadPoolExecutor自带的拒绝策略如下: + +1、AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作; + +2、CallerRunsPolicy策略:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行; + +3、DiscardOledestPolicy策略:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交; + +4、DiscardPolicy策略:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失; + +以上内置的策略均实现了RejectedExecutionHandler接口,当然你也可以自己扩展RejectedExecutionHandler接口,定义自己的拒绝策略,我们看下示例代码: + +复制代码 public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //自定义拒绝策略 pool = new ThreadPoolExecutor(1, 2, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5), Executors.defaultThreadFactory(), new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(r.toString()+"执行了拒绝策略"); + + } + }); + + for(int i=0;i<10;i++) { + pool.execute(new ThreadTask()); + } +} +} + +public class ThreadTask implements Runnable{ +public void run() { try { //让线程阻塞,使后续任务进入缓存队列 Thread.sleep(1000); System.out.println("ThreadName:"+Thread.currentThread().getName()); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } + +} +} 复制代码 输出结果: + +复制代码 com.hhxx.test.ThreadTask@33909752执行了拒绝策略 com.hhxx.test.ThreadTask@55f96302执行了拒绝策略 com.hhxx.test.ThreadTask@3d4eac69执行了拒绝策略 ThreadName:pool-1-thread-2 ThreadName:pool-1-thread-1 ThreadName:pool-1-thread-1 ThreadName:pool-1-thread-2 ThreadName:pool-1-thread-1 ThreadName:pool-1-thread-2 ThreadName:pool-1-thread-1 复制代码 可以看到由于任务加了休眠阻塞,执行需要花费一定时间,导致会有一定的任务被丢弃,从而执行自定义的拒绝策略; + +三、ThreadFactory自定义线程创建 + +线程池中线程就是通过ThreadPoolExecutor中的ThreadFactory,线程工厂创建的。那么通过自定义ThreadFactory,可以按需要对线程池中创建的线程进行一些特殊的设置,如命名、优先级等,下面代码我们通过ThreadFactory对线程池中创建的线程进行记录与命名 + +复制代码 public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) { //自定义线程工厂 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5), new ThreadFactory() { public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); //线程命名 Thread th = new Thread(r,"threadPool"+r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()); + + for(int i=0;i<10;i++) { + pool.execute(new ThreadTask()); + } +} +} + +public class ThreadTask implements Runnable{ +public void run() { //输出执行线程的名称 System.out.println("ThreadName:"+Thread.currentThread().getName()); } } 复制代码 我们看下输出结果 + +复制代码 线程118352462创建 线程1550089733创建 线程865113938创建 ThreadName:threadPool1550089733 ThreadName:threadPool118352462 线程1442407170创建 ThreadName:threadPool1550089733 ThreadName:threadPool1550089733 ThreadName:threadPool1550089733 ThreadName:threadPool865113938 ThreadName:threadPool865113938 ThreadName:threadPool118352462 ThreadName:threadPool1550089733 ThreadName:threadPool1442407170 复制代码 可以看到线程池中,每个线程的创建我们都进行了记录输出与命名。 + +四、ThreadPoolExecutor扩展 + +ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的, + +1、beforeExecute:线程池中任务运行前执行 + +2、afterExecute:线程池中任务运行完毕后执行 + +3、terminated:线程池退出后执行 + +通过这三个接口我们可以监控每个任务的开始和结束时间,或者其他一些功能。下面我们可以通过代码实现一下 + +复制代码 public class ThreadPool { private static ExecutorService pool; public static void main( String[] args ) throws InterruptedException { //实现自定义接口 pool = new ThreadPoolExecutor(2, 4, 1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(5), new ThreadFactory() { public Thread newThread(Runnable r) { System.out.println("线程"+r.hashCode()+"创建"); //线程命名 Thread th = new Thread(r,"threadPool"+r.hashCode()); return th; } }, new ThreadPoolExecutor.CallerRunsPolicy()) { + + protected void beforeExecute(Thread t,Runnable r) { + System.out.println("准备执行:"+ ((ThreadTask)r).getTaskName()); + } + + protected void afterExecute(Runnable r,Throwable t) { + System.out.println("执行完毕:"+((ThreadTask)r).getTaskName()); + } + + protected void terminated() { + System.out.println("线程池退出"); + } + }; + + for(int i=0;i<10;i++) { + pool.execute(new ThreadTask("Task"+i)); + } + pool.shutdown(); +} +} + +public class ThreadTask implements Runnable{ +private String taskName; public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public ThreadTask(String name) { this.setTaskName(name); } public void run() { //输出执行线程的名称 System.out.println("TaskName"+this.getTaskName()+"---ThreadName:"+Thread.currentThread().getName()); } } 复制代码 我看下输出结果 + +复制代码 线程118352462创建 线程1550089733创建 准备执行:Task0 准备执行:Task1 TaskNameTask0---ThreadName:threadPool118352462 线程865113938创建 执行完毕:Task0 TaskNameTask1---ThreadName:threadPool1550089733 执行完毕:Task1 准备执行:Task3 TaskNameTask3---ThreadName:threadPool1550089733 执行完毕:Task3 准备执行:Task2 准备执行:Task4 TaskNameTask4---ThreadName:threadPool1550089733 执行完毕:Task4 准备执行:Task5 TaskNameTask5---ThreadName:threadPool1550089733 执行完毕:Task5 准备执行:Task6 TaskNameTask6---ThreadName:threadPool1550089733 执行完毕:Task6 准备执行:Task8 TaskNameTask8---ThreadName:threadPool1550089733 执行完毕:Task8 准备执行:Task9 TaskNameTask9---ThreadName:threadPool1550089733 准备执行:Task7 执行完毕:Task9 TaskNameTask2---ThreadName:threadPool118352462 TaskNameTask7---ThreadName:threadPool865113938 执行完毕:Task7 执行完毕:Task2 线程池退出 复制代码 可以看到通过对beforeExecute()、afterExecute()和terminated()的实现,我们对线程池中线程的运行状态进行了监控,在其执行前后输出了相关打印信息。另外使用shutdown方法可以比较安全的关闭线程池, 当线程池调用该方法后,线程池中不再接受后续添加的任务。但是,此时线程池不会立刻退出,直到添加到线程池中的任务都已经处理完成,才会退出。 + +五、线程池线程数量 + +线程吃线程数量的设置没有一个明确的指标,根据实际情况,只要不是设置的偏大和偏小都问题不大,结合下面这个公式即可 + +复制代码 /* * Nthreads=CPU数量 * Ucpu=目标CPU的使用率,0<=Ucpu<=1 * W/C=任务等待时间与任务计算时间的比率 / Nthreads = NcpuUcpu(1+W/C) \ No newline at end of file -- Gitee