# concurrency_tools_practice **Repository Path**: adastevy/concurrency_tools_practice ## Basic Information - **Project Name**: concurrency_tools_practice - **Description**: 初始化提交 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-04-24 - **Last Updated**: 2021-07-07 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # 并发编程 - 线程想关的基本理论与工具 - 多线程程序下的性能调优 - 电商场景下多线程的使用 [并发编程大纲思维导图](https://www.processon.com/view/link/6083ce93e401fd45d7086cad) ![并发编程](https://gitee.com/adastevy/img/raw/master/img/20210424155553.png) [并发编程-工具类思维导图](https://www.processon.com/view/link/6083cf680791293ce812ffcf) ![image-20210424155828545](https://gitee.com/adastevy/img/raw/master/img/20210424155828.png) [并发工具分类思维导图](https://www.processon.com/view/link/6083d054e401fd45d7087194) [高性能缓存思维导图](https://www.processon.com/view/link/6083d0aef346fb54941b5375) [线程协作、控制并发流程](https://www.processon.com/view/link/6083d15c7d9c0811840505f9) # 1 多线程 J.U.C ## 1.1 线程 ### 1.1.1 概念 1)回顾线程创建方式 继承Thread 实现Runnable 2)线程的常用方法 ![](https://gitee.com/adastevy/img/raw/master/img/20201021155442.png) 1. sleep(): - Thread.sleep(1000); - 在指定时间内让当前执行的线程暂停执行一段时间,让其他线程有机会继续执行,但不会释放对象锁,也就是说如果有synchronized同步快,其他线程仍然不能访问共享数据,不推荐使用,sleep() 使当前线程进入阻塞状态,在指定时间不会执行。 2. wait(): - 对象的方法,会释放对象锁 - wait()和notify()、notifyAll(),这三个方法用于协调多个线程对共享数据的存取,所以必须在synchronized语句块内使用也就是说,调用wait(),notify()和notifyAll()的任务在调用这些犯法前必须拥有对象锁 - **wait()和notify()、notifyAll()它们都是Object类的方法,而不是Thread类的方法。**谁调用谁等待。 - 当调用某一对象的wait() 方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用 notify() 方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获得锁标志,他们随时准备争夺锁的拥有权,当调用了某个对象的notifyAll() 方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池 1. wait():调用该方法使持有该对象的线程把该对象的控制权交出去,然后处于等待状态 2. notify():调用该方法就会通知某个正在等待这个对象的控制权的线程可以继续运行 3. notifyAll():调用该方法就会通知所有等待这个对象控制权的线程继续运行 3. yield(): - Thread类的静态方法,不会释放对象锁,不抛异常 - yield() 方法和sleep() 方法类似,也不会释放对象锁,它是Thread类的静态方法,区别在于,它没有参数,即yield() 方法只是使当前线程让步,重新回到就绪状态,所以执行yield的线程,有可能在进入到就绪状态后马上又被执行,另外yield方法只能使同优先级或者高优先级的线程得到执行机会,这也和sleep方法不同 4. join(): - Thread 类的对象实例的方法 - Thread t1 = new Thread();t1.join(); - join() 方法会使当前线程等待调用join()**方法的线程结束后才能继续执行** ### 1.1.2 线程的状态 - NEW:刚刚创建,没做任何操作 ```properties Thread thread = new Thread(); System.out.println(thread.getState()); ``` - RUNNABLE:调用run,可以执行,但不代表一定在执行(RUNNING,READY) ```properties thread.start(); System.out.println(thread.getState()); ``` - BLOCKED:抢不到锁 ```java final byte[] lock = new byte[0]; new Thread(new Runnable() { public void run() { synchronized (lock){ try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); Thread thread2 = new Thread(new Runnable() { public void run() { synchronized (lock){ } } }); thread2.start(); Thread.sleep(1000); System.out.println(thread2.getState()); ``` - WAITING ```java Thread thread2 = new Thread(new Runnable() { public void run() { LockSupport.park(); } }); thread2.start(); Thread.sleep(500); System.out.println(thread2.getState()); LockSupport.unpark(thread2); Thread.sleep(500); System.out.println(thread2.getState()); ``` - TIMED_WAITING ```java Thread thread3 = new Thread(new Runnable() { public void run() { try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread3.start(); Thread.sleep(500); System.out.println(thread3.getState()); ``` - TERMINATED ```java //等待1s后再来看 Thread.sleep(1000); System.out.println(thread.getState()); ``` ### 1.1.3 线程池基本概念 根据上面的状态,普通线程执行完,就会进入TERMINATED销毁掉,而线程池就是创建一个缓冲池存放线程,执行结束以后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等候下次任务来临,这使得线程池比手动创建线程有着更多的优势: - 降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗; - 提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行; - 方便线程并发数的管控。因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM - 节省cpu切换线程的时间成本(需要保持当前执行线程的现场,并恢复要执行线程的现场)。 - 提供更强大的功能,延时定时线程池。(Timer vs ScheduledThreadPoolExecutor) 常用线程池类结构,可以通过idea查看到 (查看:ScheduledThreadPoolExecutor,ForkJoinPool类图) ![](https://gitee.com/adastevy/img/raw/master/img/20201021155104.png) 说明: - 最常用的是ThreadPoolExecutor - 调度用ScheduledThreadPoolExecutor - 任务拆分合并用ForkJoinPool - Executors是工具类,协助你创建线程池的 ### 1.1.4 工作机制 在线程池的编程模式下,任务是提交给整个线程池,而不是直接提交给某个线程,线程池在拿到任务后,就在内部协调空闲的线程,如果有,则将任务交给某个空闲的线程。一个线程同时只能执行一个任务,但可以同时向一个线程池提交多个任务。 1)线程池状态 ![](https://gitee.com/adastevy/img/raw/master/img/20201021155622.png) - RUNNING:初始化状态是RUNNING。线程池被一旦被创建,就处于RUNNING状态,并且线程池中的任务数为0。RUNNING状态下,能够接收新任务,以及对已添加的任务进行处理。 - SHUTDOWN:SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。 ```java //shutdown后不接受新任务,但是task1,仍然可以执行完成 ExecutorService poolExecutor = Executors.newFixedThreadPool(5); poolExecutor.execute(new Runnable() { public void run() { try { Thread.sleep(1000); System.out.println("finish task 1"); } catch (InterruptedException e) { e.printStackTrace(); } } }); poolExecutor.shutdown(); poolExecutor.execute(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("ok"); ``` - STOP:不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。调用线程池的shutdownNow()接口时,线程池由(RUNNING 或 SHUTDOWN ) -> STOP 注意:运行中的任务还会打印,直到结束,因为调的是Thread.interrupt ```properties //改为shutdownNow后,任务立马终止,sleep被打断,新任务无法提交,task1停止 poolExecutor.shutdownNow(); ``` - TIDYING:所有的任务已终止,队列中的”任务数量”为0,线程池会变为TIDYING。线程池变为TIDYING状态时,会执行钩子函数terminated(),可以通过重载terminated()函数来实现自定义行为 ```java package com.itheima.thread.demo; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; //自定义类,重写terminated方法 public class MyExecutorService extends ThreadPoolExecutor { public MyExecutorService(int corePoolSize, int maximumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void terminated() { super.terminated(); System.out.println("treminated"); } //调用 shutdownNow, ternimated方法被调用打印 public static void main(String[] args) throws InterruptedException { MyExecutorService service = new MyExecutorService(1,2,10000,TimeUnit.SECONDS,new LinkedBlockingQueue(5)); service.shutdownNow(); } } ``` - TERMINATED:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED 2)结构说明 (源码查看:两个集合,一个queue,一个hashset) ![](https://gitee.com/adastevy/img/raw/master/img/20201021170017.png) 3)任务的提交 - 添加任务,如果线程池中线程数没达到coreSize,直接创建新线程执行 - 达到core,放入queue - queue已满,未达到maxSize继续创建线程 - 达到maxSize,根据reject策略处理 - 超时后,线程被释放,下降到coreSize ### 1.1.5 创建和停止线程池 1. 线程池的构造函数的参数 ![线程池构造函数的参数 ](https://gitee.com/adastevy/img/raw/master/img/20210329163547.png) - corePoolSize: corePoolSize指的是核心线程数:线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待任务到来,再去创建新线程去执行任务。 - maxPoolSize: maxPoolSize最大线程数:线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是最大量`maxPoolSize` - keepAliveTime: 如果线程池当前的线程数多余corePoolSize,那么如果多余的线程`空闲时间超过`keepAliveTime,它们就会被终止。 - threadFactory 新的线程是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory(),创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程,如果自己指定ThreadFactory,那么就可以`改变线程名`、`线程组`、`优先级`、是否是守护线程等。 通常我们用默认的ThreadFactory就可以了。 - workQueue 工作队列 - 1)直接交换:SynchronousQueue SynchronousQueue内部并没有容量,只是把任务简单的中转,交到线程去处理,这个时候我们的maxPoolSize就要设置的大一点,因为没有队列作为缓冲。 - 2)无界队列:LinkedBlockingQueue LinkedBlockingQueue当coreSize线程数都在忙,新的任务就会放到无界队列中,这个时候maxPoolSize就会失去作用。这个时候确实可以防止我们流量突增,风险就是队列会越来越大,造成OOM异常。 - 3)有界队列:ArrayBlockingQueue 可以设置队列大小,maxPoolSize就会有作用。 ![corePoolSize和maxPoolSize ](https://gitee.com/adastevy/img/raw/master/img/20210330111338.png) - 添加线程的规则: 1. 如果线程数小于`corePoolSize`,即使其他工作线程处于空闲状态,也会创建一个新的线程来运行新任务。 2. 如果线程数等于(或大于)`corePoolSize`但少于maxPoolSize,则将任务放入队列。 3. 如果队列已满,并且线程数小于maxPoolSize,则创建一个新线程来运行任务。 ![线程池添加线程规则 ](https://gitee.com/adastevy/img/raw/master/img/20210330111443.png) 增减线程的特点: 1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池; 2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它; 3. 通过设置maximumPoolSize为很高的值,例如Integer.MAX_VALUE,可以允许线程池容纳任意数量的并发任务。 4. 是只有在队列填满时才创建多余corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。 1. 线程池应该手动创建还是自动创建 自动创建使用的时候非常方便,但是有一些弊端,一般手动创建 手动创建更好,因为这样可以让我们更加明确线程池的运行规则,避免资源耗尽的风险。 2. 线程池里的线程数量设定为多少比较合适? - CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的1-2倍 - 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考Brain Goetz推介的计算方法: 线程数=CPU核心数*(1+平均等等时间/平均工作时间)--压测 3. 停止线程池的正确方法 1. shutdown 不接受新的任务,存量的任务会执行 ```java //执行shutdown()方法该方法返回true 并不代表线程池停止 表示开始停止 executorService.isShutdown(); // 方法该方法返回true 代表线程池完全停止 executorService.isTerminated() ``` 2. awaitTermination() 等待一段时间返回线程的终止结果 ```java //代表七秒钟期间 线程完全停止 返回true boolean b = executorService.awaitTermination(7, TimeUnit.SECONDS); ``` 3. shutdownNow() ```java //立即终止线程池 并返回池中的任务 List runnables = executorService.shutdownNow(); ``` 4. 线程池任务太多,怎么拒绝? 拒绝时机: - 1.当Executor关闭,提交新任务会被拒绝 - 2.以及当Executor对最大线程和工作队列容量使用有界并以及饱和时。 5. 4种拒绝策略 - AbortPolicy 直接抛出一个异常 - DiscardPolicy 新的会被默默的丢弃 - DiscardOldestPolicy 丢弃最老的 - CallerRunnsPolicy 谁提交的谁去跑 让主线程去运行 6. 钩子方法,给线程池加点料 - 每个任务执行前后 日志,统计 ```java /** * 描述: 演示每个任务执行前后放钩子函数 */ public class PauseableThreadPool extends ThreadPoolExecutor { private final ReentrantLock lock = new ReentrantLock(); private Condition unpaused = lock.newCondition(); private boolean isPaused; @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); lock.lock(); try { while (isPaused) { unpaused.await(); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * *暂停线程池 */ private void pause() { lock.lock(); try { isPaused = true; } finally { lock.unlock(); } } /** * *唤醒线程池 */ public void resume() { lock.lock(); try { isPaused = false; unpaused.signalAll(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10l,TimeUnit.SECONDS, new LinkedBlockingQueue<>()); Runnable runnable = new Runnable() { @Override public void run() { System.out.println("我被执行"); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } }; for (int i = 0; i < 10000; i++) { pauseableThreadPool.execute(runnable); } Thread.sleep(1500); pauseableThreadPool.pause(); System.out.println("线程池被暂停了"); Thread.sleep(1500); pauseableThreadPool.resume(); System.out.println("线程池被恢复了"); } } ``` ### 1.1.6 源码剖析 ```JAVA //任务提交阶段:(4个if条件路线 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();//工作数 一个状态。一个个数 //判断工作数,如果小于coreSize,addWork,注意第二个参数core=true if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } //否则,如果线程池还在运行,offer到队列 if (isRunning(c) && workQueue.offer(command)) { //再检查一下状态 int recheck = ctl.get(); //如果线程池已经终止,直接移除任务,不再响应 if (! isRunning(recheck) && remove(command)) reject(command); //否则,如果没有线程干活的话,创建一个空work,该work会从队列获取任务去执行 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //队列也满,继续调addWork,但是注意,core=false,开启到maxSize的大门 //超出max的话,addWork会返回false,进入reject else if (!addWorker(command, false)) reject(command); } ``` ```java //线程创建 private boolean addWorker(Runnable firstTask, boolean core) { //第一步,计数判断,不符合条件打回false retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); //判断线程数,注意这里! //也就说明线程池的线程数是不可能设置任意大的。 //最大29位(CAPACITY=29位二进制) if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //第二步,创建新work放入线程集合works(一个HashSet) boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //符合条件,创建新的work并包装task w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //在这里!!! workers 是一个HashSet workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //注意,只要是成功add了新的work,那么将该新work立即启动,任务得到执行 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } ``` 任务获取与执行 ```java final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //在worker执行runWorker()的时候,不停循环,先查看自己有没有携带Task,如果有,执行 while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } } ``` 从队列获取任务 ```java //如果没用,会调用getTask,从队列获取任务 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? //判断是不是要超时处理,重点!!!决定了当前线程要不要被释放 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //线程数超出max,并且上次循环中poll等待超时了,那么说明该线程已终止 //将线程队列数量原子性减 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //重点!!! //如果线程可被释放,那就poll,释放的时间为:keepAliveTime //否则,线程是不会被释放的,take一直被阻塞在这里,知道来了新任务继续工作 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //到这里说明可被释放的线程等待超时,已经销毁,设置该标记,下次循环将线程数减少 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } } ``` 完整流程回顾: ![](https://gitee.com/adastevy/img/raw/master/img/20201021180630.png) ### 1.1.7 注意点 1)线程池是如何保证线程不被销毁的呢? 答案:如果队列中没有任务时,核心线程会一直阻塞在获取任务的方法,直到返回任务。而任务执行完后,又会进 入下一轮 work.runWork()中循环 验证:秘密就藏在核心源码里 ThreadPoolExecutor.getTask() ```java //work.runWork(): while (task != null || (task = getTask()) != null) //work.getTask(): boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); ``` 2)那么线程池中的线程会处于什么状态? 答案:TIMED_WAITING,RUNNABLE,WAITING 验证:起一个线程池,放置一个任务sleep,debug查看结束前后的状态 ```java //debug add watcher: ((ThreadPoolExecutor) poolExecutor).workers.iterator().next().thread.getState() ``` ```java ThreadPoolExecutor poolExecutor = Executors.newFixedThreadPool(5); poolExecutor.execute(new Runnable() { public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("ok"); ``` 3)核心线程与非核心线程有区别吗? 答案:没有。被销毁的线程和创建的先后无关。即便是第一个被创建的核心线程,仍然有可能被销毁 验证:看源码,每个works在runWork的时候去getTask,在getTask内部,并没有针对性的区分当前work是否是核心线程或者类似的标记。只要判断works数量超出core,就会调用poll(),否则take() ### 1.1.8 Executors工具 以上构造函数比较多,为了方便使用,提供了一个Executors工具类 1)newCachedThreadPool() : 弹性线程数(有任务就开线程) - 可以缓存 - 特点:无界线程池,具有自动回收多余线程的功能。 ```java //一定时间后会回收线程,默认的时间是60m new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()) ``` 2)newFixedThreadPool(int nThreads) : 固定线程数 - 由于传进去的LinkedBlockingQueue是没有容量上限的,所以当请求数越来越多,并且无法即使处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM。 ```java new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) ``` 3)newSingleThreadExecutor() : 单一线程数 - 可以看出,这里和刚才的newFixedThreadPool的原理基本一样,只不过把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存。 ```java new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) ``` 4)newScheduledThreadPool(int corePoolSize) : 可调度,常用于定时(支持定时及周期性任务执行的线程池) ```java // super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, // new DelayedWorkQueue()); // DelayedWorkQueue 延时队列 ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10); //5秒后执行 // threadPool.schedule(new Task(), 5, TimeUnit.SECONDS); //以一定频率重复运行 1秒后运行 3秒为一个周期 threadPool.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS); ``` 以上4种线程池的构造函数的参数: | Parameter | FixedThreadPool | CachedThreadPool | ScheduledThreadPool | SingleThreadExecutor | | ------------- | ---------------- | ----------------- | ------------------- | -------------------- | | coreSize | constructor-arg | 0 | constructor-arg | 1 | | maxPoolSize | same as coreSize | Integer.MAX_VALUE | Integer.MAX_VALUE | 1 | | KeepAliveTime | 0 seconds | 60 seconds | 0 seconds | 0 seconds | ### 1.1.9 线程池的状态 - RUNNING:接受新任务并处理排队任务。 - SHUTDOWN:不接受新任务,但处理排队任务。 - STOP:不接受新任务,也不处理排队任务,并中断正在进行的任务。 - TIDYING,中文是整洁,理解了中文的就容易理解这个状态了:所有任务都已经终止,workerCount为零时,线程会转换到TIDYING状态,并将运行terminate()钩子方法。 - TERMINATED:terminate()运行完成。 ```java private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; ``` ### 1.1.10 使用线程池的主意点 - 避免任务堆积 - 避免线程数过度增加 - 排查线程泄露 ## 1.2 锁 ### 1.2.1 概述 [java中的锁分类](https://www.processon.com/view/link/6083ce770791293ce812fd57) 锁是一种互斥的机制,在多线程环境中实现对资源的协调与控制,凡是有资源被多线程共享,涉及到你改我改的情况就要考虑锁的加持。从一个案例看起,在写代码的时候,不注意往往会遇到以下代码... 1) 糟糕的实现 ```java package com.itheima.thread.lock; import java.util.concurrent.locks.ReentrantLock; public class BadCounter { private static int i=0; //ReentrantLock lock = new ReentrantLock(); public int get(){ return i; } public void inc(){ //lock.lock(); int j=get(); try { Thread.sleep(100); j++; i=j; } catch (InterruptedException e) { e.printStackTrace(); } finally { //lock.unlock(); } } public static void main(String[] args) throws InterruptedException { final BadCounter counter = new BadCounter(); //不使用线程10次,对比使用线程10次,看结果 for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { counter.inc(); } }).start(); } Thread.sleep(3000); //理论上10才对。可是.... System.out.println(counter.i); } } ``` 出问题了.... 在遍地spring bean的年代,单例模式下的类变量尤其要注意! ### 1.2.2 实现方式 1)synchronized ```java //加synchronized,再测试 public synchronized void inc() ``` 2)Lock ```java //换lock方式测试 Lock lock = new ReentrantLock(); public void inc() { lock.lock(); //... lock.unlock(); } ``` 无论哪种方式加锁均能实现正确计数,但是这个性能实在是感人,后面调优还会提到。 #### 1.2.2.1 Lock简介 - Lock接口最常见的实现类是ReentrantLock - 通常情况下,Lock只允许一个线程来访问这个共享资源。不过有些时候,一些特殊的实现也允许并发范文,比如ReadWriteLock里面的ReadLock Lock中声明了四个方法来获取锁 - lock() - lock()就是最普遍的获取锁。如果锁已经被其他线程获取,则进行等待。 - lock()不会像synchronized一样在异常时自动释放锁。 - 最佳实现就是,在finally中释放锁,以保证发生异常时锁一定被释放。 - lock()方法不能被中断,这会带来很多的隐患:一旦陷入死锁,lock()就会陷入永久等待。 ```java /** * 描述: Lock不会像synchronized一样,异常的时候自动释放锁,所 * 以最佳实践是,finally中释放锁,以便保证发生异常的时候锁一定被释放 */ public class MustUnlock { private static Lock lock = new ReentrantLock(); public static Lock lock2 = new ReentrantLock(); public static void main(String[] args) { lock.lock(); try{ //获取本锁保护的资源 System.out.println(Thread.currentThread().getName()+"开始执行任务"); }finally { lock.unlock(); } } } ``` - tryLock() - tryLock()用来尝试获取锁,如果当前锁没有被其他线程占用,则获取成功,返回true,否则返回false,代表获取锁失败。 - 相比于lock,这样的显然功能更强大了,我们可以根据是否能获取到锁来决定程序的行为。 - 该方法会立即返回,即便在拿不到锁时不会一直等待。 - tryLock(Long time,TimeUnit unit) - tryLock(Long time,TimeUnit unit):超时就放弃 ```java package lock.lock; import java.util.Random; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 描述: 用tryLock来避免死锁 */ public class TryLockDeadlock implements Runnable { int flag = 1; static Lock lock1 = new ReentrantLock(); static Lock lock2 = new ReentrantLock(); public static void main(String[] args) { TryLockDeadlock r1 = new TryLockDeadlock(); TryLockDeadlock r2 = new TryLockDeadlock(); r1.flag = 1; r1.flag = 0; new Thread(r1).start(); new Thread(r2).start(); } @Override public void run() { for (int i = 0; i < 100; i++) { if (flag == 1) { try { //800秒 有可能被中断 被中断就不用执行下面的语句了,直接执行catch语句 if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) { //执行获取到锁的逻辑 try { System.out.println("线程1获取到了锁1"); Thread.sleep(new Random().nextInt(1000)); if (lock2.tryLock(800, TimeUnit.MILLISECONDS)) { try { System.out.println("线程1获取到了锁2"); System.out.println("线程1成功获取到了两把锁"); break; } finally { lock2.unlock(); } } else {// System.out.println("线程1获取锁2失败,已重试"); } } finally { lock1.unlock(); Thread.sleep(new Random().nextInt(1000)); } } else { System.out.println("线程1获取锁1失败,已重试"); } } catch (InterruptedException e) { e.printStackTrace(); } } if (flag == 0) { try { if (lock2.tryLock(3000, TimeUnit.MILLISECONDS)) { try { System.out.println("线程2获取到了锁2"); Thread.sleep(new Random().nextInt(1000)); if (lock1.tryLock(800, TimeUnit.MILLISECONDS)) { try { System.out.println("线程2获取到了锁1"); System.out.println("线程2成功获取到了两把锁"); break; } finally { lock1.unlock(); } } else { System.out.println("线程2获取锁1失败,已重试"); } } finally { lock2.unlock(); Thread.sleep(new Random().nextInt(1000)); } } else { System.out.println("线程2获取锁2失败,已重试"); } } catch (InterruptedException e) { e.printStackTrace(); } } } } } ``` - lockInterruptibly() - lockInterruptibly():相当于tryLock(Long time,TimeUnit unit)把超时时间设置为无限。在等待的过程中,线程可以被中断。 ```java /** * 描述: TODO */ public class LockInterruptibly implements Runnable { private Lock lock = new ReentrantLock(); public static void main(String[] args) { LockInterruptibly lockInterruptibly = new LockInterruptibly(); Thread thread0 = new Thread(lockInterruptibly); Thread thread1 = new Thread(lockInterruptibly); thread0.start(); thread1.start(); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } thread1.interrupt(); } @Override public void run() { System.out.println(Thread.currentThread().getName() + "尝试获取锁"); try { lock.lockInterruptibly(); try { System.out.println(Thread.currentThread().getName() + "获取到了锁"); Thread.sleep(5000); } catch (InterruptedException e) { //睡眠期间被打断 System.out.println(Thread.currentThread().getName() + "睡眠期间被中断了"); } finally { lock.unlock(); System.out.println(Thread.currentThread().getName() + "释放了锁"); } } catch (InterruptedException e) { //等待锁期间被打断 System.out.println(Thread.currentThread().getName() + "获得锁期间被中断了"); } } } ``` #### 1.2.2.2为什么要用Lock 为什么synchronized不够用? 1. 效率低:锁的释放情况少、试图获得锁时不能设定超时、不能中断一个正在试图获得锁的线程 - 两种情况释放锁 - 第一情况是正常执行完 - 第二种情况是发生异常,JVM自动将锁释放。 2. 不够灵活(读写锁更灵活):加锁和释放的时机单一,每个锁仅有单一的条件(某一个对象),可能不够用的。 3. 无法知道是否成功获取到锁。(我申请的时候无法知道能否获取到锁)。 #### 1.2.2.3 可见性的保证 - happens-before happens-before原则,如果我们这件事发生了,其他线程一定看到我做的修改,就代表他们有happens-before - lock的加解锁和synchronized有同样的内存语义,也就是说,下一个线程加锁后可以看到所有前一个线程解锁前发生的所有操作。 ![可见性](https://gitee.com/adastevy/img/raw/master/img/20210401100205.png) 线程A先于B获取到锁,A的修改的B可见 ### 1.2.3 锁的分类及详解 ReentrantLock即使互斥锁,又是可重入锁。 ![锁分类](https://gitee.com/adastevy/img/raw/master/img/20210401101803.png) #### 1)乐观锁/悲观锁 乐观锁也称为**非互斥同步锁** -->原子类、并发容器 悲观锁也称为**互斥同步锁** -->synchronized和Lock接口 互斥同步锁的劣势: - 阻塞和唤醒带来的性能劣势 - 永久阻塞:如果持有锁的线程被永久阻塞,比如遇到了无限循环、死锁等活跃性问题,那么等待该线程释放锁的线程将永远得不到执行。 - 优先级反转 乐观锁/悲观锁的适用场景: - 悲观锁:适合并发写入多的情况,适用于临界区持锁时间比较长的情况,悲观锁可以避免大量的无用自旋等消耗,典型的情况: - 临界区有IO操作 - 临界区代码复杂或者循环量大 - 临界区竞争非常激烈 - 乐观锁:适合并发写入少,大部分是读取的场景,不加锁的能让读取性能大幅提高。 乐观锁顾名思义,很乐观的认为每次读取数据的时候总是认为没人动过,所以不去加锁。但是在更新的时候回去对比一下原来的值,看有没有被别人更改过。适用于读多写少的场景。 mysql中类比version号更新 update xxx set a=aaa where id=xx and version=1 java中的atomic包属于乐观锁实现,即CAS(下节会详细介绍) 悲观锁在每次读取数据的时候都认为其他人会修改数据,所以读取数据的时候也加锁,这样别人想拿的时候就会阻塞,直到这个线程释放锁,这就影响了并发性能。适合写操作比较多的场景。 mysql中类比for select xxx for update; update update xx set a = aaa 案例中synchronized实现就是悲观锁(1.6之后优化为锁升级机制),悲观锁书写不当很容易影响性能(性能部分会讲到) 开销对比: - 悲观锁的原始开销要高于乐观锁,但特点是一劳永逸,临界区持锁时间就算越来越差,也不会对互斥锁的开销造成影响 - 相反,虽然乐观锁一开始的开销比悲观锁小,但是如果自旋时间很长或者不停重试,那么消耗的资源也会越来越多。 #### 2)独享锁/共享锁 - 排它锁,又称为独占锁、独享锁 - 共享锁,又称为读锁,获得共享锁之后,可以查看但无法修改和删除数据,其他线程此时也可获取到共享锁,也可以查看但无法修改和删除数据。 - 共享锁和排它锁的典型是读写锁ReentrantReadWriteLock,其中读锁是共享锁,写锁是独占锁。 很好理解,独享锁是指该锁一次只能被一个线程所持有,而共享锁是指该锁可被多个线程所持有。 读写锁的规则 - a) 多个线程只申请读锁,都可以申请到。 - b)如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁 - c)如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。 - d)一句话总结:要么是一个或多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现(要么多读,要么一写)。 读写锁只是一把锁,可以通过两种方式锁定:读锁定和写锁定,读写锁可以同时被一个或多个线程读锁定,也可以被单一线程写锁定,但是永远不能同时对这把锁进行读锁定和写锁定。(要么多读要么一写) ```java package lock.readwrite; import java.util.concurrent.locks.ReentrantReadWriteLock; /** * 描述: TODO */ public class CinemaReadWrite { private static ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); private static ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); private static ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); private static void read() { readLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到了读锁,正在读取"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + "释放读锁"); readLock.unlock(); } } private static void write() { writeLock.lock(); try { System.out.println(Thread.currentThread().getName() + "得到了写锁,正在写入"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println(Thread.currentThread().getName() + "释放写锁"); writeLock.unlock(); } } public static void main(String[] args) { new Thread(() -> read(), "Thread1").start(); new Thread(() -> read(), "Thread2").start(); new Thread(() -> write(), "Thread3").start(); new Thread(() -> write(), "Thread4").start(); } } ``` ![读写锁](https://gitee.com/adastevy/img/raw/master/img/20210407201554.png) 案例一:ReentrantLock,独享锁 ```java package com.itheima.thread.lock; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class PrivateLock { Lock lock = new ReentrantLock(); //读写锁 ReentrantReadWriteLock lock2 = new ReentrantReadWriteLock(); long start = System.currentTimeMillis(); void read() { lock2.writeLock().lock(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }finally { lock2.writeLock().unlock(); } System.out.println("read time = "+(System.currentTimeMillis() - start)); } public static void main(String[] args) { final PrivateLock lock = new PrivateLock(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { lock.read(); } }).start(); } } } ``` 结果分析:每个线程结束的时间点逐个上升,锁被独享,一个用完下一个,依次获取锁 ![](https://gitee.com/adastevy/img/raw/master/img/20201021181311.png) 案例二:ReadWriteLock,read共享,write独享 ```java package com.jeaw.health.controller; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class SharedLock { ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); Lock lock = readWriteLock.readLock(); long start = System.currentTimeMillis(); void read() { lock.lock(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } System.out.println("end time = " + (System.currentTimeMillis()-start)); } public static void main(String[] args) { final SharedLock lock = new SharedLock(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { lock.read(); } }).start(); } } } ``` 结果分析:每个线程独自跑,各在100ms左右,证明是共享的 ![](https://gitee.com/adastevy/img/raw/master/img/20201022023406.png) 案例三:同样是上例,换成writeLock ```java Lock lock = readWriteLock.writeLock(); ``` 结果分析:恢复到了1s时长,变为独享 ![](https://gitee.com/adastevy/img/raw/master/img/20201022023453.png) 小节: - 读锁的共享锁可保证并发读是非常高效的,读写,写读 ,写写的过程是互斥的。 - 独享锁与共享锁也是通过AQS来实现的,通过实现不同的方法,来实现独享或者共享。 读写锁的插队策略: - 公平锁:不允许插队 - 非公平锁 - 写锁可以随时插队 - 读锁仅在等待队列节点不是想获取写锁的线程的时候可以插队。 #### 3)分段锁 从Map一家子说起.... HashMap是线程不安全的,在多线程环境下,使用HashMap进行put操作时,可能会引起死循环,导致CPU利用 率接近100%,所以在并发情况下不能使用HashMap。 于是有了HashTable,HashTable是线程安全的。但是HashTable线程安全的策略实在不怎么高明,将get/put等所有相关操作都整成了synchronized的。 ![](https://gitee.com/adastevy/img/raw/master/img/20201022023556.png) 那有没有办法做到线程安全,又不这么粗暴呢?基于分段锁的ConcurrentHashMap诞生... ConcurrentHashMap使用Segment(分段锁)技术,将数据分成一段一段的存储,Segment数组的意义就是将一 个大的table分割成多个小的table来进行加锁,Segment数组中每一个元素一把锁,每一个Segment元素存储的是 HashEntry数组+链表,这个和HashMap的数据存储结构一样。当访问其中一个段数据被某个线程加锁的时候,其 他段的数据也能被其他线程访问,这就使得ConcurrentHashMap不仅保证了线程安全,而且提高了性能。 但是这也引来一个负面影响:ConcurrentHashMap 定位一个元素的过程需要进行两次Hash操作,第一次 Hash 定位到 Segment,第二次 Hash 定位到元素所在的链表。所以 Hash 的过程比普通的 HashMap 要长。 ![](https://gitee.com/adastevy/img/raw/master/img/20201022023655.png) 备注:JDK1.8ConcurrentHashMap中抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。 #### 4)可重入锁 ![可重入锁和不可重入锁源码对比](https://gitee.com/adastevy/img/raw/master/img/20210401162919.png) ```java final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState();//维护了上锁的次数 if (c == 0) { //没有任何人持有它 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } ``` ```java protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } ``` 可重入锁指的获取到锁后,如果同步块内需要再次获取同一把锁的时候,直接放行,而不是等待。其意义在于防止 死锁。前面使用的synchronized 和ReentrantLock 都是可重入锁。 实现原理实现是通过为每个锁关联一个请求计数器和一个占有它的线程。如果同一个线程再次请求这个锁,计数器 将递增,线程退出同步块,计数器值将递减。直到计数器为0锁被释放。 场景见于父类和子类的锁的重入(调super方法),以及多个加锁方法的嵌套调用。 案例一:父子可重入 ```java package com.itheima; public class ParentLock { byte[] lock = new byte[0]; public void f1(){ synchronized (lock){ System.out.println("f1 from parent"); } } } ``` ```java package com.itheima; public class SonLock extends ParentLock { public void f1() { synchronized (super.lock){ super.f1(); System.out.println("f1 from son"); } } public static void main(String[] args) { SonLock lock = new SonLock(); lock.f1(); } } ``` 案例二:内嵌方法可重入 ```java package com.itheima; public class NestedLock { public synchronized void f1(){ System.out.println("f1"); } public synchronized void f2(){ f1(); System.out.println("f2"); } public static void main(String[] args) { NestedLock lock = new NestedLock(); //可以正常打印 f1,f2 lock.f2(); } } ``` 案例三:不可重入锁的典型错误,不要这么做!!! 先看代码,猜一猜结果? ```java package com.jeaw.health.controller; public class BadLock { Lock lock = new Lock(); public void f1() { System.out.println("f1"); lock.lock(); f2(); lock.unlock(); } public void f2() { lock.lock(); System.out.println("f2"); lock.unlock(); } public static void main(String[] args) { BadLock badLock = new BadLock(); //理论上,会打印 f1 和 f2 //实际上,这个错误的设计会导致卡死在f1 badLock.f1(); } //自定义的锁,现实中不要这么做!!! class Lock { private boolean isLocked = false; public synchronized void lock() { try { //想要拿锁,一直判断标记,如果被占就wait等待 while (isLocked) { wait(); } } catch (InterruptedException e) { e.printStackTrace(); } //一旦被唤醒,退出while了,自己拿到锁,将标记改为true(已占用) isLocked = true; } public synchronized void unlock() { //占用标记改成false isLocked = false; //同时唤醒等待锁的线程 notify(); } } } ``` ReentrantLock的方法介绍 - getHoldCount() 查询当前线程对此锁的持有数量。 - isHeldByCurrentThread 可以看出锁是否被当前线程持有。 - getQueueLength 可以返回当前正在等待这把锁的队列有多长,一般这两个方法是开发和 #### 5)公平锁/非公平锁 ![对比公平锁和非公平锁源码](https://gitee.com/adastevy/img/raw/master/img/20210401162938.png) **基本概念:** 常见于AQS,公平锁就是在并发环境中,每个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前 线程是等待队列的第一个,就占有锁,否则就会加入到等待队列中,直到按照FIFO的规则从队列中取到自己。 非公平锁与公平锁基本类似,只是在放入队列前先判断当前锁是否被线程持有。如果锁空闲,那么他可以直接抢 占,而不需要判断当前队列中是否有等待线程。只有锁被占用的话,才会进入排队。 在现实中想象一下游乐场旋转木马插队现象...... **优缺点:** 公平锁的优点是等待锁的线程不会饿死,进入队列规规矩矩的排队,迟早会轮到。缺点是整体吞吐效率相对非公平 锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。 非公平锁的性能要高于公平锁,因为线程有几率不阻塞直接获得锁。ReentrantLock默认使用非公平锁就是基于性 能考量。但是非公平锁的缺点是可能引发队列中的线程始终拿不到锁,一直排队被饿死。 **编码方式:** 很简单,ReentrantLock支持创建公平锁和非公平锁(默认),想要实现公平锁,使用new ReentrantLock(true)。 **背后原理:** AQS,后面还会详细讲到。AQS中有一个state标识锁的占用情况,一个队列存储等待线程。 state=0表示锁空闲。如果是公平锁,那就看看队列有没有线程在等,有的话不参与竞争乖乖追加到尾部。如果是 非公平锁,那就直接参与竞争,不管队列有没有等待者。 state>0表示有线程占着锁,这时候无论公平与非公平,都直接去排队(想抢也没有) 备注: 因为ReentrantLock是可以定义公平非公平锁,次数。所以是>0而不是简单的0和1而synchronized只能是非公平锁 共享锁和排它锁的总结: - ReentrantReadWriteLock实现了ReadWriteLock接口,最主要的有两个方法:readLock()和writeLock()来获取读锁和写锁。 - 所得申请和释放策略 - 多个线程只能申请读锁,都可以申请到。 - 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。 - 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。 - 要么是一个或者多个线程同时有读锁,要么是一个线程有写锁,但是两者不会同时出现。 **总结:要么多读,要么一写** #### 6)锁升级 java中每个对象都可作为锁,锁有四种级别,按照量级从轻到重分为:无锁、偏向锁、轻量级锁、重量级锁。 如何理解呢?A占了锁,B就要阻塞等。但是,在操作系统中,阻塞就要存储当前线程状态,唤醒就要再恢复,这 个过程是要消耗时间的... 如果A使用锁的时间远远小于B被阻塞和挂起的执行时间,那么我们将B挂起阻塞就相当的不合算。 于是出现自旋:自旋指的是锁已经被其他线程占用时,当前线程不会被挂起,而是在不停的试图获取锁(可以理解 为不停的循环),每循环一次表示一次自旋过程。显然这种操作会消耗CPU时间,但是相比线程下文切换时间要少 的时候,自旋划算。 而偏向锁、轻量锁、重量锁就是围绕如何使得cpu的占用更划算而展开的。 ```properties 举个生活的例子,假设公司只有一个会议室(共享资源) 偏向锁: 前期公司只有1个团队,那么什么时候开会都能满足,就不需要询问和查看会议室的占用情况,直接进入使用状态。会议 室门口挂了个牌子写着A使用,A默认不需要预约(ThreadID=A) 轻量级锁: 随着业务发展,扩充为2个团队,B团队肯定不会同意A无法无天,于是当AB同时需要开会时,两者竞争,谁抢到谁算谁 的。偏向锁升级为轻量级锁,但是未抢到者在门口会不停敲门询问(自旋,循环),开完没有?开完没有? 重量级锁: 后来发现,这种不停敲门的方式很烦,A可能不理不睬,但是B要不停的闹腾。于是锁再次升级。 如果会议室被A占用,那么B团队直接闭嘴,在门口安静的等待(wait进入阻塞),直到A用完后会通知B(notify)。 ``` 注意点: - 上面几种锁都是JVM自己内部实现,我们不需要干预,但是可以配置jvm参数开启/关闭自旋锁、偏向锁。 - 锁可以升级,但是不能反向降级:偏向锁→轻量级锁→重量级锁 - 无锁争用的时候使用偏向锁,第二个线程到了升级为轻量级锁进行竞争,更多线程时,进入重量级锁阻塞 偏重场景 | 锁 | 优点 | 缺点 | 适用场景 | | -------- | ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------------------------------------------------ | | 偏向锁 | 加锁和解锁不需要CAS操作,没有额外的性能消耗,和执行非同步方法相比仅存在纳秒级的差距 | 若线程间存在锁竞争,会带来额外的锁撤销的消耗 | 只有一个线程访问同步块或者同步方法 | | 轻量级锁 | 竞争的线程不会阻塞,提高了程序的响应速度 | 若线程长时间竞争不到锁, 自旋会消耗 CPU 性能 | 线程交替执行同步块或者同步方法,追求响应时间,锁占用时间很短,阻塞还不如自旋的场景 | | 重量级锁 | 线程竞争不使用自旋,不会消耗 CPU | 线程阻塞,响应时间缓慢, 在多线程下,频繁的获取释放锁,会带来巨大的性能消耗 | 追求吞吐量,锁占用时间较长 | 7)互斥锁/读写锁 - 典型的互斥锁:synchronized,ReentrantLock,读写锁:ReadWriteLock 前面都用过了 - 互斥锁属于独享锁,读写锁里的写锁属于独享锁,而读锁属于共享锁 案例:互斥锁用不好可能会失效,看一个典型的锁不住现象! ```java package com.jeaw.health.controller; public class ObjectLock { public static Integer i = 0; public void inc() { synchronized (this) { int j = i; try { Thread.sleep(100); j++; } catch (InterruptedException e) { e.printStackTrace(); } i = j; } } public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { //重点! new ObjectLock().inc(); } }).start(); } Thread.sleep(3000); //理论上10才对。可是.... System.out.println(ObjectLock.i); } } ``` 结果分析:每个线程内都是new对象,所以this不是同一把锁,结果锁不住,输出1 - this,换成static的 i 变量试试? - 换成ObjectLock.class 试试? - 换成String.class - 去掉synchronized块,外部方法上加 static synchronized ### 1.2.4 AQS #### 1)概念 [AQS思维导图](https://www.processon.com/view/link/6083c6a11e0853347fd9ab1e) 首先搞清楚,AbstractQuenedSynchronizer抽象的队列式同步器,是一个抽象类,这个类在java.util.concurrent.locks包。除了java自带的synchronized关键字之外,jdk提供的另外一种锁机制。如果需要自己实现锁的逻辑,可以考虑使用AQS,非常的便捷。 ```java public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable ``` jdk中使用AQS的线程工具类很多,自旋锁、互斥锁、读锁写锁、信号量、通过类继承关系可以轻松查看: ![](https://gitee.com/adastevy/img/raw/master/img/20201022025035.png) #### 2)原理 ![](https://gitee.com/adastevy/img/raw/master/img/20201022025049.png) - AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释 放了锁。它提供了三个方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))来对同步状态state进行操作 - AQS通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时, AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线 程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。 - CLH(Craig,Landin,and Hagersten)队列是一个虚拟的双向队列,即不存在队列实例,使用前后节点和 指针来实现关联。 拓展:AQS中有两个队列,这里的阻塞队列,还有多个ConditionObject维护的条件队列。和 condition的 await/signal有关 ![](https://gitee.com/adastevy/img/raw/master/img/20201022025126.png) #### 3)实现方式 AQS使用了模板设计模式。只需要实现指定的锁获取方法即可,内部的机制AQS已帮你封装好。(AQS源码idea中查看) 需要子类继承AQS,并实现的方法(protected): - tryAcquire(int arg):独占式获取同步状态,其他线程需要等待该线程释放同步状态 - tryRelease(int arg):独占式释放同步状态 - tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败 - tryReleaseShared(int arg):共享式释放同步状态 使用时,调用的是父类的方法(public) - acquire(int arg):独占式获取 - release(int arg):独占式释放 - acquireShared(int arg):共享式获取 - releaseShared(int arg):共享式释放 4)源码分析 ```java package com.jeaw.health.controller; import java.util.concurrent.locks.AbstractOwnableSynchronizer; public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //可共享式获取锁,外部调用,模板模式 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } //需要实现的部分,空protected方法,被上面的对外方法所调用 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //同理,锁的释放,模板模式 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //独占式获取 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //独占式释放 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } } ``` #### 4)场景案例 用AQS自己实现一个锁,最大允许指定数量的线程并行运作。其他排队等候(指定并行数量) ```java package com.jeaw.health.controller; import java.util.concurrent.locks.AbstractQueuedSynchronizer; public class MyLock extends AbstractQueuedSynchronizer { public MyLock(int count) { setState(count); } @Override protected int tryAcquireShared(int arg) { for (; ; ) { int current = getState(); int newCount = current - arg; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } @Override protected boolean tryReleaseShared(int arg) { for (; ; ) { int current = getState(); int newState = current + arg; if (compareAndSetState(current, newState)) { return true; } } } public static void main(String[] args) { final MyLock lock = new MyLock(3); for (int i = 0; i < 30; i++) { new Thread(new Runnable() { public void run() { lock.acquireShared(1); try { Thread.currentThread().sleep(1000); System.out.println("ok"); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.releaseShared(1); } } }).start(); } } } ``` 验证结果:虽然30个一次性start,但是会每1s输出3个ok,达到了并发控制 ### 1.2.4 final关键字 #### 1)是什么是不变性 如果对象在被创建后,状态就不能被修改,那么他就是不可变的 属性被声明为final后,该变量则只能被赋值一次。而且一旦被赋值,final的变量就不能在被改变 #### 2)final的作用 - 类防止被继承、方法防止被重写、变量防止被修改。 - 天生是线程安全的,而不需要额外的同步开销。 #### 3)3种修饰用法:修饰变量、方法、类 - 修饰变量 引用指向不能变,但是内容可以变 - final instance variable(类中的final属性) - 赋值时机 - 第一种是在声明变量的等号右边直接赋值 - 第二种是在构造函数中赋值 - 第三就是在类的初始化代码块中赋值(不常用) - final static variable(类中的 static final属性) - 赋值时机 - 第一种是在声明变量的等号右边直接赋值 - 第二种是static初始化代码块赋值 - final local variable(方法中的 final属性) - 和前两种不同,由于这里的变量是在方法里的,所有没有构造函数,也不存在初始化代码块 - final local variable不规定赋值时机,只要求在使用前必须赋值,这和方法中的非final变量的要求也是一样的 - 修饰方法 - 不允许修饰构造方法 - 不能被重写 - static方法不能被重写 - 修饰类 - 不可以被继承 final的注意点: - final修饰对象的时候,只是对象的引用不可变,而对象本身的属性是可以变的 - final的使用原则:良好的编程习惯(不可修改) 对象不可变的条件: - 对象创建后,其状态就不能修改。 - 所有属性都是final修饰的。 - 对象创建过程中没有发生溢出。 ```java package immutable; import java.util.HashSet; import java.util.Set; /** * 描述: 一个属性是对象,但是整体不可变,其他类无法修改set里面的数据 */ public class ImmutableDemo { private final Set students = new HashSet<>(); //初始化 public ImmutableDemo() { students.add("李小美"); students.add("王壮"); students.add("徐福记"); } //读操作 public boolean isStudent(String name) { return students.contains(name); } } ``` 把变量写在线程内部--栈封闭 - 在方法里新建的局部变量,实际上是存储在每个线程私有的栈空间,而每个栈的空间是不能被其他线程所访问到的,所以不会有线程安全问题,这就是著名的“栈封闭”技术,是“线程封闭”技术的一种情况。 - 方法内的局部变量是安全的,封闭的。 面试题: ```java /** * 描述: TODO */ public class FinalStringDemo1 { public static void main(String[] args) { String a = "wukong2";//引用常量池 final String b = "wukong"; String d = "wukong"; String c = b + 2; //引用常量池 b直接被替换成wukong 编译器优化 a、c指向同一个常量 String e = d + 2; //在堆上创建e System.out.println((a == c)); //ture System.out.println((a == e)); //false } } ``` ```java public class FinalStringDemo2 { public static void main(String[] args) { String a = "wukong2"; final String b = getDashixiong(); String c = b + 2; System.out.println(a == c); //返回false } private static String getDashixiong() { return "wukong"; } } ``` ## 1.3 原子操作(atomic) ### 1.3.1 概念 原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作" 。类比于数据库事务,redis的multi。 - 不可分割 - 一个操作是不可中断的,即便是在多线程的情况下也可以保证。 - java.util.concurrent.atomic - 原子类的作用和锁类似,是为了保证并发情况下线程安全,不过原子类相比于锁,有一定的优势。 - 粒度更细:原子变量可以把竞争范围缩小到变量级别,这是我们可以获得的最细粒度的情况下,通常锁的粒度都要大于原子变量的粒度。 - 效率更高:通常,使用原子类的效率会比使用锁的效率更高,除了高度竞争的情况。 ![6类原子类纵览](https://gitee.com/adastevy/img/raw/master/img/20210408121549.png) ### 1.3.2 CAS Compare And Set(或Compare And Swap),翻译过来就是比较并替换,CAS操作包含三个操作数——内存位置(V)、预期原值(A)、新值(B)。从第一视角来看,理解为:我认为位置 V 应该是 A,如果是A,则将 B 放到这个位置;否则,不要更改,只告诉我这个位置现在的值即可。 计数器问题发生归根结底是取值和运算后的赋值中间,发生了插队现象,他们不是原子的操作。前面的计数器使用加锁方式实现了正确计数,下面,基于CAS的原子类上场.... ![CAS 1](https://gitee.com/adastevy/img/raw/master/img/20210409093957.png) ```java package com.itheima.thread.lock; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; public class AtomicCounter { // private static int i=0; private AtomicInteger i = new AtomicInteger(0); public int get(){ return i.get(); } public void inc(){ // int j=get(); try { Thread.sleep(100); // j++; // i=j; // i.incrementAndGet(); i.getAndIncrement(); } catch (InterruptedException e) { e.printStackTrace(); } finally { } } public static void main(String[] args) throws InterruptedException { final AtomicCounter counter = new AtomicCounter(); //不使用线程10次,对比使用线程10次,看结果 for (int i = 0; i < 10; i++) { new Thread(new Runnable() { public void run() { counter.inc(); } }).start(); } Thread.sleep(3000); //理论上10才对。可是.... System.out.println(counter.i); } } ``` 应用场景: - 乐观锁 - 并发容器 - 原子类 分析Java中是如何利用CAS实现原子操作的 - AtomicInteger加载Unsafe工具,用来直接操作内存数据。 - 用Unsafe来实现底层操作。 - 用volatile修饰value字段,保证可见性。 Unsafe类 - Unsafe是CAS的核心类。Java无法直接访问底层操作系统,而是通过本地(native)方法来访问,不过尽管如此,JVM还是开了一个后门,JDK中有一个类Unsafe,他提供了硬件级别的原则操作。 - valueOffset表示的是变量值在内存中的偏移地址,因为Unsafe就是根据内存偏移地址获取数据的原值的,这样我们就能通过unsafe来实现CAS了。 CAS的缺点 - ABA问题(用乐观锁的时候添加一个版本号,去对比版本号比对比值靠谱一些) - 自旋时间过长 ### 1.3.3 atomic 上面展示了AtomicInteger,关于atomic包,还有很多其他类型: - 基本类型 - AtomicBoolean:以原子更新的方式更新boolean; - AtomicInteger:以原子更新的方式更新Integer; ```java //常用方法 public final int get() // 获取当前的值 public final int getAndSet(int newValue) //获取当前的值,并设置新的值 public final int getAndIncrement() //获取当前的值,并自增 public final int getAndDecrement() //获取当前的值,并自减 public final int getAndAdd(int delta) //获取当前的值,并加上预期的值 ``` - AtomicLong:以原子更新的方式更新Long; - 引用类型 - AtomicReference : 原子更新引用类型。 - AtomicReferenceFieldUpdater :原子更新引用类型的字段。 - AtomicMarkableReference : 原子更新带有标志位的引用类型。 - 数组 - AtomicIntegerArray:原子更新整型数组里的元素。 - AtomicLongArray:原子更新长整型数组里的元素。 - AtomicReferenceArray:原子更新引用类型数组里的元素。 - 字段 - AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。(对普通变量进行升级) - AtomicIntegerFieldUpdater使用的注意点: - 可见范围 (不能是private) - 不支持static(变量加了static修饰会报错) - AtomicLongFieldUpdater:原子更新长整型字段的更新器。 - AtomicStampedReference:原子更新带有版本号的引用类型。 Adder累加器 - java 8引入的,相比是比较新的一个类 - 高并发下LongAdder比AtomicLong效率高,不过本质是空间换时间。 - 竞争激烈的时候,LongAdder把不同线程对应到不同的Cell上进行修改,降低了冲突的概率,是多段锁的理念,提高了并发性。 ```java package atomic; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; /** * 描述: 演示高并发场景下,LongAdder比AtomicLong性能好 */ public class LongAdderDemo { public static void main(String[] args) throws InterruptedException { LongAdder counter = new LongAdder(); ExecutorService service = Executors.newFixedThreadPool(20); long start = System.currentTimeMillis(); for (int i = 0; i < 10000; i++) { service.submit(new Task(counter)); } service.shutdown(); while (!service.isTerminated()) { } long end = System.currentTimeMillis(); System.out.println(counter.sum()); //结果汇总 System.out.println("LongAdder耗时:" + (end - start)); } private static class Task implements Runnable { private LongAdder counter; public Task(LongAdder counter) { this.counter = counter; } @Override public void run() { for (int i = 0; i < 10000; i++) { counter.increment(); } } } } ``` AtomicLong的弊端:线程间的同步 ![AtomicLong的弊端](https://gitee.com/adastevy/img/raw/master/img/20210408154523.png) LongAdder带来的改进和原理 ![LongAdder带来的改进和原理 3](https://gitee.com/adastevy/img/raw/master/img/20210408154624.png) - 在内部,这个LongAdder的实现原理和刚才的AtomicLong是不同的,刚才的AtomicLong的实现原理是,每一次加法都需要做同步,所以在高并发的时候会导致冲突比较多,也就降低了效率。 - 而此时的LongAdder,每个线程会有自己的一个计数器,仅用来自己线程内计数,这样一来就不会和其他线程的计数器干扰 - 如图所示,第一线程的计数器数值,也就是ctr‘,为1的时候,可能线程2的计数器ctr’‘的数值已经是3了,他们之间并不存在竞争关系,所有再加和的过程中,根本不需要同步机制,也不需要刚才的flush和refresh。这里也没有一个公共的counter来给所有的线程统一计数。 LongAdder的原理: - LongAdder引入了分段累加的概念,内部有一个base变量和一个Cell[]数组共同参与计数: - base变量:竞争不激烈,直接累加到该变量上。 - Cell[]数组:竞争激烈,各个线程分散累加到自己的槽Cell[i]中。 AtomicLong和LongAdder的使用场景: - 在低争用下,AtomicLong和LongAdder这两个类具有相似的特性,但是竞争激烈的情况下,LongAdder的预期吞吐量要高得多,但是消耗更多的空间。 - LongAdder适合的场景是统计求和和技术的场景,而且LongAdder基本只提供了add方法,而AtomicLong还具有cas方法。 Accumulator累加器 - accumulator和Adder非常相似,Accumulator就是一个更通用版本的Adder ```java package atomic; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAccumulator; import java.util.stream.IntStream; /** * 描述: 演示LongAccumulator的用法 */ public class LongAccumulatorDemo { public static void main(String[] args) { LongAccumulator accumulator = new LongAccumulator((x, y) -> 2 + x * y, 1); ExecutorService executor = Executors.newFixedThreadPool(8); IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i))); executor.shutdown(); while (!executor.isTerminated()) { } System.out.println(accumulator.getThenReset()); } } ``` ### 1.3.4 注意! 使用atomic要注意原子性的边界,把握不好会起不到应有的效果,原子性被破坏。 案例:原子性被破坏现象 ```java package com.itheima.thread.atomic; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; public class BadAtomic { AtomicInteger i = new AtomicInteger(0); static int j=0; public synchronized void badInc(){ int k = i.incrementAndGet(); try { Thread.sleep(new Random().nextInt(100)); } catch (InterruptedException e) { e.printStackTrace(); } j=k; } public static void main(String[] args) throws InterruptedException { BadAtomic atomic = new BadAtomic(); for (int i = 0; i < 10; i++) { new Thread(()->{ atomic.badInc(); }).start(); } Thread.sleep(3000); System.out.println(atomic.j); System.out.println(atomic.i.get()); } } ``` 结果分析: - 每次都不一样,总之不是10 - 在badInc上加synchronized,问题解决 ## 1.4 ThreadLocal ### 1.4.1 概念 ThreadLocal类并不是用来解决多线程环境下的共享变量问题,而是用来提供线程内部的共享变量。在多线程环境下,可以保证各个线程之间的变量互相隔离、相互独立。 ThreadLocal两大使用场景: - 典型场景一:每个线程需要一个独享的对象(通常是工具类,典型需要使用的类有SimpleDateFormate和Random)。(工具类线程不安全) ```java package threadlocal; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 描述:利用ThreadLocal,给每个线程分配自己的dateFormat对象,保证了线程安全,高效利用内存 * 不共享 dateFormat对象 ,但一个线程会绑定一个ThreadLocal对象 从而可以复用dateFormat */ public class ThreadLocalNormalUsage05 { public static ExecutorService threadPool = Executors.newFixedThreadPool(10); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 1000; i++) { int finalI = i; threadPool.submit(new Runnable() { @Override public void run() { String date = new ThreadLocalNormalUsage05().date(finalI); System.out.println(date); } }); } threadPool.shutdown(); } public String date(int seconds) { //参数的单位是毫秒,从1970.1.1 00:00:00 GMT计时 Date date = new Date(1000 * seconds); //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat dateFormat =ThreadSafeFormatter.dateFormatThreadLocal2.get(); return dateFormat.format(date); } } class ThreadSafeFormatter { //重写initialValue方法 public static ThreadLocal dateFormatThreadLocal = new ThreadLocal() { @Override protected SimpleDateFormat initialValue() { return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } }; //初始化的时候设置SimpleDateFormat对象到ThreadLocal中 public static ThreadLocal dateFormatThreadLocal2 = ThreadLocal .withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); } ``` - 典型场景二:每个线程内需要保存全局变量(例如在拦截器中获取用户信息),可以让不同方法直接使用,避免参数传递的麻烦。 ```java package threadlocal; /** * 描述: 演示ThreadLocal用法2:避免传递参数的麻烦 */ public class ThreadLocalNormalUsage06 { public static void main(String[] args) { new Service1().process(""); } } class Service1 { public void process(String name) { User user = new User("超哥"); UserContextHolder.holder.set(user); new Service2().process(); } } class Service2 { public void process() { User user = UserContextHolder.holder.get(); ThreadSafeFormatter.dateFormatThreadLocal.get(); System.out.println("Service2拿到用户名:" + user.name); new Service3().process(); } } class Service3 { public void process() { User user = UserContextHolder.holder.get(); System.out.println("Service3拿到用户名:" + user.name); UserContextHolder.holder.remove(); } } class UserContextHolder { public static ThreadLocal holder = new ThreadLocal<>(); } class User { String name; public User(String name) { this.name = name; } } ``` ThreadLocal的两个作用: 1. 让某个需要用到的对象在线程间隔离(每个线程都有自己的独立的对象) 2. 在任何方法中都可以轻松获取到该对象。 总结: 场景一:initialValue - 在ThreadLocal第一次get的时候把对象给初始化出来,对象的初始化时机可以由我们控制。 场景二:set - 如果需要保存到ThreadLocal里的对象的生成时机不由我们随意控制,例如拦截器生成的用户信息,用ThreadLocal.set直接放到我们的ThreadLocal中去,以便后续使用。 ThreadLocal带来的好处: - 达到线程安全 - 不需要加锁,提高执行效率 - 更高效的利用内存,节省开销:相比于每个任务都新建一个SimpleDateFormate,显然用ThreadLocal可以节省内存和开销。 - 避免传参的繁琐:无论是场景一的工具类,还是场景二的用户名,都可以在任何地方直接通过ThreadLocal拿到,再也不需要每次都传同样的参数,ThreadLocal使得代码耦合度更低,更优雅。 ThreadLocalMap类 - 与HashMap大致相同 - 但是解决冲突不相同 - HashMap是拉链法 - ThreadLocalMap是直接追加到数组的后面,不会有红黑树、链表 内存泄露:某个对象不再有用,但是占用的内存却不能被回收。 ​ 弱引用的特点是,如果这个对象只被弱引用关联(没有任何强引用关联),那么这个对象就可以被回收 所有弱引用不会阻止GC,因此这个弱引用的机制。 Value的泄露: - ThreadLocalMap的每一个Entry都是一个对key的弱引用,同时,每一个Entry都包含一个对value的强引用 - 正常情况下,当线程终止,保留在ThreadLocal里的value会被垃圾回收,因为没有任何强引用了 - 但是,如果线程不终止(比如线程需要保持很久),那么key对应的value就不能被回收,因为有以下的调用链:==Thread-->ThreadLocalMap-->Entry(key为null)-->Value== 如何避免内存泄露: - 调用remove方法,就会删除对应的Entry对象,可以避免内存泄露,所有使用完ThreadLocal之后,应该调用remove方法,将value至为空。 ThreadLocal带来的空指针异常: - 在进行get之前,必须先set,否则可能会报空指针异常(装箱拆箱造成--initialValue返回null) - 共享对象 - 如果在每个线程中Thread.set()进去的东西本来就是多线程共享的同一个对象,比如static对象,那么多个线程的ThreadLocal.get()取得的还是这个共享对象本身,还是有并发访问问题。 ThreadLocal注意点: - 如果可以不使用ThreadLocal就解决问题,那么不要强行使用 - 例如在任务数很少的时候,在局部变量中可以新建对象就可以解决问题,那么久不需要使用到ThreadLocal - 优先使用框架的支持,而不是自己创造 - 例如在spring中,如果可以使用RequestContextHolder,那么就不用自己维护ThreadLocal,因为自己可能会忘记调用remove()方法,造成内存泄露。 ```java public class ThreadLocalNPE { ThreadLocal longThreadLocal = new ThreadLocal(); public void set() { longThreadLocal.set(Thread.currentThread().getId()); } public long get() { return longThreadLocal.get(); } public static void main(String[] args) { ThreadLocalNPE threadLocalNPE = new ThreadLocalNPE(); System.out.println(threadLocalNPE.get()); Thread thread1 = new Thread(new Runnable() { @Override public void run() { threadLocalNPE.set(); System.out.println(threadLocalNPE.get()); } }); thread1.start(); } } ``` ### 1.4.2 使用 ThreadLocal实例一般定义为private static类型的,在一个线程内,该变量共享一份,类似上下文作用,可以用来上下传递信息。 ```java package com.itheima.thread.atomic; public class ThreadLocalDemo implements Runnable{ private static ThreadLocal threadLocal = new ThreadLocal<>(); public void run(){ for (int i = 0; i < 3; i++) { threadLocal.set(i); System.out.println(Thread.currentThread().getName()+",value="+threadLocal.get()); } } public static void main(String[] args) { ThreadLocalDemo demo = new ThreadLocalDemo(); new Thread(demo).start(); new Thread(demo).start(); } } ``` 结果分析: - 同一个demo实例,不同的thread嵌套 - 结果打印了各自的变量值,线程内上下文被传递,不同线程间被隔离 ![](https://gitee.com/adastevy/img/raw/master/img/20201023120813.png) ### 1.4.3 应用场景 - 数据库连接,session管理 - 下面的基于日志平台的访问链路追踪中,会用到 一个失败的案例: 遇到过一个项目,电商商铺详情页凌晨调度生成。需要上下传递shopid,为每个商铺重新生成一下。在商铺详情页里因为是按面包屑分片生成,比如商铺信息、热卖商品、最多好评、店主推荐、最新上架等。 其他信息全部生成ok,唯独商品列表多个列表出现问题。经查,在商品部分的查询中用到了ThreadLocal,造成当前商铺id丢失。 ### 1.4.4 实现原理 ThreadLocalMap是ThreadLocal内部类,由ThreadLocal创建,每个Thread里维护一个ThreadLocal.ThreadLocalMap类型的属性threadLocals。所有的value值其实是存储在ThreadLocalMap中。 这个存储结构的思路是反转的.... ![](https://gitee.com/adastevy/img/raw/master/img/20201023120900.png) 主要方法介绍 1)T initialValue() : 初始化 1. 该方法会返回当前线程对应的“初始值”,这是延迟加载的方法,只有在调用get的时候,才会触发。 ```java get() -->setInitialValue() private T setInitialValue() { T value = initialValue(); Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) map.set(this, value); else createMap(t, value); return value; } ``` 2. 当线程第一次使用get方法访问变量时,将调用此方法,除非线程先前调用了set方法,在这种情况下,不会为线程调用本initialValue方法。 3. 通常,每个线程最多调用一次此方法,但如果已经调用了remove()后,在调用get(),则可以再次调用此方法。 4. 如果不重写本方法,这个方法会返回null,一般使用匿名内部类方法来重写initialValue()方法,以便在后续使用中可以初始化副本对象。 2)set方法源码 ```java ThreadLocal.ThreadLocalMap threadLocals = null; public void set(T value) { //取到当前线程 Thread t = Thread.currentThread(); //从当前线程中拿出Map ThreadLocalMap map = getMap(t); if (map != null) //如果非空,说明之前创建过了 //以当前创建的ThreadLocal对象为key,需要存储的值为value,写入Map //因为每个线程Thread里有自己独自的Map,所以起到了隔离作用 map.set(this, value); else //如果没有,那就创建 createMap(t, value); } void createMap(Thread t, T firstValue) { t.threadLocals = new ThreadLocalMap(this, firstValue); } ``` 3)get方法源码 先获取当前线程的ThreadLocalMap,然后调用map.getEntry方法,把本ThreadLocal的引用作为参数传入,取出map中属于本ThreadLocal的value ,这个Map以及map中的key和value都是保存在线程中的,而不是保存在ThreadLocal中。 ```java public T get() { Thread t = Thread.currentThread(); //获取到当前线程下的Map ThreadLocalMap map = getMap(t); if (map != null) { //如果非空,根据当前ThreadLocal为key,取出对应的value即可 ThreadLocalMap.Entry e = map.getEntry(this); if (e != null) { @SuppressWarnings("unchecked") T result = (T)e.value; return result; } } //如果map是空的,往往返回一个初始值,这是一个protect方法 //这就是为什么创建ThreadLocal的时候往往要求实现这个方法 return setInitialValue(); } ThreadLocalMap getMap(Thread t) { return t.threadLocals; } ``` 4)remove方法 ```java public void remove() { ThreadLocalMap m = getMap(Thread.currentThread()); //很简单,获取到map后,调用remove移除掉 if (m != null) m.remove(this); } ``` 4)内存泄露问题如何解决 在上述的get方法中,Entry类继承了WeakReference,即每个Entry对象都有一个ThreadLocal的弱引用,GC对于弱引用的对象采取积极的内存回收策略,避免无人搭理时发生内存泄露。 ``` ThreadLocalMap.Entry e = map.getEntry(this); ``` ![](https://gitee.com/adastevy/img/raw/master/img/20201023121057.png) 验证代码: ```java ThreadLocal local = new ThreadLocal(); local.set(100); System.out.println(local.get()); System.gc(); //不会回收,因为local被强引用 System.out.println(local.get()); local = null; //debug,查看currentThread里面的localMaps //注意table里的reference Thread currentThread = Thread.currentThread(); //断点1:虽然local被赋值null,但是ThreadLocal内部依然存在引用(内存泄露风险!) System.out.println(1); System.gc(); //断点2:gc后,引用消失 System.out.println(2); ``` ThreadLocal对象只是作为ThreadLocalMap的一个key而存在的,现在它被回收了,那么value呢?针对这一问题,ThreadLocalMap类在每次get(),set(),remove() ThreadLocalMap中的值的时候,会自动清理key为null的value。如此一来,value也能被回收了。 用完ThreadLocal后,手动remove是一个好习惯! ### 1.4.5 注意! ThreadLocal如果指向了同一个引用,会打破隔离而失效。 案例:隔离失败了! ```java package com.itheima.thread.local; import java.util.HashMap; import java.util.Map; public class BadLocal{ public static void main(String[] args) { ThreadLocal local = new ThreadLocal(); Map map = new HashMap(); new Thread(()->{ //在线程设置后,过段时间取name //猜一猜结果? map.put("name","i am "+Thread.currentThread().getName()); local.set(map); System.out.println(Thread.currentThread().getName()+":" +local.get().get("name")); //do something... try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+":" +local.get().get("name")); }).start(); new Thread(()->{ //在线程中赋值name map.put("name","i am "+Thread.currentThread().getName()); local.set(map); }).start(); } } ``` ## 1.5 Fork/Join ### 1.5.1 概念 ForkJoin是由JDK1.7后提供多线并发处理框架。ForkJoinPool由Java大师Doug Lea主持编写,处理逻辑大概分为两步。 1.任务分割:Fork(分岔),先把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割。 2.合并结果:join,分割后的子任务被多个线程执行后,再合并结果,得到最终的完整输出。 ### 1.5.2 组成 - ForkJoinTask:主要提供fork和join两个方法用于任务拆分与合并;多数使用RecursiveAction(无返回值的任务)和RecursiveTask(需要返回值)来实现compute方法。 ![](https://gitee.com/adastevy/img/raw/master/img/20201023150540.png) - ForkJoinPool:调度ForkJoinTask的线程池; ![](https://gitee.com/adastevy/img/raw/master/img/20201023150615.png) - ForkJoinWorkerThread:Thread的子类,存放于线程池中的工作线程(Worker); ![](https://gitee.com/adastevy/img/raw/master/img/20201023150638.png) - WorkQueue:任务队列,用于保存任务; ### 1.5.3 基本使用 一个典型的例子:计算1-1000的和 ```java package com.itheima.thread; import java.util.concurrent.*; public class SumTask { private static final Integer MAX = 100; static class SubTask extends RecursiveTask { // 子任务开始计算的值 private Integer start; // 子任务结束计算的值 private Integer end; public SubTask(Integer start, Integer end) { this.start = start; this.end = end; } @Override protected Integer compute() { if (end-start #### 1)HashMap 为什么HashMap是线程不安全的? - 同时put碰撞导致数据丢失 - 同时put扩容导致数据丢失 - 死循环造成的CPU100% - 多线程下扩容导致循环链表 ![HashMap分析 1.7 2](https://gitee.com/adastevy/img/raw/master/img/20210409094020.png) ![HashMap分析 1.8 2](https://gitee.com/adastevy/img/raw/master/img/20210409094124.png) #### 2)红黑树 - 对二叉查找树BST的一种平衡策略,O(logN) vs O(N)。 - 会自动平衡,防止极端不平衡从而影响查找效率的情况发送。 - 每个节点要么是==红色==,要么是==黑色==,但是节点永远是黑色的。 - ==红色节点不能连续==(也即是,红色节点的孩子和父亲都不能是红色)。 - 从任一节点到其子树中每个叶子节点的路劲都包含==相同数量的黑色节点==。 - 所有的叶子节点都是==黑色==的。 #### 3)JDK 1.7 ConcurrentHashMap ![JDK1.7的ConcurrentHashMap实现和分析 2](https://gitee.com/adastevy/img/raw/master/img/20210409095358.png) - java 7中的ConcurrentHashMap最外层是多个segment,每一个segment的底层数据结构与HashMap类似,任然是数组和链表组成的拉链法。 - 每个segment独立上了ReentrantLock,每个segment之间互不影响,提高了并发效率。 - ConcurrentHashMap默认有16个Segment,所有最多可以同时支持16个线程并发写(操作分别分布在不同的Segment上)。这个默认值可以在初始化的时候设置为其他值,但是一旦初始化以后,是不可以扩容的 #### 4)JDK 1.8 ConcurrentHashMap JDK 1.8 ConcurrentHashMap借鉴了HashMap的思想实现 ![JDK1.8的ConcurrentHashMap实现和分析 1](https://gitee.com/adastevy/img/raw/master/img/20210409100707.png) putValue流程: - 判定key value不为空 - 计算hash值 - 根据对应位置节点的类型,来赋值,或者helpTransfer,或者增长链表,或者给红黑树加节点 - 检查满足阀值就“红黑树化” - 返回oldVal #### 5)1.7的结构和1.8的不同点? - 数据结构 - 1.7采用了分段锁Segment+HashEntity实现 - 1.8采用了基于HashMap的思想实现,每个node独立,提高了并发度 - Hash碰撞 - 1.7 拉链法 - 1.8 是先用拉链法,然后红黑树 - 保证并发安全 - 1.7是用Segment实现线程安全,而Segment继承ReentrantLock - 1.8 CAS+synchronize - 查询复杂度 - 1.7是O(N) - 1.8是O(logN) #### 6)为什么超过8要转为红黑树? - 默认不是红黑树节点,默认是链表的形式,它所占用的内存更少(红黑树是链表的2倍) - 想要达到冲突为8实际上是很难的 概率只有千万分之几 #### 7)为什么ConcurrentHashMap是线程不安全的? ConcurrentHashMap只能保证同时get和put是线程安全的 ```java public class OptionsNotSafe implements Runnable { private static ConcurrentHashMap scores = new ConcurrentHashMap(); public static void main(String[] args) throws InterruptedException { scores.put("小明", 0); Thread t1 = new Thread(new OptionsNotSafe()); Thread t2 = new Thread(new OptionsNotSafe()); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println(scores); } @Override public void run() { for (int i = 0; i < 1000; i++) { while (true) { Integer score = scores.get("小明"); Integer newScore = score + 1; boolean b = scores.replace("小明", score, newScore); if (b) { break; } } } } } ``` 解决方式是:使用replace()方法 ### 1.7.2 基本使用 很简单,new创建即可: ```java public static void main(String[] args) throws InterruptedException { //定义ConcurrentHashMap Map map = new ConcurrentHashMap(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { //多线程下的put可以放心使用 map.put(UUID.randomUUID().toString(), "1"); } }).start(); } Thread.sleep(3000); System.out.println(map); } ``` ### 1.7.3 实现原理 1.7是分段锁,上面阐述过,1.8采用的是cas + synchronized 操作,具体看代码: ```java /** Implementation for put and putIfAbsent */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); //计算hash int binCount = 0; for (Node[] tab = table;;) {//自旋,确保插入成功 Node f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable();//表为空的话,初始化表 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //否则,插入元素,看下面的 casTabAt 方法 //cas 在这里!比较是否为null,如果null才会设置并break,否则到else if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; //其他情况下,加锁保持 //synchronized 在这里! //加锁,锁的是当前插槽上的头节点f(类似分段锁) synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { //当前插槽上的节点数量 binCount = 1; //沿着Node链往后找 for (Node e = f;; ++binCount) { K ek; //如果找到相同key,说明之前put过,覆盖 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; //put方法上,onlyIfAbsent=false //即要不要覆盖? if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //否则,新key,新Node插入到最后 if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } } //如果是红黑树,说明已经转化过,按树的规则放入Node else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果节点数达到临界值,链表转成树 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //计数 addCount(1L, binCount); return null; } //compareAndSetObject,比较并插入,典型CAS操作 static final boolean casTabAt(Node[] tab, int i,Node c, Node v) { return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //get取值 public V get(Object key) { Node[] tab; Node e, p; int n, eh; K ek; int h = spread(key.hashCode()); //判断table是不是空的,当前桶上是不是空的 //如果为空,返回null if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) //找到对应hash槽的第一个node,如果key相等,返回value if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } //如果正在扩容,不影响,继续顺着node找即可 else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; //其他情况,逐个遍历,比对key,找到后返回value while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; } ``` 总结: ```properties put过程: 1.根据key的hash值定位到桶位置 2.如果table为空if,先初始化table。 3.如果table当前桶里没有node,cas添加元素。成功则跳出循环,失败则进入下一轮for循环。 4.判断是否有其他线程在扩容,有则帮忙扩容,扩容完成再添加元素。 5.如果桶的位置不为空,遍历该桶的链表或者红黑树,若key已存在,则覆盖,不存在则将key插入到链表或红黑树 的尾部。 get过程: 1.根据key的hash值定位到桶位置。 2.map是否初始化,没有初始化则返回null 3.定位的桶是否有头结点,没有返回null 4.是否有其他线程在扩容,有的话调用find方法沿node指针往后查找。扩容与find可以并行,因为node的next指 针不会变 5.若没有其他线程在扩容,则遍历桶对应的链表或者红黑树,使用equals方法进行比较。key相同则返回value,不存 在则返回null ``` ### 1.7.4 注意! 注意正确理解ConcurrentHashMap线程安全这个问题。看一个典型案例: ```java package com.itheima.thread.demo; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class BadConcurrent { public static void main(String[] args) throws InterruptedException { Map map = new ConcurrentHashMap(); map.put("val",0); for (int i = 0; i < 10; i++) { new Thread(()->{ int v = map.get("val"); v++; try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } map.put("val",v); }).start(); } Thread.sleep(3000); System.out.println(map); } } ``` 猜一猜结果? ## 1.8 并发容器 除了上面提到的ConcurrentHashMap,还有很多其他的并发容器,本节统一汇总。 [并发队列思维导图](https://www.processon.com/view/link/6083cff7f346fb54941b514b) ### 1.8.1 背景 java中的集合类非常丰富(ArrayList,HashMap之类),在单线程下用的顺风顺水,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性。常见手段有两种: - 自己通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行 - Vector、Stack、HashTable、Collections.synchronized等同步容器法,在早期的jdk中用的比较多,实现方式和上面几乎一样,而且多步操作时如果外面不额外加一层synchronized,依然锁不住。实际效果还不如上面 于是,并发容器诞生…… Collections.synchronized 性能不是太好(使用了同步代码块) ```java /** * 描述: 演示Collections.synchronizedList(new ArrayList()) */ public class SynList { public static void main(String[] args) { List list = Collections.synchronizedList(new ArrayList()); list.add(5); System.out.println(list.get(0)); Map objectObjectMap = Collections.synchronizedMap( new HashMap<>()); } } ``` ### 1.8.2 清单 #### 1.ConcurrentHashMap 对应:HashMap 目标:代替Hashtable、synchronizedMap,使用最多,前面详细介绍过 原理:JDK7中采用Segment分段锁,JDK8中采用CAS+synchronized #### 2.CopyOnWriteArrayList [CopyOnWriteArrayList思维导图](https://www.processon.com/view/link/6083ccc3e0b34d0360d09ce9) 对应:ArrayList 目标:代替Vector、synchronizedList;Vector和synchronizedList的锁的粒度太大,并发效率相对比较低,并且迭代时无法编辑。 原理:高并发往往是读多写少的特性,读操作不加锁,而对写操作加Lock独享锁,先复制一份新的集合,在新的集 合上面修改,然后将新集合赋值给旧的引用,并通过volatile 保证其可见性。 查看源码:volatile array,lock加锁,数组复制 CopyOnWriteArrayList适用场景: - 读操作可以尽可能地快,而写即使慢一些也没有太大关系 - 读多写少:黑名单,每日更新;监听器:迭代操作远多余修改操作。 CopyOnWriteArrayList的读写规则: - 读写锁规则的升级:读取是完全不用加锁的,并且更厉害的是,写入也不会阻塞读取操作。只有写入和写入之间需要进行同步等待。 ```java /** * 描述:演示CopyOnWriteArrayList可以在迭代的过程中修改数组内容, * 但是ArrayList不行,对比 */ public class CopyOnWriteArrayListDemo1 { public static void main(String[] args) { ArrayList list = new ArrayList<>(); // CopyOnWriteArrayList list = new CopyOnWriteArrayList<>(); list.add("1"); list.add("2"); list.add("3"); list.add("4"); list.add("5"); Iterator iterator = list.iterator(); while (iterator.hasNext()) { System.out.println("list is" + list); String next = iterator.next(); System.out.println(next); if (next.equals("2")) { list.remove("5"); } if (next.equals("3")) { list.add("3 found"); } } } } ``` CopyOnWriteArrayList实现原理 - CopyOnWrite的含义 - 修改的时候,先拷贝一份;然后修改,修改完了之后再替换原来的指针 - 读的时候不受限制,写的是新的,可以同步操作。 - 创建新的副本、读写分离 - 读和写完全使用不同的容器 - “不可变”原理 - 旧的容器完全是不变的 - 迭代的时候 - 使用旧数组,不会像ArrayList一样报错 CopyOnWriteArrayList的缺点: - 数据一致性问题:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的数据,马上能读到,请不要使用CopyOnWrite容器 - 内存占用问题:因为CopyOnWrite的写实复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象。这是所以CopyOnWrite的问题! #### 3.CopyOnWriteArraySet 对应:HashSet 目标:代替synchronizedSet 原理:与CopyOnWriteArrayList实现原理类似。 #### 4.ConcurrentSkipListMap 对应:TreeMap 目标:代替synchronizedSortedMap(TreeMap) 原理:基于Skip list(跳表)来代替平衡树,按照分层key上下链接指针来实现。 *附加:跳表* #### 5.ConcurrentSkipListSet 对应:TreeSet 目标:代替synchronizedSortedSet(TreeSet) 原理:内部基于ConcurrentSkipListMap实现,原理一致 #### 6.ConcurrentLinkedQueue 高效的非阻塞并发队列,使用链表实现,可以看做一个线程安全的LinkedList 对应:LinkedList 对应:无界线程安全队列 原理:通过队首队尾指针,以及Node类元素的next实现FIFO队列 #### 7.BlockingQueue 对应:Queue 特点:拓展了Queue,增加了可阻塞的插入和获取等操作 原理:通过ReentrantLock实现线程安全,通过Condition实现阻塞和唤醒 实现类: - LinkedBlockingQueue:基于链表实现的可阻塞的FIFO队列 - ArrayBlockingQueue:基于数组实现的可阻塞的FIFO队列 - PriorityBlockingQueue:按优先级排序的队列 ## 1.9并发控制 ![并发工具类概览](https://gitee.com/adastevy/img/raw/master/img/20210411161609.png) ### 1.9.1 countdownlatch ```java package flowcontrol.countdownlatch; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 描述: 工厂中,质检,5个工人检查,所有人都认为通过,才通过 * 一等多 main 等 线程池中的线程 */ public class CountDownLatchDemo1 { private static CountDownLatch latch = new CountDownLatch(5); public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int no = i + 1; Runnable runnable = new Runnable() { @Override public void run() { try { Thread.sleep((long) (Math.random() * 10000)); System.out.println("No." + no + "完成了检查。"); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } }; service.submit(runnable); } System.out.println("等待5个人检查完....."); latch.await(); System.out.println("所有人都完成了工作,进入下一个环节。"); } } ``` ### 1.9.2 Condition ```java package flowcontrol.condition; import java.util.ArrayList; import java.util.List; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; /** * 描述: 演示Condition的基本用法 * Condition 是绑定在锁上的 */ public class ConditionDemo1 { private ReentrantLock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); void method1() throws InterruptedException { List list = new ArrayList(); try { System.out.println("条件不满足,开始await"); condition.await(); System.out.println("条件满足了,开始执行后续的任务"); } finally { lock.unlock(); } } void method2() { lock.lock(); try { System.out.println("准备工作完成,唤醒其他的线程"); condition.signal(); } finally { lock.unlock(); } } public static void main(String[] args) throws InterruptedException { ConditionDemo1 conditionDemo1 = new ConditionDemo1(); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(1000); conditionDemo1.method2(); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); conditionDemo1.method1(); } } ``` 实现生产者消费者: ```java package flowcontrol.condition; import java.util.PriorityQueue; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 描述: 演示用Condition实现生产者消费者模式 */ public class ConditionDemo2 { private int queueSize = 10; private PriorityQueue queue = new PriorityQueue(queueSize); private Lock lock = new ReentrantLock(); private Condition notFull = lock.newCondition(); private Condition notEmpty = lock.newCondition(); public static void main(String[] args) { ConditionDemo2 conditionDemo2 = new ConditionDemo2(); Producer producer = conditionDemo2.new Producer(); Consumer consumer = conditionDemo2.new Consumer(); producer.start(); consumer.start(); } class Consumer extends Thread { @Override public void run() { consume(); } private void consume() { while (true) { lock.lock(); try { while (queue.size() == 0) { System.out.println("队列空,等待数据"); try { notEmpty.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.poll(); notFull.signalAll(); System.out.println("从队列里取走了一个数据,队列剩余" + queue.size() + "个元素"); } finally { lock.unlock(); } } } } class Producer extends Thread { @Override public void run() { produce(); } private void produce() { while (true) { lock.lock(); try { while (queue.size() == queueSize) { System.out.println("队列满,等待有空余"); try { notFull.await(); } catch (InterruptedException e) { e.printStackTrace(); } } queue.offer(1); notEmpty.signalAll(); System.out.println("向队列插入了一个元素,队列剩余空间" + (queueSize - queue.size())); } finally { lock.unlock(); } } } } } ``` # 2 性能调优 ## 2.1 锁优化 Java虚拟机对锁的优化: - 自旋锁优化和自适应(JVM参数设置) - 锁消除(方法内部私有的,无须加锁) - 锁粗化(反复获取对一个对象加锁解锁) ### 2.1.1 Synchronized优化 synchronized使用起来非常简单,但是需要注意的是synchronized加锁的是什么维度 - 缩小同步代码块 - 尽力不要锁住方法 - 减少请求锁的次数 - 避免认为制造“热点” - 锁中尽量不要再包含锁 - 选择合适的锁类型或合适的工具类 对象级别: ```java public synchronized void test(){ // TODO } public void test(){ synchronized (this) { // TODO } } ``` 类级别: ```java public static synchronized void test(){ // TODO } public void test(){ synchronized (TestSynchronized.class) { // TODO } } ``` 案例:看一个加锁粒度的案例 ```java package com.itheima.thread.opt; import java.util.concurrent.atomic.AtomicLong; public class BadSync implements Runnable{ long start = System.currentTimeMillis(); AtomicLong atomicLong = new AtomicLong(0); volatile int i=0; public void inc(){ i++; } @Override public synchronized void run() { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } inc(); atomicLong.getAndAdd(System.currentTimeMillis() - start); } // public void run2() { // try { // Thread.sleep(100); // } catch (InterruptedException e) { // e.printStackTrace(); // } // synchronized(this){ // // inc(); // totalTime += (System.currentTimeMillis() - start); // } // // } public static void main(String[] args) throws InterruptedException { BadSync sync = new BadSync(); for (int i = 0; i < 5; i++) { new Thread(sync).start(); } Thread.sleep(3000); System.out.println("最终计数:i="+ sync.i); System.out.println("最终耗时:time="+sync.atomicLong.get()); } } ``` - 看一下最后的结果和耗时 - 将synchronized换到inc方法上,再试试最后的结果和耗时 结论是什么? ### 2.1.2 Lock锁优化 看一个小需求:电商系统中记录首页被用户浏览的次数,以及最后一次操作的时间(含读或写)。 ```java package com.itheima.thread.opt; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; public class TotalLock { //类创建的时间 final long start = System.currentTimeMillis(); //总耗时 AtomicLong totalTime = new AtomicLong(0); //缓存变量 private Map map = new HashMap(){{put("count",0L);}}; ReentrantLock lock = new ReentrantLock(); //查看map被写入了多少次 public Map read(){ lock.lock(); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); //最后操作完成的时间 map.put("time",end); lock.unlock(); System.out.println(Thread.currentThread().getName()+",read="+(end-start)); totalTime.addAndGet(end - start); return map; } //写入 public Map write(){ lock.lock(); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //写入计数 map.put("count",map.get("count")+1); long end = System.currentTimeMillis(); map.put("time",end); lock.unlock(); System.out.println(Thread.currentThread().getName()+",write="+(end-start)); totalTime.addAndGet(end - start); return map; } public static void main(String[] args) throws InterruptedException { TotalLock count = new TotalLock(); //读 for (int i = 0; i < 9; i++) { new Thread(()->{ count.read(); }).start(); } //写 for (int i = 0; i < 1; i++) { new Thread(()->{ count.write(); }).start(); } Thread.sleep(3000); System.out.println(count.map); System.out.println("读写总共耗时:"+count.totalTime.get()); } } ``` 仔细看读的时间变化和执行的总时间,思考一下,从业务和技术角度有没有可优化空间? 仔细分析业务:查看次数这里其实是可以并行读取的,我们关注的业务是写入次数,也就是count,至于读取发生的时间time的写入操作,只是一个单步put,每次覆盖,不需要原子性保障,对这个加互斥锁没有必要。 改成读写锁试试…… ```java package com.itheima.thread.opt; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class ReadAndWrite { //类创建的时间 final long start = System.currentTimeMillis(); //总耗时 AtomicLong totalTime = new AtomicLong(0); //缓存变量 private Map map = new ConcurrentHashMap(){{put("count",0L);}}; ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //查看map被写入了多少次 public Map read(){ lock.readLock().lock(); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); //最后操作完成的时间 map.put("time",end); lock.readLock().unlock(); System.out.println(Thread.currentThread().getName()+",read="+(end-start)); totalTime.addAndGet(end - start); return map; } //写入 public Map write(){ lock.writeLock().lock(); try { Thread.currentThread().sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //写入计数 map.put("count",map.get("count")+1); long end = System.currentTimeMillis(); map.put("time",end); lock.writeLock().unlock(); System.out.println(Thread.currentThread().getName()+",write="+(end-start)); totalTime.addAndGet(end - start); return map; } public static void main(String[] args) throws InterruptedException { ReadAndWrite count = new ReadAndWrite(); //读 for (int i = 0; i < 9; i++) { new Thread(()->{ count.read(); }).start(); } //写 for (int i = 0; i < 1; i++) { new Thread(()->{ count.write(); }).start(); } Thread.sleep(3000); System.out.println(count.map); System.out.println("读写总共耗时:"+count.totalTime.get()); } } ``` 再来看读的时间变化和总执行时间。 当read远大于write时,这个差距会更明显 (改成9:1试试……) ### 2.1.3 CAS乐观锁优化 回顾上面的计数器,我们用synchronized实现了准确计数,本节我们看执行时间,追究性能问题。 案例一:直接加synchronized锁 ```java package com.itheima.thread.opt; public class NormalSync implements Runnable{ Long start = System.currentTimeMillis(); int i=0; public synchronized void run() { int j = i; //实际业务中可能会有一堆的耗时操作,这里等待100ms模拟 try { //做一系列操作 Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //业务结束后,增加计数 i = j+1; System.out.println(Thread.currentThread().getId()+ " ok,time="+(System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { NormalSync test = new NormalSync(); new Thread(test).start(); new Thread(test).start(); Thread.currentThread().sleep(1000); System.out.println("last value="+test.i); } } ``` 线程二最终耗时会在200ms+,总耗时300ms,原因是悲观锁卡在了read后的耗时操作上,但是保证了最终结果是2 案例二:基于CAS思想,compare再set ```java package com.itheima.thread.opt; import sun.misc.Unsafe; import java.lang.reflect.Field; public class CasSync implements Runnable{ long start = System.currentTimeMillis(); int i=0; public void run() { int j = i; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } //CAS处理,在这里理解思想,实际中不推荐大家使用! try { // Field f = Unsafe.class.getDeclaredField("theUnsafe"); // f.setAccessible(true); // Unsafe unsafe = (Unsafe) f.get(null); // long offset = unsafe.objectFieldOffset(CasSync.class.getDeclaredField("i")); // while (!unsafe.compareAndSwapInt(this,offset,j,j+1)){ // j = i; // } //优化后 //synchronized (this){ // while (j != i){ // j = i; // } // j++; // i = j; //} } catch (Exception e) { e.printStackTrace(); } //实际开发中,要用atomic包,或者while+synchronized自旋 //synchronized (this){ // //注意这里! // while (j != i){ // j = i; // } // i = j+1; //} System.out.println(Thread.currentThread().getName()+ " ok,time="+(System.currentTimeMillis() - start)); } public static void main(String[] args) throws InterruptedException { CasSync test = new CasSync(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); new Thread(test).start(); Thread.currentThread().sleep(1000); System.out.println("last value="+test.i); } } ``` 线程一、二均在100ms+,总耗时200ms,最终结果还是2 ### 2.1.4 一些经验 - 减少锁的时间 不需要同步执行的代码,能不放在同步快里面执行就不要放在同步快内,可以让锁尽快释放 - 减少锁的粒度 将物理上的一个锁,拆成逻辑上的多个锁,增加并行度,从而降低锁竞争,典型如分段锁 - 锁的粒度 拆锁的粒度不能无限拆,最多可以将一个锁拆为当前cup数量相等 - 减少加减锁的次数 假如有一个循环,循环内的操作需要加锁,我们应该把锁放到循环外面,否则每次进出循环,都要加锁 - 使用读写锁 业务细分,读操作加读锁,可以并发读,写操作使用写锁,只能单线程写,参考计数器案例 - 善用volatile volatile的控制比synchronized更轻量化,在某些变量上可以加以运用,如单例模式中 ## 2.2 线程池参数调优 ### 2.2.1 代码调试 - 创建线程池,无限循环添加task,debug看works和queue数量增长规律 - 等待一段时间后,查看works数量是否回落到core ```java package com.itheima.thread.demo; import java.util.concurrent.*; public class ExecutorTest { public static void main(String[] args) { BlockingQueue queue = new ArrayBlockingQueue(2); ThreadPoolExecutor executor = new ThreadPoolExecutor( 3, 5, 20, TimeUnit.SECONDS, queue, new RejectedExecutionHandler() { public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println("reject"); } } ); for (; ; ) { executor.execute(new Runnable() { public void run() { try { Thread.currentThread().sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } } } ``` ### 2.2.2 Executors剖析 Executors只是一个工具类,协助你创建线程池。Executors对特定场景下做了参数调优。 1)newCachedThreadPool ```java //core=0 //max=Integer //timeout=60s //queue=1 //也就是只要线程不够用,就一直开,不用就全部释放。线程数0‐max之间弹性伸缩 //注意:任务并发太高且耗时较长时,造成cpu高消耗,同时要警惕OOM return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue()); ``` 2)newFixedThreadPool ```java //core=max=指定数量 //timeout=0 //queue=无界链表 //也就是说,线程数一直保持制定数量,不增不减,永不超时 //如果不够用,就沿着队列一直追加上去,排队等候 //注意:并发太高时,容易造成长时间等待无响应,如果任务临时变量数据过多,容易OOM return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); ``` 3)newSingleThreadExecutor ```java //core=max=1 //timeout=0 //queue=无界链表 //只有一个线程在慢吞吞的干活,可以认为是fix的特例 //适用于任务零散提交,不紧急的情况 new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); ``` 4)newScheduledThreadPool ```java //core=制定数 //max=Integer //timeout=0 //queue=DelayedWorkQueue(重点!) //用于任务调度,DelayedWorkQueue限制住了任务可被获取的时机(getTask方法),也就实现了时间控制 super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); ``` ### 2.2.3 一些经验 1)corePoolSize 基本线程数,一旦有任务进来,在core范围内会立刻创建线程进入工作。所以这个值应该参考业务并发量在绝大多数时间内的并发情况。同时分析任务的特性。 高并发,执行时间短的,要尽可能小的线程数,如配置CPU个数+1,减少线程上下文的切换。因为它不怎么占时间,让少量线程快跑干活。 并发不高、任务执行时间长的要分开看:如果时间都花在了IO上,那就调大CPU,如配置两倍CPU个数+1。不能让CPU闲下来,线程多了并行处理更快。如果时间都花在了运算上,运算的任务还很重,本身就很占cpu,那尽量减少cpu,减少切换时间。参考第一条 如果高并发,执行时间还很长…… 2) workQueue 任务队列,用于传输和保存等待执行任务的阻塞队列。这个需要根据你的业务可接受的等待时间。是一个需要权衡时间还是空间的地方,如果你的机器cpu资源紧张,jvm内存够大,同时任务又不是那么紧迫,减少coresize,加大这里。如果你的cpu不是问题,对内存比较敏感比较害怕内存溢出,同时任务又要求快点响应。那么减少这里。 3) maximumPoolSize 线程池最大数量,这个值和队列要搭配使用,如果你采用了无界队列,那很大程度上,这个参数没有意义。同时要注意,队列盛满,同时达到max的时候,再来的任务可能会丢失(下面的handler会讲)。 如果你的任务波动较大,同时对任务波峰来的时候,实时性要求比较高。也就是来的很突然并且都是着急的。那么调小队列,加大这里。如果你的任务不那么着急,可以慢慢做,那就扔队列吧。 队列与max是一个权衡。队列空间换时间,多花内存少占cpu,轻视任务紧迫度。max舍得cpu线程开销,少占内存,给任务最快的响应。 4) keepaliveTime 线程存活保持时间,超出该时间后,线程会从max下降到core,很明显,这个决定了你养闲人所花的代价。如果你不缺cpu,同时任务来的时间没法琢磨,波峰波谷的间隔比较短。经常性的来一波。那么实当的延长销毁时间,避免频繁创建和销毁线程带来的开销。如果你的任务波峰出现后,很长一段时间不再出现,间隔比较久,那么要适当调小该值,让闲着不干活的线程尽快销毁,不要占据资源。 5) threadFactory(自定义展示实例) 线程工厂,用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。如果需要自己定义线程的某些属性,如个性化的线程名,可以在这里动手。一般不需要折腾它。 6)handler 线程饱和策略,当线程池和队列都满了,再加入线程会执行此策略。默认不处理的话会扔出异常,打进日志。这个与任务处理的数据重要程度有关。如果数据是可丢弃的,那不需要额外处理。如果数据极其重要,那需要在这里采取措施防止数据丢失,如扔消息队列或者至少详细打入日志文件可追踪。 ## 2.3 协程 ### 2.3.1 概念 ``` 面试官:你知道协程吗? 你:知道,出差或旅游经常用。 面试官:…… ``` 很大一部分的程序员不知道协程是啥,项目中也没用到协程。 先看概念:计算机有进程,线程和协程。前两者大家都很熟。而协程,则是基于线程之上自主开辟的异步任务。 - 线程的切换由操作系统负责调度,协程由用户自己进行调度 - 线程的默认Stack大小是1M,而协程更轻量,接近1K。因此可以在相同的内存中开启更多的协程。 - 多个协程在同一个线程上,因此不必使用锁,也减少了上下文切换。 再说结论: - 一般需要使用第三方框架来实现 - java里相对非主流,go和python相对用的多 - 实际web开发中用的较少,还是要把主要精力放到线程上 ### 2.3.2 使用方式 使用Kilim框架看协程与线程的编码 解压kilim.zip包,导入工程,按说明执行 ## 2.4 并发容器选择 1)案例一:电商网站中记录一次活动下各个商品售卖的数量。 - 场景分析:需要频繁按商品id做get和set,但是商品id(key)的数量相对稳定不会频繁增删- - 初级方案:选用HashMap,key为商品id,value为商品购买的次数。每次下单取出次数,增加后再写入 - 问题:HashMap线程不安全!在多次商品id写入后,如果发生扩容,在 JDK1.7 之前,在并发场景下HashMap 会出现死循环,从而导致 CPU 使用率居高不下。JDK1.8 中修复了 HashMap 扩容导致的死循环问题,但在高并发场景下,依然会有数据丢失以及不准确的情况出现。 - 选型:Hashtable 不推荐,锁太重,选 ConcurrentHashMap 确保高并发下多线程的安全性 2)案例二:在一次活动下,为每个用户记录浏览商品的历史和次数。 - 场景分析:每个用户各自浏览的商品量级非常大,并且每次访问都要更新次数,频繁读写 - 初级方案:为确保线程安全,采用上面的思路,ConcurrentHashMap - 问题:ConcurrentHashMap 内部机制在数据量大时,会把链表转换为红黑树。而红黑树在高并发情况下,删除和插入过程中有个平衡的过程,会牵涉到大量节点,因此竞争锁资源的代价相对比较高 - 选型:用跳表,ConcurrentSkipListMap将key值分层,逐个切段,增删效率高于ConcurrentHashMap ![](https://gitee.com/adastevy/img/raw/master/img/20201023154304.png) 结论:如果对数据有强一致要求,则需使用 Hashtable;在大部分场景通常都是弱一致性的情况下,使用ConcurrentHashMap 即可;如果数据量级很高,且存在大量增删改操作,则可以考虑使用ConcurrentSkipListMap。 3)案例三:在活动中,创建一个用户列表,记录冻结的用户。一旦冻结,不允许再下单抢购,但是可以浏览。 - 场景分析:违规被冻结的用户不会太多,但是绝大多数非冻结用户每次抢单都要去查一下这个列表。低频写,高频读。 - 初级方案:ArrayList记录要冻结的用户id - 问题:ArrayList对冻结用户id的插入和读取操作在高并发时,线程不安全。Vector可以做到线程安全,但并发性能差,锁太重。 - 选型:综合业务场景,选CopyOnWriteArrayList,会占空间,但是也仅仅发生在添加新冻结用户的时候。绝大多数的访问在非冻结用户的读取和比对上,不会阻塞。 ## 2.5 上下文切换优化 ### 2.5.1 基本操作 CPU通过时间片分配算法来循环执行任务,时间片一般是几十毫秒(ms)。切换就要保存旧状态,完成恢复时就要读取存储的内容。这个操作过程就是上下文的切换。 *(现实例子:参考日常需求开发与应急bug处理开发任务被挂起的场景 -_-! )* ### 2.5.2 竞争锁 1)锁的持有时间越长,就意味着有越多的线程在等待该竞争资源释放。上下文的切换代价就越多。 2)将锁贴近需要加锁的地方,越近越好!在synchronized性能优化中有案例 ```java public void f(){ synchronized (this){ f1(); f2(); f3(); } } ``` ```java public void f(){ f1(); synchronized (this){ f2(); } f3(); } ``` 3)结论:类锁 < 静态锁 < 方法锁 < 代码块锁 ,能共享锁的地方尽量不要用独享锁 ### 2.5.3 wait/notify 1)过时通知 看一个典型错误,猜一猜结果…… ```java package com.itheima.thread.opt; public class WaitInvalid { volatile int total = 0; byte[] lock = new byte[0]; //计算1-100的和,算完后通知print public void count(){ synchronized (lock){ for (int i = 1; i < 101; i++) { total += i; } lock.notify(); } System.out.println("count finish"); } //打印,等候count的通知 public void print(){ synchronized (lock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(total); } public static void main(String[] args) throws InterruptedException { WaitInvalid waitInvalid = new WaitInvalid(); new Thread(()->{ waitInvalid.count(); }).start(); Thread.sleep(1000); new Thread(()->{ waitInvalid.print(); }).start(); } } ``` 将count和wait的顺序交换,再看一下结果,思考一下为什么?? 分析: count先执行时,提前释放了notify通知,这时候,print还没进入wait,收不到这个信号。 等print去wait的时候,再等通知等不到了,典型的通知过时现象。 仅仅因为一行代码的顺序问题,如果不注意,造成整个程序卡死 2)额外唤醒 跑一下看看结果…… ```java package com.itheima.thread.opt; import java.util.ArrayList; import java.util.List; public class NotifyInvalid { List list = new ArrayList(); byte[] lock = new byte[0]; public void del() { synchronized (lock){ //没值就等,有值就删 if (list.isEmpty()){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } if (list.isEmpty()){ list.remove(0); } } } public void add(){ synchronized (lock){ //加个值后唤醒 list.add(0,0); lock.notifyAll(); } } public static void main(String[] args) throws InterruptedException { NotifyInvalid notifyInvalid = new NotifyInvalid(); //启动两个线程等候删除 for (int i = 0; i < 2; i++) { new Thread(()->{ notifyInvalid.del(); }).start(); } //新线程添加一个 new Thread(()->{ notifyInvalid.add(); }).start(); Thread.sleep(1000); System.out.println(notifyInvalid.list.size()); } } ``` 分析: 出异常了!因为等候的两个线程第一个删除后,第二个唤醒时,等待前的状态已失效。 方案: 线程唤醒后,要警惕睡眠前后状态不一致,要二次判断 ### 2.5.4 线程池 1)线程池的线程数量设置不宜过大,因为一旦线程池的工作线程总数超过系统所拥有的处理器数量,就会导致过多的上下文切换。 2)慎用Executors,尤其如newCachedThreadPool。这个方法前面分析过。如果任务过多会无休止创建过多线程,增加了上下文的切换。最好根据业务情况,自己创建线程池参数。 ### 2.5.5 虚拟机 1)很多 JVM 垃圾回收器(serial 收集器、ParNew 收集器)在回收旧对象时,会产生内存碎片 2)碎片内存整理中就需要移动存活的对象。而移动内存对象就意味着这些对象所在的内存地址会发生变化 3)内存地址变化就要去移动对象前暂停线程,在移动完成后需要再次唤醒。无形中增加了上下文的切换 4)结论:合理搭配JVM内存调优,减少 JVM 垃圾回收的频率可以有效地减少上下文切换 ### 2.5.6 协程 1)协程不需要切换上下文,更轻量化。 2)平时用的相对较少。用不好会出问题。 # 3 电商实际应用 ### 3.1 常见问题 ### 3.1.1 线程协作 先搞懂线程协作的一些基本操作,面试经常要用到! 1)Object中 - wait:让出锁,阻塞等待 - notify/notifyAll:唤醒wait的进程,注意,具体唤醒哪一个要看优先级,同优先级的看运气 notifyAll优先级测试,猜一下输出? ```java package com.itheima.thread.buis; public class NotifyTest { public static void main(String[] args) { byte[] lock = new byte[0]; Thread t1 = new Thread(()->{ synchronized (lock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t1"); } }); Thread t2 = new Thread(()->{ synchronized (lock){ try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t2"); } }); Thread t3 = new Thread(()->{ synchronized (lock){ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("t3"); lock.notifyAll(); } }); t2.setPriority(2); t1.setPriority(3); t3.setPriority(2); t1.start(); t2.start(); t3.start(); } } ``` 结果分析:wait让出锁,t3得到执行,t3唤醒后,虽然t1先start,但是优先级低,所以t2优先执行 2)Thread中 - sleep:暂停一下,只是让出CPU的执行权,并不释放锁。 猜一下结果…… ```java package com.itheima.thread.buis; public class SleepTest { public static void main(String[] args) throws InterruptedException { byte[] lock = new byte[0]; Thread t1 = new Thread(()->{ synchronized (lock){ System.out.println("start"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("end"); } }); Thread t2 = new Thread(()->{ synchronized (lock){ System.out.println("t2"); } }); t1.start(); Thread.sleep(100); t2.start(); } } ``` 分析: 新的thread无法异步执行,被迫等待锁,跟着sleep - yield:不释放锁,运行中转为就绪,让出cpu给大家去竞争。当然有可能自己又抢了回来 想一下,以下代码有可能是什么结果…… ```java package com.itheima.thread.buis; public class YieldTest { public static void main(String[] args) { byte[] lock = new byte[0]; Thread t1 = new Thread(()->{ synchronized (lock){ System.out.println("start"); Thread.yield(); System.out.println("end"); } }); Thread t2 = new Thread(()->{ synchronized (lock){ System.out.println("t2"); } }); Thread t3 = new Thread(()->{ System.out.println("t3"); }); t1.start(); t2.start(); t3.start(); } } ``` 分析: t3会插队抢到执行权,但是t2不会,因为t2和t1共用一把锁而yield不会释放 t3不见得每次都能抢到。可能t1让出又抢了回去 - join:父线程等待子线程执行完成后再执行,将异步转为同步。注意调的是子线程,阻断的是父线程 一个典型的join案例,打开和关闭join看下结果: ```java package com.itheima.thread.buis; public class JoinTest implements Runnable{ int i = 0; @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } i=1; System.out.println("sub"); } public static void main(String[] args) throws InterruptedException { JoinTest test = new JoinTest(); System.out.println("start"); Thread t = new Thread(test); t.start(); t.join(); System.out.println(test.i); System.out.println("end"); } } ``` 分析: 如果不join,main先跑完 如果join,main必须等待sub之后才输出 扩展:concurrent.lock中, Condition.await(),signal/signalAll 与 wait/notify效果一样 ### 3.1.1 死锁 1) 现象 很简单,先看一个案例。双锁互相等待。 ```java package com.itheima.thread.buis; public class DeadLock { byte[] lock1 = new byte[0]; byte[] lock2 = new byte[0]; public void f1() throws InterruptedException { synchronized (lock1){ System.out.println("f1.lock1"); Thread.sleep(1000); synchronized (lock2){ System.out.println("f1.lock2"); } } } public void f2() throws InterruptedException { synchronized (lock2){ System.out.println("f2.lock2"); Thread.sleep(1000); synchronized (lock1){ System.out.println("f2.lock1"); } } } public static void main(String[] args) { DeadLock deadLock = new DeadLock(); Thread t1 = new Thread(()->{ try { deadLock.f1(); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread t2 = new Thread(()->{ try { deadLock.f2(); } catch (InterruptedException e) { e.printStackTrace(); } }); t1.start(); t2.start(); } } ``` 2)死锁的条件 - 互斥使用,即资源只能独享,一个占了其他都必须等。- - 不可抢占,资源一旦被占,就只能等待占有者主动释放,其他线程抢不走。 - 贪婪占有,占着一把锁不释放,同时又需要申请另一把。 - 循环等待,即存在等待环路,A → B → C → A。 3)排查 jdk自带工具 - jps + jstack pid 通过jps找到线程号,再执行jstack pid,找到 Found one Java-level deadlock:xxx - jconsole 执行jconsole,打开窗口,找到 线程 → 检测死锁 - jvisualvm 执行jvisualvm,打开窗口,双击线程pid,打开线程,会提示死锁,dump查看线程信息 4)如何避免 - 合理搭配锁顺序,如果必须获取多个锁,我们就要考虑不同线程获取锁的次序搭配 - 少用synchronized,多用Lock.tryLock方法并配置超时时间 - 对多线程保持谨慎。拿不准的场景宁可不用。线上一旦死锁往往正是高访问时间段。代价巨大 ### 3.1.2 饥饿线程 1)概念 如果一个线程因为 CPU 时间全部被其他线程抢走而始终得不到 CPU 运行时间,看一个案例: 读代码,猜一猜结果? ```java package com.itheima.thread.buis; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; public class HungryThread{ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); void write(){ readWriteLock.writeLock().lock(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } readWriteLock.writeLock().unlock(); } void read(){ readWriteLock.readLock().lock(); System.out.println("read"); readWriteLock.readLock().unlock(); } public static void main(String[] args) { HungryThread hungryThread = new HungryThread(); Thread t1 = new Thread(()->{ //不停去拿写锁,拿到后sleep一段时间,释放 while (true) { hungryThread.write(); } }); Thread t2 = new Thread(()->{ //不停去拿读锁,虽然是读锁,但是...看下面! while (true){ hungryThread.read(); } }); t1.setPriority(9); //优先级低! t2.setPriority(1); t1.start(); t2.start(); } } ``` 结果分析: read几乎不会出现,甚至一直都拿不到锁。处于饥饿状态 StampedLock ```java package com.itheima.thread.buis; import java.util.concurrent.locks.StampedLock; public class StampedThread { StampedLock lock = new StampedLock(); void write(){ long stamp = lock.writeLock(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } lock.unlock(stamp); } void read(){ //乐观读 long stamp = lock.tryOptimisticRead(); //判断是否有写在进行,没占用的话,得到执行,打印read if (lock.validate(stamp)){ System.out.println("read"); } } public static void main(String[] args) { StampedThread stampedThread = new StampedThread(); Thread t1 = new Thread(()->{ while (true) { stampedThread.write(); } }); Thread t2 = new Thread(()->{ while (true){ stampedThread.read(); } }); t1.setPriority(9); t2.setPriority(1); t1.start(); t2.start(); } } ``` 结果分析: read间隔性打出,提升了读操作的并发性 注意,StampedLock的使用有局限性! - 对于读多写少的场景 StampedLock 性能很好,简单的应用场景基本上可以替代 ReadWriteLock - StampedLock 在命名上并没有 Reentrant,StampedLock 是不可重入的! - StampedLock 的悲观读锁、写锁都不支持条件变量(无法使用Condition) 案例:StampedLock是不可重入锁! ```java package com.itheima.thread.buis; import java.util.concurrent.locks.StampedLock; public class StampedReetrant { public static void main(String[] args) { StampedLock lock = new StampedLock(); long stamp1 = lock.writeLock(); System.out.println(1); long stamp2 = lock.writeLock(); System.out.println(2); lock.unlock(stamp2); lock.unlock(stamp1); } } ``` 2) 饥饿线程产生原因 - 高优先级线程吞噬所有的低优先级线程的 CPU 时间。 - 锁始终被别的线程抢占。 3) 解决饥饿问题的方案 - 保证资源充足 - 避免持有锁的线程长时间执行,设置一定的退出机制 - 在高风险地方使用公平锁 ## 3.2 解决方案 ### 3.2.1 demo准备 1)boot项目 搭建springboot web项目,集成mybatis,druid连接池,rabbitmq(抢单用),swagger mysql,rabbitmq使用docker启动,操作参考如下 docker安装说明: https://www.runoob.com/docker/windows-docker-install.html 启动: ```shell #mysql #注意,D:/data/mysql是机器上要挂载的数据库存放目录 #需要在自己机器上创建这个路径 #同时要注意,windows下安装docker后,必须选中磁盘share,参考下面的截图 docker run ‐‐name mysql ‐v D:/data/mysql:/var/lib/mysql ‐p3306:3306 ‐e MYSQL_ROOT_PASSWORD=root ‐d daocloud.io/mysql:5.7.4 #rabbitmq #这个不需要挂盘 docker run ‐d ‐‐hostname my‐rabbit ‐‐name rabbit ‐p 15672:15672 ‐p5672:5672 daocloud.io/library/rabbitmq:3.6.10‐management ``` ![](https://gitee.com/adastevy/img/raw/master/img/20201023160328.png) 2)建表 - orders表,超时订单案例会用到 ```sql CREATE TABLE `orders` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL COMMENT '商品名称(demo使用,现实中为外键id)', `createtime` datetime DEFAULT NULL COMMENT '创建时间', `updatetime` datetime DEFAULT NULL COMMENT '更新时间', `invalid` int(11) DEFAULT NULL COMMENT '失效时间(单位秒)', `status` tinyint(4) DEFAULT NULL COMMENT '状态(0=新增,‐1=失效)', PRIMARY KEY (`id`) ); ``` - product表,库存和排序会用到 ```sql CREATE TABLE `product` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL COMMENT '商品名称', `num` int(11) DEFAULT '0' COMMENT '库存数', `price` float DEFAULT '0' COMMENT '价格', PRIMARY KEY (`id`) ); ``` - flashorder表,记录抢购后的单子 ```java CREATE TABLE `flashorder` ( `id` int(11) NOT NULL AUTO_INCREMENT, `productid` int(11) NOT NULL COMMENT '抢到的商品id', `userid` int(11) NOT NULL COMMENT '用户id,这里用抢购线程的id模拟', PRIMARY KEY (`id`) ); ``` 3)调试 启动springboot项目后,访问 http://localhost:8080/doc.html 进入swagger可以调试所有demo接口 ### 3.2.2 超时订单 #### 3.2.2.1 设计方案 1) 定时扫表:写定时任务轮询扫订单表,挨个比对时间,超时的更新掉 - 数据量小时,一般万级以内可以。几万到上亿的数据,显然不可取。 - 当前项目多处于分库分表模式,扫描需要扫多个表甚至跨库 2) 延迟消费:在下订单时,同时投放一个队列用于延迟操作,常见队列有三种 - DelayQueue,简单,不借助任何外部中间件,可借助db事务,down机丢失,同时注意内存占用 - 消息队列,设置延迟并监听死信队列,注意消息堆积产生的监控报警 - redis过期回调,redis对内存空间的占用 具体采取哪种延迟手段,根据企业实际情况,临时性的场合(比如某个抢购活动),可以采用方案一,系统化的订单取消,比如电商系统默认30分钟不支付取消规则,2号方案居多。为加深线程相关内容,本章节采用方案一 #### 3.2.2.2 实现 定义delay的对象,实现Delay接口 ```java package com.itheima.thread.order; import com.itheima.thread.mapper.Orders; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class OrderDto implements Delayed { private int id; private long invalid; public OrderDto(Orders o){ this.id = o.getId(); this.invalid = o.getInvalid()*1000 + System.currentTimeMillis(); } //倒计时,降到0时队列会吐出该任务 @Override public long getDelay(TimeUnit unit) { return invalid - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { OrderDto o1 = (OrderDto) o; return this.invalid - o1.invalid <= 0 ? -1 : 1; } public int getId() { return id; } public long getInvalid() { return invalid; } } ``` - 定义监控类,启动守护进程,如果有超时任务,提交进线程池 ```java package com.itheima.thread.order; import com.itheima.thread.mapper.Orders; import com.itheima.thread.mapper.OrdersMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Component public class OrderMonitor { @Autowired OrdersMapper mapper ; //延时队列 final DelayQueue queue = new DelayQueue(); //任务池 ExecutorService service = Executors.newFixedThreadPool(3); //投放延迟订单 public void put(OrderDto dto){ this.queue.put(dto); System.out.println("put task:"+dto.getId()); } //在构造函数中启动守护线程 public OrderMonitor(){ this.execute(); System.out.println("order monitor started"); } //守护线程 public void execute(){ new Thread(()->{ while (true){ try { OrderDto dto = queue.take(); System.out.println("take task:"+dto.getId()); service.execute(new Task(dto)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } //任务类 class Task implements Runnable{ OrderDto dto; Task(OrderDto dto){ this.dto = dto; } @Override public void run() { // 先从数据库中取出order来判断是否已经支付 Orders orders = new Orders(); orders.setId(dto.getId()); orders.setUpdatetime(new Date()); orders.setStatus(-1); System.out.println("cancel order:"+orders.getId()); mapper.updateByPrimaryKeySelective(orders); } } } ``` - 在add订单业务中,同时扔一份到queue,注意事务性 ```java @PostMapping("/add") @Transactional public int add(@RequestParam String name){ Orders order = new Orders(); order.setName(name); order.setCreatetime(new Date()); order.setUpdatetime(new Date()); //超时时间,取10‐20之间的随机数(秒) order.setInvalid(new Random().nextInt(10)+10); order.setStatus(0); mapper.insert(order); //事务性验证 // int i = 1/0; monitor.put(new OrderDto(order)); return order.getId(); } ``` ### 3.2.3 加/减库存 #### 3.2.3.1 设计方案 1)rabbitmq异步排队:使用rabbitmq先排队,请求到来时之间入队,界面显示排队中,消费端逐个消费,同时扣减库存,界面轮询查询结果。可能会出现排队半天抢完的情况。 2)库存预热:使用缓存或内存变量,活动开始前从db中提取库存值初始化,请求到来时直接扣减,及时提醒。可能出现一种感觉,活动刚开始就抢没了…… 实际企业秒杀场景下,方案1居多,为讲解多线程,本课程采用2 #### 3.2.3.2 实现 初始化库存缓存 ```java public Map load(){ products.clear(); List list = productMapper.selectByExample(null); list.forEach(p ‐> { products.put(p.getId(),new AtomicInteger(p.getNum())); }); return products; } ``` 抢购代码,开启10个线程,不停去抢,减库存,如果抢到,异步刷库。 ```java //抢购 @GetMapping("/go") @ApiOperation(value = "抢购") public void go(int productId){ for (int i = 0; i < 10; i++) { new Thread(()->{ int count = 0; long userId = Thread.currentThread().getId(); while (products.get(productId).getAndDecrement() > 0){ count++; //扔消息队列,异步处理 template.convertAndSend("promotion.order",productId+","+userId); } System.out.println(Thread.currentThread().getName()+"抢到:"+count); }).start(); } } ``` 注意分析控制台结果: - 前端线程立刻抢购得到结果,给出每个线程抢到的商品数 - 后面异步处理缓慢得到结果,操作db ### 3.2.4 价格排序 #### 3.2.4.1 设计方案 1)直接数据库sort,这种最典型 2)redis缓存zset获取,在商品列表缓存,web网站排序场景中常见 3)内存排序,有时候,需要复杂的运算和比较逻辑,sql sort操作表达不出来时,必须进入内存运算 本课程使用方案3,规则模拟按价格排序 #### 3.2.4.2 实现 1)针对内存排序,首先想到的是实现Comparable接口,在多线程知识背景下,可以运用所学的ForkJoin实现归并排序。 2)算法回顾 ![](https://gitee.com/adastevy/img/raw/master/img/20201023161504.png) 3)ForkJoinTask,任务实现算法 ```java package com.itheima.thread.order; import com.itheima.thread.mapper.Product; import java.util.ArrayList; import java.util.List; import java.util.concurrent.RecursiveTask; public class SortTask extends RecursiveTask> { private List list; public SortTask(List list){ this.list = list; } @Override //分拆与合并 protected List compute() { if (list.size() > 2){ //如果拆分的长度大于2,继续拆 int middle = list.size() / 2 ; //拆成两个 List left = list.subList(0,middle); List right = list.subList(middle+1,list.size()); //子任务fork出来 SortTask task1 = new SortTask(left); task1.fork(); SortTask task2 = new SortTask(right); task2.fork(); //join并返回 return mergeList(task1.join(),task2.join()); }else if (list.size() == 2 && list.get(0).getPrice() > list.get(1).getPrice()){ //如果长度达到2个了,但是顺序不对,交换一下 //其他如果2个且有序,或者1个元素的情况,不需要管他 Product p = list.get(0); list.set(0,list.get(1)); list.set(1,p); } //交换后的返回,这个list已经是每个拆分任务里的有序值了 return list; } //归并排序的合并操作,目的是将两个有序的子list合并成一个整体有序的集合 //遍历两个子list,依次取值,两边比较,从小到大放入新list //注意,left和right是两个有序的list,已经从小到大排好序了 private List mergeList(List left,List right){ if (left == null || right == null) return null; //合并后的list List total = new ArrayList<>(left.size()+right.size()); //list1的下标 int index1 = 0; //list2的下标 int index2 = 0; //逐个放入total,所以需要遍历两个size的次数之和 for (int i = 0; i < left.size()+right.size(); i++) { //如果list1的下标达到最大,说明list1已经都全部放入total if (index1 == left.size()){ //那就从list2挨个取值,不需要比较直接放入total total.add(i,right.get(index2++)); continue; }else if (index2 == right.size()){ //如果list2已经全部放入,那规律一样,取list1 total.add(i,left.get(index1++)); continue; } //到这里说明,1和2中还都有元素,那就需要比较,把小的放入total //list1当前商品的价格 Float p1 = left.get(index1).getPrice(); //list2当前商品的价格 Float p2 = right.get(index2).getPrice(); Product min = null; //取里面价格小的,取完后,将它的下标增加 if (p1 <= p2){ min = left.get(index1++); }else{ min = right.get(index2++); } //放入total total.add(min); } //这样处理后,total就变为两个子list的所有元素,并且从小到大排好序 System.out.println(total); System.out.println("------------------"); return total; } } ``` 4)调用过程 ```java package com.itheima.thread.order; import com.itheima.thread.mapper.Product; import com.itheima.thread.mapper.ProductMapper; import io.swagger.annotations.Api; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; @RestController @RequestMapping("/sort") @Api(value = "多线程排序测试demo") public class SortController { @Autowired ProductMapper mapper; @GetMapping("/list") List sort() throws ExecutionException, InterruptedException { //查商品列表 List list = mapper.selectByExample(null); //线程池 ForkJoinPool pool = new ForkJoinPool(2); //开始运算,拆分与合并 Future> future = pool.submit(new SortTask(list)); return future.get(); } } ```