# EasyIm **Repository Path**: firerr/easy-im ## Basic Information - **Project Name**: EasyIm - **Description**: 一个开箱即用的im通讯框架 - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 1 - **Created**: 2021-09-09 - **Last Updated**: 2021-09-09 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # EasyIM [![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme) EasyIM 是一个基于 **Netty** 的, 开箱即用的 **IM** 框架, 目前 **SDK** 实现了客户端之间, **点对点**, **点对多**之间 , 支持并默认使用 **WebSocket** 协议对外通信, **UDP** 和 **MQ** 协议待更新. ## 0) 目录 - [EasyIM](#easyim) - [0) 目录](#0-目录) - [1) 基础组件介绍](#1-基础组件介绍) - [2) 快速使用](#2-快速使用) - [2.1) 启动示例](#21-启动示例) - [2.2) Handler](#22-handler) - [2.2.1) 模板抽象类 AbstractServerHandler](#221-模板抽象类-abstractserverhandler) - [2.2.2) ServerWebSocketCoreHandler](#222-serverwebsocketcorehandler) - [2.3) Listener](#23-listener) - [2.3.1) ServerEventListener](#231-servereventlistener) - [2.3.2) MessageQoSEventListenerS2C](#232-messageqoseventlisteners2c) - [2.4) OnlineChannels](#24-onlinechannels) - [2.5) 小结](#25-小结) - [3) 项目细节](#3-项目细节) - [3.1) 整体工作流程概述](#31-整体工作流程概述) - [3.1.1) 基础连接架构](#311-基础连接架构) - [3.1.1) 整体架构图](#311-整体架构图) - [3.2) 代码目录结构概述](#32-代码目录结构概述) - [3.3) 整体工作流程细节](#33-整体工作流程细节) - [4) 当前依赖组件版本](#4-当前依赖组件版本) - [9) 术语定义](#9-术语定义) ## 1) 基础组件介绍 基础组件工作流程图: ![](./doc/image/EasyIM_Basic.png) * **Agreement(通讯消息包装)**: 客户端向 **SDK** 发起的请求, 会被 **转换** 为一个 **Agreement** 对象, 即 **Agreement**, **SDK** 向客户端发送的消息也遵循该格式. * **Handler(消息处理器)**: 客户端向 **SDK** 发起的请求, 经转换为我们的 **Agreement** 通讯消息包装后, 会交由 **Handler** 处理, **SDK** 内置了 一些基础的 **Handler(AbstractServerHandler, ServerWebSocketCoreHandler)**, 开发者可以根据业务需要, 定义自己的 **Handler**. * **AgreementStrategy(基础业务处理器)**: 请求经由 **SDK 内置**的基础 **Handler** 后 , 根据 **Agreement.type** 属性分发到对应的 **AgreementStrategy** 上(`Agreement.type 对应 AgreementStrategy.myType`). **SDK** 也内置了一些基础的 **AgreementStrategy** 实现 , 开发者可以根据业务需要, 定义自己的 **Handler**. * **Listener(回调监听器)**: 供开发者在使用 **SDK** 时, 可以基于框架定义的模板方法, 实现自己的请求处理器. ## 2) 快速使用 上文介绍了 **SDK** 的一些基础组件 ### 2.1) 启动示例 项目可以基于 **BaseServerApplication** 基础类启动 **SDK**(基于 **Netty**), 启动参考代码: ```java public class Application { public static void main(String[] args) throws Exception { BaseServerApplication application = new BaseServerApplication.Builder() .setPort(7788) // 监听端口 .setOnlineChannels(new CaffeineCache<>()) // 在线用户 Channel 存储容器 .setAbstractServerHandler(new ServerWebSocketCoreHandler()) // 消息处理器 // 回调监听器 .setServerEventListener(new ServerEventListener() { /*...*/ }) .setMessageQoSEventListenerS2C(new MessageQoSEventListenerS2C() { /*...*/ }) .build(); application.startUp(); // 启动 // 加一个钩子,确保在 JVM 退出时释放 Netty 的资源 Runtime.getRuntime().addShutdownHook(new Thread(application::shutdown)); } } ``` 如上, 我们在启动 **SDK** 时, 可以指定几个比较重要的参数实例, 接下来两小节着重讲解这几个参数: * **OnlineChannels**: 在线队列容器 * **ServerEventListener**: 业务相关监听器 * **MessageQosEventListenerS2C**: QOS 相关监听器 * **AbstractServerHandler**: 即上图中的 **Handler** 消息处理器 ### 2.2) Handler 消息处理器这块功能用到了**模板方法**模式, **SDK** 提供了 **AbstractServerHandler** 模板方法抽象类的默认实现可以使用, 可基于此抽象类进行扩展. 同时, **SDK** 也内置了一个 **ServerWebSocketCoreHandler** 的扩展处理器可以开箱即用, 客户端的消息最终会由我们传入的 **Handler** 来执行. #### 2.2.1) 模板抽象类 AbstractServerHandler 我们可以看看模板抽象类的内容:
点击展开: com.cloud.easy.im.handler.AbstractServerHandler ```java public abstract class AbstractServerHandler { /**io线程执行*/ List strategyList = Lists.newArrayList(); protected ServerEventListener serverEventListener = null; /**业务线程池*/ protected EventProducer eventProducer; public AbstractServerHandler() { } /**处理接收消息逻辑*/ public void messageReceived(Channel session, Agreement agreement) throws Exception { for (AgreementStrategy strategy : strategyList) { if (strategy.algorithmInterface(agreement.getType())) { strategy.handler(session, agreement, serverEventListener); return; } } //业务线程执行 eventProducer.onHandler(session, agreement); } /**客户端创建链接*/ public abstract void sessionCreat(Channel session) throws Exception; /**客户端断开链接*/ public abstract void sessionClosed(Channel session) throws Exception; /**异常处理*/ public abstract void exceptionCaught(Channel session, Throwable cause) throws Exception; } ```
**模板抽象类 AbstractServerHandler** 供子类进行实现, 可继承该类重写其方法实现业务. #### 2.2.2) ServerWebSocketCoreHandler **SDK**内置了一个实现供于开箱即用的基于 **WebSocket** 的 **Handler** 实现, 在抽象类基础上扩展了 **WebSocket** 的心跳续约功能以及 **Close** 功能, 可以展开代码简单看一下:
点击展开: com.cloud.easy.im.handler.ServerWebSocketCoreHandler ```java public class ServerWebSocketCoreHandler extends AbstractServerHandler { /**构造核心处理器逻辑*/ public ServerWebSocketCoreHandler() { strategyList.add(new AgreementWebSocketCKeepLiveHandler()); } @Override public void sessionCreat(Channel session) { logger.debug("[WEB_SOCKET_CORE_HANDLER]客户端:" + CharsetHelper.sessionToString(session) + "已建立会话链接"); } @Override public void sessionClosed(Channel session) { String userId = OnlineProcessor.getUserIdFromSession(session); if (!StringUtil.isNullOrEmpty(userId)) { Channel inOnlineSeesion = OnlineProcessor.getInstance().getOnlineSession(userId); logger.info("[WEB_SOCKET_CORE_HANDLER]客户端:" + CharsetHelper.sessionToString(session) + "会话已关闭"); if (Objects.nonNull(inOnlineSeesion) && session == inOnlineSeesion) { OnlineProcessor.getInstance().removeUser(userId); if (serverEventListener != null) { serverEventListener.onUserLogoutAction_CallBack(userId, null, session); } else { logger.debug("[WEB_SOCKET_CORE_HANDLER]>> 会话" + CharsetHelper.sessionToString(session) + "被系统close了,但回调对象是null,没有进行回调通知."); } } else { { logger.warn("[WEB_SOCKET_CORE_HANDLER]【注意】会话" + CharsetHelper.sessionToString(session) + "不在在线列表中,意味着它是被客户端弃用的,本次忽略这条关闭事件即可!"); } } } else { logger.warn("[WEB_SOCKET_CORE_HANDLER]【注意】会话" + CharsetHelper.sessionToString(session) + "被系统close了,但它里面没有存放user_id,它很可能是没有成功合法认证而被提前关闭,从而正常释放资源。"); } } @Override public void exceptionCaught(Channel session, Throwable cause) { logger.warn("客户端:" + CharsetHelper.sessionToString(session) + "抛出异常,原因是:" + cause.getMessage() + ",可以提前close掉了哦!", cause); session.close(); } } ```
### 2.3) Listener 除了上文提到的 **Handler** 之外, **SDK**还提供了 **Listener** 监听接口, **SDK** 没有提供内置实现, 需要开发者自己实现. 我们实现的 **Listener**, 会在客户端操作的**生命周期**中**各个阶段**, 回调 / 调用我们提供的 **Listener** 中的方法, 可以在其中实现我们的业务. **SDK** 定义了两个监听器接口: 接着我们会简单看一下其中的回调方法: * **ServerEventListener**: 业务相关监听器. * **MessageQosEventListenerS2C**: QOS 相关监听器. #### 2.3.1) ServerEventListener ![](./doc/image/EasyIM_Listener.jpg) 用户/客户端的业务操作会触发我们的 **ServerEventListener** 监听器, 提供了以下回调方法: * **用户登陆** * **用户登陆成功** * **用户退出登陆** * **消息发送前** * **单聊发送成功** * **单聊发送失败** * **发送群组消息**
点击展开: com.cloud.easy.im.event.ServerEventListener ```java public interface ServerEventListener { /**用户身份验证回调方法定义. * 服务端的应用层可在本方法中实现用户登陆验证。 * note:本回调在一种特殊情况下——即用户实际未退出登陆但再次发起来登陆包时,本回调是不会被调用的! *

* 本方法中用户验证通过(即方法返回值=0时)后 ,将立即调用回调方法 * {@link ServerEventListener#onUserLoginAction_CallBack(String, String, Channel)} * 否则会将验证结果(本方法返回值错误码通过客户端的 * ChatBaseEvent.onLoginMessage(int dwUserId, int dwErrorCode) 方法进行回调)通知客户端)。*/ byte onVerifyUser_CallBack(String userId, String token, String extra, Channel session); /**用户登录验证成功后的回调方法定义(可理解为上线通知回调). * 服务端的应用层通常可在本方法中实现用户上线通知等。 * note:本回调在一种特殊情况下——即用户实际未退出登陆但再次发起来登陆包时,回调也是一定会被调用。*/ void onUserLoginAction_CallBack(String userId, String extra, Channel session); /**用户退出登录回调方法定义(可理解为下线通知回调)。 * 服务端的应用层通常可在本方法中实现用户下线通知等。*/ void onUserLogoutAction_CallBack(String userId, Object obj, Channel session); /**消息发送之前,前置操作 * 本方法的典型用途:消息落地,适用于所有消息场景,在消息发送逻辑正式执行前执行*/ void messageBefore_CallBack(Agreement dataFromClient); /**收到客户端发送给“其它客户端”的数据回调通知(即:消息路径为“C2C”的消息). * note:本方法当且仅当在数据被服务端成功实时发送(“实时”即意味着对方在线的情况下)出去后被回调调用. * 本方法的典型用途:开发者可在本方法中可以实现用户聊天信息的收集,以便后期监控分析用户的行为等。*/ void messageSendSuccess_C2C_CallBack(Agreement dataFromClient); /**服务端在进行消息直发(S2C消息)或转发(C2C消息)时,当对方在线但实时发送失败、以及其它各种问题导 致消息并没能正常发出时, * 将无条件走本回调通知。 * note:本方法当且仅当在数据被服务端在线发送失败后被回调调用.*/ boolean messageSendFaild_C2C_CallBack(Agreement dataFromClient); /**用户发送群组信息时,通过此回调接口返回群组中除自己以外,所有在线用户的id列表 * note:注意此接口最好只返回在线列表(最佳实践:可以通过位图结构,实时更新在线群组人员)*/ List messageGroupAction_CallBack(String groupId, String userId); } ```

#### 2.3.2) MessageQoSEventListenerS2C **MessageQoSEventListenerS2C** 监听器主要负责 QOS 工作相关的监听, 提供: * **超过最大 QOS 尝试次数** * **收到客户端 ACK 应答包**
点击展开: com.cloud.easy.im.event.MessageQoSEventListenerS2C ```java public interface MessageQoSEventListenerS2C { /**因为超过最大qos尝试次数,而被丢弃的消息。 * 此消息通过使用框架时设置的最大尝试qos次数{@link QoS4SendDaemonRoot#QOS_TRY_COUNT}控制,超过最大次数后,将由系统自行维护后续消息*/ void messagesLost(ArrayList lostMessages); /**服务端收到客户端返回的ack应答包后回调函数*/ void messagesBeReceived(String theFingerPrint); } ```
### 2.4) OnlineChannels OnlineChannels 即 **SDK** 中的在线用户容器, 存储我们在线用户的 Netty Channel, 可通过此 Channel 与客户端进行通讯. 位于 `com.cloud.easy.im.processor.OnlineProcessor#onlineChannels`, 我们可以遵循 `OnlineMap` 接口, 定义自己的在线用户容器. **SDK** 内置的一些在线用户容器实现: * **CaffeineCache**: 基于 **Caffeine** 的容器实现, 包括 Spring 5, 以及 SpringBoot 2 目前都使用它代替了 Guava 用做缓存.. * **DefaultOnlineMap**: 基于 **ConcurrentHashMap** 的容器实现. ### 2.5) 小结 总结一下, 我们在启动 **SDK** 时, 可供自定义配置的组件如下: 1. **Handler**: **AbstractServerHandler** 的实现, **SDK** 内置有 **ServerWebSocketCoreHandler**. 2. **Listener**: 两个监听器 **ServerEventListener** 和 **MessageQosEventListenerS2C** 的实现. 3. **OnlineChannels**: 在线用户 **Channel** 的存储容器, **OnlineMap** 的实现, **SDK** 内置有 **CaffeineCache** 和 **DefaultOnlineMap**. ## 3) 项目细节 ### 3.1) 整体工作流程概述 #### 3.1.1) 基础连接架构 ![](./doc/image/EasyIM_Pool.jpg) #### 3.1.1) 整体架构图 ![](./doc/image/EasyIM_All.jpg) ### 3.2) 代码目录结构概述
点击展开: 代码目录结构树
.
├── BaseServerApplication: 基础启动类, 项目中的启动类需基于此类启动.
│
├── argeement
│   ├── Agreement: 请求相应消息包装, 客户端与服务端之间消息应遵循该格式, 详细信息可参考 WebSocket_Agreement.md.
│   ├── AgreementFactory: Agreement 转换工厂, 基于 FastJSON, 主要用于处理: (客户端消息 <-> Agreement <-> 服务端消息) 之间的转换.
│   ├── AgreementType: Agreement.type 属性枚举.
│   ├── server: 服务端向客户端通用响应封装, 对应序列化为响应时的 Agreement.dataContent.
│   │   ├── ErrorResponse: 通用错误响应.
│   │   ├── GroupResponse: 群聊信息响应(暂未使用).
│   │   └── LoginResponse: 登陆结果响应.
│   ├── ErrorCode: ErrorResponse.errorCode 属性枚举.
│   └── client: 客户端向服务器通用请求封装, 对应请求时的 Agreement.dataContent 的反序列化.
│       └── LoginInfo: 用户登陆请求.
│
├── container: 容器对象, 目前仅存放了在线用户 Channel 容器.
│   ├── OnlineMap: 在线用户缓存容器接口.
│   ├── CaffeineCache: 基于 Caffeine 的在线用户缓存实现, 包括 Spring 5, 以及 SpringBoot 2 目前都使用它代替了 Guava 用做缓存.
│   └── DefaultOnlineMap: 基于 ConcurrentHashMap 的在线用户缓存实现.
│
├── event: 监听以及监听事件相关.
│   ├── MessageQoSEventListenerS2C: QoS 监听器.
│   └── ServerEventListener: 业务相关监听器, 上文也有详细介绍.
│
├── handler: 客户端消息处理器, 客户端消息经过 Netty 后的基础业务处理相关.
│   ├── AbstractServerHandler: Handler 模板抽象类
│   ├── ApplicationPullHandler: 暂未使用, 保留
│   ├── ApplicationPushHandler: 暂未使用, 保留
│   ├── ServerCoreHandler: 暂未使用, 保留
│   ├── ServerWebSocketCoreHandler: Handler 抽象类的一个 WebSocket 实现, 客户端请求会首先分发至此 Handler
│   ├── disruptor: 请求经由 Handler 处理后, 会提交至我们基于 Disruptor 实现的连接池, Disruptor 是一个高吞吐量的环状队列.
│   │   ├── EventProducer: Disruptor 连接池.
│   │   └── event: Disruptor 连接池任务.
│   │       └── WebSocketEvent: Disruptor 连接池任务所需参数包装.
│   │
│   ├── qos: QoS 相关, 保证服务端与客户端之间, 消息的可靠传输, 暂时不细讲.
│   │   ├── QoS4RecivedDaemonC2S
│   │   ├── QoS4RecivedDaemonRoot
│   │   ├── QoS4SendDaemonRoot
│   │   └── QoS4SendDaemonS2C
│   │
│   └── strategy: 客户端消息经 Handler 提交至 EventProducer 后, 大致会根据 Agreement.type 分发至相对应应的, 此包下的业务能力处理器上.
│       ├── AgreementCLoginHandler: 弃用中, 保留
│       ├── AgreementStrategy: 业务能力处理器接口
│       └── websocket: 基于 WebSocket 通信业务能力处理器相关
│           ├── AgreementWebSocketCKeepLiveHandler: 心跳续约处理器, 响应客户端的心跳续约请求.
│           ├── AgreementWebSocketCCommonPullHandler: 客户端发起的, 点对多消息, 通用处理器.
│           ├── AgreementWebSocketCCommonPushHandler: 客户端发起的, 点对点消息, 通用处理器.
│           ├── AgreementWebSocketCLoginHandler: 客户端登陆消息处理器.
│           ├── AgreementWebSocketCLogoutHandler: 客户端退出登陆消息处理器.
│           └── AgreementWebSocketCRecivedHandler: 客户端收到消息后, 返回的确认已收到 ACK 消息处理器.
│
├── netty
│   ├── IMObserver: Netty 异步操作发生时, 可利用此接口, 基于异步处理的结果实现相应业务.
│   ├── channel: 暂时仅存放 UDP 相关的 Channel 包装.
│   │   ├── UDPClientChannel
│   │   └── UDPServerChannel
│   ├── config
│   │   └── UDPServerChannelConfig
│   └── handler
│       ├── NioWebSocketHandler: Netty 这边的消息处理器, 仅仅用于传递消息至 Handler.
│       └── UDPClientInboundHandler
│
├── processor: 基础能力层顶层静态工具类.
│   ├── CommonProcessor: 消息处理器, 底层调用 GlobalHelper, LocalHelper 发送消息, 以及 QoS 相关的静态方法包装.
│   └── OnlineProcessor: 操作在线用户相关处理器, 包括在线用户容器也在其中.
│
└── utils: 基础能力层底层工具类
    ├── BasicThreadFactory: 线程工厂工具类.
    ├── ByteBufJsonConvert: ByteBuf 与 JSON 转换工具类.
    ├── CharsetHelper: 
    ├── GlobalSendHelper: C2C/G(客户端 -> 客户端/服务端) 消息发送工具类.
    ├── LocalSendHelper: S2C(服务端 -> 客户端) 消息发送工具类.
    ├── MQProvider: RabbitMQ 相关工具, 暂未使用
    ├── RedisProvider: Redis 相关工具类.
    ├── SequenceIdutil: 消息标识生成器工具.
    ├── SequenceIdUtil2: 消息标识生成工具.
    └── SystemClock: 获取当前时间工具类, 建议使用其获取当前时间.
### 3.3) 整体工作流程细节 这一节主要来看看框架整体的, 核心工作代码**(有删减)**细节, 有需要可以展开看看:
点击展开 从基础启动类的 **startUp** 方法开始: `com.cloud.easy.im.BaseServerApplication#startUp` ```java // Boss 和 Worker Group 的容量都为 1, 后续会将任务分发至 Disruptor 线程池 private final EventLoopGroup bossGroup=new NioEventLoopGroup(1); private final EventLoopGroup workerGroup=new NioEventLoopGroup(1); private Channel nettyChannel=null; public synchronized void startUp()throws InterruptedException{ // 1. 服务端 Netty 线程池 ServerBootstrap bootstrap=initServerBootstrap(); ChannelFuture cf=bootstrap.bind(builder.port).syncUninterruptibly(); nettyChannel=cf.channel(); initOnlineMap(); // 4. 初始化在线用户 Channel 容器 nettyChannel.closeFuture().await(); } private ServerBootstrap initServerBootstrap(){ return new ServerBootstrap() .group(bossGroup,workerGroup) // 2. 设置请求处理器 .childHandler(initChildChannelHandler4Netty()); } private ChannelHandler initChildChannelHandler4Netty(){ return new ChannelInitializer(){ @Override protected void initChannel(Channel ch)throws Exception{ ch.pipeline() // 3. 请求被 Worker 连接池分发至 NioWebSocketHandler 处理 .addLast("handler",new NioWebSocketHandler(builder.abstractServerHandler)); } }; } private void initOnlineMap(){ // 5. 在线用户容器初始化 OnlineProcessor.setOnlineChannels(builder.onlineChannels); } ``` 1. 请求会分发至 NioWebSocketHandler 中处理, 我们接着来看一下处理代码: `com.cloud.easy.im.netty.handler.NioWebSocketHandler#channelRead0` ```java public class NioWebSocketHandler extends SimpleChannelInboundHandler { private final AbstractServerHandler abstractServerHandler; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { logger.debug("收到消息:" + msg); if (msg instanceof FullHttpRequest) { // 建立 WS 连接 handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { // 1. 处理 WS 消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // 2. 请求过滤... // 3. 获取消息报文字符串, 反序列化为 Agreement 对象 String request = ((TextWebSocketFrame) frame).text(); Agreement dataFromClient = AgreementFactory.parse(request, Agreement.class); // 4. 将 Agreement 交由 Handler 处理 abstractServerHandler.messageReceived(ctx.channel(), dataFromClient); } } ``` **NioWebSocketHandler** 这边主要将请求过滤, 然后将消息报文转为 Agreement 对象, 交由 Handler 执行. 2. 接着我们看一下 Handler 这边的处理 Agreement 代码; `com.cloud.easy.im.handler.AbstractServerHandler#messageReceived` ```java public abstract class AbstractServerHandler { // 业务执行线程池 protected EventProducer eventProducer; public void messageReceived(Channel session, Agreement agreement) throws Exception { for (AgreementStrategy strategy : strategyList) { if (strategy.algorithmInterface(agreement.getType())) { // 消息非业务消息, 交由非业务消息处理器执行如心跳续约处理器 strategy.handler(session, agreement, serverEventListener); return; } } // 1. 消息为业务消息, 交由 EventProducer 线程池异步执行 eventProducer.onHandler(session, agreement); } } ``` **Handler** 这边主要根据 `Agreement.type` 属性判断消息类型, 针对: * 非业务消息: 即不会产生 **IO** 阻塞的消息, 直接交由 **AgreementStrategy** 处理器业务处理器**同步**处理. * 业务消息: 如为业务消息, 则交由我们的 **EventProducer** 异步线程池执行 3. 我们接着看一下 **EventProducer** 这边的整体代码, 这边主要基于 **Disruptor** 实现的线程池. `com.cloud.easy.im.handler.disruptor.EventProducer` ```java public class EventProducer { /** * 业务线程执行 */ private List strategyList = Lists.newArrayList(); public EventProducer(int bufferSize, ServerEventListener serverEventListener, MessageQoSEventListenerS2C messageQoSEventListenerS2C, AbstractServerHandler abstractServerHandler) { strategyList.add(new AgreementWebSocketCCommonPushHandler()); strategyList.add(new AgreementWebSocketCCommonPullHandler()); strategyList.add(new AgreementWebSocketCLoginHandler()); strategyList.add(new AgreementWebSocketCLogoutHandler(abstractServerHandler)); strategyList.add(new AgreementWebSocketCRecivedHandler(messageQoSEventListenerS2C)); // 1. 初始化 Disruptor, 定义每个线程执行逻辑 Disruptor disruptor = new Disruptor(() -> new WebSocketEvent(), bufferSize, new BasicThreadFactory.Builder() .namingPattern("EventProducer-daemon-pool-%d").daemon(true).build(), ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWith(new EventHandler() { @Override public void onEvent(WebSocketEvent event, long sequence, boolean endOfBatch) throws Exception { // 2. 使用对应的处理器执行 for (AgreementStrategy strategy : strategyList) { if (strategy.algorithmInterface(event.getAgreement().getType())) { // 3. 执行处理器中的 handler 方法 strategy.handler(event.getSession(), event.getAgreement(), serverEventListener); return; } } } }); disruptor.start(); ringBuffer = disruptor.getRingBuffer(); } public void onHandler(Channel session, Agreement agreement) { // 1. 申请任务实例, 填充任务所需参数 long sequence = ringBuffer.next(); try { WebSocketEvent webSocketEvent = ringBuffer.get(sequence); webSocketEvent.setAgreement(agreement); webSocketEvent.setSession(session); } finally { // 2. 提交任务 ringBuffer.publish(sequence); } } } ``` **EventProducer** 这边主要利用连接池, 让业务消息异步执行, 并分发至对应的业务处理器. 4. 上节提到, 我们会根据 **Agreement.type** 属性选择响应的处理器执行, 我们来看一下登陆业务处理器是怎么执行的: `com.cloud.easy.im.handler.strategy.websocket.AgreementWebSocketCLoginHandler#handler` ```java public class AgreementWebSocketCLoginHandler extends AgreementStrategy { public void handler(Channel session, Agreement dataFromClient, ServerEventListener serverEventListener) throws Exception { // 1. 将 Agreement.dataContent 转换为业务 Bean final LoginInfo loginInfo = AgreementFactory.parseLoginInfo(dataFromClient.getDataContent()); logger.debug("[WS_LOGINE]>> loginInfo=" + loginInfo.toString()); // 2. 参数过滤 if (loginInfo.getLoginUserId() == null) { logger.warn("[WS_LOGINE]>> loginInfo或loginInfo.getLoginUserId()是null,无法登陆loginInfo=" + loginInfo + "]!"); return; } if (serverEventListener == null) { logger.warn("[WS_LOGINE]>> 收到客户端登陆信息,但回调对象是null,没有进行回调."); return; } // 3. 判断是否已经登陆(判断 channel 中有无用户 Id) boolean alreadyLogined = OnlineProcessor.isOnline(session); // 4. 定义登陆成功后的回调观察者 IMObserver imObserver = (sucess, extraObj) -> { if (sucess) { //在Attribute中存放用户id,以方便后续消息传递 session.attr(OnlineProcessor.USER_ID_IN_SESSION_ATTRIBUTE_ATTR).set(loginInfo.getLoginUserId()); //存放进在线列表中 OnlineProcessor.getInstance().putOnlineUser(loginInfo.getLoginUserId(), session); //登陆成功后回调 serverEventListener .onUserLoginAction_CallBack(loginInfo.getLoginUserId(), loginInfo.getExtra(), session); } else { logger.warn("[WS_LOGINE]>> 发给客户端[userid:" + loginInfo.getLoginUserId() + "]的登陆成功信息发送失败了!"); } }; // 5. 如果已经登陆, 则调用 LocalSendHelper#sendData 回消息 if (alreadyLogined) { //如果已经登陆 logger.debug("[WS_LOGINE]>> 客户端[userid:" + loginInfo.getLoginUserId() + "]已经登陆过,但还在尝试重新登陆"); LocalSendHelper.sendData(session, AgreementFactory.createLoginInfoResponse((byte) 0, loginInfo.getLoginUserId()), imObserver); return; } // 5. 如首次登陆, 回调监听器登陆校验方法 byte code = serverEventListener.onVerifyUser_CallBack( loginInfo.getLoginUserId(), loginInfo.getLoginToken(), loginInfo.getExtra(), session); if (code != 0) { // 6. 登陆失败,返回失败结果 logger.debug("[WS_LOGINE]>> 客户端[userid:" + loginInfo.getLoginUserId() + "]回调登陆校验接口,接口返回结果失败!code:" + code); LocalSendHelper.sendData(session, AgreementFactory.createLoginInfoResponse(code, loginInfo.getLoginUserId()), null); return; } // 6. 登陆成功 LocalSendHelper.sendData(session, AgreementFactory.createLoginInfoResponse(code, loginInfo.getLoginUserId()), imObserver); } } ``` ## 4) 当前依赖组件版本 | Feature | Version | | :-------------: | :----------: | | Netty | 4.1.42.Final | | Jedis | 3.3.0 | | Guava | 29.0-jre | | Caffeine | 2.8.5 | | Disruptor | 3.3.5 | | FastJSON | 1.2.68 | | AMQP-Client | 5.9.0 | ## 9) 术语定义 * [**模板方法模式**](https://zh.wikipedia.org/wiki/模板方法): 模板方法模型是一种行为设计模型。模板方法是一个定义在父类别的方法 ,在模板方法中会呼叫多个定义在父类别的其他方法,而这些方法有可能只是抽象方法并没有实作 ,模板方法仅决定这些抽象方法的执行顺序,这些抽象方法的实作由子类别负责,并且子类别不允许覆写模板方法。 * ... * ...