# my-demo-springboot-sse **Repository Path**: diywindow/my-demo-springboot-sse ## Basic Information - **Project Name**: my-demo-springboot-sse - **Description**: No description available - **Primary Language**: Java - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-12-04 - **Last Updated**: 2023-12-04 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README @[TOC] # SpringBoot-SSE开发实践 ## SSE SSE(Server-SentEvents,即服务器发送事件)是围绕只读Comet交互推出的API或者模式。SSE API用于创建到服务器的单向连接,服务器通过这个连接可以发送任意数量的数据。服务器响应的MIME类型必须是text/event-stream,而且是浏览器中的JavaScript API能解析格式输出。SSE支持短轮询、长轮询和HTTP流,而且能在断开连接时自动确定何时重新连接。 - **SSE特点**:实现简单、 单向通信、自动重连、··· - **业务场景**:客户端与服务端建立连接后,只需要服务端给客户端发送数据,客户端无需要给服务端发送数据 --- ## 开发实践 ### 项目框架 ``` . ├── README.md ├── pom.xml └── src └── main ├── java │ └── cn │ └── zuster │ └── sse │ ├── SseApplication.java【启动类】 │ ├── controller 【控制器】 │ ├── exception 【异常】 │ ├── service 【服务接口】 │ │ └── impl 【服务实现】 │ ├── session 【SESSION管理】 │ └── task 【任务管理】 └── resources └── application.properties 【配置文件】 ``` ### 项目依赖 SpringBoot中已经有SseEmitter了,所以不需要额外引入其他包。 ```xml 4.0.0 org.springframework.boot spring-boot-starter-parent 2.4.1 cn.zuster my-demo-springboot-sse 0.0.1-SNAPSHOT my-demo-springboot-sse Spring Boot And SSE Demo 1.8 org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok ``` ### Session管理 服务端和客户端建立连接时,往往会保持很多个SSE会话,为此,我们需要统一的对会话进行管理,此处我们使用 **ConcurrentHashMap**进行会话管理,其中 key 为客户端ID,value 为 **SseEmitter** 对象。当然 value 也可以按照业务进行封装。 主要方法说明: - boolean **exist**(String id):检测指定的客户端Session是否存在。 - boolean **add**(String id, SseEmitter emitter):添加Session,如果有相同的客户端ID,则先结束掉之前的Session,重新建立新的Session。 - boolean **del**(String id):删除指定客户端Session。 - boolean **send**(String id, Object msg):给指定的客户端发送数据,注意,此处没有指定 MediaType ,即默认发送的就是 data,如果需要发送其他类型的数据,可进行自由扩展。 - void **onCompletion**(String id, ScheduledFuture future):当 SseEmitter 触发 onCompletion时业务中需要处理的逻辑,包括停止线程池中的线程执行(比如心跳),移除缓存的Session等。 - void **onError**(String id, SseException e):当 SseEmitter 触发 onError 和 onTimeout 时业务中需要处理的逻辑,这里我取到缓存的Session,然后继续触发 completeWithError(),最终还是会执行到上面的 onCompletion() 方法中。 ```java package cn.zuster.sse.session; // 省略 import /** * SSE Session * * @author zuster * @date 2021/1/5 */ public class SseSession { private static final Logger logger = LoggerFactory.getLogger(SseSession.class); /** * Session维护Map */ private static Map SESSION = new ConcurrentHashMap<>(); /** * 判断Session是否存在 * * @param id 客户端ID * @return */ public static boolean exist(String id) { return SESSION.get(id) == null; } /** * 增加Session * * @param id 客户端ID * @param emitter SseEmitter */ public static void add(String id, SseEmitter emitter) { final SseEmitter oldEmitter = SESSION.get(id); if (oldEmitter != null) { oldEmitter.completeWithError(new SseException("RepeatConnect(Id:" + id + ")")); } SESSION.put(id, emitter); } /** * 删除Session * * @param id 客户端ID * @return */ public static boolean del(String id) { final SseEmitter emitter = SESSION.remove(id); if (emitter != null) { emitter.complete(); return true; } return false; } /** * 发送消息 * * @param id 客户端ID * @param msg 发送的消息 * @return */ public static boolean send(String id, Object msg) { final SseEmitter emitter = SESSION.get(id); if (emitter != null) { try { emitter.send(msg); return true; } catch (IOException e) { logger.error("MSG: SendMessageError-IOException | ID: " + id + " | Date: " + new Date() + " |", e); return false; } } return false; } /** * SseEmitter onCompletion 后执行的逻辑 * * @param id 客户端ID * @param future */ public static void onCompletion(String id, ScheduledFuture future) { SESSION.remove(id); if (future != null) { // SseEmitter断开后需要中断心跳发送 future.cancel(true); } } /** * SseEmitter onTimeout 或 onError 后执行的逻辑 * * @param id * @param e */ public static void onError(String id, SseException e) { final SseEmitter emitter = SESSION.get(id); if (emitter != null) { emitter.completeWithError(e); } } } ``` ### 业务接口 一般使用 SSE 时的业务包括:客户端建立连接、给客户端发送数据、客户端终端连接,接口如下: ```java package cn.zuster.sse.service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; /** * SSE 相关业务接口 * * @author zuster * @date 2021/1/5 */ public interface SseService { /** * 新建连接 * * @param clientId 客户端ID * @return */ SseEmitter start(String clientId); /** * 发送数据 * * @param clientId 客户端ID * @return */ String send(String clientId); /** * 关闭连接 * * @param clientId 客户端ID * @return */ String close(String clientId); } ``` ### 业务实现 业务实现简要介绍: - ScheduledExecutorService **heartbeatExecutors** :使用线程池来管理客户端连接后给客户端发送心跳,我们的业务场景是建立连接后服务端需每隔10秒给客户单发送一个消息,若连续3次未收到心跳,则客户端中断连接,重新进行连接。很多地方使用while(true)...Thread.sleep()方式来实现此业务,但是在真实业务中问题很多,没有用线程池优雅和高效。 - SseEmitter **start**(String clientId) :客户端建立连接,建立连接后,需要将缓存Session,同时设置心跳(如果有其他业务也可以在这里设置),另外在onCompletion、onTimeout、onError回调事件中处理相关的业务。**强调:一定要在回调中处理掉Session和之前设置的Task,否则很容易OOM!** - String **send**(String clientId):向指定客户端发送消息。 - String **close**(String clientId):关闭连接。 ```java package cn.zuster.sse.service.impl; // 省略 import /** * SSE 相关业务实现 * * @author zuster * @date 2021/1/5 */ @Service public class SseServiceImpl implements SseService { private static final Logger logger = LoggerFactory.getLogger(SseServiceImpl.class); /** * 发送心跳线程池 */ private static ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(8); /** * 新建连接 * * @param clientId 客户端ID * @return */ @Override public SseEmitter start(String clientId) { // 设置为0L为永不超时 // 次数设置30秒超时,方便测试 timeout 事件 SseEmitter emitter = new SseEmitter(30_000L); logger.info("MSG: SseConnect | EmitterHash: {} | ID: {} | Date: {}", emitter.hashCode(), clientId, new Date()); SseSession.add(clientId, emitter); final ScheduledFuture future = heartbeatExecutors.scheduleAtFixedRate(new HeartBeatTask(clientId), 0, 10, TimeUnit.SECONDS); emitter.onCompletion(() -> { logger.info("MSG: SseConnectCompletion | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date()); SseSession.onCompletion(clientId, future); }); emitter.onTimeout(() -> { logger.error("MSG: SseConnectTimeout | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date()); SseSession.onError(clientId, new SseException("TimeOut(clientId: " + clientId + ")")); }); emitter.onError(t -> { logger.error("MSG: SseConnectError | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date()); SseSession.onError(clientId, new SseException("Error(clientId: " + clientId + ")")); }); return emitter; } /** * 发送数据 * * @param clientId 客户端ID * @return */ @Override public String send(String clientId) { if (SseSession.send(clientId, System.currentTimeMillis())) { return "Succeed!"; } return "error"; } /** * 关闭连接 * * @param clientId 客户端ID * @return */ @Override public String close(String clientId) { logger.info("MSG: SseConnectClose | ID: {} | Date: {}", clientId, new Date()); if (SseSession.del(clientId)) return "Succeed!"; return "Error!"; } } ``` ### 任务 我们的业务为建立连接后发送心跳数据,此处我只设置了客户端ID,如果业务中有其他数据可以扩充。 ```java package cn.zuster.sse.task; // 省略 import /** * 心跳任务 * * @author zuster * @date 2021/1/5 */ public class HeartBeatTask implements Runnable { private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class); private final String clientId; public HeartBeatTask(String clientId) { // 这里可以按照业务传入需要的数据 this.clientId = clientId; } @Override public void run() { logger.info("MSG: SseHeartbeat | ID: {} | Date: {}", clientId, new Date()); SseSession.send(clientId, "ping"); } } ``` ### 异常 ```java package cn.zuster.sse.exception; /** * SSE异常信息 * * @author zuster * @date 2021/1/5 */ public class SseException extends RuntimeException { public SseException() { } public SseException(String message) { super(message); } public SseException(String message, Throwable cause) { super(message, cause); } public SseException(Throwable cause) { super(cause); } public SseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { super(message, cause, enableSuppression, writableStackTrace); } } ``` ### 控制器 ```java package cn.zuster.sse.controller; // 省略 import /** * SSE测试控制器 * * @author songyh * @date 2021/1/5 */ @RestController @RequestMapping("sse") public class SseTestController { private static final Logger logger = LoggerFactory.getLogger(SseTestController.class); @Autowired private SseService sseService; @RequestMapping("start") public SseEmitter start(@RequestParam String clientId) { return sseService.start(clientId); } /** * 将SseEmitter对象设置成完成 * * @param clientId * @return */ @RequestMapping("/end") public String close(String clientId) { return sseService.close(clientId); } } ``` --- ## 测试 代码就上面这么多了,启动起来测试一下吧。 - 建立连接:http://localhost:8080/sse/start?clientId=888 - 关闭连接:http://localhost:8080/sse/end?clientId=111 需要测试的点包括: - 同时开启多个连接 - 启动两个相同的连接 - 启动后直接关掉 - 启动后等待30秒超时 - 发送消息(我controller中删了发送消息的,可以自行加上试试) - 通过关闭连接接口关闭连接 好了,敬请的玩吧