diff --git a/cmd/server/graph/meta.go b/cmd/server/graph/meta.go index 6163be432e4411e55ba6a174021a880437f2a48e..a53097bdea5c30c9c0dfcb43a4ff8332baa0e52c 100755 --- a/cmd/server/graph/meta.go +++ b/cmd/server/graph/meta.go @@ -11,8 +11,6 @@ import ( "github.com/shirou/gopsutil/process" ) -var Global_TopoDataBuffer *TopoDataBuffer - type TopoDataBuffer struct { TopoConfId string Nodes *Nodes diff --git a/cmd/server/handler/customTopoHandler.go b/cmd/server/handler/customTopoHandler.go index 9ffaf591261c0f69451408a892606dec4fe375e2..c68f739b38f26afc610db1a9b965ac8e3c78d6d4 100755 --- a/cmd/server/handler/customTopoHandler.go +++ b/cmd/server/handler/customTopoHandler.go @@ -6,8 +6,8 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/graph" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/resourcemanage" - "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/service" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/service/custom" + "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/service/webclient" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -123,12 +123,14 @@ func UpdateCustomTopoHandle(ctx *gin.Context) { func RunCustomTopoHandle(ctx *gin.Context) { // TODO: 执行业务之前先判断batch集群中的机器是否部署且运行topo-agent + tcid_str := ctx.Query("id") + webclient_id := ctx.Query("clientId") + doneChan := make(chan *graph.TopoDataBuffer, 1) go func() { var custom_topodata *graph.TopoDataBuffer = new(graph.TopoDataBuffer) - tcid_str := ctx.Query("id") if tcid_str == "" { err := errors.New("id is nil") resourcemanage.ERManager.ErrorTransmit("error", err, false, true) @@ -171,14 +173,21 @@ func RunCustomTopoHandle(ctx *gin.Context) { return } - service.UpdateGlobalTopoDataBuffer(custom_topodata) - doneChan <- graph.Global_TopoDataBuffer + webclient.WebClientsManager.UpdateClientTopoDataBuffer(webclient_id, custom_topodata) + + doneChan <- webclient.WebClientsManager.Get(webclient_id) }() select { case <-ctx.Request.Context().Done(): return case data := <-doneChan: + if data == nil { + err := errors.Errorf("topodatabuff is nill, client: %s, %s", ctx.Request.RemoteAddr, webclient_id) + resourcemanage.ERManager.ErrorTransmit("error", err, false, true) + response.Fail(ctx, nil, err.Error()) + return + } if len(data.Combos) != 0 && len(data.Edges.Edges) != 0 && len(data.Nodes.Nodes) != 0 { response.Success(ctx, map[string]interface{}{ "nodes": data.Nodes.Nodes, diff --git a/cmd/server/main.go b/cmd/server/main.go index 38ad036aba0ab83f2f5587bece4a69b071d21401..4f48177c47b442828879ccb705019d959ac93705 100755 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -10,6 +10,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/pluginclient" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/resourcemanage" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/service" + "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/service/webclient" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/signal" // "github.com/pyroscope-io/pyroscope/pkg/cmd/agent/profiler" ) @@ -52,6 +53,11 @@ func main() { */ db.InitDB() + /* + 浏览器客户端 + */ + webclient.InitWebClientsManager() + /* init web server */ diff --git a/cmd/server/service/topoDataBuffer.go b/cmd/server/service/webclient/webclientmanage.go similarity index 59% rename from cmd/server/service/topoDataBuffer.go rename to cmd/server/service/webclient/webclientmanage.go index d333d3c55b393e425238d342acf6b7d5cc92b812..ffb447204ba704c915bd8b94b0b8b0cacd3622a3 100644 --- a/cmd/server/service/topoDataBuffer.go +++ b/cmd/server/service/webclient/webclientmanage.go @@ -1,8 +1,10 @@ -package service +package webclient import ( + "fmt" "strconv" "sync" + "time" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/agentmanager" "gitee.com/openeuler/PilotGo-plugin-topology/cmd/server/graph" @@ -10,19 +12,33 @@ import ( "github.com/pkg/errors" ) -// 更新全局图数据缓存 -func UpdateGlobalTopoDataBuffer(custom_topodata *graph.TopoDataBuffer) { - if graph.Global_TopoDataBuffer == nil || graph.Global_TopoDataBuffer.TopoConfId != custom_topodata.TopoConfId { - graph.Global_TopoDataBuffer = custom_topodata +var WebClientsManager *WebClientsManagement + +func InitWebClientsManager() { + WebClientsManager = &WebClientsManagement{ + webClients: make(map[string]*graph.TopoDataBuffer), + } + + // go WebClientsManager.HeartbeatDetect() +} + +type WebClientsManagement struct { + webClients map[string]*graph.TopoDataBuffer +} + +// 更新前端client图数据缓存 +func (wcm *WebClientsManagement) UpdateClientTopoDataBuffer(_token string, _custom_topodata *graph.TopoDataBuffer) { + if wcm.Get(_token) == nil || wcm.Get(_token).TopoConfId != _custom_topodata.TopoConfId { + wcm.Add(_token, _custom_topodata) } else { var wg sync.WaitGroup - for uuid, global_node_slice := range graph.Global_TopoDataBuffer.Nodes.LookupByUUID { + for uuid, global_node_slice := range wcm.Get(_token).Nodes.LookupByUUID { wg.Add(1) go func(_uuid string, _global_node_slice []*graph.Node) { defer wg.Done() // 初始化最新图数据中节点的待匹配状态,若未在缓存图数据中发现对应节点,则该节点为新增节点 custom_node_matched_state_map := make(map[string]bool) - for _, custom_node := range custom_topodata.Nodes.LookupByUUID[_uuid] { + for _, custom_node := range _custom_topodata.Nodes.LookupByUUID[_uuid] { custom_node_matched_state_map[custom_node.ID] = false } // 初始化缓存图数据中节点的待匹配状态,若未在新图数据中发现对应节点,则该节点为删减节点 @@ -32,9 +48,9 @@ func UpdateGlobalTopoDataBuffer(custom_topodata *graph.TopoDataBuffer) { } // 用新图数据更新缓存图数据 for _, global_node := range _global_node_slice { - for _, custom_node := range custom_topodata.Nodes.LookupByUUID[_uuid] { + for _, custom_node := range _custom_topodata.Nodes.LookupByUUID[_uuid] { if global_node.Name == custom_node.Name { - if global_node.Metrics["Pid"] == custom_node.Metrics["Pid"] || isSamePstreeBranch(global_node, custom_node, agentmanager.Global_AgentManager.GetAgent_T(_uuid).Processes_2) { + if global_node.Metrics["Pid"] == custom_node.Metrics["Pid"] || wcm.isSamePstreeBranch(global_node, custom_node, agentmanager.Global_AgentManager.GetAgent_T(_uuid).Processes_2) { // 更新节点数据 global_node.LayoutAttr = custom_node.LayoutAttr global_node.Metrics = custom_node.Metrics @@ -42,23 +58,23 @@ func UpdateGlobalTopoDataBuffer(custom_topodata *graph.TopoDataBuffer) { global_node.Tags = custom_node.Tags global_node.Unixtime = custom_node.Unixtime // 更新边数据 - custom_edge_id_slice_any, ok := custom_topodata.Edges.Node_Edges_map.Load(custom_node.ID) + custom_edge_id_slice_any, ok := _custom_topodata.Edges.Node_Edges_map.Load(custom_node.ID) if !ok { continue } for _, custom_edge_id := range custom_edge_id_slice_any.([]string) { - custom_edge_any, ok := custom_topodata.Edges.Lookup.Load(custom_edge_id) + custom_edge_any, ok := _custom_topodata.Edges.Lookup.Load(custom_edge_id) if !ok { continue } custom_edge := custom_edge_any.(*graph.Edge) - global_edge_id_slice, ok := graph.Global_TopoDataBuffer.Edges.Node_Edges_map.Load(global_node.ID) + global_edge_id_slice, ok := wcm.Get(_token).Edges.Node_Edges_map.Load(global_node.ID) if !ok { continue } for _, global_edge_id := range global_edge_id_slice.([]string) { - global_edge_any, ok := graph.Global_TopoDataBuffer.Edges.Lookup.Load(global_edge_id) + global_edge_any, ok := wcm.Get(_token).Edges.Lookup.Load(global_edge_id) if !ok { continue } @@ -85,43 +101,43 @@ func UpdateGlobalTopoDataBuffer(custom_topodata *graph.TopoDataBuffer) { } } // 将新图数据中的新增节点及边添加到缓存图数据中 - for _, custom_node := range custom_topodata.Nodes.LookupByUUID[_uuid] { + for _, custom_node := range _custom_topodata.Nodes.LookupByUUID[_uuid] { if !custom_node_matched_state_map[custom_node.ID] { - graph.Global_TopoDataBuffer.Nodes.Add(custom_node) - custom_edge_id_slice_any, ok := custom_topodata.Edges.Node_Edges_map.Load(custom_node.ID) + wcm.Get(_token).Nodes.Add(custom_node) + custom_edge_id_slice_any, ok := _custom_topodata.Edges.Node_Edges_map.Load(custom_node.ID) if !ok { continue } for _, custom_edge_id := range custom_edge_id_slice_any.([]string) { - custom_edge_any, ok := custom_topodata.Edges.Lookup.Load(custom_edge_id) + custom_edge_any, ok := _custom_topodata.Edges.Lookup.Load(custom_edge_id) if !ok { continue } custom_edge := custom_edge_any.(*graph.Edge) - graph.Global_TopoDataBuffer.Edges.Add(custom_edge) + wcm.Get(_token).Edges.Add(custom_edge) } } } // 删减缓存图数据中过期的节点及边 for _, global_node := range _global_node_slice { if !global_node_matched_state_map[global_node.ID] { - err := graph.Global_TopoDataBuffer.Nodes.Remove(global_node) + err := wcm.Get(_token).Nodes.Remove(global_node) if err != nil { err = errors.Wrap(err, "->") resourcemanage.ERManager.ErrorTransmit("error", err, false, true) continue } - global_edge_id_slice_any, ok := graph.Global_TopoDataBuffer.Edges.Node_Edges_map.Load(global_node.ID) + global_edge_id_slice_any, ok := wcm.Get(_token).Edges.Node_Edges_map.Load(global_node.ID) if !ok { continue } for _, global_edge_id := range global_edge_id_slice_any.([]string) { - global_edge_any, ok := graph.Global_TopoDataBuffer.Edges.Lookup.Load(global_edge_id) + global_edge_any, ok := wcm.Get(_token).Edges.Lookup.Load(global_edge_id) if !ok { continue } global_edge := global_edge_any.(*graph.Edge) - err := graph.Global_TopoDataBuffer.Edges.Remove(global_edge.ID) + err := wcm.Get(_token).Edges.Remove(global_edge.ID) if err != nil { err = errors.Wrap(err, "->") resourcemanage.ERManager.ErrorTransmit("error", err, false, true) @@ -137,14 +153,14 @@ func UpdateGlobalTopoDataBuffer(custom_topodata *graph.TopoDataBuffer) { } // 在parent_pid的子进程树中搜索target_pid -func searchTargetPid(process_map map[int32][]int32, parent_pid, target_pid int32) bool { +func (wcm *WebClientsManagement) searchTargetPid(process_map map[int32][]int32, parent_pid, target_pid int32) bool { find_target := false for _, sub_pid := range process_map[parent_pid] { if sub_pid == target_pid { find_target = true break } - find_target = searchTargetPid(process_map, sub_pid, target_pid) + find_target = wcm.searchTargetPid(process_map, sub_pid, target_pid) if find_target { return true } @@ -153,7 +169,7 @@ func searchTargetPid(process_map map[int32][]int32, parent_pid, target_pid int32 } // 判断两个process node是否属于同一个应用 -func isSamePstreeBranch(old_node, new_node *graph.Node, new_process_slice []*graph.Process) bool { +func (wcm *WebClientsManagement) isSamePstreeBranch(old_node, new_node *graph.Node, new_process_slice []*graph.Process) bool { var small_pid, big_pid int32 old_node_pid, _ := strconv.Atoi(old_node.Metrics["Pid"]) new_node_pid, _ := strconv.Atoi(new_node.Metrics["Pid"]) @@ -169,5 +185,37 @@ func isSamePstreeBranch(old_node, new_node *graph.Node, new_process_slice []*gra for _, process := range new_process_slice { new_process_map[int32(process.Pid)] = process.Cpid } - return (old_node.Metrics["Ppid"] == new_node.Metrics["Ppid"] || searchTargetPid(new_process_map, small_pid, big_pid)) + return (old_node.Metrics["Ppid"] == new_node.Metrics["Ppid"] || wcm.searchTargetPid(new_process_map, small_pid, big_pid)) +} + +func (wcm *WebClientsManagement) Add(_token string, _topodata *graph.TopoDataBuffer) { + wcm.webClients[_token] = _topodata +} + +func (wcm *WebClientsManagement) Delete(_token string) { + delete(wcm.webClients, _token) +} + +func (wcm *WebClientsManagement) Get(_token string) *graph.TopoDataBuffer { + value, ok := wcm.webClients[_token] + if !ok { + return nil + } + return value +} + +// TODO: +func (wcm *WebClientsManagement) HeartbeatDetect() { + defer resourcemanage.ERManager.Wg.Done() + resourcemanage.ERManager.Wg.Add(1) + for { + select { + case <-resourcemanage.ERManager.GoCancelCtx.Done(): + return + case <-time.After(1 * time.Second): + for k, v := range wcm.webClients { + fmt.Printf(">>>clientId: %s, topoConfId: %s\n", k, v.TopoConfId) + } + } + } }