From be1b38b7359e83a3b4bb4803ab1c7a15d49971c0 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Wed, 25 Oct 2023 16:39:17 +0800 Subject: [PATCH] improve data processing --- .gitignore | 1 + agent/main.go | 4 +-- server/agentmanager/topoclient.go | 2 +- server/dao/init.go | 11 ++++++- server/go.mod | 2 +- server/processor/processor.go | 50 +++++++++++++------------------ 6 files changed, 35 insertions(+), 35 deletions(-) diff --git a/.gitignore b/.gitignore index ad26911..007838b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,7 @@ config_agent.yaml build toserver toagent +test # static files server/handler/assets/ diff --git a/agent/main.go b/agent/main.go index 3e17859..ee8b4f6 100755 --- a/agent/main.go +++ b/agent/main.go @@ -11,13 +11,13 @@ import ( ) func main() { - + InitLogger() engine := gin.Default() handler.InitRouter(engine) if err := engine.Run(conf.Config().Topo.Agent_addr); err != nil { - logger.Fatal("failed to run server") + logger.Fatal("failed to run web server") } } diff --git a/server/agentmanager/topoclient.go b/server/agentmanager/topoclient.go index 885aeb4..0147c7b 100644 --- a/server/agentmanager/topoclient.go +++ b/server/agentmanager/topoclient.go @@ -43,7 +43,7 @@ func (t *Topoclient) InitMachineList() { statuscode := resp.StatusCode if statuscode != 200 { - err = errors.Errorf("http返回状态码异常: %d **fatal**2", statuscode) // err top + err = errors.Errorf("http返回状态码异常: %d; %s **fatal**2", statuscode, url) // err top t.ErrCh <- err t.ErrGroup.Add(1) t.ErrGroup.Wait() diff --git a/server/dao/init.go b/server/dao/init.go index 0d978b3..40653cc 100755 --- a/server/dao/init.go +++ b/server/dao/init.go @@ -13,15 +13,24 @@ import ( func InitDB() { var unixtime_now int64 var period int64 + var runningAgents int period = conf.Global_config.Topo.Period switch conf.Global_config.Topo.Database { case "neo4j": go func(pt int64) { for true { + // if runningAgents = agentmanager.Topo.GetRunningAgentNumber(); runningAgents <= 0 { + // err := errors.New("no running agent **warn**1") + // agentmanager.Topo.ErrCh <- err + + // time.Sleep(5 * time.Second) + // continue + // } + unixtime_now = time.Now().Unix() - PeriodProcessNeo4j(unixtime_now) + PeriodProcessNeo4j(unixtime_now, runningAgents) time.Sleep(time.Duration(period) * time.Second) diff --git a/server/go.mod b/server/go.mod index ca85ff1..1dd1505 100644 --- a/server/go.mod +++ b/server/go.mod @@ -45,4 +45,4 @@ require ( gopkg.in/yaml.v2 v2.4.0 ) -// replace github.com/pkg/errors v0.9.1 => /usr/local/lib/errors@v1.2.3 +// replace github.com/pkg/errors v0.9.1 => /home/wjq/桌面/study/tool/errors@v1.2.3 diff --git a/server/processor/processor.go b/server/processor/processor.go index 3cc0ba0..0b708a1 100644 --- a/server/processor/processor.go +++ b/server/processor/processor.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" "sync" + "time" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/collector" @@ -23,28 +24,28 @@ func CreateDataProcesser() *DataProcesser { } func (d *DataProcesser) Process_data() (*meta.Nodes, *meta.Edges, []error, []error) { + start := time.Now() nodes := &meta.Nodes{ + Lock: sync.Mutex{}, Lookup: make(map[string]*meta.Node, 0), LookupByType: make(map[string][]*meta.Node, 0), LookupByUUID: make(map[string][]*meta.Node, 0), Nodes: make([]*meta.Node, 0), } edges := &meta.Edges{ + Lock: sync.Mutex{}, + Lookup: sync.Map{}, SrcToDsts: make(map[string][]string, 0), DstToSrcs: make(map[string][]string, 0), Edges: make([]*meta.Edge, 0), } var wg sync.WaitGroup - create_node_rwlock := &sync.RWMutex{} agent_node_count = 0 agent_node_count_rwlock = &sync.RWMutex{} var collect_errorlist []error var process_errorlist []error - // 获取运行状态agent的数目 - agent_count := agentmanager.Topo.GetRunningAgentNumber() - datacollector := collector.CreateDataCollector() collect_errorlist = datacollector.Collect_instant_data() if len(collect_errorlist) != 0 { @@ -55,16 +56,20 @@ func (d *DataProcesser) Process_data() (*meta.Nodes, *meta.Edges, []error, []err // return nil, nil, collect_errorlist, nil } + // TODO: 临时获取运行状态agent的数目 + agent_count := agentmanager.Topo.GetRunningAgentNumber() + agentmanager.Topo.AgentMap.Range( func(key, value interface{}) bool { wg.Add(1) - go func() { + agent := value.(*agentmanager.Agent_m) + + go func(_agent *agentmanager.Agent_m, _nodes *meta.Nodes, _edges *meta.Edges) { defer wg.Done() - agent := value.(*agentmanager.Agent_m) - if agent.Host_2 != nil && agent.Processes_2 != nil && agent.Netconnections_2 != nil { - err := d.Create_node_entities(agent, nodes, create_node_rwlock) + if _agent.Host_2 != nil && _agent.Processes_2 != nil && _agent.Netconnections_2 != nil { + err := d.Create_node_entities(_agent, _nodes) if err != nil { process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) } @@ -73,27 +78,27 @@ func (d *DataProcesser) Process_data() (*meta.Nodes, *meta.Edges, []error, []err if agent_node_count == agent_count { break } - // ttcode - // fmt.Printf("\033[32m agent_node_count\033[0m: %d\n", agent_node_count) - // fmt.Printf("\033[32magent_count\033[0m: %d\n", agent_count) } - err = d.Create_edge_entities(agent, edges, nodes) + err = d.Create_edge_entities(_agent, _edges, _nodes) if err != nil { process_errorlist = append(process_errorlist, errors.Wrap(err, "**2")) } } - }() + }(agent, nodes, edges) return true }, ) wg.Wait() + elapse := time.Since(start) + fmt.Fprintf(agentmanager.Topo.Out, "\033[32mtopo server 采集数据处理时间\033[0m: %v\n", elapse) + return nodes, edges, collect_errorlist, process_errorlist } -func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes *meta.Nodes, mu *sync.RWMutex) error { +func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes *meta.Nodes) error { host_node := &meta.Node{ ID: fmt.Sprintf("%s_%s_%s", agent.UUID, meta.NODE_HOST, agent.IP), Name: agent.UUID, @@ -102,9 +107,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.HostToMap(agent.Host_2, &agent.AddrInterfaceMap_2), } - mu.Lock() nodes.Add(host_node) - mu.Unlock() for _, process := range agent.Processes_2 { proc_node := &meta.Node{ @@ -115,9 +118,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.ProcessToMap(process), } - mu.Lock() nodes.Add(proc_node) - mu.Unlock() for _, thread := range process.Threads { thre_node := &meta.Node{ @@ -128,9 +129,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.ThreadToMap(&thread), } - mu.Lock() nodes.Add(thre_node) - mu.Unlock() } // for _, net := range process.NetIOCounters { @@ -156,9 +155,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.NetToMap(net), } - mu.Lock() nodes.Add(net_node) - mu.Unlock() } for _, disk := range agent.Disks_2 { @@ -170,9 +167,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.DiskToMap(disk), } - mu.Lock() nodes.Add(disk_node) - mu.Unlock() } for _, cpu := range agent.Cpus_2 { @@ -184,9 +179,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.CpuToMap(cpu), } - mu.Lock() nodes.Add(cpu_node) - mu.Unlock() } for _, ifaceio := range agent.NetIOcounters_2 { @@ -198,9 +191,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes Metrics: *utils.InterfaceToMap(ifaceio), } - mu.Lock() nodes.Add(iface_node) - mu.Unlock() } agent_node_count_rwlock.Lock() @@ -228,10 +219,9 @@ func (d *DataProcesser) Create_edge_entities(agent *agentmanager.Agent_m, edges } } - // TODO: edge实例重复 for _, obj := range nodes_map[meta.NODE_HOST] { for _, sub := range nodes_map[meta.NODE_PROCESS] { - if sub.Metrics["Pid"] == "1" && sub.UUID == obj.UUID { + if sub.UUID == obj.UUID && sub.Metrics["Pid"] == "1" { belong_edge := &meta.Edge{ ID: fmt.Sprintf("%s_%s_%s", sub.ID, meta.EDGE_BELONG, obj.ID), Type: meta.EDGE_BELONG, -- Gitee