# websocket-webflux **Repository Path**: oldersheep/websocket-webflux ## Basic Information - **Project Name**: websocket-webflux - **Description**: webflux整合websocket,实现消息服务 - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2021-07-04 - **Last Updated**: 2023-05-08 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # TCP连接管理服务 - 使用Spring Webflux整合WebSocket - 认证方式仅支持URL认证 - 使用spring内置的事件实现解耦 ## 开发流程 ### 配置WebSocket `Spring Webflux`已经内置了`WebSocket`,需要开启一些配置即可: ```java @Configuration public class WebSocketConfiguration { /** * 启动过程中,会最先初始化这个Bean,而这个构造函数仅仅是将 * webSocketService = new HandshakeWebSocketService(); * 而这个握手的服务里面,init了UpgradeStrategy,这里使用的是类型推断, * 推断为:org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy * 具体的推断方式,基本等同于反射。 */ @Bean public WebSocketHandlerAdapter webSocketHandlerAdapter() { return new WebSocketHandlerAdapter(); } /** * 将websocket的请求路径进行映射,映射到自定义的WebSocketHandler * 这里需要注意的是: * 1. handlerMapping.setOrder(Ordered.HIGHEST_PRECEDENCE); 这个必须有,不然ws连接时会报404 */ @Bean public HandlerMapping webSocketMapping(WebSocketHandler imSocketHandler) { final Map mappings = new HashMap<>(); mappings.put("/ws", imSocketHandler); final SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); handlerMapping.setOrder(Ordered.HIGHEST_PRECEDENCE); handlerMapping.setUrlMap(mappings); return handlerMapping; } /** * 这个配置完全没有必要了,在初始化WebSocketHandlerAdapter时已经实例化了WebSocketService, * 同时自己推断出来了ReactorNettyRequestUpgradeStrategy */ @Bean public WebSocketService webSocketService() { return new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy()); } } ``` ### 实现自定义的`WebSocketHandler` 看注释,`WebSocketHandler`的`handle(WebSocketSession)`方法只在WebSocket连接时调用一次, 之后这个方法再也不会被执行,而这里就引出了`Publisher`。 根据这个特性,将认证放在了方法的返回值前即可。 这里还有一系列没有搞明白的地方: - 请求如何使被转发到这个Handler中的 - 最后返回的Mono被谁订阅了。 ### 认证的逻辑 定了认证的流程,提供了Redis进行认证的方式,利用Spring的注解 `@ConditionalOnProperty(value = "ws.auth.type", havingValue = "redis")`, 如果需要改变认证逻辑的话,只需要修改yml中的ws.auth.type,并写一个实现类,继承自 `URLAuthenticationProcess`,参考注释将逻辑实现即可。 这里有一个方法 `void publishHeartbeatEvent(WebSocketSession session);`, 这里是通过事件机制,确保认证成功后进行心跳检测。 `processAuthentication`这个方法是具体认证的方法,被默认实现了,建议不要轻易覆盖。 ### 心跳的实现 这里的心跳利用了`Flux.interval(Duration.ofSeconds(1))`创建了一个类似秒表的发射器。当认证成功后, 会发布一个开始心跳的事件,而后`HeartbeatEventListener`监听到之后,开始消费秒表,这时秒表开始从0走针。 而订阅了这个秒表的消费者,记录了上次心跳的时间,与当前的时间进行对比,如果不到心跳周期,就放行,否则执行Ping的逻辑。 这里主要有一个停止消费的问题,就是秒表的消费者消费后的返回值`Disposable`是用来取消订阅的,而取消订阅的操作需要在 消费者里面调用,这就形成了`鸡生蛋蛋生鸡`的问题,暂时未想到更好的办法,使用一个持有类,将对象传入。 ### 心跳复位 这里仍然是利用`Publisher`的一个特性方法`doOnEach`,每一次的接收到消息,就发布复位的事件,这里复位其实就是将 消费者中的一个计数器置为初始值即可。 ### 消息的顶层设计 `MessagePayload` 消息内容 `MessageConverter` 消息转换器 `MessageProcessorHandler` 消息处理器 `WebSocketHandler`的`doOnNext`方法的逻辑是,轮询消息转换器,转换为正确的消息内容,而后轮询消息处理器,根据support 方法进行适配处理。 目前就是这三个顶级的接口,暂时未想到扩展点,后续有问题再修改吧。 ## 未来的一些考虑 ### 多节点部署 `WebSocketSessionRepository`的默认实现已经无法满足了,必须换成Redis或者数据库等 中间介质。同时,还要存储userId与机器IP:PORT的对应关系。