From df8b5d9002209808e44e984ab1c748d64638e0e0 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Thu, 26 Oct 2023 11:08:40 +0800 Subject: [PATCH] add atomic to agent_node_count variable in data process --- server/agentmanager/mach_agent.go | 6 ++---- server/dao/neo4j.go | 2 +- server/meta/node.go | 9 ++------- server/processor/processor.go | 14 ++++++-------- 4 files changed, 11 insertions(+), 20 deletions(-) diff --git a/server/agentmanager/mach_agent.go b/server/agentmanager/mach_agent.go index 94501dd..39d1b27 100644 --- a/server/agentmanager/mach_agent.go +++ b/server/agentmanager/mach_agent.go @@ -44,10 +44,8 @@ func (t *Topoclient) DeleteAgent(uuid string) { } // 获取运行状态agent的数目 -func (t *Topoclient) GetRunningAgentNumber() int { - var agent_count int - - agent_count = 0 +func (t *Topoclient) GetRunningAgentNumber() int32 { + var agent_count int32 Topo.AgentMap.Range(func(key, value interface{}) bool { agent := value.(*Agent_m) diff --git a/server/dao/neo4j.go b/server/dao/neo4j.go index 80c4908..3070750 100644 --- a/server/dao/neo4j.go +++ b/server/dao/neo4j.go @@ -211,7 +211,7 @@ func PeriodProcessNeo4j(unixtime int64, agentnum int) { } nodeUuidWg.Wait() - for _, _edges := range utils.SplitEdgesByBreakpoint(edges.Edges, _agentnum) { + for _, _edges := range utils.SplitEdgesByBreakpoint(edges.Edges, int(_agentnum)) { __edges := _edges edgeBreakWg.Add(1) go func(___edges []*meta.Edge) { diff --git a/server/meta/node.go b/server/meta/node.go index 5827cf1..d109525 100755 --- a/server/meta/node.go +++ b/server/meta/node.go @@ -8,7 +8,7 @@ import ( ) type Nodes struct { - Lock sync.Mutex + Lock sync.Mutex Lookup map[string]*Node LookupByType map[string][]*Node LookupByUUID map[string][]*Node @@ -25,18 +25,13 @@ type Node struct { func (ns *Nodes) Add(node *Node) { ns.Lock.Lock() + defer ns.Lock.Unlock() if _, ok := ns.Lookup[node.ID]; !ok { - // for k, v := range node.Attrs { - // if _, ok := old_n.Attrs[k]; !ok { - // old_n.Attrs[k] = v - // } - // } ns.Lookup[node.ID] = node ns.LookupByType[node.Type] = append(ns.LookupByType[node.Type], node) ns.LookupByUUID[node.UUID] = append(ns.LookupByUUID[node.UUID], node) ns.Nodes = append(ns.Nodes, node) } - ns.Lock.Unlock() } func (ns *Nodes) Remove(node *Node) error { diff --git a/server/processor/processor.go b/server/processor/processor.go index 0b708a1..4479e46 100644 --- a/server/processor/processor.go +++ b/server/processor/processor.go @@ -5,6 +5,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" @@ -16,8 +17,7 @@ import ( type DataProcesser struct{} -var agent_node_count int -var agent_node_count_rwlock *sync.RWMutex +var agent_node_count int32 func CreateDataProcesser() *DataProcesser { return &DataProcesser{} @@ -41,8 +41,6 @@ func (d *DataProcesser) Process_data() (*meta.Nodes, *meta.Edges, []error, []err } var wg sync.WaitGroup - agent_node_count = 0 - agent_node_count_rwlock = &sync.RWMutex{} var collect_errorlist []error var process_errorlist []error @@ -75,7 +73,7 @@ func (d *DataProcesser) Process_data() (*meta.Nodes, *meta.Edges, []error, []err } for { - if agent_node_count == agent_count { + if atomic.LoadInt32(&agent_node_count) == agent_count { break } } @@ -92,6 +90,8 @@ func (d *DataProcesser) Process_data() (*meta.Nodes, *meta.Edges, []error, []err ) wg.Wait() + atomic.StoreInt32(&agent_node_count, int32(0)) + elapse := time.Since(start) fmt.Fprintf(agentmanager.Topo.Out, "\033[32mtopo server 采集数据处理时间\033[0m: %v\n", elapse) @@ -194,9 +194,7 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes nodes.Add(iface_node) } - agent_node_count_rwlock.Lock() - agent_node_count++ - agent_node_count_rwlock.Unlock() + atomic.AddInt32(&agent_node_count, int32(1)) return nil } -- Gitee