# java-thread **Repository Path**: high_wind_yasuo/java-thread ## Basic Information - **Project Name**: java-thread - **Description**: this is my project to learn java thread - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 1 - **Forks**: 0 - **Created**: 2022-01-08 - **Last Updated**: 2022-12-24 ## Categories & Tags **Categories**: Uncategorized **Tags**: Thread ## README # Java并发编程实战 ## 第一部分:并发理论基础 ### 01|可见性、原子性和有序性问题,并发编程的源头 核心矛盾:就是cpu、内存和I/O设备的速度差异(内存和I/O设备的速度差异远远大于cpu和内存之间的速度差异)。为了平衡三者之间的速度差异,计算机体系结构、操作系统和编译程序都做出了改变,主要是: 1. cpu增加了缓存,以均衡和内存的速度差异 2. 操作系统增加了进程、线程,以便分时复用cpu,均衡cpu和I/O设备之间的速度差异 3. 编译程序优化指令的执行顺序,使得缓存能够得到更好的应用 CPU的时间局部性和空间局部性:**cpu认为当前加载的数据附近的数据被使用的概率很大,会一起加载** 同时这也引起了下面的问题: 1. 缓存导致的可见性问题: 一个线程对共享变量做的修改,可以被另一个线程看见,称为可见性问题 2. 线程切换带来的原子性问题: 任务切换(线程切换)是并发编程里面诡异Bug的源头之一。高级语言里面的一条指令往往需要多条cpu指令完成。一个或者多个操作在执行的过程中不被中断的特性称为原子性。 3. 编译优化带来的有序性问题: java领域经典的利用双重检查创建单例对象 ```java public class Singleton{ static Singleton instance; static Singleton getInstance(){ if(instance == null){ synchronized(Singleton.class){ if(instance == null){ instance = new Singleton(); } } return instance; } } } ``` ### 02 | Java内存模型:看Java如何解决可见性和有序性问题 什么是Java内存模型? java内存模型是个很复杂的规范,具体可以总结为以下几方面:volatile、synchrozied和final关键字以及六项Happens-Before规则 1. volitile原始语义:禁用cpu缓存--volitile能保可见性的原理在于底层使用了缓存一致性MESI。 2. Happens-Before原则:前面一个操作的结果对后续操作是可见的。准确的说:Happens-Before规则约束了编译器的优化行为,虽然允许编译器优化,但是编译器优化的过程一定要遵守Happens-Before原则 Heppens-Before约束: 1. 程序的顺序性规则 2. volitile变量规则:对一个volatile变量的写操作,Happens-Before于后续地这个volatile变量的读操作 volitile能够保证线程及时可见的问题,不加volitile不能保证及时 volatile解决不了原子性问题 原子性通过synchronized保证 **volitile的另一个作用是禁止指令重拍,也就是说volitile能同时解决可见性和有序性问题** volitile是如何保证有序性的呢? 3. 传递性:A Happens-Before B,B Happens-Before C,那么A Happens-Before C 3. 管程中的规则:对一个锁的解锁Happens-Before于后续对这个锁的加锁 6. 线程join()原则:主线程等到子线程完成后,子线程能够看到子线程的操作(针对于共享变量) 5. 线程start()原则:主线程启动子线程后,子线程能看到主线程在启动子线程之前的所有操作 final关键字:final修饰变量时,就是告诉编译器,该变量生而不变,可以随意优化 内存屏障: | 屏障类型 | 指令示例 | 说明 | | :--------: | :----------------------: | :----------------------------------------------------------: | | LoadLoad | Load1;LoadLoad;Load2 | 保证Load1的读取操作在Load2及后续读取操作之前执行 | | StoreStore | Store1;StoreStore;Store2 | 在store2及其后的写操作执行前,保证store1的写操作已经刷新到主内存 | | LoadStore | Load;LoadStore;Store2 | 在store2及其后的写操作执行前,保证load1的读操作已经读区结束 | | StoreLoad | Store1;StoreLoad;Load2 | 保证store1的写操作已经刷新到主内存之后,load2及其后的读操作才能执行 | 更多得关于内存屏障的内容: [https://blog.csdn.net/breakout_alex/article/details/94379895]: java内存屏障的原理与应用 ### 03| 互斥锁(上):解决原子性问题 原子性:一个或者多个操作在cpu执行的过程中不被中断的特性,称为原子性。原子性问题的源头是线程切换。如果禁止线程切换就能避免原子性问题。 互斥:同一时刻只有一个线程执行。 简易锁模型:枷锁-->临界区-->解锁。进入临界区的线程称为持有锁,执行完临界区的代码后,解锁。 简易锁模型中两个重要的问题: 1. 我们锁的是什么? 2. 我们保护的是什么? 简易锁改进后的模型:创建线程保护资源R的锁LR-->枷锁操作lock(LR)-->临界区(操作受保护的资源R)-->解锁unlock(LR) Java语言提供的锁技术:synchronized synchronized使用示例: ```java class X{ // 修饰非静态方法 synchronized void foo(){ // 临界区 } // 修饰静态方法 synchronized static void bar(){ // 临界区 } // 修饰代码块 Object object = new Object(); void baz(){ synchronized(obj){ // 临界区 } } } ``` Java synchronized的隐式规则: 1. 当修饰静态方法的时候,锁定的是当前类的class对象 2. 当修饰非静态方法的时候,锁定的是当前实例this ### 04|互斥锁(下):如何用一把锁保护多个资源? 当要保护多个资源时,首先要区分这些资源是否存在关联关系 1. 保护没有关联关系的多个资源 没有关联关系的资源比如银行业务中的账户余额和密码相关的操作。这种事比较简单的,各自管理各自的就行。示例代码如下: ```java class Account{ // 锁:保护账户余额 private final Object balLock =new Object(); // 账户余额 private Integer balance; // 锁:保护账户密码 private final Object pwLock = new Object(); // 账户密码 private String password; // 取款 void withdraw(Integer amt){ synchronized(ballLock){ if(this.balance > amt){ this.balance -= amt; } } } // 查看余额 Integer getBalance(){ synchronized(balLock){ return balance; } } // 更改密码 void updatePassword(String pw){ synchronized(pwLock){ this.password = pw; } } // 查看密码 String getPassword(){ synchronized(pwLock){ return password; } } } /* 示例中用两把锁锁住了两个不同的保护的对象。也可以用this锁来锁住上面的所有方法,但是程序的性能会很差。用不同的锁锁住不同的对象,对资源进行精细化管理,能够提升性能。这种锁就叫作细粒度锁 */ ``` 2. 保护有关联的多个资源 如果多个资源是有关联的,情况变得复杂一些。例如因为银行业务里面的转账操作,账户A减少100元,账户B增加100元。 ```java class Account{ private int balance; // 转账 void transfer(Account target, int amt){ if(this.balance > amt){ this.balance -= amt; target.balance += amt; } } } ``` 分析发现,锁能覆盖所有受保护的资源就行。但是上面的代码中的锁其实只能锁住this.balance。经过优化后的代码如下: ```java /* * 可以让所有的对象都持有一个唯一性的对象,这个对象在创建Account时传入。 */ class Account{ private Object lock; private int balance; private Account(); // 创建Account时传入同一个对象 public Account(Object lock){ this.lock = lock; } // 转账 void transfer(Account target, int amt){ // 此处检查所有对象共享的锁 synchronized(lock){ if(this.balance > amt){ this.balance -= amt; target.balance += amt; } } } } ``` 经过优化后的代码确实能解决问题,但是有点小瑕疵,他要求在创建Account对象的时候必须传入同一个对象,如果创建对象的时候传入的不是同一个对象,同样会导致问题。实际生产的时候,传入共享的lock很难。为了解决这个问题,一个更好的思路是使用Account.class作为共享的锁。 ```java class Account{ private int balance; // 转账 void transfer(Account target, int amt){ synchronized(Account.class){ if(this.balance > amt){ this.balance -= amt; target.balance += amt; } } } } ``` 锁示意图 ### 05|一不小心就死锁了,怎么办? 04中,用来解决银行业务的方法理论上可行,但是实际运用中,所有的转账操作都将变成串行,性能是完全不能接受的。 类比于古代的账户形式,一个账户对应一个账本,转账的时候会出现以下几种情况: 1. 文件架上恰好有转出账本和转入账本,那就同时拿走 2. 文件架上只有一个账本,先拿文件架上 有的,等待另一个送回来 3. 文件架子上无账本,等着被送回来 这个步骤可以图形化如下: 锁 ```java class Account{ private int balance; void transfer(Account target, int amt){ // 锁定转出账户 synchronized(this){ // 锁定转入账户 synchronized(target){ if(this.balance > amt){ this.balance -= amt; target.balance += amt; } } } } } /* * 上面代码中加的锁是细粒度锁 * 使用细粒度锁可以提高并行度 */ ``` 上面的代码使用了细粒度锁,但是细粒度锁在提升性能的同时,是有代价的,就是可能会出现的死锁问题。 死锁:一组互相竞争资源的线程因互相等待,导致一直阻塞的现象 死锁发生的必要条件: 1. 互斥,共享资源X和Y只能被一个线程占用; 2. 占有且等待,线程T1已经取得共享资源X,等待共享资源Y的时候,不释放共享资源X; 3. 不可强张,其他线程不鞥强行抢占T1占有的资源; 4. 循环等待,线程T1和等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待; 根据死锁形成的条件,我们只破坏其中一个,就可以避免死锁的发生。 其中,互斥这个条件是没法破坏的,因为我们用锁的原因就是为了追求互斥。但是其他三个条件是可以破坏的。 1. 对于"占用且等待"这个条件,一次性申请所有资源,这样就不用等待了 2. 对于"不可抢占"这个条件,占用部分资源的线程进一步申请其他资源时,如果申请不到,可以主动释放它占有的资源,这样不可抢占这个条件就破坏了。 3. 对于"循环等待"这个条件,可以靠按序申请资源来预防。 其中,破坏不可抢占条件看上去很简单,核心就是能够主动释放占有的资源。这一点通过synchronized是做不到的。因为synchronized申请不到资源的时候,就会直接进入阻塞状态,啥都不干,也不释放已经占有的资源。 ### 06|用"等待-通知"机制优化循环等待 在循环等待的场景下,最好的方案是:如果线程要求的条件不满足,则线程阻塞自己,进入等待状态;如果线程要求的条件满足后,通知等待的线程重新执行。使用阻塞的方式能避免循环等待消耗cpu的问题。 java语言对等待-通知机制的支持: java语言中可以通过synchronized配合wait(),notify(),notifyAll()方法实现等待-通知机制。notify()通知等待线程的时候,告诉等待线程的是条件曾经满足过。实际上,notify只能保证在通知时间点上,条件是满足的。而被通知线程的执行时间点和通知的时间点基本上不会重合。 小试牛刀:一个更好的资源分配器:如何解决一次性申请转出账户和转入账户的问题。在这个等待-通知机制中,我们需要考虑以下四个因素。 1. 互斥锁:用this作为互斥锁 2. 线程要求的条件:转出账户和转入账户都没有被分配过 3. 何时等待:线程要求的条件不满足就等待 4. 何时通知:当有线程释放账户时就通知 ```java // 条件满足的范式(经典做法) // while(条件不满足){ // wait(); // } class Allocator{ private List als; // 一次性申请所有资源 synchronized void apply(Object from, Object to){ // 经典写法 while(als.contains(from) || als.contains(to)){ try{ wait(); } catch(Exception e){ // 异常处理逻辑 } } als.add(from); als.add(to); } // 归还资源 sychronized void free( Object from, Object to){ als.remove(form); als.remove(to); notifyAll(); } } ``` notify和notifyAll的区别:notify()会随机地通知等待队列中的一个线程,而notifyAll()会通知等待队列中所有的线程。notify()可能会导致有些线程永远都不会被通知到。 ```tex 课后思考:wait()和sleep()的区别是什么? Answer: wait()方法与sleep()方法的不同之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。Object的方法。 sleep()方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是sleep()方法不会释放“锁标志”,也就是说如果有synchronized同步块,其他线程仍然不能访问共享数据。Thread中的方法。 ``` ### 07|安全性、活跃性以及性能问题 并发编程中需要注意的问题很多,总结起来有以下三个方面:安全性问题、活跃性问题和性能问题 安全性问题:我们经常说这个不是线程安全的,这个类是线程安全的。那么什么是线程安全呢?本质上就是正确性,就是**程序按照我们期望的方式执行。**其实只有一种情况会出现多线程问题:**存在共享数据并该数据会发生变化,通俗讲就是有多个线程会同时读写同一个数据。** 竞态条件:**程序的执行结果依赖程序执行的顺序** 活跃性问题:所谓的活跃性问题,指的是某个操作无法执行下去。常见的"死锁"就是一种典型的活跃性问题。除了死锁外,还有两种情况,分别是"活锁"和"饥饿"。 活锁:**线程没有发生阻塞,但是任然执行不下去**。 饥饿:**线程因为无法访问需要的资源而无法执行下去**。有过线程优先级"不均",cpu繁忙的情况下,优先级低的线程得到执行的机会很小,就可能发生线程"饥饿"。公平锁方案:先来后到的方案,线程的等待是有顺序的,排在等待队列前面的线程会优先获得资源 性能问题:锁的过度使用可能会导致性能问题。过多的锁导致串行化的范围变大。就不能发挥多线程的优势。假设串行化百分比是5%,使用多核多线程相比于单核单线程能提速多少呢? $$ 阿姆达尔定律: S = 1 / ((1 - p ) + p / n)\\ n-->cpu的核数,\\ p-->并行百分比, $$ 也就是说,如果串行百分比是5%,那无论增加多少个cpu,都不会超过20倍的性能提升。 所以使用锁的时候一定要注意对性能的影响那么如何才能避免锁带来的性能问题呢?java sdk包中之所以有那么多的东西,有很大一部分原因就是为了提升在某个特定领域的性能。 从方案层面来讲,可以如下解决问题: 1. 既然锁会带来性能问题,最好的方案就是使用无锁算法和无锁的数据结构。如:线程本地存储(Therad Local Storage, TLS)、写入时复制(Copy-on-write)、乐观锁等;java并发包中的原子类是一种无锁的数据结构; 2. 减少锁持有的时间 性能方面的并发指标: 1. 吞吐量:单位时间内能处理的请求数量。越大说明性能越好。 2. 延时:从发出请求到收请求的响应时间。越小越好。 3. 并发量:指的是能同时处理请求的数量。一般来说,随着的并发量的增加,延时也会增加。所以延时这个指标,一般是基于并发量来说的,比如并发量是1000的时候,延时是50ms。 ### 08|管程:并发编程的万能钥匙 管程技术是解决并发问题的万能钥匙,Java1.5之前提供的唯一的并发原语就是管程。 管程:**sychronized关键字和wait(),notify(),notifyAll()这三个方法都是管程的组成部分。管程和信号量是等价的,所谓等价指的是管城能够实现信号量,信号量也能实现管程**。不过管程更容易使用,所以Java选择了管程。**管程指的是管理共享变量以及对共享变量的操作过程,让它们支持并发**。 sychronized详解: 1. java的管程实现 2. 有三种使用方法,分别如下: 1. 修饰静态方法:锁的是class文件,在字节码上加了标识ACC_SYNCHRONIZED 2. 修饰普通方法:锁的是this对象,在字节码上加了ACC_SYNCHRONIZED标识 3. 修饰同步代码块:锁的是同步代码块,在开始的位置加上了monitorenter,结束的位置加了monitorexit。没遇到一次monitorenter,计数器加1。 3. synchronized是非公平锁,要想实现公平锁,可以使用ReentranLock。 MESA模型: 在并发编程领域,有两个核心问题:一个是互斥,同一个时刻只允许一个线程访问共享资源;另一个是同步,线程之间的通信和协作问题。MESA管程模型示意图如下: avatar 管程模型中,共享变量和对共享变量的操作都被封装起来。多个线程试图进入管程内部时,只允许一个线程进入,其他线程在入口处等待。同时,每一个条件变量都对应一个等待队列。条件变量和条件变量等待队列的作用是解决线程同步问题。 MESI模型(缓存一致性协议) volitile能保持即时可见性,就是通过缓存一致性。 缓存是一致性是一种规范,MESI是该规范的一种实现方式,流程如下。可以参考多核cpu的架构图。 多核cpu架构图 MESI协议:在早期的多核cpu中,为了保证缓存一致,采用的是总线锁的方案。这种方案的缺点也很明显,就是性能实在太差。只要涉及到变量的访问,就会加锁。为了提高cpu的使用率。引入了缓存一致性协议,缓存一致性协议的实现方式有很多,MESI是最常用的一种。 1. MESI是共享变量的四种不同的状态,分别是M(修改)、E(独占)、S(共享)和I(失效) 2. cpu通过监听总线中的消息来变更变量的状态 3. 一个cpu将内存中的变量读到缓存中时,如果其他cpu没有引用该变量的副本,则该变量在当前cpu中的状态被标记为E;此时如果另一个cpu读了该变量,则两个cpu中的变量状态都将被更新为S 4. 如果多个cpu核只是读变量的话,问题不大,但是如果是写,则会触发如下过程;两个cpu同时对各自的缓存行进行加锁并发出消息,其中只有一个能够加锁成功。加锁成功的cpu修改变量,并将其标记为M状态;未能加锁的则将自己的变量更新为I状态,并读取该变量在内存中的最新值。(这就是为什么volitile能够保证可见性) 5. 如果两个cpu同时加锁,此时将采用总线裁决 下面的代码用管程实现了一个线程安全的阻塞队列。阻塞队列有两个操作,分别是入队和出队操作,两个方法都是先获取互斥锁。 ```java /* * 1. 对于阻塞队列的入队操作,如果阻塞队列已满,就需要等待直到阻塞队列不满,所以代码中使用了notFull.await() * 2. 对于阻塞出队操作,如果阻塞队列为空,就需要等待直到阻塞队列不空,所以就用了notEmpty.await() * 3. 如果入队成功,那么阻塞队列就不空了,就需要通知条件变量:阻塞队列不空notEmpty()对应的等待队列 * 4. 如果出队成功,那么阻塞队列就不满了,就需要通知条件变量:阻塞队列不满notFull()对应的等待队列 */ public class BlockedQUeue{ final Lock lock = new ReentrantLock(); // 条件变量:队列不满 final Condition notFull = lock.newCondition(); // 条件变量:队列不空 final Condition notEmpty = lock.newCondition(); // 入队 void enq(T x){ lock.lock(); try{ while(队列已满){ // notFull.await(); } // 省略入队操作。。。 // 入队后,通知可以出队 notEmpty.signal(); }finally{ lock.unlock(); } } // 出队 void deq(){ lock.lock(); try{ while(队列已空){ // 等待队列不空 notEmpty.await(); } // 省略出队操作。。。 // 出队后,通知可入队 notFull.signal(); } } } ``` 对于MESA管程来说,有一个编程范式:**在一个while循环里面调用wait()**。 ````java while(条件不满足){ wait()l; } ```` notify()的使用条件: 1. 所有等待线程拥有相同的等待条件 2. 所有等待线程唤醒后,执行相同的操作 3. 只需要唤醒一个线程 ### 09|Java线程(上):Java线程的生命周期 在Java领域,实现并发编程的主要手段就是多线程。java语言的线程本质上就是操作系统的线程,它们是一一对应的。 通用性的线程生命周期: 通用的线程生命周期有五个状态,分别入下图所示:初始状态、可运行状态、运行状态、休眠状态和终止状态。 通用线程生命周期 java中线程的生命周期: 相比于通用线程模型的生命周期,java做了如下的改动。合并了可运行状态和运行状态、细化了休眠状态。最终,java的线程的生米给你周期有以下六个状态:NEW(初始化状态)、RUNNABLE(可运行/运行状态)、BLOCKED(阻塞状态)、WAITING(无时限等待)、TIMED-WAITING(有时限等待)TERMINATED(终止状态)。在操作系统层面,其实Java的BLOCKED、WAITING、TIMED-EAITING是一种状态。就是上面提到的休眠状态。也就是说,Java只要处于上述的三种状态之一,那么这个线程就永远没有CPU的使用权。 java线程生命周期 线程状态转换原因: 1. **RUNNABLE和BLOCKED的状态转换** 线程等待synchronized的隐式锁 2. **RUNNABLE与WAITING状态的切换** 有3种场景会触发这种转换: 1. 获得synchronized隐式锁的线程,调用无参数的Object.wait()方法 2. 调用无参数的Thread.join()方法 3. 调用LockSupport.park()方法。其中的LockSupport对象时Java并发包中锁的实现基础。LockThread.unpark(Thread thread)可唤醒目标线程 3. **RUNNABLE和TIMED-WAITING的状态转换** 有5种场景会触发这种转换: 1. 调用带超时参数的hread.join(long millis)方法 2. 获得synvhronized隐式锁的线程,调用带超时参数的Object.wait(long timeout)方法 3. 调用带超时参数的Thread.join(long millis)方法 4. 调用带超时参数的LockSupport.parkNanos(Object blocker, long deadline)方法 5. 调用带超时参数的LockSupport.parkUnitil(long deadline) 方法 TIMED-WAITING和WAITING的区别仅仅是触发条件多了超时参数 4. **NEW到RUNNABLE状态** java对象创建出阿里就是new状态。继承Thread和实现Runnable接口都可以创建线程。new状态的线程,不会被操作系统调度,因此不会执行。只要调用线程对象的start方法就行了。 5. **从RUNNABLE状态到TERMINATED状态** 1. 线程执行完run()方法 2. 执行run()方法的过程中出现异常 3. 强制中断run()方法-->stop()或者是interrupt()方法 ### 10|创建多少线程才是合适的? 对于I/O密集型计算的场景,最佳线程数的计算公式如下: $$ 最佳线程数 = CPU核数[1 + (I/O耗时 / CPU耗时)] $$ ### 11 | Java线程(下):为什么局部变量是线程安全的? 局部变量是不存在数据竞争的:在cpu的眼里,是没有方法的概念的,cpu眼里只有一条条的指令。局部变量是放在调用栈里面的。所以每一个线程的方法都有自己的栈帧,所以局部变量是线程安全的。 ### 12|如何用面向对象的思想写好并发程序? 可以从下面的三个策略入手: 1. 封装共享变量 2. 识别共享变量间的约束条件 3. 制定并发访问策略 13|理论基础模块热点问题答疑 ## 第二部分:并发工具类 ### 14|Lock和Condition(上):隐藏在并发包中的管程 Java SDK并发包的内容很丰富,最核心的可以认为是对管程的实现。Java SDK并发包通过Lock和Condition两个接口实现管程。Lock解决互斥问题,Condition解决同步问题。 对于死锁问题,破坏的方法之一是破坏不可抢占条件。重新设计一把锁能实现这个功能,可以有如下三种方案: 1. 能够响应中断 2. 支持超时 3. 非阻塞地获取锁 上面的三种方案能全面弥补synchronized的问题。对应于Lock中的三个方法: ```java void lockInterrupted() throws InterruptedException; boolean tryLock(long time, TimeUnit unit) throws InterruptedException; boolean tryLock(); ``` 可重入锁:线程可以重复获取的同一把锁 可重入函数:指的是多个线程可以同时调用该函数,且每一个线程都能获取到正确结果。可重入函数是线程安全的。 公平锁和非公平锁,**锁都对应着一个等待队列,如果一个线程没有获得锁,就会进入等待队列,当有线程释放锁的时候,就需要从等待队列中唤醒一个等待的线程。如果是公平锁,唤醒的策略就是谁等待的时间长,就唤醒谁,很公平;如果是非公平锁,则不提供这个公平保证,有可能等待时间短的线程反而先被唤醒**。ReentrantLock的构造函数有两种,分别如下: ```java // 默认构造函数,实现非公平锁 public ReentrantLock(){ sync = new NonfairSync(); } // 根据公平策略实现的锁 public ReentrantLock(boolean fair){ sync = fair ? new FairSync() : new NonFairSync(); } ``` 用锁的最佳实践: 1. 永远只在更新对象的成员变量时加锁 2. 永远只在访问可变的成员变量时加锁 3. 永远不在调用其他对象的方法时加锁 ### 15|Lock和Condition(下):Dubbo如何用管程实现异步转同步 Condition实现了管程模型里面的条件变量:Java语言内置的管程中只有一个条件变量,Lock&Condition实现的管程支持多个条件变量。很多并发场景下,实现多个条件变量能让程序的可读性更好。eg:实现一个阻塞队列,需要实现两个条件变量 ```java // 一个阻塞队列,需要满足两个条件,一个是队列不空,一个是队列不满 public class blockedQueue { final Lock lock = new ReentrantLock(); // 条件变量:队列不满 final Condition notFull = lock.newCondition(); // 条件变量:队列不空 final Condition notEmpty = lock.newCondition(); // 入队操作 void enq(T x){ lock.lock(); try{ while(队列已满){ // 等待队列不满 notFull.await(); } // ...入队操作 // 通知可以出队 notEmpty.singal; }final { lock.unlock(); } } // 出队操作 void deq(){ lock.lock(); try{ while(队列已空){ // 等待队列不空 notEmpty.await(); } // ...出队操作 // 通知可入队 notFull.singal(); }final{ lock.unlock(); } } } ``` Dubbo源码分析: TCP协议本身就是异步的。在RPC调用中,在TCP协议层面,发送完RPC请求后,线程是不会等待RPC的响应结果的。但是实际使用的的大多数的RPC都是同步的。其实就是框架给做了异步转同步的工作。对于一个简单的RPC调用,默认情况下,sayHello()方法是个同步方法,也就是说,执行service.sayHello("dubbo")的时候,线程会停下来等结果。 ```java DemoService service = new DemoService; String message = service.sayHello("dubbo"); System.out.println(message); ``` ### 16|Semaphore:如何快速实现一个限流器? 线程能不能执行,要看Semaphore是不是允许,Semaphore就是信号量。 信号量模型:一个计数器、一个等待队列、三个方法。其中,计数器和等待队列是对外透明的,只能通过信号量的三个方法来访问,三个方法分别是init(),down(),up()。其中: 1. init:设置信号量的初始值 2. down:计数器的值减1,如果此时计数器的值小于1, 则当前的线程将会被阻塞,否则当前线程继续执行 3. up:计数器的值加1, 如果此时的计数器的值等于或者大于1,则唤醒等待队列中的一个线程,并从等待队列中移除。 在java SDK中,信号量模型是由concurrent包中的Semaphore实现的。代码化的信号量的模型如下: ```java /** * 在java sdk包中,下面的down和up方法分别对应的是acquire和release */ class Semaphore{ // 计数器 int count; // 等待队列 Queue queue; // 初始化操作 Semaphore(int c){ this.count = c; } // void down(){ this.count--; if(this.count < 0){ // 将当前线程插入等待队列 // 阻塞当前线程 } } void up(){ this.count ++; if(this.count <= 0){ // 移除等待队列中的某个线程T // 唤醒线程T } } } ``` 如何使用信号量: 用累加器的例子来说明信号量的使用: ```java static int count; static final Semaphore s= new Semaphore(1); // 用信号量保证互斥 static void addOne(){ s.acquire(); try{ count += 1; }finally{ s.release; } } ``` 实现一个互斥锁(如上),其实仅仅是Semaphore的一个小功能,它有一个Lock很难实现的功能,就是-->Semaphore允许多个线程同时访问临界区。 ### 17|ReadWriteLock:如何快速实现一个完备的缓存? java包很庞一个原因是对不同的场景进行了优化,提升了易用性。其中,针对读多写少的场景,Java SDK并发包提供了读写锁--ReadWriteLock。读写锁的三个原则: 1. 允许多个线程同时读共享资源 2. 只有一个线程能够写共享资源 3. 如果一个线程正在写共享资源,其他线程不能进行读操作 使用读写锁快速实现一个缓存 ```java class Cache{ final Map m = new Hashmap<>(); final ReadWriteLock rwl = new ReentrantReadWriteLock(); // 读锁 final Lock r = rwl.readLock(); // 写锁 final Lock w = rwl.writeLock(); // 读缓存 V get(K key){ r.lock(); try{ return m.get(key); }finally{ r.unlock(); } } V put(K key, V value){ w.lock(); try{ return m.put(key, value); }finally{ w.unlock(); } } } ``` 使用缓存首先要解决的问题就是初始化。缓存的初始化可以时一次性加载,也可以使用按需加载(懒加载)的方式。按需加载的实现方式如下: ```java class Cache{ final Map m = new Hashmap<>(); final ReadWriteLock rwl = new ReentrantReadWriteLock(); final Lock r = rwl.readLock(); final Lock w = rwl.writeLock(); V get(K key){ V v = null; // 读缓存 r.lock(); try{ v = m.get(key); }finally{ r.unlock(); } // 缓存中存在,返回 if(null != v){ return v; } // 缓存中不存在,查询数据源 w.lock(); try{ // 再次验证 // 其他线程可能已经查询过数据库 v = m.get(key); if(null == v){ v = queryFromDB(); // 缓存到map中 m.put(key, v); } }finally{ w.unlock(); } }return v; } ``` **ReadWriteLock不支持读锁升级到写锁** ### 18|StampedLock:有没有比读写锁更快的锁 StampedLock的性能比读写锁还要高。StampedLock和ReadWriteLock的区别: **ReadWriteLock支持两种模式:一种是读锁,一种是写锁。StampedLock支持三种模式,分别是写锁、悲观读锁和乐观读锁。其中,写锁和悲观读锁和ReadWriteLock中的写锁和读锁和语义非常相似。允许多个线程同时获取悲观读锁,只允许一个线程获取写锁。写锁和悲观读锁是互斥的。不同的是,不同的是写锁和悲观读锁在加锁成功之后,都会返回一个stamp,解锁的时候需要传入这个stamp** ```java final StampedLock sl = new StampedLock(); // 获取/释放悲观读锁示意代码 long stamp = sl.readLock(); try{ // 业务相关 }finally{ sl.unlockRead(stamp); } // 获取/释放写锁示意代码 long stamp = sl.writeLock(); try{ // 业务代码 }finally{ sl.unlockWrite(stamp); } ``` StampedLock性能比ReadWriteLock 性能还要好的原因之一是**StampedLock支持乐观读的方式:ReadWriteLock支持多个线程同时读。但是当多个线程同时读的时候,不会允许写操作。但是StampedLock支持乐观读的方式-->允许一个线程获取写锁,也就是说不是所有的写操作都会被阻塞。** ```java class Point{ private int x,y; final StampedLock sl = new StampedLock(); // 计算到原点的距离 int distanceFromOrigin(){ // 乐观读 long stamp = sl.tryOprimisticRead(); // 读入局部变量 // 读的过程数据可能被修改 int curX = x, curY = y; // 判断执行操作期间 // 是否存在写操作,如果存在,则sl.validate返回false if(!sl.valudate(stamp)){ // 升级为悲观读锁 stamp = sl.readLock(); try{ curX = x; curY = y; }finally{ // 释放悲观锁 sl.unlockRead(stamp); } } return Math.sqrt( curX * curX + cury & curY ) } } ``` StampedLock使用的注意事项:对于读多写少的场景,StampedLock的性能很好,简单的应用场景基本上可以替代ReadWriteLock,但是StampedLock的功能仅仅是ReadWriteLock的子集,使用的时候需要注意: 1. StampedLock是不可重入锁 2. StampedLock的悲观读锁、写锁都是不支持条件变量的 3. 如果线程阻塞在StampedLock的readLock或者是writeLock中,,此时调用该阻塞线程的interrupt方法,会导致cpu飙升。**所以,使用StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可以中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()**代码如下: ```java final StampedLock lock = new StampedLock(); Thread T1 = new Thread(() -> { // 获取写锁 lock.writeLock(); // 永远阻塞在此处 不释放写锁 LockSupport.park(); }); T1.start(); // 保证T1获取写锁 Thread.sleep(100); Thread T2 = new Thread(() -> { // 阻塞在悲观读锁 lock.readLock() }); T2.start(); // 保证T2阻塞在读锁 Thread.sleep(100); // 中断线程T2 // 会导致T2所在的cpu飙升 T2.interrupt(); T2.join; ``` ### 19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致? 对于串行化的系统,优化性能首先想到的是利用多线程并行处理。以对账为例: ```java while(存在未对账订单){ // 查询未对账订单 Thread T1 = new Thread(() -> { pos = getPOrders(); }); T1.start(); // 查询派送单 Thread T2 = new Thread(() -> { dos = getDOrders(); }); T2.start(); T1.join(); T2.join(); // 执行对账操作 diff = check(pos, dos); // 保存差异 save(diff); } ``` 上面的代码中,while每次都会新建线程,是一个很耗费资源的过程,利用线程池进行如下优化: ```java // 创建两个线程对的线程池 Executor executor = Executors.newFixedThreadPool(2); while(存在未对账订单){ // 查询未对账订单 executor.execute(() -> { pos = getPOrders(); }); // 查询派送单 exevutor.execute(() -> { dos = getDOrders(); }); // 实现对两个线程对的等待 // 执行对账操作 diff = check(pos, dos); // 保存差异 save(diff); } ``` 问题就出在如何**实现对线程的等待**,一个最直接的办法是计数器,初始值设置为2(两个线程),执行完pos = getPOrders()时减1,执行完dos=getDOrders()之后再减1。主线程等到计数器为0,说明两个操作都结束了,继续执行下面的操作。基于这个思想,java并发包提供了CountDownLatch。 CountDownLatch代码实现 ```java // 创建2个线程的线程池 Executor executor = Executors.newThreadPool(2); while(存在未对账账单){ // 计数器初始化为2 CountDownLatch latch = new CountDownLatch(2); // 查询未对账订单 executor.execute(() -> { pos = getPOrders(); latch.countDown(); }); // 查询派送单 executor.execute(() -> { dos = getDOrders(); latch.countDown(); }); // 等待两个查询操作结束 latch.await(); // 执行对账操作 diff = check(pos, dos); // 差异写入差异库 save(diff); } ``` 进一步优化性能,上面的代码的优化余地-->两个查询操作和对账操作也是可以并行执行的,执行对账操作的同时,再去查询下一轮的查询操作。 用Cyclicbarrier实现线程同步 ```java /** * 在下面的代码中, 首先创建了一个计数器的初始值为2的CyclicBarrier,需要注意的是创建CyclicBarrier的时候,同时传入了一个回调函数,当计数器为0的时候,调* 用这个回调函数。 * 同时,CyclicBarrier有自动充值的功能。方便好用 */ // 订单队列 Vector

pos; Vector dos; // 执行回调的线程池 Executor executor = Executors.newFixedThreadPool(1); final CyclicBarrier barrier = new CyclicBarrier(2, () -> { executor.execute(() - > check()); }); void check(){ P p = pos.remove(0); D d = dos.remove(0); // 执行对账操作 diff = check(p, d); // 差异写入差异库 save(diff); } void checkAll(){ // 循环查询订单库 Thread T1 = new Thread(() -> { while(存在未对账订单){ // 查询订单库 pos.add(getPOrders()); // 等待 barrier.await(); } }); T1.start(); // 循环查询运单库 Thread T2 = new Thread(() -> { while(存在未对账订单){ // 查询运单库 dos.add(getDOorders()); // 等待 barrier.await(); } }); T2.await(); } ``` CountDownLatch和Cyclicbarrier的区别:**1. CountDownLatch主要用来解决一个线程等待多个线程的场景,CyclicBarrier是一组线程之间相互等待;2. CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到0,再有现成调用await(),该线程会直接通过,CyclicBarrier的计数器是可以循环利用的,具备自动重置的功能;3. Cyclicbarrier还可以设置回调函数** ### 20|并发容器,都有哪些坑需要我们填? 同步容器及注意事项: java中的容器:List、Map、Se和Queue。如何将非线程安全的容器变成线程安全的容器?一个简单的思路是,把一个非线程安全的容器封装在一个对象内部,然后控制好访问路径就可以了。以ArrayList为例: ```java SafeArrayList { // 封装ArrayList List c = new ArrayList<>(); // 控制访问路径 synchronized T get(int inx){ return c.get(idx); } synchronized void add(int idx, T t){ c.add(idx, t); } synchronized boolean addIfNotExist(T t){ if(!c.contains(t)){ c.add(t); return true; } return false; } } ``` 注意:**组合操作需要注意竞态条件问题,即便每个操作都能保证原子性,也不能保证组合操作的原子性** 并发容器及其注意事项:Java1.5之前的线程安全的容器,主要指的就是同步容器。不过同步容器的问题就是性能太差,串行度高,1.5之后提供了性能更高的容器,我们一般称为并发容器。并发容器的类别如下: java并发容器 并发容器的关键点如下: 1. List:List里面只有一个实现类就是CopyOnWriteArrayList。写时复制的队列,好处是读的时候是完全无锁的。 2. Map:Map的两个实现分别是 1. ConcurrentHashMap:key无序 2. ConcurrentSkipListMap:key有序 它们的key和value都不能为空。否则会抛出异常,下表是Map的实现类对于key和value的要求 | 聚合类 | Key | Value | 是否线程安全 | | :------------------: | :--------: | :----------: | :----------: | | HashMap | Nullable | Nullable | 否 | | TreeMap | 不允许Null | Nullable | 否 | | HashTable | 不允许Null | 不允许为Null | 是 | | ConcurrntHashMap | 不允许Null | 不允许Null | 是 | | ConcurrntSkipListMap | 不允许Null | 不允许Null | 是 | 3. Set:Set的两个实现类是CopyOnWriteArrayListSet和ConcurrentSkipListSet。 4. Queue:并发包中Queue这类容器是最复杂的。从两个方面进行理解 1. 阻塞和非阻塞 2. 单端和双端 java并发包中阻塞队列使用Blocking关键字标识,单端队列使用Queue标识,双端使用Deque进行标识。 1. 单端堵塞队列:事项有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlokingQueue和DelayQueue。 2. 双端阻塞队列:LinkedBlockingDeque 3. 单端阻塞队列:ConcurrentLinkedQueue 4. 双端堵塞队列:ConcurrentLinkedDeque ### 21|原子类:无锁工具类的典范 无锁方案最大的好处就是性能,在解决问题的同时,又没有引入新的问题,nice。 无锁方案的实现原理:CPU为了解决并发问题,提供了CAS指令(Compare and swap)。cas指令包含3个参数,共享变量的内存地址A、用于比较的值B和共享变量的新值C,只有当A处的值等于B时,才会将A处的值更新为C。作为一条CPU指令,CAS本身就是能够保证原子性的。CAS模拟代码 ```java class SimulatedCAS{ int count; synchronized int cas( int expect, int new value){ // 读当前的count的值 int curvalue = count; // 比较目前count值是否==期望值 if(curValue == expect){ // 如果是,则更新count的值 count = newValue; } // 返回写入的新值 return curValue; } } ``` 使用CAS解决并发问题,一般都会伴随着自旋(循环尝试)。实现一个线程安全的count += 1操作,CAS自旋的实现方案如下设计 ```java class SimulatedCAS{ volatile int count; // 实现count += 1 adOne(){ do{ newValue = count + 1; }while(count != cas(count, newValue)) } // 模拟实现CAS,仅用来帮助理解 synchronized int cas(int expect, int newValue){ // 读当前的count的值 int curValue = count; // 比较目前count值是否==期望值 if(curValue == expect){ // 如果是,则更新count的值 count = newValue; } // 返回写入前的值 return curValue; } } ``` CAS这种无锁方案,完全没有加锁解锁操作性能好了很多,但是这个方案中会存在一个问题,那就是ABA问题。使用CAS方案的时候,一定要check一下。 ```java public final long getAndAddLong( Object o, long offset, long delta){ long v; do { // 读取内存中的值 v = getLongVolatile(o, offset); } while (!compareAndSwapLong( o, offset, v, v + delta)); return v; } //原子性地将变量更新为x //条件是内存中的值等于expected //更新成功则返回true native boolean compareAndSwapLong( Object o, long offset, long expected, long x); ``` 原子类概览: 原子类 ### 22 | Executor与线程池:如何创建正确的线程池? Java程序中,创建对象,仅仅是在JVM的堆里分配一块内容而已;而创建一个线程,需要调用操作系统的API,然后操作系统要为线程分配一系列的资源,成本极高。线程是一个重量级的对象,**应该避免频繁创建和销毁**。一般意义上的池化资源,当需要资源的时候就调用acquire()方法申请资源,用完之后就调用release()释放资源。但是Java的并发包没有申请和释放线程的方法。 线程池是一种生产者--消费者模式:线程池的使用方是生产者,线程池本身是消费者(消费的是任务)。下面的示例代码中,我们创建了一个非常简单的线程池MyThreadPool来理解线程池的工作原理。 ```java // 简化的线程池,仅用来说明工作原理 class MyThreadPool{ // 利用阻塞队列实现生产者-消费者模式 BlockingQueue workQueue; // 保存内部工作线程 List threads = new ArrayList<>(); // 构造方法 MyThreadPool(int poolSize, BlockingQueue workQueue){ this.workQueue = workQueue; // 创建工作线程 for(int idx = 0; idx < poolSize; idx ++){ WorkerThread work = new WorkerThread(); work.start(); threads.add(work); } } // 提交任务 void execute(Runnable command){ workQueue.put(command); } // 工作线程负责消费任务,并执行任务 class WorkerThread extends Thread{ public void run(){ // 循环取任务并执行 while(true){ Runnable task = WorkQueue.take(); task.run(); } } } } /**下面使用的示例**/ // 创建有界阻塞队列 BlockingQueue workQueue = new linkedBlockingQueue<>(2); // 创建线程池 MyThreadPool pool = new MyThreadPool(10, workQueue); // 提交任务 pool.execute(() > System.out.println("Hello")); ``` 如何使用java中的线程池? ThreadPoolExecutor的构造函数如下 ```java /** * 下面是一个完备的线程池的构造函 * 可以将线程池类比成一个项目组,线程就是项目组的成员 * corePoolSize:线程池保有的最小线程数。 * maximumPoolSize:线程池创建的最大线程数 * keepAliveTime & unit: 线程中线程存在的时间和时间单位 * workQueue: 工作队列 * threadFactory:定义如何创建线程,例如给线程指定一个有意义的名字 * handler:自定义任务的拒绝策略。如果任务数大于最大工作队列,此时提交任务,线程池就回拒绝 * 1. callerRunsPolicy:提交任务的线程自己去执行该任务 * 2. AbortPolicy:默认的拒绝策略,会throws RejetExecutionException * 3. DiscardPolicy:直接丢弃任务,没有任何异常抛出 * 4. DiscardOldestPolicy:丢弃最老的任务,把最早加入队列的任务丢弃,然后把新任务加入到工作队列 */ ThreadPoolExecutor{ int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectExecutionHandler handler } ``` 使用线程池的注意事项: 1. Executors是java提供的静态工厂类,可以快速创建线程池,但是已经不被推荐使用。因为Executors的很多方法使用的都是无界的linkdedBlockingQueue。很容易导致OOM 2. 默认拒绝策略要慎重使用,最好是自己定义任务拒绝策略 3. 注意异常处理问题 ### 23|如何用多线程实现最优的“烧水泡茶”程序? 很多场景下,我们需要获取任务的执行结果。ThreadPoolExecutor提供了相关的重要功能,如下: ```java // 提交Runnable任务 // 参数是Runnable接口,Runnable接口的run()方法是没有返回值的,所以此处的Future仅可以用来判断任务是否已经结束,类似于Thread.join() Future submit(Runnable task); // 提交Callable任务 // 提交的Callable任务的call()方法是有返回值的。可以通过Future接口的get()方法来获取任务的返回值 Future submit(Callable task); // 提交Runnable任务及结果引用 // 提交Runnable任务以及结果引用。假设这个方法的返回值是Future f,f.get()的返回值就是传给submit()方法的参数result。其经典用法如下: Future submit(Runnable task, T result); /*----------------------submit(Runnable task, T result)的用法--------------------*/ ExecutorService executor = Executors.newFixedThreadPool(1); // 创建Result对象r Result r = new Result(); r.setAAA(a); // 提交任务 Future future = executor.submit(new Task(r), r); Result fr = future.get(); // 下面等式成立 fr === r; fr.getAAA() === a; fr.getXXX() === x; class Task implements Runnable{ Restlt r; // 通过构造函数result Task(Result r){ this.r = r; } void run(){ // 可以操作result a = r.getAAA(); r.setXXX(x); } /* ---------------------Future接口的5个方法-----------------------*/ // 取消任务 boolean cache(boolean mayInterruptIfRunning); // 判断任务是否已经取消 boolean isCancelled(); // 判断任务是否已经结束 boolean isDone(); // 获取任务的执行结果 get(); // 获得任务执行结果,支持超时 get(long timeout, TimeUnit unit) ``` FutureTask工具类: FutureTask实现了Runnable接口和Future接口,由于实现了Runnable接口,所以可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread执行;又因为实现了Future接口,所以也能用来获取任务的执行结果。下面的代码示例是将FutureTask对象提交给ThreadPoolExecutor去执行。 ```java // 创建FutureTask FutureTask futureTask = new FutureTask<>(() -> 1 + 2); // 创建线程池 ExecutorService es = Executors.newCachedThreadPool(); // 提交FutureTask es.submit(futureTask); // 获取计算结果 Integer result = futureTask.get(); ``` FutureTask对象直接被Thread执行的示例代码如下: ```java // 创建FutureTask FutureTask futureTask = new FutureTask<>(() -> 1+ 2); // 创建并启动线程 Thread T1 = new Thread(futureTask); T1.start(); Integer result = futureTask.get(); ``` ### 24|CompletableFuture:异步编程没那么难 异步化,是并行方案得以实施的基础,更深入讲就是:**利用多线程优化性能这个核心方案得以实施的基础**。 CompletableFuture有可能是最复杂的工具类,有着让人震撼的功能。用CompletableFuture实现一个烧水泡茶的程序如下: ```java /** * 下面的代码中,任务1负责洗水壶,烧开水,任务2负责洗茶壶,洗茶杯和拿茶叶,任务3负责泡茶。 * 其中,任务3需要任务2和任务1都执行完成后才可以开始 * 从大局上看,可以发现 * 1. 不必手动维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要关注 * 2. 语义清晰,例如f3 = f1.thenCombine(f2,()-{})能够清晰地表达“任务3需要等待任务1和任务2都完成以后才能开始” * 3. 代码更加简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的 */ // 任务1:洗水壶->烧开水 CompletableFuture f1 = CompletableFuture.runAsync(()->{ System.out.println("T1:洗水壶。。。"); sleep(1, TimeUnit.SECONDS); System.out.println("T1:烧开水。。。"); sleep(15, TimeUnit.SECONDS); }); // 任务2:洗茶壶->洗茶杯->拿茶叶 CompletableFuture f2 = CompletableFuture.supplyAsync(() -> { System.out.println("T2:洗茶壶。。。"); sleep(1, TimeUnit.SECONDS); System.out.println("T2:洗茶杯。。。"); sleep(2, TimeUnit.SECONDS); System.out.println("T2:拿茶叶。。。"); sleep(1, TimeUnit.SECONDS); return "龙井"; }); // 任务3: 任务1和任务2完成后执行:泡茶 CompletableFuture f3 = f1.thenCombine(f2, (__, tf) -> { System.out.println("T1:拿到茶叶:" + tf); System.out.println("T1:泡茶..."); return "上茶:" + tf; }); // 等待任务3执行结果 System.out.println(f3.join()); void sleep(int t, TimeUnit u){ try{ u.sleep(t); }catch(IntertuptedException e){ } } ``` 创建CompletableFuture对象: 创建CompletableFuture对象的方法主要死4种静态方法,根据不同的业务创建不同的线程池,以避免互相打扰 ```java // 使用默认线程池(公共的ForkJoinPool线程池,默认创建线程数是CPU的核数) // 没有返回值 static CompletableFuture runAsync(Runnable runnable); // 有返回值 static CompletableFuture supplyAsync(Supplier supplier); // 可以指定线程池 // 没有返回值 static CompletableFuture runAsync(Runnable runnable, Executor executor); // 有返回值 static CompletableFuture supplier supplier, Executor executor; ``` 创建完CompletableFuture对象之后,会自动执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,需要关注两个问题: 1. 异步操作的什么时候结束? 2. 如何获取异步操作的结果? 同时,CompletableFuture类实现了CompletableStage接口,里面包含了40个方法 CompletableStage接口:**如何理解** 对于CompletionStage接口,可以从分工的角度类比一下工作流。任务是有时序关系的,比如有并行关系、串行关系和汇聚关系等。CompletionStage接口能清晰描述任务之间的时序关系。ComplettionStage接口如何描述串行关系、AND汇聚关系、OR汇聚关系? 1. 描述串行关系:CompletionStage接口里面描述串行关系主要是thenApply、thenAccept、thenRun和thenCompose四个系列的接口。 1. thenApply系列函数里面的参数fn的类型是接口Function,这个接口里与CompletionStage相关的方法是R apply(T t),这个方法虽然支持参数,支持返回值,所以thenAccept系列方法返回的是CompletionStage<R> 2. thenAccept系列方法里参数consumer的类型是接口Consumer<T>, 这个接口里和CmpletionStage相关的方法是void accept(T t),这个方法虽然支持参数,但是不支持返回值,所以thenAccept系列方法的返回值是CompletionStage<Void> 3. thenRun系列方法里action的参数是Runnable,所以action既不能接受参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage<Void> 4. Async代表的是异步执行fn、consumer或者action。其中,需要注意的是thenCompose系列方法,这个系列的方法会创建出一个子流程,最终结果和thenApply系列是相同的 总结如下 ```java CompletionStage thenApply(fn); CompletionStage thenApplyAsync(fn); CompletionStage thenAccept(consumer); CompletionStage thenAcceptAsync(consumer); CompletionStage thenRun(action); CompletionStage thenRunAsync(acation); CompletionStage thenCompose(fn); CompletionStage thenComposeAsync(fn); ``` 示例代码如下 ```java CompletableFuture f0 = CompletableFuture.supplyAsync( () -> "Hello world") .thenApply(s -> s + "QQ") .thenApply(String::toUpperCase); System.out.println(f0.join()); ``` 2. 描述AND汇聚关系 CompletionStage接口里面描述AND汇聚关系,主要是then Combine、thenAcceptBoth和runAfterBoth系列的接口。这些接口的区别就是fn、consumer和action这三个核心参数的不同。 ```java CompletionStage thenCombine(other, fn); CompletionStage thenCombineAsync(other, fn); CompletionStage thenAcceptBoth(other, consumer); CompletionStage thenAcceptBothAsync(other, consumer); CompletionStage runAfterBoth(other, action); CompletionStage runAfterBothAsync(other, action); ``` 3. 描述OR汇聚关系 ```java CompletionStage applyToEither(other, fn); CompletionStage applyToEitherAsync(other, fn); CompletionStage acceptEither(other, consumer); CompletionStage acceptEitherAsync(other, consumer); CompletionStage runAfterEither(other,action); CompletionStage runAfterEitherAsync(other, aciotn); ``` 下面的代码展示了如何使用ApplyToEither()方法来描述一个汇聚关系 ```java CompletableFuture f1 = CompletableFuture.supplyAsync(()->{ int t = getRandom(5,10); sleep(t, TimeUnit.SECONDS); return String.valueOf(t); }); CompletableFuture f2 = CompletableFuture.supplyAsync(()->{ int t = getRandom(5,10); sleep(t, TimeUnit.SECONDS); return String.valueOf(t); }); COmpletableFuture f3 = f1.applyToEither(f2, s-> s); System.out.println(f3.join()); ``` 4. 异常处理 上面提到的fn、consumer、action核心方法中都不允许抛出可检查异常,但是无法限制跑出运行时异常。同步方法中可以try-catch异常,异步方法该怎么处理呢? ```java CompletableFuture f0 = CompletableFuture .supplyAsync(() -> (7/0)) .thenApply(r -> r*10); System.out.println(f0.join()); ``` CompetionStage中处理异常的方法很简单,如下 ```java CompletionStage exceptionally(fn); CompletionStage whenComplete(consumer); CompletionStage whenCompleteAsync(consumer); CompletionStage handle(fn); CompletionStage handleAsync(fn); // 使用示例 CompletableFuture f0 = CompletableFuture .supplyAsync(() -> (7/0)) .thenApply(r -> r * 10) .exceptionally(e -> 0); System.out.println(f0.join()); ``` ### 25|CompletionService:如何批量执行异步任务? CompletionService的内部实现原理也是维护了一个阻塞队列,任务执行结束就把任务的执行结果加入到阻塞队列中。 如何创建CompletionService?CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法是两个 1. ExecutorCompletionService(Executor executor); 2. ExecutorCompletionService(Executor exescutor, BolckingQueue<Future<V>> completionQueue); 示例代码如下 ```java // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(3); // 创建CompletionService CompletionService cs = new ExecutorCompletionService<>(executor); // 异步向电商S1询价 cs.submit(() -> getProceByS1()); // 异步向电商S2询价 cs.submit(() -> getPriceByS2()); // 异步向电商S3询价 cs.submit(() -> getPriceByS3()); // 将询价结果异步保存到数据库 for (int i = 0; i < 3; i++){ Integer r = cs.take().get(); executor.execute(() -> save(r)); } ``` CompletionService接口说明: ```java // submit方法,参数是Callable task Future submit(Callable task); // submit方法,参数是Callable task和V result Future submit(Runnable task, V result); // 阻塞队列相关 // take()从阻塞队列中获取并返回一个元素,如果阻塞队列为空,阻塞 Future take() throws InterruptedException; // poll()从阻塞队列中获取并返回一个元素,如果阻塞队列为空,返回null Future poll(); Future poll(long timeout, TimeUnit unit) throws InterruptedException; ``` 利用CompletionService实现Dubbo中的Forking Cluster,Dubbo中有一种叫做Forking的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。 ### 26|Fork/join:单机版的MapReduce 并发编程,其实就是三个方面的问题,分别是分工、协作和互斥。 分治思想:把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到小问题能直接求解。Java并发包中提供了一种叫做Fork/Join的并行计算框架,就是用来支持分治的任务模型。用Fork/Join实现斐波那契数列的求和示例代码如下: ```java static void main(String[] args){ // 创建分治任务线程池 ForkJoinPool fjp = new ForkJoinPool(4); // 创建分治任务 Fibonacci fib = new Fibonacci(30); // 启动分治任务 Integer result = fjp.invoke(fib); // 输出结果 System.out.println(result); } // 递归任务 static class Fibonacci extends RecursiveTask{ final int n; Fibonacci(int n){ this.n = n; } protected Integer compute(){ if(n <= 1){ return n; } Fibonacci f1 = new Fibonacci(n - 1); // 创建子任务 f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); // 等待子任务结果,并合并结果 return f2.compute() + f1.join(); } } ``` ForkJoinPool工作原理 Fork/Join并行计算的核心组件是ForkJoinPool。Fork/Join本质上是一个生产者-消费者的实现。