# netty_redis_zookeeper高并发实战源码 **Repository Path**: it_architecture/netty_redis_zookeeper_source_code ## Basic Information - **Project Name**: netty_redis_zookeeper高并发实战源码 - **Description**: No description available - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 467 - **Created**: 2022-11-14 - **Last Updated**: 2023-02-13 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README ## 学习说明 本软件作为分布式Java开发的练习和实战入手项目,实际上,和 生产项目在架构上差不多。 只是在细节上处理的内容,生产项目更加丰富和复杂。 本项目的配套材料,请参考: - 配套图书 [《Java高并发核心编程(卷1)》一书 ](https://www.cnblogs.com/crazymakercircle/p/9904544.html) - 配套视频:[《从菜鸟大神-Java高并发核心编程》视频](https://www.cnblogs.com/crazymakercircle/p/14434174.html) 部分视频 ## 实践计划 ### Java NIO 实战(1) ##### 使用FileChannel复制文件 通过使用Channel通道,完成复制文件。 本环节的目标是掌握以下知识:Java NIO中ByteBuffer、Channel两个重要组件的使用。 接着是升级实战的案例,使用文件Channel通道的transferFrom方法,完成高效率的文件复制。 ##### 使用SocketChannel传输文件 本环节的目标是掌握以下知识: - 非阻塞客户端在发起连接后,需要不断的自旋,检测连接是否完成的。 - SocketChannel传输通道的read读取方法、write写入方法。 - 在SocketChannel传输通道关闭前,尽量发送一个输出结束标志到对方端。 ##### 使用DatagramChannel传输数据 客户端使用DatagramChannel发送数据,服务器端使用DatagramChannel接收数据。 本环节的目标是掌握以下知识: - 使用接收数据方法receive,使用发送数据方法send。 - DatagramChannel和SocketChannel两种通道,在发送、接收数据上的不同。 ##### 使用NIO实现Discard服务器 客户端功能:发送一个数据包到服务器端,然后关闭连接。服务器端也很简单,收到客户端的数据,直接丢弃。 本环节的目标是掌握以下知识: - Selector选择器的注册,以及选择器的查询。 - SelectionKey选择键方法的使用。 - 根据SelectionKey方法的四种IO事件类型,完成对应的IO处理。 ### Reactor反应器模式实践(2) ##### 单线程Reactor反应器模式的实现 使用单线程Reactor反应器模式,设计和实现一个EchoServer回显服务器。功能很简单:服务器端读取客户端的输入,然后回显到客户端。 本环节的目标是掌握以下知识: - 单线程Reactor反应器模式两个重要角色——Reactor反应器、Handler处理器的编写。 - SelectionKey选择键两个重要的方法——attach和attachment方法的使用。 ##### 多线程Reactor反应器模式 使用多线程Reactor反应器模式,设计一个EchoServer回显服务器,主要的升级方式为: - 引入ThreadPool线程池,将负责IOHandler输入输出处理器的执行,放入独立的线程池中,与负责服务监听和IO事件查询的反应器线程相隔离。 - 将反应器线程拆分为多个SubReactor子反应器线程,同时,引入多个Selector选择器,每一个子反应器线程负责一个选择器读取客户端的输入,回显到客户端。 本环节的目标是掌握以下知识: - 线城池的使用 - 多线程反应器模式实现 ### 异步回调模式实践(3) ##### 使用线程join方式,通过阻塞式异步调用的方式,实现泡茶喝的实例 为了在计算机中,实现华罗庚的课文《统筹方法》泡茶喝的流程,可以设计三条线程:主线程、清洗线程、烧水线程,主要介绍如下: - 主线程(MainThread)的工作是:启动清洗线程、启动烧水线程,等清洗、烧水的工作完成后,泡茶喝。 - 清洗线程(WashThread)的工作是:洗茶壶、洗茶杯。 - 烧水线程(HotWarterThread)的工作是:洗好水壶,灌上凉水,放在火上,一直等水烧开。 本环节的目标是掌握以下知识: - ​ 不同版本的join方法的使用 ##### 使用FutureTask类和Callable接口,启动阻塞式的异步调用,并且获取异步线程的结果。 还是实现华罗庚的课文《统筹方法》泡茶喝的流程,可以设计三条线程:主线程、清洗线程、烧水线程,主要改进如下: - 主线程(MainThread)的工作是:启动清洗线程、启动烧水线程,然后阻塞,等待异步线程的返回值,根据异步线程的返回值,决定后续的动作。 - 清洗线程(WashThread)在异步执行完成之后,有返回值。 - 烧水线程(HotWarterThread)在异步执行完成之后,有返回值。 本环节的目标是掌握以下知识: - Callable(可调用)接口的使用;Callable接口和Runnable(可执行)接口的不同。 - FutureTask异步任务类的使用。 ##### 使用ListenableFuture类和FutureCallback接口,启动非阻塞异步调用,并且完成异步回调。 还是实现华罗庚的课文《统筹方法》泡茶喝的流程,可以设计三条线程:主线程、清洗线程、烧水线程,主要改进如下: - 主线程(MainThread)的工作是:启动清洗线程、启动烧水线程,并且设置异步完成后的回调方法,这里主线程不阻塞等待,而是去干其他事情,例如读报纸。 - 清洗线程(WashThread)在异步执行完成之后,执行回调方法。 - 烧水线程(HotWarterThread)在异步执行完成之后,执行回调方法 本环节的目标是掌握以下知识: - FutureCallback接口的使用;FutureCallback接口和Callable接口的区别和联系。 - ListenableFuture异步任务类的使用,以及为异步任务设置回调方法。 ### Netty基础实践(4) ##### Netty中Handler处理器的生命周期 操作步骤如下: **步骤01** 定义一个非常简单的入站处理器——InHandlerDemo。这个类继承于ChannelInboundHandlerAdapter适配器,它实现了基类的所有的入站处理方法,并在每一个方法的实现中,都加上了必要的输出信息。 **步骤02** 编写一个单元测试代码:将这个处理器加入到一个EmbeddedChannel嵌入式通道的流水线中。 **步骤03** 通过writeInbound方法,向EmbeddedChannel写一个模拟的入站ByteBuf数据包。InHandlerDemo作为一个入站处理器,就会处理到该ByteBuf数据包。 **步骤04** 通过输出,可以观测到处理器的生命周期。 本环节的目标是掌握以下知识: - Netty中Handler处理器的生命周期。 - EmbeddedChannel嵌入式通道的使用。 ##### ByteBuf的基本使用 操作步骤如下: **步骤01** 使用Netty的默认分配器,分配了一个初始容量为9个字节,最大上限为100个字节的ByteBuf缓冲区。 **步骤02** 向ByteBuf写数据,观测ByteBuf的属性变化。 **步骤03** 从ByteBuf读数据,观测ByteBuf的属性变化。 本环节的目标是掌握以下知识: - ByteBuf三个重要属性:readerIndex(读指针)、writerIndex(写指针)、maxCapacity(最大容量)。 - ByteBuf读写过程中,以上三个重要属性的变化规律。 ##### 使用Netty,实现EchoServer回显服务器 前面实现过Java NIO版本的EchoServer回显服务器,学习了Netty后,设计和实现一个Netty版本的EchoServer回显服务器。功能很简单:服务器端读取客户端的输入,然后将数据包直接回显到客户端。 本环节的目标是掌握以下知识: - 服务器端ServerBootstrap的装配和使用。 - 服务器端NettyEchoServerHandler入站处理器的channelRead入站处理方法的编写。 - 服务器端实现Netty的ByteBuf缓冲区的读取、回显。 - 客户端Bootstrap的装配和使用。 - 客户端NettyEchoClientHandler入站处理器中接受回显的数据,并且释放内存。 - 客户端实现多种方式释放ByteBuf,包括:自动释放、手动释放。 ### 解码器(Decoder)与编码器(Encoder)实践(5) ##### 整数解码实践 具体步骤如下: **步骤01** 定义一个非常简单的整数解码器——Byte2IntegerDecoder。这个类继承于ByteToMessageDecoder字节码解码抽象类,并实现基类的decode抽象方法,将ByteBuf缓冲区中的数据,解码成以一个一个的Integer对象。 **步骤02** 定义一个非常简单的整数处理器——IntegerProcessHandler。读取上一站的入站数据,把它转换成整数,并且显示在Console控制台。 **步骤03** 编写一个整数解码实战的测试用例。在测试用例中,新建了一个EmbeddedChannel嵌入式的通道实例,将两个自己的入站处理器Byte2IntegerDecoder、IntegerProcessHandler加入到通道的流水线上。通过writeInbound方法,向EmbeddedChannel写入一个模拟的入站ByteBuf数据包。 **步骤04** 通过输出,可以观察整数解码器的解码结果。 本环节的目标是掌握以下知识: - 如何基于Netty的ByteToMessageDecoder字节码解码抽象类,实现自己的ByteBuf二进制字节到POJO对象的解码。 - 使用ByteToMessageDecoder,如何管理ByteBuf的应用计数。 ##### 整数相加的解码器实践 具体步骤如下: **步骤01** 继承ReplayingDecoder基础解码器,编写一个整数相加的解码器:一次解码两个整数,并把这两个数据相加之和,作为解码的结果。 **步骤02** 使用前面定义的整数处理器——IntegerProcessHandler。读取上一站的入站数据,把它转换成整数,并且显示在Console控制台。 **步骤03** 使用前面定义的测试类,测试整数相加的解码器,并且查看结果是否正确。 本环节的目标是掌握以下知识: - 如何基于ReplayingDecoder解码器抽象类,实现自己的ByteBuf二进制字节到POJO对象的解码。 - ReplayingDecoder的成员属性——state阶段属性的使用。 - ReplayingDecoder的重要方法——checkpoint(IntegerAddDecoder.Status)方法的使用 ##### 基于Head-Content协议的字符串分包解码器 具体步骤如下: **步骤01** 继承ReplayingDecoder基础解码器,编写一个字符串分包解码器StringReplayDecoder。 在StringReplayDecoder的decode方法中,分两步: 第1步,解码出字符串的长度; 第2步,按照第一个阶段的字符串长度,解码出字符串的内容。 **步骤02** 编写一个简单的业务处理器StringProcessHandler。其功能是:读取上一站的入站数据,把它转换成字符串,并且显示在Console控制台。 **步骤03** 新建了一个EmbeddedChannel嵌入式的通道实例,将两个自己的入站处理器StringReplayDecoder、StringProcessHandler加入到通道的流水线上。为了测试入站处理器,使用writeInbound方法,向嵌入式通道EmbeddedChannel写入了100个ByteBuf入站缓冲;每一个ByteBuf缓冲,仅仅包含一个字符串。EmbeddedChannel通道接收到入站数据后,pipeline流水线上的两个入站处理器,就能不断地处理这些入站数据:将接收到的二进制字节,解码成一个一个的字符串,然后逐个地显示在Console控制台上 本环节的目标是掌握以下知识: - 如何基于ReplayingDecoder解码器抽象类,实现自己的ByteBuf二进制字节到字符串的解码。 - 巩固ReplayingDecoder的成员属性——state阶段属性的使用。 - 巩固ReplayingDecoder的重要方法——checkpoint(IntegerAddDecoder.Status)方法的使用 ##### 多字段Head-Content协议数据包解析实践 具体步骤如下: **步骤01** 使用LengthFieldBasedFrameDecoder解码器,解码复杂的Head-Content协议。例如协议中包含版本号、魔数等多个其他的数据字段。 **步骤02** 使用前面所编写那一个简单的业务处理器StringProcessHandler。其功能是:读取上一站的入站数据,把它转换成字符串,并且显示在Console控制台上。 **步骤03** 新建一个EmbeddedChannel嵌入式的通道实例,将第一步和第二步的两个入站处理器LengthFieldBasedFrameDecoder、StringProcessHandler加入到通道的流水线上。为了测试入站处理器,使用writeInbound方法,向嵌入式通道EmbeddedChannel写入100个ByteBuf入站缓冲;每一个ByteBuf缓冲,仅仅包含一个字符串。EmbeddedChannel通道接收到入站数据后,pipeline流水线上的两个入站处理器,就能不断地处理到这些入站数据:将接到的二进制字节,解码成一个一个的字符串,然后逐个地显示在控制台上。 本环节的目标是掌握以下知识: - LengthFieldBasedFrameDecoder解码器的使用。 - LengthFieldBasedFrameDecoder解码器的长度的矫正公式,计算公式为:内容字段的偏移-长度字段的偏移-长度字段的长度。 ### JSON和ProtoBuf序列化实践(6) ##### JSON通信实践 客户端将POJO转成JSON字符串,编码后发送到服务器端。服务器端接收客户端的数据包,并解码成JSON,转成POJO。 具体步骤如下: **步骤01** 客户端的编码过程: 先通过谷歌的Gson框架,将POJO序列化成JSON字符串;然后使用Netty内置的StringEncoder编码器,将JSON字符串编码成二进制字节数组;最后,使用LengthFieldPrepender编码器(Netty内置),将二进制字节数组编码成Head-Content格式的二进制数据包。 **步骤02** 服务器端的解码过程: 先使用LengthFieldBasedFrameDecoder(Netty内置的自定义长度数据包解码器),解码Head-Content二进制数据包,解码出Content字段的二进制内容。然后,使用StringDecoder字符串解码器(Netty内置的解码器),将二进制内容解码成JSON字符串。最后,使用自定义的JsonMsgDecoder解码器,将JSON字符串解码成POJO对象。 **步骤03** 编写一个JsonMsgDecoder自定义的JSON解码器。将JSON字符串,解码成特定的POJO对象。 **步骤04** 分别组装好服务器端、客户端的流水线,运行程序,查看两端的通信结果。 本环节的目标是掌握以下知识: - LengthFieldPrepender编码器的使用:在发送端使用它加上Head-Content的头部长度。 - JsonMsgDecoder的编写。 - JSON传输时,客户端流水线编码器的组装,服务器端流水线解码器的组装。 ##### ProtoBuf通信实践 设计一个简单的客户端/服务器端传输程序:客户端将ProtoBuf的POJO编码成二进制数据包,发送到服务器端;服务器端接收客户端的数据包,并解码成ProtoBuf的POJO。 具体步骤如下: **步骤01** 设计好需要传输的ProtoBuf的“.proto”协议文件,并且生成ProtoBuf的POJO和Builder:在“.proto”协议文件中,仅仅定义了一个消息结构体,并且该消息结构体也非常简单,只包含两个字段:消息ID、消息内容。使用protobuf-maven-plugin插件,生成message的POJO类和Builder(构造者)类的Java代码。 **步骤02** 客户端的编码过程:先使用Netty内置的ProtobufEncoder,将ProtobufPOJO对象编码成二进制的字节数组。然后,使用Netty内置的ProtobufVarint32LengthFieldPrepender编码器,加上varint32格式的可变长度。Netty会将完成了编码后的Length+Content格式的二进制字节码,发送到服务器端。 **步骤03** 服务器端的解码过程:先使用Netty内置的ProtobufVarint32FrameDecoder,根据varint32格式的可变长度值,从入站数据包中,解码出二进制Protobuf字节码。然后,可以使用Netty内置的ProtobufDecoder解码器,将Protobuf字节码解码成Protobuf POJO对象。最后,自定义一个ProtobufBussinessDecoder解码器,处理ProtobufPOJO对象。 本环节的目标是掌握以下知识: - “.proto”基础协议。 - Netty内置的ProtobufEncoder、ProtobufDecoder两个专用的传输Protobuf序列化数据的编码器/解码器的使用。 - Netty内置的两个ProtoBuf专用的可变长度Head-Content协议的半包编码、解码处理器:ProtobufVarint32LengthFieldPrepender编码器、ProtobufVarint32FrameDecoderProtobufEncoder解码器的使用。 ### 基于Netty的单聊实战(7~10) ##### 自定义ProtoBuf编码器/解码器 具体步骤如下: **步骤01** 为单聊系统,设计一套自定义的“.proto”协议文件;然后通过maven插件生成Protobuf Builder和POJO。 **步骤02** 继承Netty提供的MessageToByteEncoder编码器,编写一个自定义的Protobuf编码器,完成Head-Content协议的复杂数据包的编码。通过自定义编码器,最终将ProtobufPOJO编码成Head-Content协议的二进制ByteBuf帧。 **步骤03** 继承Netty提供的ByteToMessageDecoder解码器,编写一个自定义的Protobuf解码器,完成Head-Content协议的复杂数据包的解码。通过自定义的解码器,最终将Head-Content协议的复杂数据包,解码出Protobuf POJO。 **步骤04** 分别组装好服务器端、客户端的流水线,运行程序,查看两端的通信结果。 本环节的目标是掌握以下知识: - 设计复杂的“.proto”协议文件。 - 自定义的Protobuf编码器的编写。 - 自定义的Protobuf编码器的编写。 ##### 登录实践 业务逻辑: - 客户端发送登录数据包。 - 服务器端进行用户信息验证。 - 服务器端创建Session会话。 - 服务器端将登录结果信息返回给客户端,包括成功标志、Session ID等。 具体步骤如下: 从客户端到服务器端再到客户端,大致有以下几个步骤。 **步骤01** 编写LoginConsoleCommand控制台命令类,从客户端收集用户ID和密码。 **步骤02** 编写客户端LoginSender发送器类,组装Protobuf数据包,通过客户端通道发送到服务器端。 **步骤03** 编写服务器端UserLoginHandler入站处理器,处理收到的登录消息,完成数据的校验后,将数据包交给业务处理器LoginMsgProcesser,进行异步的处理。 **步骤04** 编写服务器端LoginMsgProcesser(业务处理器),将处理结果,写入用户绑定的子通道。 **步骤05** 编写客户端业务处理器LoginResponceHandler,处理登录响应,例如设置登录的状态,保存会话的SessionID等。 本环节的目标是掌握以下知识: - Netty知识的综合运用。 - Channel通道容器功能的使用。 ##### 单聊实践 单聊的业务逻辑: - 当用户A登录成功之后,按照单聊的消息格式,发送所要的消息。 - 这里的消息格式设定为——userId:content。其中的userId,就是消息接收方目标用户B的userId;其中的content,表示聊天的内容。 - 服务器端收到消息后,根据目标userId,进行消息帧的转发,发送到用户B所在的客户端。 - 客户端用户B收到用户A发来的消息,显示在自己的控制台上。 具体步骤如下: 从客户端到服务器端再到客户端,大致有5个步骤。 **步骤01** 当用户A登录成功之后,按照单聊的消息格式,发送所要的消息。这里的消息格式设定为——userId:content,其中的userId,就是消息接收方目标用户B的userId,其中的content,表示聊天的内容。 **步骤02** CommandController在完成聊天内容和目标用户的收集后,调用chatSender发送器实例,将聊天消息组装成Protobuf消息帧,通过客户端channel通道发往服务器端。 **步骤03** 编写服务器端的消息转发处理器ChatRedirectHandler类,其功能是,对用户登录进行判断:如果没有登录,则不能发送消息;开启异步的消息转发,由负责转发的chatRedirectProcesser实例,完成消息转发。 **步骤04** 编写负责异步消息转发的ChatRedirectProcesser类,功能如下:根据目标用户ID,找出所有的服务器端的会话列表(Session List),然后对应每一个会话,发送一份消息。 **步骤05** 编写客户端ChatMsgHandler处理器,主要的工作如下:对消息类型进行判断,判断是否为聊天请求Protobuf数据包。如果不是,直接将消息交给流水线的下一站;如果是聊天消息,则将聊天消息显示在控制台上。 本环节目标是掌握以下知识: - Netty知识的综合运用。 - 服务器端会话(Session)的管理。 ### ZooKeeper实践计划(11) ##### 分布式ID生成器 具体步骤如下: **步骤01** 自定义一个分布式ID生成器——IDMaker类,通过创建ZK的临时顺序节点的方法,生成全局唯一的ID。 **步骤02** 基于自定义的IDMaker,编写单元测试的用例,生成ID。 本环节的目标是掌握以下知识: - 分布式ID生成器原理。 - ZooKeeper客户端的使用。 ##### 使用ZK实现SnowFlake ID算法 具体步骤如下: **步骤01** 编写一个实现SnowFlake ID算法的ID生成器——SnowflakeIdGenerator类,生成全局唯一的ID。 **步骤02** 基于自定义的SnowflakeIdGenerator,编写单元测试的用例,生成ID。 本环节的目标是掌握以下知识: - SnowFlake ID算法原理。 - ZooKeeper客户端的使用。 ### Redis实践计划 ##### 使用RedisTemplate模板类完成Redis的缓存CRUD操作 具体步骤如下: **步骤01** 将RedisTemplate模板类的大部分缓存操作,封装成一个自己的缓存操作Service服务,称之为CacheOperationService类。 **步骤02** 编写业务类UserServiceImplWithTemplate类,使用CacheOperationService类,完成User对象的缓存CRUD。 **步骤03** 编写测试用例,访问UserServiceImplWithTemplate类。观察在进行User对象的查询时,能优先使用缓存数据,是否省去数据库访问的时间。 本环节目标是掌握以下知识: - RedisTemplate模板类的使用。 - Jedis的客户端API。 - RedisTemplate模板API。 ##### 使用RedisTemplate模板类完成Redis的缓存CRUD操作 具体步骤如下: **步骤01** 使用RedisCallback的doInRedis回调方法,在doInRedis回调方法中,直接使用实参RedisConnection连接类实例,完成Redis的操作。 **步骤02** 编写业务类UserServiceImplInTemplate类,使用RedisTemplate模板实例去执行RedisCallback回调实例,完成User对象的缓存CRUD。 **步骤03** 编写测试用例,访问UserServiceImplInTemplate类。观察在进行User对象的查询时,优先使用缓存数据,看看是否省去了数据库访问的时间。 本环节的目标是掌握以下知识: - RedisCallback回调接口的使用。 - Jedis的客户端API。 - RedisTemplate模板API。 ##### 使用Spring缓存注解,完成Redis的缓存CRUD操作 具体步骤如下: **步骤01** 编写业务类UserServiceImplWithAnn类,使用缓存注解,完成User对象的缓存CRUD。 **步骤02** 编写测试用例,访问UserServiceImplWithAnn类。观察在进行User对象的查询时,优先使用缓存数据,看看是否省去了数据库访问的时间。 本环节目标是掌握以下知识: - Spring缓存注解的使用。 - Jedis的客户端API。 - Spring缓存注解的配置。 ```text 文件句柄,也叫文件描述符。在Linux系统中,文件可分为:普通文件、目录文件、链接文件和设备文件。文件描述符(File Descriptor)是内核为了高效管理已被打开的文件所创建的索引,它是一个非负整数(通常是小整数),用于指代被打开的文件。所有的IO系统调用,包括socket的读写调用,都是通过文件描述符完成的。 在Linux下,通过调用ulimit命令,可以看到单个进程能够打开的最大文件句柄数量,这个命令的具体使用方法是: ulimit -n ulimit -n 10000000 在上面的命令中,n的设置值越大,可以打开的文件句柄数量就越大。Linux ulimit只能作为临时修改,系统重启后,句柄数量又会恢复为默认值1024 如果想永久地把设置值保存下来,可以编辑/etc/rc.local开机启动文件,在文件中添加如下内容: ulimit -SHn 10000000 增加-S和-H两个命令选项。选项-S表示软性极限值,-H表示硬性极限值。硬性极限是实际的限制,就是最大可以是100万,不能再多了。软性极限是系统警告(Warning)的极限值,超过这个极限值,内核会发出警告 终极解除Linux系统的最大文件打开数量的限制,可以通过编辑Linux的极限配置文件/etc/security/limits.conf来解决,修改此文件,加入如下内容: soft nofile 1000000 hard nofile 1000000 soft nofile表示软性极限,hard nofile表示硬性极限 在使用和安装目前非常火的分布式搜索引擎——ElasticSearch,就必须去修改这个文件,增加最大的文件句柄数的极限值。 在服务器运行Netty时,也需要去解除文件句柄数量的限制,修改/etc/security/limits.conf文件即可。 ``` ### Java NIO通信基础详解 Java NIO由以下三个核心组件组成: - Channel(通道) - Buffer(缓冲区) - Selector(选择器) 需要强调的是:Buffer类是一个非线程安全类,在NIO中有8种缓冲区类,分别如下: **ByteBuffer、CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer、ShortBuffer、MappedByteBuffer**。 Buffer类提供了一些重要的属性。其中,有三个重要的成员属性:capacity(容量)、position(读写位置)、limit(读写的限制)。 除此之外,还有一个标记属性:mark(标记),可以将当前的position临时存入mark中;需要的时候,可以再从mark标记恢复到position位置。 ##### Buffer类的重要属性 ###### capacity属性 Buffer类的capacity属性一旦初始化,就不能再改变。原因是什么呢?Buffer类的对象在初始化时,会按照capacity分配内部的内存。在内存分配好之后,它的大小当然就不能改变了。 ###### position属性 在写入模式下,position的值变化规则如下: ​ (1)在刚进入到写模式时,position值为0,表示当前的写入位置为从头开始。 ​ (2)每当一个数据写到缓冲区之后,position会向后移动到下一个可写的位置。 ​ (3)初始的position值为0,最大可写值position为limit-1。当position值达到limit时,缓冲区就已经无空间可写了。 在读模式下,position的值变化规则如下: ​ (1)当缓冲区刚开始进入到读模式时,position会被重置为0。 ​ (2)当从缓冲区读取时,也是从position位置开始读。读取数据后,position向前移动到下一个可读的位置。 ​ (3)position最大的值为最大可读上限limit,当position达到limit时,表明缓冲区已经无数据可读。 ###### limit属性 在写模式下,limit属性值的含义为可以写入的数据最大上限。在刚进入到写模式时,limit的值会被设置成缓冲区的capacity容量值,表示可以一直将缓冲区的容量写满。 在读模式下,limit的值含义为最多能从缓冲区中读取到多少数据。一般来说,是先写入再读取。当缓冲区写入完成后,就可以开始从Buffer读取数据,可以使用flip翻转方法,这时,limit的值也会进行非常大的调整。 ![img](https://res.weread.qq.com/wrepub/epub_26174369_8) ##### 详解NIO Buffer类的重要方法 ###### allocate()创建缓冲区 在使用Buffer(缓冲区)之前,我们首先需要获取Buffer子类的实例对象,并且分配内存空间。 为了获取一个Buffer实例对象,这里并不是使用子类的构造器new来创建一个实例对象,而是调用子类的allocate()方法。 ###### put()写入到缓冲区 在调用allocate方法分配内存、返回了实例对象后,缓冲区实例对象处于写模式,可以写入对象。要写入缓冲区,需要调用put方法。put方法很简单,只有一个参数,即为所需要写入的对象。不过,写入的数据类型要求与缓冲区的类型保持一致。 ###### flip()翻转 向缓冲区写入数据之后,是否可以直接从缓冲区中读取数据呢?呵呵,不能。 这时缓冲区还处于写模式,如果需要读取数据,还需要将缓冲区转换成读模式。flip()翻转方法是Buffer类提供的一个模式转变的重要方法,它的作用就是将写入模式翻转成读取模式。 对flip()方法的从写入到读取转换的规则,详细的介绍如下: 首先,设置可读的长度上限limit。将写模式下的缓冲区中内容的最后写入位置position值,作为读模式下的limit上限值。 其次,把读的起始位置position的值设为0,表示从头开始读。 最后,清除之前的mark标记,因为mark保存的是写模式下的临时位置。在读模式下,如果继续使用旧的mark标记,会造成位置混乱。 Buffer.flip()方法的源代码如下: ```java public final Buffer flip() { // 设置可读的长度的上线limit,为写入的position limit = position; // 把读的起始位置的position的位置设置为0,表示从头开始读取 position = 0; // 清除之前的mark标识 mark = UNSET_MARK; return this; } ``` 在读取完成后,如何再一次将缓冲区切换成写入模式呢? 可以调用Buffer.clear()清空或者Buffer.compact()压缩方法,它们可以将缓冲区转换为写模式。 ​ **缓冲区读写模式的转换** img ###### get()从缓冲区读取 如果position值和limit的值相等,表示所有数据读取完成,position指向了一个没有数据的元素位置,已经不能再读了。此时再读,会抛出BufferUnderflowException异常。 在读完之后,是否可以立即进行写入模式呢?不能。 现在还处于读取模式,我们必须调用Buffer.clear()或Buffer.compact(),即清空或者压缩缓冲区,才能变成写入模式,让其重新可写。 缓冲区是不是可以重复读呢?可以的。 ###### rewind()倒带 已经读完的数据,如果需要再读一遍,可以调用rewind()方法。rewind()也叫倒带,就像播放磁带一样倒回去,再重新播放。 rewind ()方法,主要是调整了缓冲区的position属性,具体的调整规则如下: ​ (1)position重置为0,所以可以重读缓冲区中的所有数据。 ​ (2)limit保持不变,数据量还是一样的,仍然表示能从缓冲区中读取多少个元素。 ​ (3)mark标记被清理,表示之前的临时位置不能再用了。 Buffer.rewind()方法的源代码如下: ```java public final Buffer rewind() { // 重置为0,所以可以重读缓冲区的数据 position = 0; // mark 标记被清理,标识之前的临时位置不在用了 mark = -1; return this; } ``` ###### mark( )和reset( ) Buffer.mark()方法的作用是将当前position的值保存起来,放在mark属性中,让mark属性记住这个临时位置;之后,可以调用Buffer.reset()方法将mark的值恢复到position中。也就是说,Buffer.mark()和Buffer.reset()方法是配套使用的。 ###### clear( )清空缓冲区 在读取模式下,调用clear()方法将缓冲区切换为写入模式。此方法会将position清零,limit设置为capacity最大容量值,可以一直写入,直到缓冲区写满。 ###### 使用Buffer类的基本步骤 总体来说,使用Java NIO Buffer类的基本步骤如下: (1)使用创建子类实例对象的allocate()方法,创建一个Buffer类的实例对象。 (2)调用put方法,将数据写入到缓冲区中。 (3)写入完成后,在开始读取数据前,调用Buffer.flip()方法,将缓冲区转换为读模式。 (4)调用get方法,从缓冲区中读取数据。 (5)读取完成后,调用Buffer.clear() 或Buffer.compact()方法,将缓冲区转换为写入模式。 ##### 详解NIO Channel(通道)类 ###### Channel(通道)的主要类型 (1)FileChannel文件通道,用于文件的数据读写。 (2)SocketChannel套接字通道,用于Socket套接字TCP连接的数据读写。 (3)ServerSocketChannel服务器嵌套字通道(或服务器监听通道),允许我们监听TCP连接请求,为每个监听到的请求,创建一个 SocketChannel套接字通道。 (4)DatagramChannel数据报通道,用于UDP协议的数据读写。 这个四种通道,涵盖了文件IO、TCP网络、UDP IO基础IO。下面从Channel(通道)的获取、读取、写入、关闭四个重要的操作,来对四种通道进行简单的介绍 ###### FileChannel文件通道 FileChannel为阻塞模式,不能设置为非阻塞模式。 1. 获取FileChannel通道 可以通过文件的输入流、输出流获取FileChannel文件通道,示例如下: ```java // 创建一条文件流输入 FileInputStream fis = new FileInputStream(srcFile); // 获取文件流通道 FileChannel inChannel = fis.getChannel(); // 创建一条文件输出流 FileOutputStream fis = new FileOutputStream(destFile); // 获取文件流通道 FileChannel outChannel = fis.getChannel(); ``` 也可以通过RandomAccessFile文件随机访问类,获取FileChannel文件通道: ```java // 创建RandomAccessFile文件随机访问对象 RandomAccessFile file = new RandomAccessFile("file.txt","rw"); // 获取文件流通道 FileChannel inChannel = file.getChannel(); ``` 2. 读取FileChannel通道 在大部分应用场景,从通道读取数据都会调用通道的int read(ByteBufferbuf)方法,它从通道读取到数据写入到ByteBuffer缓冲区,并且返回读取到的数据量。 ```java RandomAccessFile aFile = new RandomAccessFile(fileName, "rw"); // 获取通道 FileChannel inChannel = aFile.getChannel(); // 获取一个字节缓冲流 ByteBuffer buf = ByteBuffer.allocate(CAPACITY); int length = -1; // 通过通道的read方法,读取数据并写入字节类型的缓冲区 while ((length = inChannel.read(buf)) != -1) { } ``` 3. 写入FileChannel通道 写入数据到通道,在大部分应用场景,都会调用通道的int write(ByteBufferbuf)方法。此方法的参数——ByteBuffer缓冲区,是数据的来源。write方法的作用,是从ByteBuffer缓冲区中读取数据,然后写入到通道自身,而返回值是写入成功的字节数。 ```java // 如果buf 刚写完数据,需要flip翻转buf,时期变成读取模式 buf.flip(); int outlenght=0; // 调用write方法,将buf的数据写入通道 while((outlength=outchannel.write(buf))!=0){ System.out.println("写入的字节数:"+outlength); } ``` 4. 关闭通道 当通道使用完成后,必须将其关闭。关闭非常简单,调用close方法即可。 ```java // 关闭通道 channel.close(); ``` 5. 强制刷新到磁盘 ```java // 强制刷新到磁盘 channel.force(); ``` ##### SocketChannel套接字通道 在NIO中,涉及网络连接的通道有两个,一个是SocketChannel负责连接传输,另一个是ServerSocketChannel负责连接的监听。 NIO中的SocketChannel传输通道,与OIO中的Socket类对应。NIO中的ServerSocketChannel监听通道,对应于OIO中的ServerSocket类。 ServerSocketChannel应用于服务器端,而SocketChannel同时处于服务器端和客户端。换句话说,对应于一个连接,两端都有一个负责传输的SocketChannel传输通道。 无论是ServerSocketChannel,还是SocketChannel,都支持阻塞和非阻塞两种模式。 ###### 获取SocketChannel传输通道 ```java // 获取一个套接字传输通道 SocketChannel socketChannel = SocketChannel.open(); // 设置为非阻塞模式 socketChannel.configueBlocking(false); // 对服务器的ip和端口发起连接 socketChannel.connect(new InteSocketAddress("127.0.0.1",80)); ``` 非阻塞情况下,与服务器的连接可能还没有真正建立,socketChannel.connect方法就返回了,因此需要不断地自旋,检查当前是否是连接到了主机: ```java while(!socketChannel.finishConnect()){ // 不断的自旋,等待,或者做其他事 } ``` 当新连接事件到来时,在服务器端的ServerSocketChannel能成功地查询出一个新连接事件,并且通过调用服务器端ServerSocketChannel监听套接字的accept()方法,来获取新连接的套接字通道: ```java // 新连接事件到来,首先通过事件,获取服务器监听通道 ServerSocketChannel server = (ServerSocketChannel)key.channel(); // 获取新链接的套接字通道 SocketChannel socketChannel = server.accept(); // 设置为非阻塞模式 socketChannel.configueBlocking(false); ``` ###### 读取SocketChannel传输通道 当SocketChannel通道可读时,可以从SocketChannel读取数据,具体方法与前面的文件通道读取方法是相同的。调用read方法,将数据读入缓冲区ByteBuffer。 ```java ByteBuffer buf = ByteBuffer.allocate(1024); int bytesRead = socketChannel.read(buf); ``` 在读取时,因为是异步的,因此我们必须检查read的返回值,以便判断当前是否读取到了数据。read()方法的返回值,是读取的字节数。如果返回-1,那么表示读取到对方的输出结束标志,对方已经输出结束,准备关闭连接。实际上,通过read方法读数据,本身是很简单的,比较困难的是,在非阻塞模式下,如何知道通道何时是可读的呢?这就需要用到NIO的新组件——Selector通道选择器,稍后介绍。 ###### 写入到SocketChannel传输通道 和前面的把数据写入到FileChannel文件通道一样,大部分应用场景都会调用通道的int write(ByteBufferbuf)方法。 ```java buf.flip(); socketChannel.write(buf); ``` ###### 关闭SocketChannel传输通道 在关闭SocketChannel传输通道前,如果传输通道用来写入数据,则建议调用一次shutdownOutput()终止输出方法,向对方发送一个输出的结束标志(-1)。然后调用socketChannel.close()方法,关闭套接字连接。 ##### DatagramChannel数据报通道 在Java NIO中,使用DatagramChannel数据报通道来处理UDP协议的数据传输. ###### 获取DatagramChannel数据报通道 获取数据报通道的方式很简单,调用DatagramChannel类的open静态方法即可。然后调用configureBlocking(false)方法,设置成非阻塞模式。 如果需要接收数据,还需要调用bind方法绑定一个数据报的监听端口,具体如下: ```java channel.socket().bind(new InetSocketAddress(18080)) ``` ###### 读取DatagramChannel数据报通道数据 当DatagramChannel通道可读时,可以从DatagramChannel读取数据。和前面的SocketChannel的读取方式不同,不是调用read方法,而是调用receive(ByteBufferbuf)方法将数据从DatagramChannel读入,再写入到ByteBuffer缓冲区中。 ```java // 创建缓冲区 ByteBuffer buf = ByteBuffer.allocate(1024); // 从DatagramChannel 通道读入,再写入到ByteBuffer缓冲区 SocketAddress clientAddr = datagramChannel.receive(buf); ``` 通道读取receive(ByteBufferbuf)方法的返回值,是SocketAddress类型,表示返回发送端的连接地址(包括IP和端口)。通过receive方法读数据非常简单,但是,在非阻塞模式下,如何知道DatagramChannel通道何时是可读的呢?和SocketChannel一样,同样需要用到NIO的新组件——Selector通道选择器,稍后介绍。 ###### 写入DatagramChannel数据报通道 向DatagramChannel发送数据,和向SocketChannel通道发送数据的方法也是不同的。这里不是调用write方法,而是调用send方法。示例代码如下: ```java // 把缓存区翻转到读取模式 buffer.flip(); // 调用send方法,把数据发送到目标ip+端口 channel.send(buffer,new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,NioDemoConfig.SOCKET_SERVER_PORT)); // 清空缓冲区,切换到写入模式 buffer.clear(); ``` 由于UDP是面向非连接的协议,因此,在调用send方法发送数据的时候,需要指定接收方的地址(IP和端口)。 ###### 关闭DatagramChannel数据报通道 channel.close(); #### 详解NIO Selector选择器 Java NIO的三大核心组件:Channel(通道)、Buffer(缓冲区)、Selector(选择器)。其中通道和缓冲区,二者的联系也比较密切:数据总是从通道读到缓冲区内,或者从缓冲区写入到通道中。 简单地说:选择器的使命是完成IO的多路复用。 一个通道代表一条连接通路,通过**选择器**可以同时监控多个通道的IO(输入输出)状况。选择器和通道的关系,是监控和被监控的关系。 **选择器**提供了独特的API方法,能够选出(select)所监控的通道拥有哪些已经准备好的、就绪的IO操作事件。 一般来说,一个单线程处理一个**选择器**,一个**选择器**可以监控很多通道。通过**选择器**,一个单线程可以处理数百、数千、数万、甚至更多的通道。在极端情况下(数万个连接),只用一个线程就可以处理所有的通道,这样会大量地减少线程之间上下文切换的开销。 **通道**和**选择器**之间的关系,通过register(注册)的方式完成。 调用通道的Channel.register(Selector sel, int ops)方法,可以将通道实例注册到一个选择器中。register方法有两个参数:第一个参数,指定通道注册到的选择器实例;第二个参数,指定选择器要监控的IO事件类型。 可供选择器监控的通道IO事件类型,包括以下四种: (1)可读:SelectionKey.OP_READ (2)可写:SelectionKey.OP_WRITE (3)连接:SelectionKey.OP_CONNECT (4)接收:SelectionKey.OP_ACCEPT 事件类型的定义在SelectionKey类中。如果选择器要监控通道的多种事件,可以用“按位或”运算符来实现。例如,同时监控可读和可写IO事件: ```java // 监控通道的多种事件,用"按位或"运算符来实现 int key = SelectionKey.OP_READ | SelectionKey.OP_WRITE; ``` 这里的IO事件**不是对通道的IO操作**,**而是通道的某个IO操作的一种就绪状态**,表示通道具备完成某个IO操作的条件。 ##### SelectableChannel可选择通道 并不是所有的通道,都是可以被选择器监控或选择的。 比方说,FileChannel文件通道就不能被选择器复用。判断一个通道能否被选择器监控或选择,有一个前提:判断它是否继承了抽象类SelectableChannel(可选择通道)。如果继承了SelectableChannel,则可以被选择,否则不能。 简单地说,一条通道若能被选择,必须继承SelectableChannel类。 Java NIO中所有网络链接Socket套接字通道,都继承了SelectableChannel类,都是可选择的。而FileChannel文件通道,并没有继承SelectableChannel,因此不是可选择通道。 ##### 选择器使用流程 使用选择器,主要有以下三步: (1)获取选择器实例; (2)将通道注册到选择器中; (3)轮询感兴趣的IO就绪事件(选择键集合)。 ###### 第一步:获取选择器实例 选择器实例是通过调用静态工厂方法open()来获取的,具体如下: ```java // 调用静态工厂方法open()来获取Selector实例 Selector selector = Selector.open(); ``` ###### 第二步:将通道注册到选择器实例 ```java // 获取通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 设置为非阻塞 serverSocketChannel.configureBlocking(false); // 绑定连接 serverSocketChannel.bind(new InetScoketAddress(SystemConfig.SOCKET_SERVER_PORT)); // 将通道注册到选择器上,并制定监听器事件为:“接收连接"事件 serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); ``` 如何判断通道支持哪些事件呢?可以在注册之前,可以通过通道的validOps()方法,来获取该通道所有支持的IO事件集合。 ###### 第三步:选出感兴趣的IO就绪事件(选择键集合) 通过Selector选择器的select()方法,选出已经注册的、已经就绪的IO事件,保存到SelectionKey选择键集合中。SelectionKey集合保存在选择器实例内部,是一个元素为SelectionKey类型的集合(Set)。调用选择器的selectedKeys()方法,可以取得选择键集合。 ```java // 轮询,选择感兴趣的IO就绪事件(SekectionKey集合) while(selectior.select()>0){ Set selectedKeys = selector.selectedKeys(); Interator keyInterator = selectedKeys.interator(); while(keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); // 根据具体的IO事件类型,执行对应的业务操作 if(key.isAcceptable()){ //IO事件:ServerSocketChannel 夫妻监听通道有新连接 }else if(key.isConnectable){ //IO事件:传输通道连接成功 }else if(key.isReadable()){ //IO事件:传输通道可读 }else if(key.isWritable()){ //IO事件:传输通道可写 } // 处理完成后,移除选择器 keyIterator.remove(); } } ``` 用于选择就绪的IO事件的select()方法,有多个重载的实现版本,具体如下: (1)select():阻塞调用,一直到至少有一个通道发生了注册的IO事件。 (2)select(long timeout):和select()一样,但最长阻塞时间为timeout指定的毫秒数。 (3)selectNow():非阻塞,不管有没有IO事件,都会立刻返回。 select()方法返回的整数值(int整数类型),表示发生了IO事件的通道数量。更准确地说,是从上一次select到这一次select之间,有多少通道发生了IO事件。强调一下,select()方法返回的数量,指的是通道数,而不是IO事件数,准确地说,是指发生了选择器感兴趣的IO事件的通道数。 ### Reactor反应器模式 #### Reactor反应器模式简介 反应器模式由Reactor反应器线程、Handlers处理器两大角色组成: (1)Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中选择器监控的通道IO事件。(2)Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。 #### 单线程Reactor反应器模式 什么是单线程版本的Reactor反应器模式呢?简单地说,Reactor反应器和Handers处理器处于一个线程中执行。它是最简单的反应器模型,如图所示。 ![img](D:\workspaces\netty_redis_zookeeper_source_code\image\epub_26174369_10) 基于Java NIO,如何实现简单的单线程版本的反应器模式呢?需要用到SelectionKey选择键的几个重要的成员方法: 方法一:void attach(Object o)此方法可以将任何的Java POJO对象,作为附件添加到SelectionKey实例,相当于附件属性的setter方法。这方法非常重要,因为在单线程版本的反应器模式中,需要将Handler处理器实例,作为附件添加到SelectionKey实例。 方法二:Object attachment()此方法的作用是取出之前通过attach(Object o)添加到SelectionKey选择键实例的附件,相当于附件属性的getter方法,与attach(Object o)配套使用。这个方法同样非常重要,当IO事件发生,选择键被select方法选到,可以直接将事件的附件取出,也就是之前绑定的Handler处理器实例,通过该Handler,完成相应的处理。 总之,在反应器模式中,需要进行attach和attachment结合使用:在选择键注册完成之后,调用attach方法,将Handler处理器绑定到选择键;当事件发生时,调用attachment方法,可以从选择键取出Handler处理器,将事件分发到Handler处理器中,完成业务处理。 #### 多线程的Reactor反应器模式 (1)将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听受到阻塞。 (2)如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。 **反应器模式和观察者模式(Observer Pattern)对比** 相似之处在于: 在反应器模式中,当查询到IO事件后,服务处理程序使用单路/多路分发(Dispatch)策略,同步地分发这些IO事件。 观察者模式(Observer Pattern)也被称作发布/订阅模式,它定义了一种依赖关系,让多个观察者同时监听某一个主题(Topic)。这个主题对象在状态发生变化时,会通知所有观察者,它们能够执行相应的处理。 不同之处在于: 在反应器模式中,Handler处理器实例和IO事件(选择键)的订阅关系,基本上是一个事件绑定到一个Handler处理器;每一个IO事件(选择键)被查询后,反应器会将事件分发给所绑定的Handler处理器;而在观察者模式中,同一个时刻,同一个主题可以被订阅过的多个观察者处理。 ### Future异步回调模式 #### join异步阻塞 ##### 详解join合并方法 join方法的应用场景:A线程调用B线程的join方法,等待B线程执行完成;在B线程没有完成前,A线程阻塞。 join方法是有三个重载版本: (1)void join():A线程等待B线程执行结束后,A线程重新恢复执行。 (2)void join(long millis):A线程等待B线程执行一段时间,最长等待时间为millis毫秒。超过millis毫秒后,不论B线程是否结束,A线程重新恢复执行。 (3)void join(long millis, int nanos):等待B线程执行一段时间,最长等待时间为millis毫秒,加nanos纳秒。超过时间后,不论B线程是否结束,A线程重新恢复执行。 强调一下容易混淆的几点: (1)join是实例方法,不是静态方法,需要使用线程对象去调用,如thread.join()。 (2)join调用时,不是线程所指向的目标线程阻塞,而是当前线程阻塞。 (3)只有等到当前线程所指向的线程执行完成,或者超时,当前线程才能重新恢复执行。 join有一个问题:被合并的线程没有返回值。 形象地说,join线程合并就是一像一个闷葫芦。只能发起合并线程,不能取到执行结果。 #### Future接口 主要提供了3大功能: (1)判断并发任务是否执行完成。 (2)获取并发的任务完成后的结果。 (3)取消并发执行中的任务。 V get():获取并发任务执行的结果。注意,这个方法是阻塞性的。如果并发任务没有执行完成,调用此方法的线程会一直阻塞,直到并发任务执行完成。 V get(Long timeout, TimeUnit unit):获取并发任务执行的结果。也是阻塞性的,但是会有阻塞的时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。 boolean isDone():获取并发任务的执行状态。如果任务执行结束,则返回true。 boolean isCancelled():获取并发任务的取消状态。如果任务完成前被取消,则返回true。 boolean cancel(booleanmayInterruptRunning):取消并发任务的执行。 ### Guava的异步回调 何为Guava?它是谷歌公司提供的Java扩展包,提供了一种异步回调的解决方案。相关的源代码在com.google.common.util.concurrent包中。包中的很多类,都是对java.util.concurrent能力的扩展和增强。 例如,Guava的异步任务接口ListenableFuture,扩展了Java的Future接口,实现了非阻塞获取异步结果的功能。总体来说,Guava的主要手段是增强而不是另起炉灶。 为了实现非阻塞获取异步线程的结果,Guava对Java的异步回调机制,做了以下的增强: (1)引入了一个新的接口ListenableFuture,继承了Java的Future接口,使得Java的Future异步任务,在Guava中能被监控和获得非阻塞异步执行的结果。 (2)引入了一个新的接口FutureCallback,这是一个独立的新接口。该接口的目的,是在异步任务执行完成后,根据异步结果,完成不同的回调处理,并且可以处理异步结果。 #### 详解FutureCallback FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。FutureCallback拥有两个回调方法: (1)onSuccess方法,在异步任务执行成功后被回调;调用时,异步任务的执行结果,作为onSuccess方法的参数被传入。 (2)onFailure方法,在异步任务执行过程中,抛出异常时被回调;调用时,异步任务所抛出的异常,作为onFailure方法的参数被传入。 #### 详解ListenableFuture ListenableFuture仅仅增加了一个方法——addListener方法。它的作用就是将FutureCallback善后回调工作,封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行善后处理。 在实际编程中,如何将FutureCallback回调逻辑绑定到异步的ListenableFuture任务呢?可以使用Guava的Futures工具类,它有一个addCallback静态方法,可以将FutureCallback的回调实例绑定到ListenableFuture异步任务 #### ListenableFuture异步任务 如果要获取Guava的ListenableFuture异步任务实例,主要是通过向线程池(ThreadPool)提交Callable任务的方式来获取。不过,这里所说的线程池,不是Java的线程池,而是Guava自己定制的Guava线程池。Guava线程池,是对Java线程池的一种装饰。 创建Guava线程池的方法如下: ```java //java线程池 ExecutorService jPool = Executors.newFixedThreadPool(10); //GUAVA线程池 ListeningExecutorService gPool = MoreExecutors.listeningDecorator(jPool); ``` 首先创建Java线程池,然后以它作为Guava线程池的参数,再构造一个Guava线程池。有了Guava的线程池之后,就可以通过submit方法来提交任务了;任务提交之后的返回结果,就是我们所要的ListenableFuture异步任务实例了。 简单地说,获取异步任务实例的方式,是通过向线程池提交Callable业务逻辑来实现。代码如下: ```java // 调用submit方法来提交任务,返回异步任务实例 ListenableFuturehFuture = gPool.submit(hJob); // 绑定回调实例 Futures.addCallback(listenableFuture,new FutureCallback()){ // 实现回调方法,有两个 }); ``` 获取了ListenableFuture实例之后,通过Futures.addCallback方法,将FutureCallback回调逻辑的实例绑定到ListenableFuture异步任务实例,实现异步执行完成后的回调。 Guava异步回调的流程如下: 第1步:实现Java的Callable接口,创建异步执行逻辑。还有一种情况,如果不需要返回值,异步执行逻辑也可以实现Java的Runnable接口。 第2步:创建Guava线程池。 第3步:将第1步创建的Callable/Runnable异步执行逻辑的实例,通过submit提交到Guava线程池,从而获取ListenableFuture异步任务实例。 第4步:创建FutureCallback回调实例,通过Futures.addCallback将回调实例绑定到ListenableFuture异步任务上。 完成以上四步,当Callable/Runnable异步执行逻辑完成后,就会回调异步回调实例FutureCallback的回调方法onSuccess/onFailure。 Guava异步回调和Java的FutureTask异步回调,本质的不同在于: ​ · Guava是非阻塞的异步回调,调用线程是不阻塞的,可以继续执行自己的业务逻辑。 ​ · FutureTask是阻塞的异步回调,调用线程是阻塞的,在获取异步结果的过程中,一直阻塞,等待异步线程返回结果。 ### Netty的异步回调模式 Netty和Guava一样,实现了自己的异步回调体系:Netty继承和扩展了JDK Future系列异步回调的API,定义了自身的Future系列接口和类,实现了异步任务的监控、异步执行结果的获取。 总体来说,Netty对JavaFuture异步任务的扩展如下: (1)继承Java的Future接口,得到了一个新的属于Netty自己的Future异步任务接口;该接口对原有的接口进行了增强,使得Netty异步任务能够以非阻塞的方式处理回调的结果;注意,Netty没有修改Future的名称,只是调整了所在的包名,Netty的Future类的包名和Java的Future接口的包名不同。 (2)引入了一个新接口——GenericFutureListener,用于表示异步执行完成的监听器。这个接口和Guava的FutureCallbak回调接口不同。Netty使用了监听器的模式,异步任务的执行完成后的回调逻辑抽象成了Listener监听器接口。可以将Netty的GenericFutureListener监听器接口加入Netty异步任务Future中,实现对异步任务执行状态的事件监听。 总体上说,在异步非阻塞回调的设计思路上,Netty和Guava的思路是一致的。对应关系为: ​ · Netty的Future接口,可以对应到Guava的ListenableFuture接口。 ​ · Netty的GenericFutureListener接口,可以对应到Guava的FutureCallback接口。