From 4445fb379b6d7fc37b509d4bb806f01022ed5a4a Mon Sep 17 00:00:00 2001 From: mabofu Date: Mon, 26 Dec 2022 10:10:00 +0800 Subject: [PATCH] Add task update websocket server --- .../dsms/modules/server/TaskUpdateServer.java | 136 ++++++++++++++++++ .../dsms/modules/server/WebSocketServer.java | 14 +- 2 files changed, 143 insertions(+), 7 deletions(-) create mode 100644 dsms-engine-application/src/main/java/com/dsms/modules/server/TaskUpdateServer.java diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/server/TaskUpdateServer.java b/dsms-engine-application/src/main/java/com/dsms/modules/server/TaskUpdateServer.java new file mode 100644 index 0000000..0a49a66 --- /dev/null +++ b/dsms-engine-application/src/main/java/com/dsms/modules/server/TaskUpdateServer.java @@ -0,0 +1,136 @@ +/* + * Copyright 2022 The DSMS Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dsms.modules.server; + +import cn.hutool.core.thread.ThreadUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; + +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +@ServerEndpoint("/api/node/websocket/taskUpdate/{sessionId}") +@Component +@Slf4j +public class TaskUpdateServer { + + private static final int MAX_RETRY_SIZE = 3; + + private static final String PING = "ping"; + private static final String PONG = "pong"; + + + /** + * 存储每个打开了任务管理页面的TaskUpdateWebsocket对象 + */ + public static Map taskUpdateServers = new ConcurrentHashMap<>(); + /** + * 与某个客户端的连接会话,需要通过它来给客户端更新任务管理器 + */ + private Session session; + /** + * 页面sessionId + */ + private String sessionId = ""; + + /** + * 连接建立成功调用的方法 + */ + @OnOpen + public void onOpen(Session session, @PathParam("sessionId") String sessionId) { + this.session = session; + this.sessionId = sessionId; + + if (!taskUpdateServers.containsKey(sessionId)) { + //加入集合中 + taskUpdateServers.put(sessionId, this); + } + sendMessage("连接成功"); + } + + /** + * 连接关闭调用的方法 + */ + @OnClose + public void onClose() { + if (taskUpdateServers.containsKey(sessionId)) { + taskUpdateServers.remove(sessionId); + } + log.info("page out:{}", sessionId); + } + + /** + * 收到客户端消息后调用的方法 + * + * @param message 客户端发送过来的消息 + */ + @OnMessage + public void onMessage(String message, Session session) { + if (PING.equals(message)) { + sendMessage(PONG); + } + } + + /** + * @param session + * @param error + */ + @OnError + public void onError(Session session, Throwable error) { + log.error("page error, session {},cause:{}", session, error.getMessage(), error); + } + + /** + * 发送消息到客户端 + */ + public void sendMessage(String message) { + boolean sendSuccess = false; + for (int retry = 0; retry < MAX_RETRY_SIZE; retry++) { + if (!sendSuccess) { + try { + this.session.getBasicRemote().sendText(message); + sendSuccess = true; + } catch (IOException e) { + log.error("send message error,session:{},message:{}", session, message); + ThreadUtil.sleep(100); + } + } + + } + } + + + /** + * 实现服务器更新任务状态 + */ + public static void sendInfo(String message, @PathParam("sessionId") String sessionId) throws IOException { + log.debug("taskwebsocket send :" + sessionId + ",message:" + message); + if (StringUtils.hasText(sessionId) && taskUpdateServers.containsKey(sessionId)) { + taskUpdateServers.get(sessionId).sendMessage(message); + } else { + log.error("page: {} offline!", sessionId); + } + } + +} diff --git a/dsms-engine-application/src/main/java/com/dsms/modules/server/WebSocketServer.java b/dsms-engine-application/src/main/java/com/dsms/modules/server/WebSocketServer.java index ba8be29..32c5040 100644 --- a/dsms-engine-application/src/main/java/com/dsms/modules/server/WebSocketServer.java +++ b/dsms-engine-application/src/main/java/com/dsms/modules/server/WebSocketServer.java @@ -62,12 +62,12 @@ public class WebSocketServer { addOnlineCount(); } - log.info("用户连接:{},当前在线人数为:{}", userId, getOnlineCount()); + log.info("user connect:{},current user count:{}", userId, getOnlineCount()); try { sendMessage("连接成功"); } catch (IOException e) { - log.error("用户:" + userId + ",网络异常!!!!!!"); + log.error("user:{},net error!!!!!!", userId); } } @@ -81,7 +81,7 @@ public class WebSocketServer { //从集合中删除 subOnlineCount(); } - log.info("用户退出:{},当前在线人数为:{}", userId, getOnlineCount()); + log.info("user out:{},current user count:{}", userId, getOnlineCount()); } /** @@ -91,7 +91,7 @@ public class WebSocketServer { */ @OnMessage public void onMessage(String message, Session session) { - log.info("【websocket消息】收到客户端发来的消息:{}", message); + log.info("【websocket message】receive a message from the client:{}", message); } /** @@ -100,7 +100,7 @@ public class WebSocketServer { */ @OnError public void onError(Session session, Throwable error) { - log.error("用户错误:" + this.userId + ",原因:" + error.getMessage()); + log.error("user error:" + this.userId + ",cause:" + error.getMessage()); error.printStackTrace(); } @@ -116,11 +116,11 @@ public class WebSocketServer { * 发送自定义消息 */ public static void sendInfo(String message, @PathParam("userId") String userId) throws IOException { - log.info("发送消息到:" + userId + ",报文:" + message); + log.info("send:" + userId + ",message:" + message); if (StringUtils.hasText(userId) && webSocketMap.containsKey(userId)) { webSocketMap.get(userId).sendMessage(message); } else { - log.error("用户" + userId + ",不在线!"); + log.error("user" + userId + ",offline!"); } } -- Gitee