m = new Hashmap<>();
final ReadWriteLock rwl = new ReentrantReadWriteLock();
final Lock r = rwl.readLock();
final Lock w = rwl.writeLock();
V get(K key){
V v = null;
// 读缓存
r.lock();
try{
v = m.get(key);
}finally{
r.unlock();
}
// 缓存中存在,返回
if(null != v){
return v;
}
// 缓存中不存在,查询数据源
w.lock();
try{
// 再次验证
// 其他线程可能已经查询过数据库
v = m.get(key);
if(null == v){
v = queryFromDB();
// 缓存到map中
m.put(key, v);
}
}finally{
w.unlock();
}
}return v;
}
```
**ReadWriteLock不支持读锁升级到写锁**
### 18|StampedLock:有没有比读写锁更快的锁
StampedLock的性能比读写锁还要高。StampedLock和ReadWriteLock的区别:
**ReadWriteLock支持两种模式:一种是读锁,一种是写锁。StampedLock支持三种模式,分别是写锁、悲观读锁和乐观读锁。其中,写锁和悲观读锁和ReadWriteLock中的写锁和读锁和语义非常相似。允许多个线程同时获取悲观读锁,只允许一个线程获取写锁。写锁和悲观读锁是互斥的。不同的是,不同的是写锁和悲观读锁在加锁成功之后,都会返回一个stamp,解锁的时候需要传入这个stamp**
```java
final StampedLock sl = new StampedLock();
// 获取/释放悲观读锁示意代码
long stamp = sl.readLock();
try{
// 业务相关
}finally{
sl.unlockRead(stamp);
}
// 获取/释放写锁示意代码
long stamp = sl.writeLock();
try{
// 业务代码
}finally{
sl.unlockWrite(stamp);
}
```
StampedLock性能比ReadWriteLock 性能还要好的原因之一是**StampedLock支持乐观读的方式:ReadWriteLock支持多个线程同时读。但是当多个线程同时读的时候,不会允许写操作。但是StampedLock支持乐观读的方式-->允许一个线程获取写锁,也就是说不是所有的写操作都会被阻塞。**
```java
class Point{
private int x,y;
final StampedLock sl = new StampedLock();
// 计算到原点的距离
int distanceFromOrigin(){
// 乐观读
long stamp = sl.tryOprimisticRead();
// 读入局部变量
// 读的过程数据可能被修改
int curX = x, curY = y;
// 判断执行操作期间
// 是否存在写操作,如果存在,则sl.validate返回false
if(!sl.valudate(stamp)){
// 升级为悲观读锁
stamp = sl.readLock();
try{
curX = x;
curY = y;
}finally{
// 释放悲观锁
sl.unlockRead(stamp);
}
}
return Math.sqrt(
curX * curX + cury & curY
)
}
}
```
StampedLock使用的注意事项:对于读多写少的场景,StampedLock的性能很好,简单的应用场景基本上可以替代ReadWriteLock,但是StampedLock的功能仅仅是ReadWriteLock的子集,使用的时候需要注意:
1. StampedLock是不可重入锁
2. StampedLock的悲观读锁、写锁都是不支持条件变量的
3. 如果线程阻塞在StampedLock的readLock或者是writeLock中,,此时调用该阻塞线程的interrupt方法,会导致cpu飙升。**所以,使用StampedLock一定不要调用中断操作,如果需要支持中断功能,一定使用可以中断的悲观读锁readLockInterruptibly()和写锁writeLockInterruptibly()**代码如下:
```java
final StampedLock lock = new StampedLock();
Thread T1 = new Thread(() -> {
// 获取写锁
lock.writeLock();
// 永远阻塞在此处 不释放写锁
LockSupport.park();
});
T1.start();
// 保证T1获取写锁
Thread.sleep(100);
Thread T2 = new Thread(() -> {
// 阻塞在悲观读锁
lock.readLock()
});
T2.start();
// 保证T2阻塞在读锁
Thread.sleep(100);
// 中断线程T2
// 会导致T2所在的cpu飙升
T2.interrupt();
T2.join;
```
### 19 | CountDownLatch和CyclicBarrier:如何让多线程步调一致?
对于串行化的系统,优化性能首先想到的是利用多线程并行处理。以对账为例:
```java
while(存在未对账订单){
// 查询未对账订单
Thread T1 = new Thread(() -> {
pos = getPOrders();
});
T1.start();
// 查询派送单
Thread T2 = new Thread(() -> {
dos = getDOrders();
});
T2.start();
T1.join();
T2.join();
// 执行对账操作
diff = check(pos, dos);
// 保存差异
save(diff);
}
```
上面的代码中,while每次都会新建线程,是一个很耗费资源的过程,利用线程池进行如下优化:
```java
// 创建两个线程对的线程池
Executor executor = Executors.newFixedThreadPool(2);
while(存在未对账订单){
// 查询未对账订单
executor.execute(() -> {
pos = getPOrders();
});
// 查询派送单
exevutor.execute(() -> {
dos = getDOrders();
});
// 实现对两个线程对的等待
// 执行对账操作
diff = check(pos, dos);
// 保存差异
save(diff);
}
```
问题就出在如何**实现对线程的等待**,一个最直接的办法是计数器,初始值设置为2(两个线程),执行完pos = getPOrders()时减1,执行完dos=getDOrders()之后再减1。主线程等到计数器为0,说明两个操作都结束了,继续执行下面的操作。基于这个思想,java并发包提供了CountDownLatch。
CountDownLatch代码实现
```java
// 创建2个线程的线程池
Executor executor = Executors.newThreadPool(2);
while(存在未对账账单){
// 计数器初始化为2
CountDownLatch latch = new CountDownLatch(2);
// 查询未对账订单
executor.execute(() -> {
pos = getPOrders();
latch.countDown();
});
// 查询派送单
executor.execute(() -> {
dos = getDOrders();
latch.countDown();
});
// 等待两个查询操作结束
latch.await();
// 执行对账操作
diff = check(pos, dos);
// 差异写入差异库
save(diff);
}
```
进一步优化性能,上面的代码的优化余地-->两个查询操作和对账操作也是可以并行执行的,执行对账操作的同时,再去查询下一轮的查询操作。
用Cyclicbarrier实现线程同步
```java
/**
* 在下面的代码中, 首先创建了一个计数器的初始值为2的CyclicBarrier,需要注意的是创建CyclicBarrier的时候,同时传入了一个回调函数,当计数器为0的时候,调* 用这个回调函数。
* 同时,CyclicBarrier有自动充值的功能。方便好用
*/
// 订单队列
Vector pos;
Vector dos;
// 执行回调的线程池
Executor executor = Executors.newFixedThreadPool(1);
final CyclicBarrier barrier = new CyclicBarrier(2, () -> {
executor.execute(() - > check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 执行对账操作
diff = check(p, d);
// 差异写入差异库
save(diff);
}
void checkAll(){
// 循环查询订单库
Thread T1 = new Thread(() -> {
while(存在未对账订单){
// 查询订单库
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循环查询运单库
Thread T2 = new Thread(() -> {
while(存在未对账订单){
// 查询运单库
dos.add(getDOorders());
// 等待
barrier.await();
}
});
T2.await();
}
```
CountDownLatch和Cyclicbarrier的区别:**1. CountDownLatch主要用来解决一个线程等待多个线程的场景,CyclicBarrier是一组线程之间相互等待;2. CountDownLatch的计数器是不能循环利用的,也就是说一旦计数器减到0,再有现成调用await(),该线程会直接通过,CyclicBarrier的计数器是可以循环利用的,具备自动重置的功能;3. Cyclicbarrier还可以设置回调函数**
### 20|并发容器,都有哪些坑需要我们填?
同步容器及注意事项:
java中的容器:List、Map、Se和Queue。如何将非线程安全的容器变成线程安全的容器?一个简单的思路是,把一个非线程安全的容器封装在一个对象内部,然后控制好访问路径就可以了。以ArrayList为例:
```java
SafeArrayList {
// 封装ArrayList
List c = new ArrayList<>();
// 控制访问路径
synchronized T get(int inx){
return c.get(idx);
}
synchronized void add(int idx, T t){
c.add(idx, t);
}
synchronized boolean addIfNotExist(T t){
if(!c.contains(t)){
c.add(t);
return true;
}
return false;
}
}
```
注意:**组合操作需要注意竞态条件问题,即便每个操作都能保证原子性,也不能保证组合操作的原子性**
并发容器及其注意事项:Java1.5之前的线程安全的容器,主要指的就是同步容器。不过同步容器的问题就是性能太差,串行度高,1.5之后提供了性能更高的容器,我们一般称为并发容器。并发容器的类别如下:
并发容器的关键点如下:
1. List:List里面只有一个实现类就是CopyOnWriteArrayList。写时复制的队列,好处是读的时候是完全无锁的。
2. Map:Map的两个实现分别是
1. ConcurrentHashMap:key无序
2. ConcurrentSkipListMap:key有序
它们的key和value都不能为空。否则会抛出异常,下表是Map的实现类对于key和value的要求
| 聚合类 | Key | Value | 是否线程安全 |
| :------------------: | :--------: | :----------: | :----------: |
| HashMap | Nullable | Nullable | 否 |
| TreeMap | 不允许Null | Nullable | 否 |
| HashTable | 不允许Null | 不允许为Null | 是 |
| ConcurrntHashMap | 不允许Null | 不允许Null | 是 |
| ConcurrntSkipListMap | 不允许Null | 不允许Null | 是 |
3. Set:Set的两个实现类是CopyOnWriteArrayListSet和ConcurrentSkipListSet。
4. Queue:并发包中Queue这类容器是最复杂的。从两个方面进行理解
1. 阻塞和非阻塞
2. 单端和双端
java并发包中阻塞队列使用Blocking关键字标识,单端队列使用Queue标识,双端使用Deque进行标识。
1. 单端堵塞队列:事项有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、LinkedTransferQueue、PriorityBlokingQueue和DelayQueue。
2. 双端阻塞队列:LinkedBlockingDeque
3. 单端阻塞队列:ConcurrentLinkedQueue
4. 双端堵塞队列:ConcurrentLinkedDeque
### 21|原子类:无锁工具类的典范
无锁方案最大的好处就是性能,在解决问题的同时,又没有引入新的问题,nice。
无锁方案的实现原理:CPU为了解决并发问题,提供了CAS指令(Compare and swap)。cas指令包含3个参数,共享变量的内存地址A、用于比较的值B和共享变量的新值C,只有当A处的值等于B时,才会将A处的值更新为C。作为一条CPU指令,CAS本身就是能够保证原子性的。CAS模拟代码
```java
class SimulatedCAS{
int count;
synchronized int cas(
int expect, int new value){
// 读当前的count的值
int curvalue = count;
// 比较目前count值是否==期望值
if(curValue == expect){
// 如果是,则更新count的值
count = newValue;
}
// 返回写入的新值
return curValue;
}
}
```
使用CAS解决并发问题,一般都会伴随着自旋(循环尝试)。实现一个线程安全的count += 1操作,CAS自旋的实现方案如下设计
```java
class SimulatedCAS{
volatile int count;
// 实现count += 1
adOne(){
do{
newValue = count + 1;
}while(count != cas(count, newValue))
}
// 模拟实现CAS,仅用来帮助理解
synchronized int cas(int expect, int newValue){
// 读当前的count的值
int curValue = count;
// 比较目前count值是否==期望值
if(curValue == expect){
// 如果是,则更新count的值
count = newValue;
}
// 返回写入前的值
return curValue;
}
}
```
CAS这种无锁方案,完全没有加锁解锁操作性能好了很多,但是这个方案中会存在一个问题,那就是ABA问题。使用CAS方案的时候,一定要check一下。
```java
public final long getAndAddLong(
Object o, long offset, long delta){
long v;
do {
// 读取内存中的值
v = getLongVolatile(o, offset);
} while (!compareAndSwapLong(
o, offset, v, v + delta));
return v;
}
//原子性地将变量更新为x
//条件是内存中的值等于expected
//更新成功则返回true
native boolean compareAndSwapLong(
Object o, long offset,
long expected,
long x);
```
原子类概览:
### 22 | Executor与线程池:如何创建正确的线程池?
Java程序中,创建对象,仅仅是在JVM的堆里分配一块内容而已;而创建一个线程,需要调用操作系统的API,然后操作系统要为线程分配一系列的资源,成本极高。线程是一个重量级的对象,**应该避免频繁创建和销毁**。一般意义上的池化资源,当需要资源的时候就调用acquire()方法申请资源,用完之后就调用release()释放资源。但是Java的并发包没有申请和释放线程的方法。
线程池是一种生产者--消费者模式:线程池的使用方是生产者,线程池本身是消费者(消费的是任务)。下面的示例代码中,我们创建了一个非常简单的线程池MyThreadPool来理解线程池的工作原理。
```java
// 简化的线程池,仅用来说明工作原理
class MyThreadPool{
// 利用阻塞队列实现生产者-消费者模式
BlockingQueue workQueue;
// 保存内部工作线程
List threads = new ArrayList<>();
// 构造方法
MyThreadPool(int poolSize, BlockingQueue workQueue){
this.workQueue = workQueue;
// 创建工作线程
for(int idx = 0; idx < poolSize; idx ++){
WorkerThread work = new WorkerThread();
work.start();
threads.add(work);
}
}
// 提交任务
void execute(Runnable command){
workQueue.put(command);
}
// 工作线程负责消费任务,并执行任务
class WorkerThread extends Thread{
public void run(){
// 循环取任务并执行
while(true){
Runnable task = WorkQueue.take();
task.run();
}
}
}
}
/**下面使用的示例**/
// 创建有界阻塞队列
BlockingQueue workQueue = new linkedBlockingQueue<>(2);
// 创建线程池
MyThreadPool pool = new MyThreadPool(10, workQueue);
// 提交任务
pool.execute(() > System.out.println("Hello"));
```
如何使用java中的线程池?
ThreadPoolExecutor的构造函数如下
```java
/**
* 下面是一个完备的线程池的构造函
* 可以将线程池类比成一个项目组,线程就是项目组的成员
* corePoolSize:线程池保有的最小线程数。
* maximumPoolSize:线程池创建的最大线程数
* keepAliveTime & unit: 线程中线程存在的时间和时间单位
* workQueue: 工作队列
* threadFactory:定义如何创建线程,例如给线程指定一个有意义的名字
* handler:自定义任务的拒绝策略。如果任务数大于最大工作队列,此时提交任务,线程池就回拒绝
* 1. callerRunsPolicy:提交任务的线程自己去执行该任务
* 2. AbortPolicy:默认的拒绝策略,会throws RejetExecutionException
* 3. DiscardPolicy:直接丢弃任务,没有任何异常抛出
* 4. DiscardOldestPolicy:丢弃最老的任务,把最早加入队列的任务丢弃,然后把新任务加入到工作队列
*/
ThreadPoolExecutor{
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectExecutionHandler handler
}
```
使用线程池的注意事项:
1. Executors是java提供的静态工厂类,可以快速创建线程池,但是已经不被推荐使用。因为Executors的很多方法使用的都是无界的linkdedBlockingQueue。很容易导致OOM
2. 默认拒绝策略要慎重使用,最好是自己定义任务拒绝策略
3. 注意异常处理问题
### 23|如何用多线程实现最优的“烧水泡茶”程序?
很多场景下,我们需要获取任务的执行结果。ThreadPoolExecutor提供了相关的重要功能,如下:
```java
// 提交Runnable任务
// 参数是Runnable接口,Runnable接口的run()方法是没有返回值的,所以此处的Future仅可以用来判断任务是否已经结束,类似于Thread.join()
Future> submit(Runnable task);
// 提交Callable任务
// 提交的Callable任务的call()方法是有返回值的。可以通过Future接口的get()方法来获取任务的返回值
Future submit(Callable task);
// 提交Runnable任务及结果引用
// 提交Runnable任务以及结果引用。假设这个方法的返回值是Future f,f.get()的返回值就是传给submit()方法的参数result。其经典用法如下:
Future submit(Runnable task, T result);
/*----------------------submit(Runnable task, T result)的用法--------------------*/
ExecutorService executor = Executors.newFixedThreadPool(1);
// 创建Result对象r
Result r = new Result();
r.setAAA(a);
// 提交任务
Future future = executor.submit(new Task(r), r);
Result fr = future.get();
// 下面等式成立
fr === r;
fr.getAAA() === a;
fr.getXXX() === x;
class Task implements Runnable{
Restlt r;
// 通过构造函数result
Task(Result r){
this.r = r;
}
void run(){
// 可以操作result
a = r.getAAA();
r.setXXX(x);
}
/* ---------------------Future接口的5个方法-----------------------*/
// 取消任务
boolean cache(boolean mayInterruptIfRunning);
// 判断任务是否已经取消
boolean isCancelled();
// 判断任务是否已经结束
boolean isDone();
// 获取任务的执行结果
get();
// 获得任务执行结果,支持超时
get(long timeout, TimeUnit unit)
```
FutureTask工具类:
FutureTask实现了Runnable接口和Future接口,由于实现了Runnable接口,所以可以将FutureTask对象作为任务提交给ThreadPoolExecutor去执行,也可以直接被Thread执行;又因为实现了Future接口,所以也能用来获取任务的执行结果。下面的代码示例是将FutureTask对象提交给ThreadPoolExecutor去执行。
```java
// 创建FutureTask
FutureTask futureTask = new FutureTask<>(() -> 1 + 2);
// 创建线程池
ExecutorService es = Executors.newCachedThreadPool();
// 提交FutureTask
es.submit(futureTask);
// 获取计算结果
Integer result = futureTask.get();
```
FutureTask对象直接被Thread执行的示例代码如下:
```java
// 创建FutureTask
FutureTask futureTask = new FutureTask<>(() -> 1+ 2);
// 创建并启动线程
Thread T1 = new Thread(futureTask);
T1.start();
Integer result = futureTask.get();
```
### 24|CompletableFuture:异步编程没那么难
异步化,是并行方案得以实施的基础,更深入讲就是:**利用多线程优化性能这个核心方案得以实施的基础**。
CompletableFuture有可能是最复杂的工具类,有着让人震撼的功能。用CompletableFuture实现一个烧水泡茶的程序如下:
```java
/**
* 下面的代码中,任务1负责洗水壶,烧开水,任务2负责洗茶壶,洗茶杯和拿茶叶,任务3负责泡茶。
* 其中,任务3需要任务2和任务1都执行完成后才可以开始
* 从大局上看,可以发现
* 1. 不必手动维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要关注
* 2. 语义清晰,例如f3 = f1.thenCombine(f2,()-{})能够清晰地表达“任务3需要等待任务1和任务2都完成以后才能开始”
* 3. 代码更加简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的
*/
// 任务1:洗水壶->烧开水
CompletableFuture f1 = CompletableFuture.runAsync(()->{
System.out.println("T1:洗水壶。。。");
sleep(1, TimeUnit.SECONDS);
System.out.println("T1:烧开水。。。");
sleep(15, TimeUnit.SECONDS);
});
// 任务2:洗茶壶->洗茶杯->拿茶叶
CompletableFuture f2 = CompletableFuture.supplyAsync(() -> {
System.out.println("T2:洗茶壶。。。");
sleep(1, TimeUnit.SECONDS);
System.out.println("T2:洗茶杯。。。");
sleep(2, TimeUnit.SECONDS);
System.out.println("T2:拿茶叶。。。");
sleep(1, TimeUnit.SECONDS);
return "龙井";
});
// 任务3: 任务1和任务2完成后执行:泡茶
CompletableFuture f3 = f1.thenCombine(f2, (__, tf) -> {
System.out.println("T1:拿到茶叶:" + tf);
System.out.println("T1:泡茶...");
return "上茶:" + tf;
});
// 等待任务3执行结果
System.out.println(f3.join());
void sleep(int t, TimeUnit u){
try{
u.sleep(t);
}catch(IntertuptedException e){
}
}
```
创建CompletableFuture对象:
创建CompletableFuture对象的方法主要死4种静态方法,根据不同的业务创建不同的线程池,以避免互相打扰
```java
// 使用默认线程池(公共的ForkJoinPool线程池,默认创建线程数是CPU的核数)
// 没有返回值
static CompletableFuture runAsync(Runnable runnable);
// 有返回值
static CompletableFuture supplyAsync(Supplier supplier);
// 可以指定线程池
// 没有返回值
static CompletableFuture runAsync(Runnable runnable, Executor executor);
// 有返回值
static CompletableFuture supplier supplier, Executor executor;
```
创建完CompletableFuture对象之后,会自动执行runnable.run()方法或者supplier.get()方法,对于一个异步操作,需要关注两个问题:
1. 异步操作的什么时候结束?
2. 如何获取异步操作的结果?
同时,CompletableFuture类实现了CompletableStage接口,里面包含了40个方法
CompletableStage接口:**如何理解**
对于CompletionStage接口,可以从分工的角度类比一下工作流。任务是有时序关系的,比如有并行关系、串行关系和汇聚关系等。CompletionStage接口能清晰描述任务之间的时序关系。ComplettionStage接口如何描述串行关系、AND汇聚关系、OR汇聚关系?
1. 描述串行关系:CompletionStage接口里面描述串行关系主要是thenApply、thenAccept、thenRun和thenCompose四个系列的接口。
1. thenApply系列函数里面的参数fn的类型是接口Function,这个接口里与CompletionStage相关的方法是R apply(T t),这个方法虽然支持参数,支持返回值,所以thenAccept系列方法返回的是CompletionStage<R>
2. thenAccept系列方法里参数consumer的类型是接口Consumer<T>, 这个接口里和CmpletionStage相关的方法是void accept(T t),这个方法虽然支持参数,但是不支持返回值,所以thenAccept系列方法的返回值是CompletionStage<Void>
3. thenRun系列方法里action的参数是Runnable,所以action既不能接受参数也不支持返回值,所以thenRun系列方法返回的也是CompletionStage<Void>
4. Async代表的是异步执行fn、consumer或者action。其中,需要注意的是thenCompose系列方法,这个系列的方法会创建出一个子流程,最终结果和thenApply系列是相同的
总结如下
```java
CompletionStage thenApply(fn);
CompletionStage thenApplyAsync(fn);
CompletionStage thenAccept(consumer);
CompletionStage thenAcceptAsync(consumer);
CompletionStage thenRun(action);
CompletionStage thenRunAsync(acation);
CompletionStage thenCompose(fn);
CompletionStage thenComposeAsync(fn);
```
示例代码如下
```java
CompletableFuture f0 = CompletableFuture.supplyAsync(
() -> "Hello world")
.thenApply(s -> s + "QQ")
.thenApply(String::toUpperCase);
System.out.println(f0.join());
```
2. 描述AND汇聚关系
CompletionStage接口里面描述AND汇聚关系,主要是then Combine、thenAcceptBoth和runAfterBoth系列的接口。这些接口的区别就是fn、consumer和action这三个核心参数的不同。
```java
CompletionStage thenCombine(other, fn);
CompletionStage thenCombineAsync(other, fn);
CompletionStage thenAcceptBoth(other, consumer);
CompletionStage thenAcceptBothAsync(other, consumer);
CompletionStage runAfterBoth(other, action);
CompletionStage runAfterBothAsync(other, action);
```
3. 描述OR汇聚关系
```java
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage runAfterEither(other,action);
CompletionStage runAfterEitherAsync(other, aciotn);
```
下面的代码展示了如何使用ApplyToEither()方法来描述一个汇聚关系
```java
CompletableFuture f1 = CompletableFuture.supplyAsync(()->{
int t = getRandom(5,10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture f2 = CompletableFuture.supplyAsync(()->{
int t = getRandom(5,10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
COmpletableFuture f3 = f1.applyToEither(f2, s-> s);
System.out.println(f3.join());
```
4. 异常处理
上面提到的fn、consumer、action核心方法中都不允许抛出可检查异常,但是无法限制跑出运行时异常。同步方法中可以try-catch异常,异步方法该怎么处理呢?
```java
CompletableFuture f0 = CompletableFuture
.supplyAsync(() -> (7/0))
.thenApply(r -> r*10);
System.out.println(f0.join());
```
CompetionStage中处理异常的方法很简单,如下
```java
CompletionStage exceptionally(fn);
CompletionStage whenComplete(consumer);
CompletionStage whenCompleteAsync(consumer);
CompletionStage handle(fn);
CompletionStage handleAsync(fn);
// 使用示例
CompletableFuture f0 = CompletableFuture
.supplyAsync(() -> (7/0))
.thenApply(r -> r * 10)
.exceptionally(e -> 0);
System.out.println(f0.join());
```
### 25|CompletionService:如何批量执行异步任务?
CompletionService的内部实现原理也是维护了一个阻塞队列,任务执行结束就把任务的执行结果加入到阻塞队列中。
如何创建CompletionService?CompletionService接口的实现类是ExecutorCompletionService,这个实现类的构造方法是两个
1. ExecutorCompletionService(Executor executor);
2. ExecutorCompletionService(Executor exescutor, BolckingQueue<Future<V>> completionQueue);
示例代码如下
```java
// 创建线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建CompletionService
CompletionService cs = new ExecutorCompletionService<>(executor);
// 异步向电商S1询价
cs.submit(() -> getProceByS1());
// 异步向电商S2询价
cs.submit(() -> getPriceByS2());
// 异步向电商S3询价
cs.submit(() -> getPriceByS3());
// 将询价结果异步保存到数据库
for (int i = 0; i < 3; i++){
Integer r = cs.take().get();
executor.execute(() -> save(r));
}
```
CompletionService接口说明:
```java
// submit方法,参数是Callable task
Future submit(Callable task);
// submit方法,参数是Callable task和V result
Future submit(Runnable task, V result);
// 阻塞队列相关
// take()从阻塞队列中获取并返回一个元素,如果阻塞队列为空,阻塞
Future take() throws InterruptedException;
// poll()从阻塞队列中获取并返回一个元素,如果阻塞队列为空,返回null
Future poll();
Future poll(long timeout, TimeUnit unit) throws InterruptedException;
```
利用CompletionService实现Dubbo中的Forking Cluster,Dubbo中有一种叫做Forking的集群模式,这种集群模式下,支持并行地调用多个查询服务,只要有一个成功返回结果,整个服务就可以返回了。
### 26|Fork/join:单机版的MapReduce
并发编程,其实就是三个方面的问题,分别是分工、协作和互斥。
分治思想:把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到小问题能直接求解。Java并发包中提供了一种叫做Fork/Join的并行计算框架,就是用来支持分治的任务模型。用Fork/Join实现斐波那契数列的求和示例代码如下:
```java
static void main(String[] args){
// 创建分治任务线程池
ForkJoinPool fjp = new ForkJoinPool(4);
// 创建分治任务
Fibonacci fib = new Fibonacci(30);
// 启动分治任务
Integer result = fjp.invoke(fib);
// 输出结果
System.out.println(result);
}
// 递归任务
static class Fibonacci extends RecursiveTask{
final int n;
Fibonacci(int n){
this.n = n;
}
protected Integer compute(){
if(n <= 1){
return n;
}
Fibonacci f1 = new Fibonacci(n - 1);
// 创建子任务
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
// 等待子任务结果,并合并结果
return f2.compute() + f1.join();
}
}
```
ForkJoinPool工作原理
Fork/Join并行计算的核心组件是ForkJoinPool。Fork/Join本质上是一个生产者-消费者的实现。