From 63c6f219c66345157329dcd5186bc7c547bc44e0 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Thu, 2 Jan 2025 09:17:27 +0800 Subject: [PATCH] cmd/server: improve message remind timeline --- cmd/server/app/agentmanager/agent.go | 18 +++++++--- cmd/server/app/agentmanager/agentmanager.go | 2 +- cmd/server/app/network/controller/batch.go | 20 +++++++++++ cmd/server/app/network/controller/user.go | 12 +++++++ cmd/server/app/network/websocket/client.go | 34 +++++++++++------- .../app/network/websocket/client_manager.go | 24 +++++++++---- cmd/server/app/service/plugin/heartbeat.go | 35 +++++++++++++++++++ pkg/global/global.go | 33 +++++++++++++++++ 8 files changed, 154 insertions(+), 24 deletions(-) diff --git a/cmd/server/app/agentmanager/agent.go b/cmd/server/app/agentmanager/agent.go index e3e519e9..7f07dfd3 100644 --- a/cmd/server/app/agentmanager/agent.go +++ b/cmd/server/app/agentmanager/agent.go @@ -18,6 +18,7 @@ import ( configservice "gitee.com/openeuler/PilotGo/cmd/server/app/service/configfile" machineservice "gitee.com/openeuler/PilotGo/cmd/server/app/service/machine" "gitee.com/openeuler/PilotGo/cmd/server/app/service/plugin" + pkgglobal "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/pkg/utils" pnet "gitee.com/openeuler/PilotGo/pkg/utils/message/net" "gitee.com/openeuler/PilotGo/pkg/utils/message/protocol" @@ -28,8 +29,6 @@ import ( type AgentMessageHandler func(*Agent, *protocol.Message) error -var WARN_MSG chan interface{} - type Agent struct { UUID string Version string @@ -64,6 +63,11 @@ func NewAgent(conn net.Conn) (*Agent, error) { return nil, err } + pkgglobal.SendRemindMsg( + pkgglobal.MachineSendMsg, + fmt.Sprintf("agent机器 %s 已连接", agent.IP), + ) + return agent, nil } @@ -91,7 +95,7 @@ func (a *Agent) startListen() { logger.Error("update machine status failed: %s", err.Error()) } DeleteAgent(a.UUID) - str := "agent机器" + a.IP + "已断开连接" + str := "agent机器 " + a.IP + " 已断开连接" logger.Warn("agent %s disconnected, ip:%s", a.UUID, a.IP) // 发布“平台主机离线”事件 msgData := commonSDK.MessageData{ @@ -110,7 +114,7 @@ func (a *Agent) startListen() { } plugin.PublishEvent(ms) // 消息推送到前端 - WARN_MSG <- str + pkgglobal.SendRemindMsg(pkgglobal.MachineSendMsg, str) return } readBuff = append(readBuff, buff[:n]...) @@ -137,7 +141,7 @@ func (a *Agent) Init() error { }) a.bindHandler(protocol.FileMonitor, func(a *Agent, msg *protocol.Message) error { logger.Info("process file monitor from processor:%s", msg.String()) - WARN_MSG <- msg.Data.(string) + pkgglobal.SendRemindMsg(pkgglobal.MachineSendMsg, msg.Data.(string)) return nil }) @@ -267,4 +271,8 @@ func ConfigMessageInfo(Data interface{}) { logger.Error("配置文件添加失败%s", err.Error()) } } + pkgglobal.SendRemindMsg( + pkgglobal.MachineSendMsg, + fmt.Sprintf("uuid %s 添加配置文件 %s", p["Machine_uuid"].(string), p["ConfigName"].(string)), + ) } diff --git a/cmd/server/app/agentmanager/agentmanager.go b/cmd/server/app/agentmanager/agentmanager.go index 51fd0c62..2cac284c 100644 --- a/cmd/server/app/agentmanager/agentmanager.go +++ b/cmd/server/app/agentmanager/agentmanager.go @@ -94,7 +94,7 @@ func DeleteAgent(uuid string) { } func AddandRunAgent(c net.Conn) error { - if globalAgentManager.ReturnAgentNumber() == global.ClusterSize { + if globalAgentManager.ReturnAgentNumber() >= global.ClusterSize { c.Close() return errors.Errorf("cluster size is full, can't add new agent") } diff --git a/cmd/server/app/network/controller/batch.go b/cmd/server/app/network/controller/batch.go index 02c107c7..b270dc5a 100644 --- a/cmd/server/app/network/controller/batch.go +++ b/cmd/server/app/network/controller/batch.go @@ -8,11 +8,13 @@ package controller import ( + "fmt" "strconv" "gitee.com/openeuler/PilotGo/cmd/server/app/network/jwt" "gitee.com/openeuler/PilotGo/cmd/server/app/service/auditlog" "gitee.com/openeuler/PilotGo/cmd/server/app/service/batch" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/pkg/utils/message/net" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" @@ -48,6 +50,12 @@ func CreateBatchHandler(c *gin.Context) { response.Fail(c, nil, err.Error()) return } + + global.SendRemindMsg( + global.ServerSendMsg, + fmt.Sprintf("用户 %s 创建批次 %s", u.Username, params.Name), + ) + response.Success(c, nil, "批次入库成功") } @@ -103,6 +111,12 @@ func DeleteBatchHandler(c *gin.Context) { response.Fail(c, gin.H{"status": false}, err.Error()) return } + + global.SendRemindMsg( + global.ServerSendMsg, + fmt.Sprintf("用户 %s 删除批次 %v", u.Username, batchdel.BatchID), + ) + response.Success(c, nil, "批次删除成功") } @@ -139,6 +153,12 @@ func UpdateBatchHandler(c *gin.Context) { response.Fail(c, gin.H{"status": false}, "update batch failed: "+err.Error()) return } + + global.SendRemindMsg( + global.ServerSendMsg, + fmt.Sprintf("用户 %s 更新批次 %s", u.Username, batchinfo.BatchName), + ) + response.Success(c, nil, "批次修改成功") } diff --git a/cmd/server/app/network/controller/user.go b/cmd/server/app/network/controller/user.go index 3c336e2e..46cbc5ef 100644 --- a/cmd/server/app/network/controller/user.go +++ b/cmd/server/app/network/controller/user.go @@ -21,6 +21,7 @@ import ( "gitee.com/openeuler/PilotGo/cmd/server/app/service/plugin" "gitee.com/openeuler/PilotGo/cmd/server/app/service/role" userservice "gitee.com/openeuler/PilotGo/cmd/server/app/service/user" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/pkg/utils/message/net" commonSDK "gitee.com/openeuler/PilotGo/sdk/common" "gitee.com/openeuler/PilotGo/sdk/logger" @@ -166,6 +167,12 @@ func LoginHandler(c *gin.Context) { response.Fail(c, nil, err.Error()) return } + + global.SendRemindMsg( + global.ServerSendMsg, + fmt.Sprintf("用户 %s 登录", u.Username), + ) + response.Success(c, gin.H{"token": token, "departName": departName, "departId": departId, "roleId": roleId}, "登录成功!") } @@ -206,6 +213,11 @@ func Logout(c *gin.Context) { } plugin.PublishEvent(ms) + global.SendRemindMsg( + global.ServerSendMsg, + fmt.Sprintf("用户 %s 登出", u.Username), + ) + response.Success(c, nil, "退出成功!") } diff --git a/cmd/server/app/network/websocket/client.go b/cmd/server/app/network/websocket/client.go index e6ef424b..fff516f7 100644 --- a/cmd/server/app/network/websocket/client.go +++ b/cmd/server/app/network/websocket/client.go @@ -8,10 +8,11 @@ package websocket import ( + "encoding/json" "runtime/debug" "time" - "gitee.com/openeuler/PilotGo/cmd/server/app/agentmanager" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gorilla/websocket" "k8s.io/klog/v2" @@ -24,10 +25,10 @@ const ( // 用户连接 type Client struct { - Addr string // 客户端地址 - Socket *websocket.Conn // 用户连接 - Send chan []byte // 待发送的数据 - HeartbeatTime uint64 // 用户上次心跳时间 + Addr string // 客户端地址 + Socket *websocket.Conn // 用户连接 + Send chan *global.WebsocketSendMsg // 待发送的数据 + HeartbeatTime uint64 // 用户上次心跳时间 } // 初始化 @@ -35,7 +36,7 @@ func NewClient(addr string, socket *websocket.Conn, firstTime uint64) (client *C client = &Client{ Addr: addr, Socket: socket, - Send: make(chan []byte, 100), + Send: make(chan *global.WebsocketSendMsg, 100), HeartbeatTime: firstTime, } @@ -75,12 +76,19 @@ func (c *Client) Write() { logger.Debug("Client发送数据 关闭连接: %s, OK: %t", c.Addr, ok) return } - c.Socket.WriteMessage(websocket.TextMessage, message) + msg_json, err := json.Marshal(message) + if err != nil { + logger.Error("fail to marshal json data in ws client.write(): %s, %+v, %s", c.Addr, message, err.Error()) + return + } + if err := c.Socket.WriteMessage(websocket.TextMessage, msg_json); err != nil { + logger.Error("err while writing message to websocket client(remote: %s): %s", c.Socket.RemoteAddr(), err.Error()) + } } } // 发送数据 -func (c *Client) SendMsg(msg []byte) { +func (c *Client) SendMsg(msg_type global.WebsocketSendMsgType, msg string) { if c == nil { return } @@ -91,7 +99,10 @@ func (c *Client) SendMsg(msg []byte) { } }() - c.Send <- msg + c.Send <- &global.WebsocketSendMsg{ + MsgType: msg_type, + Msg: msg, + } } // 监测系统日志警告推送到前端 @@ -101,9 +112,8 @@ func SendWarnMsgToWeb(stopCh <-chan struct{}) { case <-stopCh: klog.Warningln("SendWarnMsgToWeb success exit") return - case data := <-agentmanager.WARN_MSG: - CliManager.Broadcast <- []byte(data.(string)) - + case data := <-global.WARN_MSG: + CliManager.Broadcast <- data } } diff --git a/cmd/server/app/network/websocket/client_manager.go b/cmd/server/app/network/websocket/client_manager.go index 73391c8a..a7040bf8 100644 --- a/cmd/server/app/network/websocket/client_manager.go +++ b/cmd/server/app/network/websocket/client_manager.go @@ -10,6 +10,7 @@ package websocket import ( "sync" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/logger" "k8s.io/klog/v2" ) @@ -20,11 +21,11 @@ var ( // 连接管理 type ClientManager struct { - Clients map[*Client]bool // 全部的连接 - ClientsLock sync.RWMutex // 读写锁 - Register chan *Client // 连接处理 - Unregister chan *Client // 断开连接处理程序 - Broadcast chan []byte // 广播 向全部成员发送数据 + Clients map[*Client]bool // 全部的连接 + ClientsLock sync.RWMutex // 读写锁 + Register chan *Client // 连接处理 + Unregister chan *Client // 断开连接处理程序 + Broadcast chan *global.WebsocketSendMsg // 广播 向全部成员发送数据 } func NewClientManager() (clientManager *ClientManager) { @@ -32,7 +33,7 @@ func NewClientManager() (clientManager *ClientManager) { Clients: make(map[*Client]bool), Register: make(chan *Client, 1000), Unregister: make(chan *Client, 1000), - Broadcast: make(chan []byte, 1000), + Broadcast: make(chan *global.WebsocketSendMsg, 1000), } return } @@ -95,6 +96,7 @@ func (manager *ClientManager) Start(stopCh <-chan struct{}) { select { case <-stopCh: klog.Warningln("websocket CliManager success exit") + manager.Close() return case conn := <-manager.Register: // 建立连接事件 @@ -116,6 +118,16 @@ func (manager *ClientManager) Start(stopCh <-chan struct{}) { } } +func (manager *ClientManager) Close() { + clients := CliManager.GetClients() + for client := range clients { + manager.ClientsLock.Lock() + delete(manager.Clients, client) + manager.ClientsLock.Unlock() + client.Socket.Close() + } +} + /************************** manager info ***************************************/ // 定时清理超时连接 func ClearTimeoutWebSocketConnections() { diff --git a/cmd/server/app/service/plugin/heartbeat.go b/cmd/server/app/service/plugin/heartbeat.go index 0dbd90e5..077052f9 100644 --- a/cmd/server/app/service/plugin/heartbeat.go +++ b/cmd/server/app/service/plugin/heartbeat.go @@ -8,10 +8,13 @@ package plugin import ( + "fmt" "time" eventSDK "gitee.com/openeuler/PilotGo-plugins/event/sdk" + "gitee.com/openeuler/PilotGo/cmd/server/app/service/internal/dao" "gitee.com/openeuler/PilotGo/pkg/dbmanager/redismanager" + "gitee.com/openeuler/PilotGo/pkg/global" "gitee.com/openeuler/PilotGo/sdk/logger" "gitee.com/openeuler/PilotGo/sdk/plugin/client" @@ -41,6 +44,22 @@ func checkAndRebind() { if !plugin_status.(*client.PluginStatus).Connected || time.Since(plugin_status.(*client.PluginStatus).LastConnect) > client.HeartbeatInterval+1*time.Second { err := Handshake(p.Url, p) if err != nil { + if time.Since(plugin_status.(*client.PluginStatus).LastConnect) <= client.HeartbeatInterval*2+1*time.Second { + _addr := p.Url + if p.Url == "localhost" || p.Url == "127.0.0.1" { + node, err := dao.MachineInfoByUUID(p.UUID) + if err == nil { + _addr = node.IP + } else { + logger.Error("fail to get machineinfo by uuid: %s", err.Error()) + } + } + global.SendRemindMsg( + global.PluginSendMsg, + fmt.Sprintf("%s 插件离线 %s", p.Name, _addr), + ) + } + logger.Error("rebind plugin and pilotgo server failed:%v", err.Error()) value := client.PluginStatus{ Connected: false, @@ -80,6 +99,22 @@ func checkAndRebind() { PublishEvent(ms) } } else { + if time.Since(plugin_status.(*client.PluginStatus).LastConnect) > client.HeartbeatInterval*2+1*time.Second { + _addr := p.Url + if p.Url == "localhost" || p.Url == "127.0.0.1" { + node, err := dao.MachineInfoByUUID(p.UUID) + if err == nil { + _addr = node.IP + } else { + logger.Error("fail to get machineinfo by uuid: %s", err.Error()) + } + } + global.SendRemindMsg( + global.PluginSendMsg, + fmt.Sprintf("%s 插件上线 %s", p.Name, _addr), + ) + } + value := client.PluginStatus{ Connected: true, LastConnect: time.Now(), diff --git a/pkg/global/global.go b/pkg/global/global.go index 9b44403a..28857ff4 100644 --- a/pkg/global/global.go +++ b/pkg/global/global.go @@ -7,6 +7,8 @@ */ package global +import "gitee.com/openeuler/PilotGo/sdk/logger" + const ( // 新注册机器添加到部门根节点 UncateloguedDepartId = 1 @@ -17,3 +19,34 @@ const ( // 集群规模 ClusterSize = 10 ) + +var WARN_MSG chan *WebsocketSendMsg = make(chan *WebsocketSendMsg, 100) + +type WebsocketSendMsgType int + +const ( + MachineSendMsg WebsocketSendMsgType = iota + PluginSendMsg + ServerSendMsg +) + +type WebsocketSendMsg struct { + MsgType WebsocketSendMsgType `json:"msgtype"` + Msg string `json:"msg"` +} + +/* + WebsocketSendMsgType: pkg/global/global.go +*/ +func SendRemindMsg(msg_type WebsocketSendMsgType, msg string) { + defer func () { + if r := recover(); r != nil { + logger.Error("send remind message to closed channel WARN_MSG: %+v", r) + } + }() + + WARN_MSG <- &WebsocketSendMsg{ + MsgType: msg_type, + Msg: msg, + } +} \ No newline at end of file -- Gitee