From 25a70bf07dd6e1aadbe37657ac8b902d6d71b32a Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Mon, 30 Oct 2023 10:13:42 +0800 Subject: [PATCH] modify the multihost service to apply the graph database --- server/service/multihost.go | 137 +++++++++++++++++++++++------------- 1 file changed, 90 insertions(+), 47 deletions(-) diff --git a/server/service/multihost.go b/server/service/multihost.go index 4a44cfc..87e28f8 100755 --- a/server/service/multihost.go +++ b/server/service/multihost.go @@ -2,95 +2,138 @@ package service import ( "fmt" + "os" + "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/dao" "gitee.com/openeuler/PilotGo-plugin-topology-server/meta" - "gitee.com/openeuler/PilotGo-plugin-topology-server/processor" "github.com/pkg/errors" ) -func MultiHostService() ([]*meta.Node, []*meta.Edge, []error, []error) { - dataprocesser := processor.CreateDataProcesser() - nodes, edges, collect_errlist, process_errlist := dataprocesser.Process_data() - if len(collect_errlist) != 0 || len(process_errlist) != 0 { - for i, cerr := range collect_errlist { - collect_errlist[i] = errors.Wrap(cerr, "**3") - } +func MultiHostService() ([]*meta.Node, []*meta.Edge, error) { + var latest string + var cqlOUT string + nodes := make([]*meta.Node, 0) + nodes_map := make(map[int64]*meta.Node) + edges := make([]*meta.Edge, 0) + edges_map := make(map[int64]*meta.Edge) + hostids := make([]int64, 0) + multi_nodes_map := make(map[int64]*meta.Node) + multi_nodes := make([]*meta.Node, 0) + multi_edges_map := make(map[int64]*meta.Edge) + multi_edges := make([]*meta.Edge, 0) - for i, perr := range process_errlist { - process_errlist[i] = errors.Wrap(perr, "**7") - } + driver, err := dao.Neo4j.Create_driver() + if err != nil { + err := errors.Errorf("create neo4j driver failed: %s **fatal**2", err.Error()) // err top + agentmanager.Topo.ErrCh <- err + agentmanager.Topo.Errmu.Lock() + agentmanager.Topo.ErrCond.Wait() + agentmanager.Topo.Errmu.Unlock() + close(agentmanager.Topo.ErrCh) + os.Exit(1) + } + defer dao.Neo4j.Close_driver(driver) + + cqlOUT = "match (n:host) return collect(distinct n.unixtime) as times" + times, err := dao.Neo4j.General_query(cqlOUT, "times", driver) + if err != nil { + err = errors.Wrap(err, " **2") + return nil, nil, err } - hostids := []string{} - multi_nodes_map := make(map[string]*meta.Node) - multi_nodes := []*meta.Node{} - multi_edges_map := make(map[string]*meta.Edge) - multi_edges := []*meta.Edge{} + if len(times) < 2 { + latest = times[0] + } else { + latest = times[len(times)-2] + } + + cqlOUT = fmt.Sprintf("match (nodes) where nodes.unixtime='%s' return nodes", latest) + nodes, err = dao.Neo4j.Node_query(cqlOUT, "nodes", driver) + if err != nil { + err = errors.Wrap(err, " **2") + return nil, nil, err + } + + for _, _node := range nodes { + nodes_map[_node.DBID] = _node + } + + cqlOUT = fmt.Sprintf("match ()-[relas]->() where relas.unixtime='%s' return relas", latest) + edges, err = dao.Neo4j.Relation_query(cqlOUT, "relas", driver) + if err != nil { + err = errors.Wrap(err, " **2") + return nil, nil, err + } + + for _, _edge := range edges { + edges_map[_edge.DBID] = _edge + } // 添加 host node - for _, node := range nodes.Nodes { + for _, node := range nodes { if node.Type == "host" { - if _, ok := multi_nodes_map[node.ID]; !ok { - multi_nodes_map[node.ID] = node + if _, ok := multi_nodes_map[node.DBID]; !ok { + multi_nodes_map[node.DBID] = node multi_nodes = append(multi_nodes, node) } - hostids = append(hostids, node.ID) + hostids = append(hostids, node.DBID) } } - for _, edge := range edges.Edges { + for _, edge := range edges { if edge.Type == "tcp" || edge.Type == "udp" { - if _, ok := multi_edges_map[edge.ID]; !ok { - multi_edges_map[edge.ID] = edge + if _, ok := multi_edges_map[edge.DBID]; !ok { + multi_edges_map[edge.DBID] = edge multi_edges = append(multi_edges, edge) } - if _, ok := multi_nodes_map[nodes.Lookup[edge.Src].ID]; !ok { - multi_nodes_map[nodes.Lookup[edge.Src].ID] = nodes.Lookup[edge.Src] - multi_nodes = append(multi_nodes, nodes.Lookup[edge.Src]) + if _, ok := multi_nodes_map[edge.SrcID]; !ok { + multi_nodes_map[edge.SrcID] = nodes_map[edge.SrcID] + multi_nodes = append(multi_nodes, nodes_map[edge.SrcID]) } - if _, ok := multi_nodes_map[nodes.Lookup[edge.Dst].ID]; !ok { - multi_nodes_map[nodes.Lookup[edge.Dst].ID] = nodes.Lookup[edge.Dst] - multi_nodes = append(multi_nodes, nodes.Lookup[edge.Dst]) + if _, ok := multi_nodes_map[edge.DstID]; !ok { + multi_nodes_map[edge.DstID] = nodes_map[edge.DstID] + multi_nodes = append(multi_nodes, nodes_map[edge.DstID]) } } else if edge.Type == "server" || edge.Type == "client" { - if _, ok := multi_edges_map[edge.ID]; !ok { - multi_edges_map[edge.ID] = edge + if _, ok := multi_edges_map[edge.DBID]; !ok { + multi_edges_map[edge.DBID] = edge multi_edges = append(multi_edges, edge) } - if _, ok := multi_nodes_map[nodes.Lookup[edge.Src].ID]; !ok { - multi_nodes_map[nodes.Lookup[edge.Src].ID] = nodes.Lookup[edge.Src] - multi_nodes = append(multi_nodes, nodes.Lookup[edge.Src]) + if _, ok := multi_nodes_map[edge.SrcID]; !ok { + multi_nodes_map[edge.SrcID] = nodes_map[edge.SrcID] + multi_nodes = append(multi_nodes, nodes_map[edge.SrcID]) } - if _, ok := multi_nodes_map[nodes.Lookup[edge.Dst].ID]; !ok { - multi_nodes_map[nodes.Lookup[edge.Dst].ID] = nodes.Lookup[edge.Dst] - multi_nodes = append(multi_nodes, nodes.Lookup[edge.Dst]) + if _, ok := multi_nodes_map[edge.DstID]; !ok { + multi_nodes_map[edge.DstID] = nodes_map[edge.DstID] + multi_nodes = append(multi_nodes, nodes_map[edge.DstID]) } // 创建 net 节点相连的 process 节点与 host 节点的边实例 for _, hostid := range hostids { - if nodes.Lookup[edge.Dst].UUID == nodes.Lookup[hostid].UUID { - net_process__host_edge := &meta.Edge{ - ID: fmt.Sprintf("%s_%s_%s", edge.Dst, meta.EDGE_BELONG, hostid), + if nodes_map[edge.DstID].UUID == nodes_map[hostid].UUID { + net_process_host_edge := &meta.Edge{ + ID: fmt.Sprintf("%s_%s_%s", nodes_map[edge.DstID].ID, meta.EDGE_BELONG, nodes_map[hostid].ID), Type: meta.EDGE_BELONG, Src: edge.Dst, - Dst: hostid, - Dir: true, + Dst: nodes_map[hostid].ID, + Dir: "direct", } - if _, ok := multi_edges_map[net_process__host_edge.ID]; !ok { - multi_edges_map[net_process__host_edge.ID] = net_process__host_edge - multi_edges = append(multi_edges, net_process__host_edge) - } + // TODO: multi_edges_map未包含新创建的边, multi_edges中新创建的边没有DBID、SrcID、DstID + // multi_edges_map[net_process__host_edge.ID] = net_process__host_edge + multi_edges = append(multi_edges, net_process_host_edge) break } } } } - return multi_nodes, multi_edges, collect_errlist, process_errlist + + return multi_nodes, multi_edges, nil } -- Gitee