From ffcc17b0574929b322b18447e2d6c0db43d31d2f Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Thu, 11 Apr 2024 16:54:10 +0800 Subject: [PATCH] add generator package & remove processor/collector package --- agent/main.go | 3 + agent/service/dataCollector.go | 7 +- server/collector/init.go | 3 - server/conf/config.go | 2 + .../processor.go => generator/customTopo.go} | 416 ++---------------- .../collector.go => generator/generator.go} | 121 ++++- server/generator/meta.go | 12 + server/generator/publicTopo.go | 268 +++++++++++ .../utils/processMatch.go} | 56 +-- server/generator/utils/tagInjection.go | 59 +++ server/handler/customTopoHandler.go | 12 +- ...ultTopoHandler.go => publicTopoHandler.go} | 6 +- server/main.go | 3 +- .../customService.go} | 51 ++- server/service/periodcollect.go | 23 +- server/service/{ => public}/multihost.go | 2 +- .../service/{ => public}/multihostentire.go | 2 +- server/service/{ => public}/singlehost.go | 2 +- server/service/{ => public}/singlehosttree.go | 2 +- 19 files changed, 557 insertions(+), 493 deletions(-) delete mode 100755 server/collector/init.go rename server/{processor/processor.go => generator/customTopo.go} (42%) mode change 100755 => 100644 rename server/{collector/collector.go => generator/generator.go} (50%) mode change 100644 => 100755 create mode 100644 server/generator/meta.go create mode 100644 server/generator/publicTopo.go rename server/{processor/init.go => generator/utils/processMatch.go} (69%) create mode 100644 server/generator/utils/tagInjection.go rename server/handler/{defaultTopoHandler.go => publicTopoHandler.go} (95%) rename server/service/{customTopo.go => custom/customService.go} (71%) rename server/service/{ => public}/multihost.go (99%) rename server/service/{ => public}/multihostentire.go (99%) rename server/service/{ => public}/singlehost.go (99%) rename server/service/{ => public}/singlehosttree.go (99%) diff --git a/agent/main.go b/agent/main.go index bde0cd2..7672b16 100755 --- a/agent/main.go +++ b/agent/main.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "runtime" "gitee.com/openeuler/PilotGo-plugin-topology-agent/conf" "gitee.com/openeuler/PilotGo-plugin-topology-agent/handler" @@ -12,6 +13,8 @@ import ( ) func main() { + numCPUs := runtime.NumCPU() + runtime.GOMAXPROCS(numCPUs) InitLogger() diff --git a/agent/service/dataCollector.go b/agent/service/dataCollector.go index 0d3d131..43e088e 100755 --- a/agent/service/dataCollector.go +++ b/agent/service/dataCollector.go @@ -9,6 +9,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-agent/conf" "gitee.com/openeuler/PilotGo-plugin-topology-agent/utils" "github.com/pkg/errors" + "gitee.com/openeuler/PilotGo/sdk/logger" ) func DataCollectorService() (utils.Data_collector, error) { @@ -116,11 +117,11 @@ func DataCollectorService() (utils.Data_collector, error) { wg.Wait() - fmt.Println("============================") + logger.Info("============================") for _, t := range cost_time { - fmt.Println(t) + logger.Info(t) } - fmt.Println("============================") + logger.Info("============================") return collector.Psutildata, nil default: diff --git a/server/collector/init.go b/server/collector/init.go deleted file mode 100755 index c0c7fbd..0000000 --- a/server/collector/init.go +++ /dev/null @@ -1,3 +0,0 @@ -package collector - - diff --git a/server/conf/config.go b/server/conf/config.go index 252ba9b..8e97cf3 100644 --- a/server/conf/config.go +++ b/server/conf/config.go @@ -49,6 +49,8 @@ func InitConfig() { os.Exit(1) } + Global_Config = &ServerConfig{} + err = yaml.Unmarshal(bytes, Global_Config) if err != nil { err = errors.Errorf("yaml unmarshal failed: %s", err.Error()) // err top diff --git a/server/processor/processor.go b/server/generator/customTopo.go old mode 100755 new mode 100644 similarity index 42% rename from server/processor/processor.go rename to server/generator/customTopo.go index bf66098..2edb40c --- a/server/processor/processor.go +++ b/server/generator/customTopo.go @@ -1,260 +1,27 @@ -package processor +package generator import ( - "context" "fmt" "strconv" "strings" - "sync" "sync/atomic" - "time" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" - "gitee.com/openeuler/PilotGo-plugin-topology-server/collector" "gitee.com/openeuler/PilotGo-plugin-topology-server/db/mysqlmanager" - "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/global" "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" - "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo/sdk/logger" + "gitee.com/openeuler/PilotGo-plugin-topology-server/generator/utils" "github.com/pkg/errors" ) -type DataProcesser struct { - agent_node_count int32 -} - -func CreateDataProcesser() *DataProcesser { - return &DataProcesser{} -} - -func (d *DataProcesser) ProcessData(agentnum int, tagrules []mysqlmanager.Tag_rule, noderules [][]mysqlmanager.Filter_rule) (*graph.Nodes, *graph.Edges, []error, []error) { - nodes := &graph.Nodes{ - Lock: sync.Mutex{}, - Lookup: make(map[string]*graph.Node, 0), - LookupByType: make(map[string][]*graph.Node, 0), - LookupByUUID: make(map[string][]*graph.Node, 0), - Nodes: make([]*graph.Node, 0), - } - edges := &graph.Edges{ - Lock: sync.Mutex{}, - Lookup: sync.Map{}, - SrcToDsts: make(map[string][]string, 0), - DstToSrcs: make(map[string][]string, 0), - Edges: make([]*graph.Edge, 0), - } - - var wg sync.WaitGroup - var collect_errorlist []error - var process_errorlist []error - var process_errorlist_rwlock sync.RWMutex - - datacollector := collector.CreateDataCollector() - collect_errorlist = datacollector.CollectInstantData() - if len(collect_errorlist) != 0 { - for i, err := range collect_errorlist { - collect_errorlist[i] = errors.Wrap(err, "**7") - } - } - - start := time.Now() - - ctx1, cancel1 := context.WithCancel(pluginclient.Global_Context) - go func(cancelfunc context.CancelFunc) { - for { - if atomic.LoadInt32(&d.agent_node_count) == int32(agentnum) { - cancelfunc() - break - } - } - }(cancel1) - - if agentmanager.Global_AgentManager == nil { - err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top - errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) - return nil, nil, nil, nil - } - - agentmanager.Global_AgentManager.TAgentMap.Range( - func(key, value interface{}) bool { - wg.Add(1) - - agent := value.(*agentmanager.Agent) - - go func(ctx context.Context, _agent *agentmanager.Agent, _nodes *graph.Nodes, _edges *graph.Edges, _tagrules []mysqlmanager.Tag_rule, _noderules [][]mysqlmanager.Filter_rule) { - defer wg.Done() - - if _agent.Host_2 != nil && _agent.Processes_2 != nil && _agent.Netconnections_2 != nil { - if len(_noderules) != 0 { - err := d.CustomCreateNodeEntities(_agent, _nodes, _tagrules, _noderules) - if err != nil { - process_errorlist_rwlock.Lock() - process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) - process_errorlist_rwlock.Unlock() - } - - <-ctx.Done() - - err = d.CustomCreateEdgeEntities(_agent, _edges, _nodes) - if err != nil { - process_errorlist_rwlock.Lock() - process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) - process_errorlist_rwlock.Unlock() - } - } else { - err := d.CreateNodeEntities(_agent, _nodes) - if err != nil { - process_errorlist_rwlock.Lock() - process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) - process_errorlist_rwlock.Unlock() - } - - <-ctx.Done() - - err = d.CreateEdgeEntities(_agent, _edges, _nodes) - if err != nil { - process_errorlist_rwlock.Lock() - process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) - process_errorlist_rwlock.Unlock() - } - } - } - }(ctx1, agent, nodes, edges, tagrules, noderules) - - return true - }, - ) - wg.Wait() - - atomic.StoreInt32(&d.agent_node_count, int32(0)) - - elapse := time.Since(start) - logger.Info("\033[32mtopo server 采集数据处理时间\033[0m: %v\n", elapse) - - return nodes, edges, collect_errorlist, process_errorlist -} - -func (d *DataProcesser) CreateNodeEntities(agent *agentmanager.Agent, nodes *graph.Nodes) error { - host_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_HOST, global.NODE_CONNECTOR, agent.IP), - Name: agent.UUID, - Type: global.NODE_HOST, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_1, - ComboId: agent.UUID, - Metrics: *graph.HostToMap(agent.Host_2, &agent.AddrInterfaceMap_2), - } - - nodes.Add(host_node) - - for _, process := range agent.Processes_2 { - proc_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, global.NODE_CONNECTOR, global.NODE_PROCESS, global.NODE_CONNECTOR, process.Pid), - Name: process.ExeName, - Type: global.NODE_PROCESS, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_2, - ComboId: agent.UUID, - Metrics: *graph.ProcessToMap(process), - } - - nodes.Add(proc_node) - - for _, thread := range process.Threads { - thre_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, global.NODE_CONNECTOR, global.NODE_THREAD, global.NODE_CONNECTOR, thread.Tid), - Name: strconv.Itoa(int(thread.Tid)), - Type: global.NODE_THREAD, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_3, - ComboId: agent.UUID, - Metrics: *graph.ThreadToMap(&thread), - } - - nodes.Add(thre_node) - } - - // for _, net := range process.NetIOCounters { - // net_node := &graph.Node{ - // ID: fmt.Sprintf("%s-%s-%d", agent.UUID, global.NODE_NET, process.Pid), - // Name: net.Name, - // Type: global.NODE_NET, - // UUID: agent.UUID, - // Metrics: *utils.NetToMap(&net, &agent.AddrInterfaceMap_2), - // } - - // nodes.Add(net_node) - // } - } - - // 临时定义不含网络流量metric的网络节点 - for _, net := range agent.Netconnections_2 { - if laddr_slice := strings.Split(net.Laddr, ":"); len(laddr_slice) != 0 { - net_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%d:%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_NET, global.NODE_CONNECTOR, net.Pid, laddr_slice[1]), - Name: net.Laddr, - Type: global.NODE_NET, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_5, - ComboId: agent.UUID, - Metrics: *graph.NetToMap(net), - } - - nodes.Add(net_node) - } else { - err := errors.Errorf("syntax error: %s **errstack**13", net.Laddr) // err top - errormanager.ErrorTransmit(pluginclient.Global_Context, err, false) - } - } - - for _, disk := range agent.Disks_2 { - disk_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_RESOURCE, global.NODE_CONNECTOR, disk.Partition.Device), - Name: disk.Partition.Device, - Type: global.NODE_RESOURCE, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_4, - ComboId: agent.UUID, - Metrics: *graph.DiskToMap(disk), - } - - nodes.Add(disk_node) - } +type CustomTopo struct { + Tagrules []mysqlmanager.Tag_rule + Noderules [][]mysqlmanager.Filter_rule - for _, cpu := range agent.Cpus_2 { - cpu_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_RESOURCE, global.NODE_CONNECTOR, "CPU"+strconv.Itoa(int(cpu.Info.CPU))), - Name: "CPU" + strconv.Itoa(int(cpu.Info.CPU)), - Type: global.NODE_RESOURCE, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_4, - ComboId: agent.UUID, - Metrics: *graph.CpuToMap(cpu), - } - - nodes.Add(cpu_node) - } - - for _, ifaceio := range agent.NetIOcounters_2 { - iface_node := &graph.Node{ - ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_RESOURCE, global.NODE_CONNECTOR, "NC"+ifaceio.Name), - Name: "NC" + ifaceio.Name, - Type: global.NODE_RESOURCE, - UUID: agent.UUID, - LayoutAttr: global.INNER_LAYOUT_4, - ComboId: agent.UUID, - Metrics: *graph.InterfaceToMap(ifaceio), - } - - nodes.Add(iface_node) - } - - atomic.AddInt32(&d.agent_node_count, int32(1)) - - return nil + Agent_node_count *int32 } -func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, nodes *graph.Nodes, tagrules []mysqlmanager.Tag_rule, noderules [][]mysqlmanager.Filter_rule) error { +func (c *CustomTopo) CreateNodeEntities(agent *agentmanager.Agent, nodes *graph.Nodes) error { allconnections := []graph.Netconnection{} for _, net := range agent.Netconnections_2 { allconnections = append(allconnections, *net) @@ -272,19 +39,19 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } host_node.Tags = append(host_node.Tags, host_node.UUID, host_node.Type) - if err := TagInjection(host_node, tagrules); err != nil { - atomic.AddInt32(&d.agent_node_count, int32(1)) + if err := utils.TagInjection(host_node, c.Tagrules); err != nil { + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Wrap(err, "**3") } nodes.Add(host_node) - for _, rules := range noderules { + for _, rules := range c.Noderules { uuid := "" for _, condition := range rules { if condition.Rule_type == mysqlmanager.FILTER_TYPE_HOST { if _uuid, ok := condition.Rule_condition["uuid"]; !ok { - atomic.AddInt32(&d.agent_node_count, int32(1)) + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Errorf("there is no uuid field in node host rule_condition: %+v **errstack**3", condition.Rule_condition) } else { uuid = _uuid.(string) @@ -303,9 +70,9 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node case mysqlmanager.FILTER_TYPE_PROCESS: for _, process := range agent.Processes_2 { if _name, ok := condition.Rule_condition["name"]; !ok { - atomic.AddInt32(&d.agent_node_count, int32(1)) + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Errorf("there is no name field in node rule_condition: %+v **errstack**3", condition.Rule_condition) - } else if ProcessMatching(agent, process.ExeName, process.Cmdline, _name.(string)) { + } else if utils.ProcessMatching(agent, process.ExeName, process.Cmdline, _name.(string)) { proc_node := &graph.Node{ ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, global.NODE_CONNECTOR, global.NODE_PROCESS, global.NODE_CONNECTOR, process.Pid), Name: _name.(string), @@ -318,8 +85,8 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } proc_node.Tags = append(proc_node.Tags, proc_node.UUID, proc_node.Type) - if err := TagInjection(proc_node, tagrules); err != nil { - atomic.AddInt32(&d.agent_node_count, int32(1)) + if err := utils.TagInjection(proc_node, c.Tagrules); err != nil { + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Wrap(err, "**3") } @@ -331,9 +98,9 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node case mysqlmanager.FILTER_TYPE_TAG: for _, process := range agent.Processes_2 { if _tag, ok := condition.Rule_condition["tag_name"]; !ok { - atomic.AddInt32(&d.agent_node_count, int32(1)) + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Errorf("there is no tag_name field in node rule_condition: %+v **errstack**3", condition.Rule_condition) - } else if ProcessMatching(agent, process.ExeName, process.Cmdline, _tag.(string)) { + } else if utils.ProcessMatching(agent, process.ExeName, process.Cmdline, _tag.(string)) { proc_node := &graph.Node{ ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, global.NODE_CONNECTOR, global.NODE_PROCESS, global.NODE_CONNECTOR, process.Pid), Name: _tag.(string), @@ -346,8 +113,8 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } proc_node.Tags = append(proc_node.Tags, proc_node.UUID, proc_node.Type) - if err := TagInjection(proc_node, tagrules); err != nil { - atomic.AddInt32(&d.agent_node_count, int32(1)) + if err := utils.TagInjection(proc_node, c.Tagrules); err != nil { + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Wrap(err, "**3") } @@ -369,8 +136,8 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } disk_node.Tags = append(disk_node.Tags, disk_node.UUID, disk_node.Type) - if err := TagInjection(disk_node, tagrules); err != nil { - atomic.AddInt32(&d.agent_node_count, int32(1)) + if err := utils.TagInjection(disk_node, c.Tagrules); err != nil { + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Wrap(err, "") } @@ -389,8 +156,8 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } cpu_node.Tags = append(cpu_node.Tags, cpu_node.UUID, cpu_node.Type) - if err := TagInjection(cpu_node, tagrules); err != nil { - atomic.AddInt32(&d.agent_node_count, int32(1)) + if err := utils.TagInjection(cpu_node, c.Tagrules); err != nil { + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Wrap(err, "**3") } @@ -409,8 +176,8 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } iface_node.Tags = append(iface_node.Tags, iface_node.UUID, iface_node.Type) - if err := TagInjection(iface_node, tagrules); err != nil { - atomic.AddInt32(&d.agent_node_count, int32(1)) + if err := utils.TagInjection(iface_node, c.Tagrules); err != nil { + atomic.AddInt32(c.Agent_node_count, int32(1)) return errors.Wrap(err, "**3") } @@ -421,137 +188,12 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent, node } - atomic.AddInt32(&d.agent_node_count, int32(1)) - - return nil -} - -func (d *DataProcesser) CreateEdgeEntities(agent *agentmanager.Agent, edges *graph.Edges, nodes *graph.Nodes) error { - nodes_map := map[string][]*graph.Node{} - - for _, node := range nodes.Nodes { - switch node.Type { - case global.NODE_HOST: - nodes_map[global.NODE_HOST] = append(nodes_map[global.NODE_HOST], node) - case global.NODE_PROCESS: - nodes_map[global.NODE_PROCESS] = append(nodes_map[global.NODE_PROCESS], node) - case global.NODE_THREAD: - nodes_map[global.NODE_THREAD] = append(nodes_map[global.NODE_THREAD], node) - case global.NODE_NET: - nodes_map[global.NODE_NET] = append(nodes_map[global.NODE_NET], node) - case global.NODE_RESOURCE: - nodes_map[global.NODE_RESOURCE] = append(nodes_map[global.NODE_RESOURCE], node) - } - } - - for _, sub := range nodes_map[global.NODE_HOST] { - for _, obj := range nodes_map[global.NODE_PROCESS] { - if obj.UUID == sub.UUID && obj.Metrics["Pid"] == "1" { - belong_edge := &graph.Edge{ - ID: fmt.Sprintf("%s%s%s%s%s", obj.ID, global.EDGE_CONNECTOR, global.EDGE_BELONG, global.EDGE_CONNECTOR, sub.ID), - Type: global.EDGE_BELONG, - Src: obj.ID, - Dst: sub.ID, - Dir: "direct", - } - - edges.Add(belong_edge) - } - } - } - - for _, sub := range nodes_map[global.NODE_HOST] { - for _, obj := range nodes_map[global.NODE_RESOURCE] { - if sub.UUID == obj.UUID { - belong_edge := &graph.Edge{ - ID: fmt.Sprintf("%s%s%s%s%s", obj.ID, global.EDGE_CONNECTOR, global.EDGE_BELONG, global.EDGE_CONNECTOR, sub.ID), - Type: global.EDGE_BELONG, - Src: obj.ID, - Dst: sub.ID, - Dir: "direct", - } - - edges.Add(belong_edge) - } - } - } - - for _, sub := range nodes_map[global.NODE_PROCESS] { - for _, obj := range nodes_map[global.NODE_PROCESS] { - if obj.UUID == sub.UUID && obj.Metrics["Pid"] == sub.Metrics["Ppid"] { - belong_edge := &graph.Edge{ - ID: fmt.Sprintf("%s%s%s%s%s", sub.ID, global.EDGE_CONNECTOR, global.EDGE_BELONG, global.EDGE_CONNECTOR, obj.ID), - Type: global.EDGE_BELONG, - Src: sub.ID, - Dst: obj.ID, - Dir: "direct", - } - - edges.Add(belong_edge) - } - } - } - - // TODO: 暂定net节点关系的type均为server,暂时无法判断socket连接中的server端和agent端,需要借助其他网络工具 - for _, sub := range nodes_map[global.NODE_NET] { - for _, obj := range nodes_map[global.NODE_PROCESS] { - if obj.Metrics["Pid"] == sub.Metrics["Pid"] { - server_edge := &graph.Edge{ - ID: fmt.Sprintf("%s%s%s%s%s", sub.ID, global.EDGE_CONNECTOR, global.EDGE_SERVER, global.EDGE_CONNECTOR, obj.ID), - Type: global.EDGE_SERVER, - Src: sub.ID, - Dst: obj.ID, - Dir: "direct", - } - - edges.Add(server_edge) - } - } - } - - // 生成跨主机对等网络关系实例 - for _, net := range agent.Netconnections_2 { - var peernode1 *graph.Node - var peernode2 *graph.Node - - for _, netnode := range nodes_map[global.NODE_NET] { - switch netnode.Metrics["Laddr"] { - case net.Laddr: - peernode1 = netnode - case net.Raddr: - peernode2 = netnode - } - - if peernode1 != nil && peernode2 != nil { - break - } - } - - if peernode1 != nil && peernode2 != nil { - var edgetype string - switch peernode1.Metrics["Type"] { - case "1": - edgetype = global.EDGE_TCP - case "2": - edgetype = global.EDGE_UDP - } - - peernet_edge := &graph.Edge{ - ID: fmt.Sprintf("%s%s%s%s%s", peernode1.ID, global.EDGE_CONNECTOR, edgetype, global.EDGE_CONNECTOR, peernode2.ID), - Type: edgetype, - Src: peernode1.ID, - Dst: peernode2.ID, - Dir: "undirect", - } - - edges.Add(peernet_edge) - } - } + atomic.AddInt32(c.Agent_node_count, int32(1)) return nil } -func (d *DataProcesser) CustomCreateEdgeEntities(agent *agentmanager.Agent, edges *graph.Edges, nodes *graph.Nodes) error { +func (c *CustomTopo) CreateEdgeEntities(agent *agentmanager.Agent, edges *graph.Edges, nodes *graph.Nodes) error { nodes_map := map[string][]*graph.Node{} for _, node := range nodes.Nodes { @@ -687,3 +329,7 @@ func (d *DataProcesser) CustomCreateEdgeEntities(agent *agentmanager.Agent, edge return nil } + +func (c *CustomTopo) Return_Agent_node_count() *int32 { + return c.Agent_node_count +} diff --git a/server/collector/collector.go b/server/generator/generator.go old mode 100644 new mode 100755 similarity index 50% rename from server/collector/collector.go rename to server/generator/generator.go index ccb3492..d18b078 --- a/server/collector/collector.go +++ b/server/generator/generator.go @@ -1,15 +1,18 @@ -package collector +package generator import ( "bytes" + "context" "encoding/json" "io" "os" "sync" + "sync/atomic" "time" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" + "gitee.com/openeuler/PilotGo-plugin-topology-server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" @@ -19,13 +22,117 @@ import ( "github.com/pkg/errors" ) -type DataCollector struct{} +type TopoGenerator struct { + Factory TopoInterface +} + +func CreateTopoGenerator(trules []mysqlmanager.Tag_rule, nrules [][]mysqlmanager.Filter_rule) *TopoGenerator { + _topogenerator := &TopoGenerator{} + + if len(nrules) != 0 { + _topogenerator.Factory = &CustomTopo{ + Tagrules: trules, + Noderules: nrules, + Agent_node_count: new(int32), + } + return _topogenerator + } + + _topogenerator.Factory = &PublicTopo{ + Agent_node_count: new(int32), + } + return _topogenerator +} + +func (t *TopoGenerator) ProcessingData(agentnum int) (*graph.Nodes, *graph.Edges, []error, []error) { + nodes := &graph.Nodes{ + Lock: sync.Mutex{}, + Lookup: make(map[string]*graph.Node, 0), + LookupByType: make(map[string][]*graph.Node, 0), + LookupByUUID: make(map[string][]*graph.Node, 0), + Nodes: make([]*graph.Node, 0), + } + edges := &graph.Edges{ + Lock: sync.Mutex{}, + Lookup: sync.Map{}, + SrcToDsts: make(map[string][]string, 0), + DstToSrcs: make(map[string][]string, 0), + Edges: make([]*graph.Edge, 0), + } + + var wg sync.WaitGroup + var collect_errorlist []error + var process_errorlist []error + var process_errorlist_rwlock sync.RWMutex + + collect_errorlist = t.collectInstantData() + if len(collect_errorlist) != 0 { + for i, err := range collect_errorlist { + collect_errorlist[i] = errors.Wrap(err, "**7") + } + } + + start := time.Now() + + ctx1, cancel1 := context.WithCancel(pluginclient.Global_Context) + go func(cancelfunc context.CancelFunc) { + for { + if atomic.LoadInt32(t.Factory.Return_Agent_node_count()) == int32(agentnum) { + cancelfunc() + break + } + } + }(cancel1) + + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) + return nil, nil, nil, nil + } + + agentmanager.Global_AgentManager.TAgentMap.Range( + func(key, value interface{}) bool { + wg.Add(1) + + agent := value.(*agentmanager.Agent) + + go func(ctx context.Context, _agent *agentmanager.Agent, _nodes *graph.Nodes, _edges *graph.Edges) { + defer wg.Done() + + if _agent.Host_2 != nil && _agent.Processes_2 != nil && _agent.Netconnections_2 != nil { + err := t.Factory.CreateNodeEntities(_agent, _nodes) + if err != nil { + process_errorlist_rwlock.Lock() + process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) + process_errorlist_rwlock.Unlock() + } + + <-ctx.Done() + + err = t.Factory.CreateEdgeEntities(_agent, _edges, _nodes) + if err != nil { + process_errorlist_rwlock.Lock() + process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) + process_errorlist_rwlock.Unlock() + } + + } + }(ctx1, agent, nodes, edges) + + return true + }, + ) + wg.Wait() + + atomic.StoreInt32(t.Factory.Return_Agent_node_count(), int32(0)) + + elapse := time.Since(start) + logger.Info("\033[32mtopo server 采集数据处理时间\033[0m: %v\n", elapse) -func CreateDataCollector() *DataCollector { - return &DataCollector{} + return nodes, edges, collect_errorlist, process_errorlist } -func (d *DataCollector) CollectInstantData() []error { +func (t *TopoGenerator) collectInstantData() []error { start := time.Now() var wg sync.WaitGroup var errorlist []error @@ -47,7 +154,7 @@ func (d *DataCollector) CollectInstantData() []error { temp_start := time.Now() agent := value.(*agentmanager.Agent) agent.Port = conf.Global_Config.Topo.Agent_port - err := d.GetCollectDataFromTopoAgent(agent) + err := t.getCollectDataFromTopoAgent(agent) if err != nil { errorlist_rwlock.Lock() errorlist = append(errorlist, errors.Wrapf(err, "%s**2", agent.IP)) @@ -75,7 +182,7 @@ func (d *DataCollector) CollectInstantData() []error { return nil } -func (d *DataCollector) GetCollectDataFromTopoAgent(agent *agentmanager.Agent) error { +func (t *TopoGenerator) getCollectDataFromTopoAgent(agent *agentmanager.Agent) error { url := "http://" + agent.IP + ":" + agent.Port + "/plugin/topology/api/rawdata" resp, err := httputils.Get(url, nil) diff --git a/server/generator/meta.go b/server/generator/meta.go new file mode 100644 index 0000000..e29fa07 --- /dev/null +++ b/server/generator/meta.go @@ -0,0 +1,12 @@ +package generator + +import ( + "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" +) + +type TopoInterface interface { + CreateNodeEntities(*agentmanager.Agent, *graph.Nodes) error + CreateEdgeEntities(*agentmanager.Agent, *graph.Edges, *graph.Nodes) error + Return_Agent_node_count() *int32 +} diff --git a/server/generator/publicTopo.go b/server/generator/publicTopo.go new file mode 100644 index 0000000..33611b2 --- /dev/null +++ b/server/generator/publicTopo.go @@ -0,0 +1,268 @@ +package generator + +import ( + "fmt" + "strconv" + "strings" + "sync/atomic" + + "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/global" + "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" + "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" + "github.com/pkg/errors" +) + +type PublicTopo struct { + Agent_node_count *int32 +} + +func (p *PublicTopo) CreateNodeEntities(agent *agentmanager.Agent, nodes *graph.Nodes) error { + host_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_HOST, global.NODE_CONNECTOR, agent.IP), + Name: agent.UUID, + Type: global.NODE_HOST, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_1, + ComboId: agent.UUID, + Metrics: *graph.HostToMap(agent.Host_2, &agent.AddrInterfaceMap_2), + } + + nodes.Add(host_node) + + for _, process := range agent.Processes_2 { + proc_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, global.NODE_CONNECTOR, global.NODE_PROCESS, global.NODE_CONNECTOR, process.Pid), + Name: process.ExeName, + Type: global.NODE_PROCESS, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_2, + ComboId: agent.UUID, + Metrics: *graph.ProcessToMap(process), + } + + nodes.Add(proc_node) + + for _, thread := range process.Threads { + thre_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, global.NODE_CONNECTOR, global.NODE_THREAD, global.NODE_CONNECTOR, thread.Tid), + Name: strconv.Itoa(int(thread.Tid)), + Type: global.NODE_THREAD, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_3, + ComboId: agent.UUID, + Metrics: *graph.ThreadToMap(&thread), + } + + nodes.Add(thre_node) + } + + // for _, net := range process.NetIOCounters { + // net_node := &graph.Node{ + // ID: fmt.Sprintf("%s-%s-%d", agent.UUID, global.NODE_NET, process.Pid), + // Name: net.Name, + // Type: global.NODE_NET, + // UUID: agent.UUID, + // Metrics: *utils.NetToMap(&net, &agent.AddrInterfaceMap_2), + // } + + // nodes.Add(net_node) + // } + } + + // 临时定义不含网络流量metric的网络节点 + for _, net := range agent.Netconnections_2 { + if laddr_slice := strings.Split(net.Laddr, ":"); len(laddr_slice) != 0 { + net_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%d:%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_NET, global.NODE_CONNECTOR, net.Pid, laddr_slice[1]), + Name: net.Laddr, + Type: global.NODE_NET, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_5, + ComboId: agent.UUID, + Metrics: *graph.NetToMap(net), + } + + nodes.Add(net_node) + } else { + err := errors.Errorf("syntax error: %s **errstack**13", net.Laddr) // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, err, false) + } + } + + for _, disk := range agent.Disks_2 { + disk_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_RESOURCE, global.NODE_CONNECTOR, disk.Partition.Device), + Name: disk.Partition.Device, + Type: global.NODE_RESOURCE, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_4, + ComboId: agent.UUID, + Metrics: *graph.DiskToMap(disk), + } + + nodes.Add(disk_node) + } + + for _, cpu := range agent.Cpus_2 { + cpu_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_RESOURCE, global.NODE_CONNECTOR, "CPU"+strconv.Itoa(int(cpu.Info.CPU))), + Name: "CPU" + strconv.Itoa(int(cpu.Info.CPU)), + Type: global.NODE_RESOURCE, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_4, + ComboId: agent.UUID, + Metrics: *graph.CpuToMap(cpu), + } + + nodes.Add(cpu_node) + } + + for _, ifaceio := range agent.NetIOcounters_2 { + iface_node := &graph.Node{ + ID: fmt.Sprintf("%s%s%s%s%s", agent.UUID, global.NODE_CONNECTOR, global.NODE_RESOURCE, global.NODE_CONNECTOR, "NC"+ifaceio.Name), + Name: "NC" + ifaceio.Name, + Type: global.NODE_RESOURCE, + UUID: agent.UUID, + LayoutAttr: global.INNER_LAYOUT_4, + ComboId: agent.UUID, + Metrics: *graph.InterfaceToMap(ifaceio), + } + + nodes.Add(iface_node) + } + + atomic.AddInt32(p.Agent_node_count, int32(1)) + + return nil +} + +func (p *PublicTopo) CreateEdgeEntities(agent *agentmanager.Agent, edges *graph.Edges, nodes *graph.Nodes) error { + nodes_map := map[string][]*graph.Node{} + + for _, node := range nodes.Nodes { + switch node.Type { + case global.NODE_HOST: + nodes_map[global.NODE_HOST] = append(nodes_map[global.NODE_HOST], node) + case global.NODE_PROCESS: + nodes_map[global.NODE_PROCESS] = append(nodes_map[global.NODE_PROCESS], node) + case global.NODE_THREAD: + nodes_map[global.NODE_THREAD] = append(nodes_map[global.NODE_THREAD], node) + case global.NODE_NET: + nodes_map[global.NODE_NET] = append(nodes_map[global.NODE_NET], node) + case global.NODE_RESOURCE: + nodes_map[global.NODE_RESOURCE] = append(nodes_map[global.NODE_RESOURCE], node) + } + } + + for _, sub := range nodes_map[global.NODE_HOST] { + for _, obj := range nodes_map[global.NODE_PROCESS] { + if obj.UUID == sub.UUID && obj.Metrics["Pid"] == "1" { + belong_edge := &graph.Edge{ + ID: fmt.Sprintf("%s%s%s%s%s", obj.ID, global.EDGE_CONNECTOR, global.EDGE_BELONG, global.EDGE_CONNECTOR, sub.ID), + Type: global.EDGE_BELONG, + Src: obj.ID, + Dst: sub.ID, + Dir: "direct", + } + + edges.Add(belong_edge) + } + } + } + + for _, sub := range nodes_map[global.NODE_HOST] { + for _, obj := range nodes_map[global.NODE_RESOURCE] { + if sub.UUID == obj.UUID { + belong_edge := &graph.Edge{ + ID: fmt.Sprintf("%s%s%s%s%s", obj.ID, global.EDGE_CONNECTOR, global.EDGE_BELONG, global.EDGE_CONNECTOR, sub.ID), + Type: global.EDGE_BELONG, + Src: obj.ID, + Dst: sub.ID, + Dir: "direct", + } + + edges.Add(belong_edge) + } + } + } + + for _, sub := range nodes_map[global.NODE_PROCESS] { + for _, obj := range nodes_map[global.NODE_PROCESS] { + if obj.UUID == sub.UUID && obj.Metrics["Pid"] == sub.Metrics["Ppid"] { + belong_edge := &graph.Edge{ + ID: fmt.Sprintf("%s%s%s%s%s", sub.ID, global.EDGE_CONNECTOR, global.EDGE_BELONG, global.EDGE_CONNECTOR, obj.ID), + Type: global.EDGE_BELONG, + Src: sub.ID, + Dst: obj.ID, + Dir: "direct", + } + + edges.Add(belong_edge) + } + } + } + + // TODO: 暂定net节点关系的type均为server,暂时无法判断socket连接中的server端和agent端,需要借助其他网络工具 + for _, sub := range nodes_map[global.NODE_NET] { + for _, obj := range nodes_map[global.NODE_PROCESS] { + if obj.Metrics["Pid"] == sub.Metrics["Pid"] { + server_edge := &graph.Edge{ + ID: fmt.Sprintf("%s%s%s%s%s", sub.ID, global.EDGE_CONNECTOR, global.EDGE_SERVER, global.EDGE_CONNECTOR, obj.ID), + Type: global.EDGE_SERVER, + Src: sub.ID, + Dst: obj.ID, + Dir: "direct", + } + + edges.Add(server_edge) + } + } + } + + // 生成跨主机对等网络关系实例 + for _, net := range agent.Netconnections_2 { + var peernode1 *graph.Node + var peernode2 *graph.Node + + for _, netnode := range nodes_map[global.NODE_NET] { + switch netnode.Metrics["Laddr"] { + case net.Laddr: + peernode1 = netnode + case net.Raddr: + peernode2 = netnode + } + + if peernode1 != nil && peernode2 != nil { + break + } + } + + if peernode1 != nil && peernode2 != nil { + var edgetype string + switch peernode1.Metrics["Type"] { + case "1": + edgetype = global.EDGE_TCP + case "2": + edgetype = global.EDGE_UDP + } + + peernet_edge := &graph.Edge{ + ID: fmt.Sprintf("%s%s%s%s%s", peernode1.ID, global.EDGE_CONNECTOR, edgetype, global.EDGE_CONNECTOR, peernode2.ID), + Type: edgetype, + Src: peernode1.ID, + Dst: peernode2.ID, + Dir: "undirect", + } + + edges.Add(peernet_edge) + } + } + + return nil +} + +func (p *PublicTopo) Return_Agent_node_count() *int32 { + return p.Agent_node_count +} diff --git a/server/processor/init.go b/server/generator/utils/processMatch.go similarity index 69% rename from server/processor/init.go rename to server/generator/utils/processMatch.go index 5c21fc0..7ffb2ad 100755 --- a/server/processor/init.go +++ b/server/generator/utils/processMatch.go @@ -1,4 +1,4 @@ -package processor +package utils import ( "encoding/json" @@ -6,9 +6,7 @@ import ( "strings" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" - "gitee.com/openeuler/PilotGo-plugin-topology-server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" - "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" docker "github.com/fsouza/go-dockerclient" @@ -130,55 +128,3 @@ func ProcessMatching(agent *agentmanager.Agent, exename, cmdline, component stri return false } - -func TagInjection(n *graph.Node, tags []mysqlmanager.Tag_rule) error { - for _, tagrule := range tags { - for _, rules := range tagrule.Rules { - // 判断是否为同一台机器 - uuid := "" - for _, condition := range rules { - if condition.Rule_type == mysqlmanager.FILTER_TYPE_HOST { - _uuid, ok := condition.Rule_condition["uuid"] - if !ok { - return errors.Errorf("there is no uuid field in tag rule_condition: %+v **errstack**2", condition.Rule_condition) - } - uuid = _uuid.(string) - break - } - } - if uuid != n.UUID { - continue - } - - // 为host节点添加标签 - if n.Type == "host" && len(rules) == 1 { - n.Tags = append(n.Tags, tagrule.Tag_name) - break - } - - for _, condition := range rules { - switch condition.Rule_type { - case mysqlmanager.FILTER_TYPE_HOST: - continue - case mysqlmanager.FILTER_TYPE_PROCESS: - if _name, ok := condition.Rule_condition["name"]; !ok { - return errors.Errorf("there is no name field in tag rule_condition: %+v **errstack**2", condition.Rule_condition) - } else if _name.(string) == n.Name { - n.Tags = append(n.Tags, tagrule.Tag_name) - } - case mysqlmanager.FILTER_TYPE_TAG: - if _tag, ok := condition.Rule_condition["tag_name"]; !ok { - return errors.Errorf("there is no tag_name field in tag rule_condition: %+v **errstack**2", condition.Rule_condition) - } else if _tag.(string) == n.Name { - n.Tags = append(n.Tags, tagrule.Tag_name) - } - case mysqlmanager.FILTER_TYPE_RESOURCE: - // TODO: 暂时不区分disk cpu nc等资源节点 - n.Tags = append(n.Tags, tagrule.Tag_name) - } - } - } - } - - return nil -} diff --git a/server/generator/utils/tagInjection.go b/server/generator/utils/tagInjection.go new file mode 100644 index 0000000..3b2336c --- /dev/null +++ b/server/generator/utils/tagInjection.go @@ -0,0 +1,59 @@ +package utils + +import ( + "gitee.com/openeuler/PilotGo-plugin-topology-server/db/mysqlmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" + "github.com/pkg/errors" +) + +func TagInjection(n *graph.Node, tags []mysqlmanager.Tag_rule) error { + for _, tagrule := range tags { + for _, rules := range tagrule.Rules { + // 判断是否为同一台机器 + uuid := "" + for _, condition := range rules { + if condition.Rule_type == mysqlmanager.FILTER_TYPE_HOST { + _uuid, ok := condition.Rule_condition["uuid"] + if !ok { + return errors.Errorf("there is no uuid field in tag rule_condition: %+v **errstack**2", condition.Rule_condition) + } + uuid = _uuid.(string) + break + } + } + if uuid != n.UUID { + continue + } + + // 为host节点添加标签 + if n.Type == "host" && len(rules) == 1 { + n.Tags = append(n.Tags, tagrule.Tag_name) + break + } + + for _, condition := range rules { + switch condition.Rule_type { + case mysqlmanager.FILTER_TYPE_HOST: + continue + case mysqlmanager.FILTER_TYPE_PROCESS: + if _name, ok := condition.Rule_condition["name"]; !ok { + return errors.Errorf("there is no name field in tag rule_condition: %+v **errstack**2", condition.Rule_condition) + } else if _name.(string) == n.Name { + n.Tags = append(n.Tags, tagrule.Tag_name) + } + case mysqlmanager.FILTER_TYPE_TAG: + if _tag, ok := condition.Rule_condition["tag_name"]; !ok { + return errors.Errorf("there is no tag_name field in tag rule_condition: %+v **errstack**2", condition.Rule_condition) + } else if _tag.(string) == n.Name { + n.Tags = append(n.Tags, tagrule.Tag_name) + } + case mysqlmanager.FILTER_TYPE_RESOURCE: + // TODO: 暂时不区分disk cpu nc等资源节点 + n.Tags = append(n.Tags, tagrule.Tag_name) + } + } + } + } + + return nil +} diff --git a/server/handler/customTopoHandler.go b/server/handler/customTopoHandler.go index e81e745..727ae8c 100755 --- a/server/handler/customTopoHandler.go +++ b/server/handler/customTopoHandler.go @@ -8,7 +8,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology-server/service" + "gitee.com/openeuler/PilotGo-plugin-topology-server/service/custom" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -31,7 +31,7 @@ func CustomTopoListHandle(ctx *gin.Context) { return } - tcs, total, err := service.CustomTopoListService(query) + tcs, total, err := custom.CustomTopoListService(query) if err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": @@ -67,7 +67,7 @@ func CreateCustomTopoHandle(ctx *gin.Context) { return } - tcdb_id, err := service.CreateCustomTopoService(tc) + tcdb_id, err := custom.CreateCustomTopoService(tc) if err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": @@ -108,7 +108,7 @@ func UpdateCustomTopoHandle(ctx *gin.Context) { return } - tcdb_id, err := service.UpdateCustomTopoService(req_body.TC, *req_body.ID) + tcdb_id, err := custom.UpdateCustomTopoService(req_body.TC, *req_body.ID) if err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": @@ -157,7 +157,7 @@ func RunCustomTopoHandle(ctx *gin.Context) { return } - topodata.Nodes, topodata.Edges, topodata.Combos, err = service.RunCustomTopoService(uint(tcid_int)) + topodata.Nodes, topodata.Edges, topodata.Combos, err = custom.RunCustomTopoService(uint(tcid_int)) if err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": @@ -213,7 +213,7 @@ func DeleteCustomTopoHandle(ctx *gin.Context) { return } - if err := service.DeleteCustomTopoService(req_body.IDs); err != nil { + if err := custom.DeleteCustomTopoService(req_body.IDs); err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": err = errors.Wrap(err, "**errstack**1") // err top diff --git a/server/handler/defaultTopoHandler.go b/server/handler/publicTopoHandler.go similarity index 95% rename from server/handler/defaultTopoHandler.go rename to server/handler/publicTopoHandler.go index be2b549..972fc70 100644 --- a/server/handler/defaultTopoHandler.go +++ b/server/handler/publicTopoHandler.go @@ -5,7 +5,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology-server/service" + "gitee.com/openeuler/PilotGo-plugin-topology-server/service/public" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -51,7 +51,7 @@ import ( func SingleHostTreeHandle(ctx *gin.Context) { uuid := ctx.Param("uuid") - nodes, err := service.SingleHostTreeService(uuid) + nodes, err := public.SingleHostTreeService(uuid) if err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": @@ -95,7 +95,7 @@ func SingleHostTreeHandle(ctx *gin.Context) { } func MultiHostHandle(ctx *gin.Context) { - nodes, edges, combos, err := service.MultiHostService() + nodes, edges, combos, err := public.MultiHostService() if err != nil { switch strings.Split(errors.Cause(err).Error(), "**")[1] { case "errstack": diff --git a/server/main.go b/server/main.go index a12a304..37f0592 100755 --- a/server/main.go +++ b/server/main.go @@ -62,13 +62,14 @@ func main() { /* init database + neo4j mysql redis prometheus */ db.InitDB() /* topo插件自身数据采集模块周期性数据采集: 全局网络拓扑、单机拓扑 */ - service.PeriodCollectWorking([]string{}, [][]mysqlmanager.Filter_rule{}) + service.InitPeriodCollectWorking([]string{}, [][]mysqlmanager.Filter_rule{}) /* 终止进程信号监听 diff --git a/server/service/customTopo.go b/server/service/custom/customService.go similarity index 71% rename from server/service/customTopo.go rename to server/service/custom/customService.go index ede37e3..c8d832f 100755 --- a/server/service/customTopo.go +++ b/server/service/custom/customService.go @@ -1,15 +1,18 @@ -package service +package custom import ( "strconv" + "strings" "time" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" - "gitee.com/openeuler/PilotGo-plugin-topology-server/db/graphmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/db/redismanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" + "gitee.com/openeuler/PilotGo-plugin-topology-server/generator" + "gitee.com/openeuler/PilotGo/sdk/response" "github.com/pkg/errors" ) @@ -62,13 +65,47 @@ func RunCustomTopoService(tcid uint) ([]*graph.Node, []*graph.Edge, []map[string return nil, nil, nil, errors.New("redis client not init **errstack**1") } - unixtime_now := time.Now().Unix() - nodes, edges, combos, err := DataProcessWorking(unixtime_now, running_agent_num, graphmanager.Global_GraphDB, tc.TagRules, tc.NodeRules) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "**2") + topogenerator := generator.CreateTopoGenerator(tc.TagRules, tc.NodeRules) + nodes, edges, collect_errlist, process_errlist := topogenerator.ProcessingData(running_agent_num) + if len(collect_errlist) != 0 { + for i, cerr := range collect_errlist { + collect_errlist[i] = errors.Wrap(cerr, "**errstack**3") // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, collect_errlist[i], false) + } + collect_errlist_string := []string{} + for _, e := range collect_errlist { + collect_errlist_string = append(collect_errlist_string, e.Error()) + } + return nil, nil, nil, errors.Errorf("collect data failed: %+v **errstack**10", strings.Join(collect_errlist_string, "/e/")) + } + if len(process_errlist) != 0 { + for i, perr := range process_errlist { + process_errlist[i] = errors.Wrap(perr, "**errstack**14") // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, process_errlist[i], false) + } + process_errlist_string := []string{} + for _, e := range process_errlist { + process_errlist_string = append(process_errlist_string, e.Error()) + } + return nil, nil, nil, errors.Errorf("process data failed: %+v **errstack**21", strings.Join(process_errlist_string, "/e/")) + } + if nodes == nil || edges == nil { + err := errors.New("nodes or edges is nil **errstack**24") // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, err, false) + return nil, nil, nil, err + } + + combos := make([]map[string]string, 0) + for _, node := range nodes.Nodes { + if node.Type == "host" { + combos = append(combos, map[string]string{ + "id": node.UUID, + "label": node.UUID, + }) + } } - return nodes, edges, combos, nil + return nodes.Nodes, edges.Edges, combos, nil } func CustomTopoListService(query *response.PaginationQ) ([]*mysqlmanager.Topo_configuration, int, error) { diff --git a/server/service/periodcollect.go b/server/service/periodcollect.go index ec652d8..bce37a6 100644 --- a/server/service/periodcollect.go +++ b/server/service/periodcollect.go @@ -15,12 +15,12 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/graph" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology-server/processor" + "gitee.com/openeuler/PilotGo-plugin-topology-server/generator" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/pkg/errors" ) -func PeriodCollectWorking(batch []string, noderules [][]mysqlmanager.Filter_rule) { +func InitPeriodCollectWorking(batch []string, noderules [][]mysqlmanager.Filter_rule) { graphperiod := conf.Global_Config.Topo.Period if agentmanager.Global_AgentManager == nil { @@ -54,8 +54,8 @@ func DataProcessWorking(unixtime int64, agentnum int, graphdb graphmanager.Graph var edgeBreakWg sync.WaitGroup _unixtime := strconv.Itoa(int(unixtime)) - dataprocesser := processor.CreateDataProcesser() - nodes, edges, collect_errlist, process_errlist := dataprocesser.ProcessData(agentnum, tagrules, noderules) + topogenerator := generator.CreateTopoGenerator(tagrules, noderules) + nodes, edges, collect_errlist, process_errlist := topogenerator.ProcessingData(agentnum) if len(collect_errlist) != 0 { for i, cerr := range collect_errlist { collect_errlist[i] = errors.Wrap(cerr, "**errstack**3") // err top @@ -84,21 +84,6 @@ func DataProcessWorking(unixtime int64, agentnum int, graphdb graphmanager.Graph return nil, nil, nil, err } - if len(noderules) != 0 { - combos := make([]map[string]string, 0) - - for _, node := range nodes.Nodes { - if node.Type == "host" { - combos = append(combos, map[string]string{ - "id": node.UUID, - "label": node.UUID, - }) - } - } - - return nodes.Nodes, edges.Edges, combos, nil - } - if graphmanager.Global_GraphDB == nil { err := errors.New("Global_GraphDB is nil **errstackfatal**0") // err top errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) diff --git a/server/service/multihost.go b/server/service/public/multihost.go similarity index 99% rename from server/service/multihost.go rename to server/service/public/multihost.go index c2782b3..2f81822 100755 --- a/server/service/multihost.go +++ b/server/service/public/multihost.go @@ -1,4 +1,4 @@ -package service +package public import ( "fmt" diff --git a/server/service/multihostentire.go b/server/service/public/multihostentire.go similarity index 99% rename from server/service/multihostentire.go rename to server/service/public/multihostentire.go index eabea37..43f4e54 100755 --- a/server/service/multihostentire.go +++ b/server/service/public/multihostentire.go @@ -1,4 +1,4 @@ -package service +package public // import ( // "fmt" diff --git a/server/service/singlehost.go b/server/service/public/singlehost.go similarity index 99% rename from server/service/singlehost.go rename to server/service/public/singlehost.go index 97dc586..c95d89a 100755 --- a/server/service/singlehost.go +++ b/server/service/public/singlehost.go @@ -1,4 +1,4 @@ -package service +package public // import ( // "strings" diff --git a/server/service/singlehosttree.go b/server/service/public/singlehosttree.go similarity index 99% rename from server/service/singlehosttree.go rename to server/service/public/singlehosttree.go index 2b4a731..c489496 100755 --- a/server/service/singlehosttree.go +++ b/server/service/public/singlehosttree.go @@ -1,4 +1,4 @@ -package service +package public import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/db/graphmanager" -- Gitee