From 961adf31e13a3b0da07d47f18fdccd675ad4bf0d Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Mon, 13 Jan 2025 14:07:00 +0800 Subject: [PATCH] cmd/server: remind message buffer --- .../app/network/controller/pushalarm.go | 11 +++ cmd/server/app/network/controller/user.go | 4 +- cmd/server/app/network/websocket/client.go | 7 +- .../app/network/websocket/client_manager.go | 22 +++--- pkg/global/global.go | 34 +-------- pkg/global/remindMessage.go | 72 +++++++++++++++++++ 6 files changed, 102 insertions(+), 48 deletions(-) create mode 100644 pkg/global/remindMessage.go diff --git a/cmd/server/app/network/controller/pushalarm.go b/cmd/server/app/network/controller/pushalarm.go index d387ac5d..2d7e3535 100644 --- a/cmd/server/app/network/controller/pushalarm.go +++ b/cmd/server/app/network/controller/pushalarm.go @@ -11,6 +11,7 @@ import ( "time" "gitee.com/openeuler/PilotGo/cmd/server/app/network/websocket" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gin-gonic/gin" ) @@ -31,4 +32,14 @@ func PushAlarmHandler(c *gin.Context) { // 用户连接事件 websocket.CliManager.Register <- client + + messages := websocket.CliManager.SendMsgBuffer.GetAll() + for _, msg := range messages { + _msg, ok := msg.(*global.WebsocketSendMsg) + if !ok { + logger.Error("websocketSendMsg assert error: %+v", msg) + continue + } + client.SendMsg(_msg) + } } diff --git a/cmd/server/app/network/controller/user.go b/cmd/server/app/network/controller/user.go index 46cbc5ef..199582a1 100644 --- a/cmd/server/app/network/controller/user.go +++ b/cmd/server/app/network/controller/user.go @@ -170,7 +170,7 @@ func LoginHandler(c *gin.Context) { global.SendRemindMsg( global.ServerSendMsg, - fmt.Sprintf("用户 %s 登录", u.Username), + fmt.Sprintf("用户 %s 登录, IP: %s", u.Username, c.RemoteIP()), ) response.Success(c, gin.H{"token": token, "departName": departName, "departId": departId, "roleId": roleId}, "登录成功!") @@ -215,7 +215,7 @@ func Logout(c *gin.Context) { global.SendRemindMsg( global.ServerSendMsg, - fmt.Sprintf("用户 %s 登出", u.Username), + fmt.Sprintf("用户 %s 登出, IP: %s", u.Username, c.RemoteIP()), ) response.Success(c, nil, "退出成功!") diff --git a/cmd/server/app/network/websocket/client.go b/cmd/server/app/network/websocket/client.go index fff516f7..09e11f22 100644 --- a/cmd/server/app/network/websocket/client.go +++ b/cmd/server/app/network/websocket/client.go @@ -88,7 +88,7 @@ func (c *Client) Write() { } // 发送数据 -func (c *Client) SendMsg(msg_type global.WebsocketSendMsgType, msg string) { +func (c *Client) SendMsg(msg *global.WebsocketSendMsg) { if c == nil { return } @@ -99,10 +99,7 @@ func (c *Client) SendMsg(msg_type global.WebsocketSendMsgType, msg string) { } }() - c.Send <- &global.WebsocketSendMsg{ - MsgType: msg_type, - Msg: msg, - } + c.Send <- msg } // 监测系统日志警告推送到前端 diff --git a/cmd/server/app/network/websocket/client_manager.go b/cmd/server/app/network/websocket/client_manager.go index a7040bf8..d96244bc 100644 --- a/cmd/server/app/network/websocket/client_manager.go +++ b/cmd/server/app/network/websocket/client_manager.go @@ -21,19 +21,21 @@ var ( // 连接管理 type ClientManager struct { - Clients map[*Client]bool // 全部的连接 - ClientsLock sync.RWMutex // 读写锁 - Register chan *Client // 连接处理 - Unregister chan *Client // 断开连接处理程序 - Broadcast chan *global.WebsocketSendMsg // 广播 向全部成员发送数据 + Clients map[*Client]bool // 全部的连接 + ClientsLock sync.RWMutex // 读写锁 + Register chan *Client // 连接处理 + Unregister chan *Client // 断开连接处理程序 + Broadcast chan *global.WebsocketSendMsg // 广播 向全部成员发送数据 + SendMsgBuffer *global.LimitedList // 发送消息缓冲区 } func NewClientManager() (clientManager *ClientManager) { clientManager = &ClientManager{ - Clients: make(map[*Client]bool), - Register: make(chan *Client, 1000), - Unregister: make(chan *Client, 1000), - Broadcast: make(chan *global.WebsocketSendMsg, 1000), + Clients: make(map[*Client]bool), + Register: make(chan *Client, 1000), + Unregister: make(chan *Client, 1000), + Broadcast: make(chan *global.WebsocketSendMsg, 1000), + SendMsgBuffer: global.NewLimitedList(global.RemindMsgSize), } return } @@ -105,6 +107,8 @@ func (manager *ClientManager) Start(stopCh <-chan struct{}) { // 断开连接事件 manager.EventUnregister(conn) case message := <-manager.Broadcast: + manager.SendMsgBuffer.Store(message) + // 广播事件 clients := manager.GetClients() for conn := range clients { diff --git a/pkg/global/global.go b/pkg/global/global.go index 28857ff4..fe2db5f3 100644 --- a/pkg/global/global.go +++ b/pkg/global/global.go @@ -7,8 +7,6 @@ */ package global -import "gitee.com/openeuler/PilotGo/sdk/logger" - const ( // 新注册机器添加到部门根节点 UncateloguedDepartId = 1 @@ -18,35 +16,7 @@ const ( // 集群规模 ClusterSize = 10 -) - -var WARN_MSG chan *WebsocketSendMsg = make(chan *WebsocketSendMsg, 100) -type WebsocketSendMsgType int - -const ( - MachineSendMsg WebsocketSendMsgType = iota - PluginSendMsg - ServerSendMsg + // 消息提醒缓存 + RemindMsgSize = 20 ) - -type WebsocketSendMsg struct { - MsgType WebsocketSendMsgType `json:"msgtype"` - Msg string `json:"msg"` -} - -/* - WebsocketSendMsgType: pkg/global/global.go -*/ -func SendRemindMsg(msg_type WebsocketSendMsgType, msg string) { - defer func () { - if r := recover(); r != nil { - logger.Error("send remind message to closed channel WARN_MSG: %+v", r) - } - }() - - WARN_MSG <- &WebsocketSendMsg{ - MsgType: msg_type, - Msg: msg, - } -} \ No newline at end of file diff --git a/pkg/global/remindMessage.go b/pkg/global/remindMessage.go new file mode 100644 index 00000000..eb1a3d66 --- /dev/null +++ b/pkg/global/remindMessage.go @@ -0,0 +1,72 @@ +package global + +import ( + "container/list" + "time" + + "gitee.com/openeuler/PilotGo/sdk/logger" +) + +var WARN_MSG chan *WebsocketSendMsg = make(chan *WebsocketSendMsg, 100) + +type WebsocketSendMsgType int + +const ( + MachineSendMsg WebsocketSendMsgType = iota + PluginSendMsg + ServerSendMsg +) + +type WebsocketSendMsg struct { + MsgType WebsocketSendMsgType `json:"msgtype"` + Time string `json:"time"` + Msg string `json:"msg"` +} + +/* +WebsocketSendMsgType: pkg/global/global.go +*/ +func SendRemindMsg(msg_type WebsocketSendMsgType, msg string) { + defer func() { + if r := recover(); r != nil { + logger.Error("send remind message to closed channel WARN_MSG: %+v", r) + } + }() + + WARN_MSG <- &WebsocketSendMsg{ + MsgType: msg_type, + Time: time.Now().Format("2006-01-02 15:04:05"), + Msg: msg, + } +} + +type LimitedList struct { + capacity int + data *list.List +} + +func NewLimitedList(capacity int) *LimitedList { + return &LimitedList{ + capacity: capacity, + data: list.New(), + } +} + +func (l *LimitedList) Store(value interface{}) { + if l.data.Len() >= l.capacity { + l.data.Remove(l.data.Front()) + } + l.data.PushBack(value) +} + +func (l *LimitedList) GetAll() []interface{} { + result := make([]interface{}, 0, l.data.Len()) + for e := l.data.Front(); e != nil; e = e.Next() { + result = append(result, e.Value) + } + return result +} + +func (l *LimitedList) Len() int { + return l.data.Len() +} -- Gitee