diff --git a/server/dao/neo4j.go b/server/dao/neo4j.go index 41fe978ece0701aba9be36d33b35ddce35716df2..80c4908390a7ec36a66084baaf526cb93b3b7a01 100644 --- a/server/dao/neo4j.go +++ b/server/dao/neo4j.go @@ -3,10 +3,14 @@ package dao import ( "fmt" "os" + "sync" + "time" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" + "gitee.com/openeuler/PilotGo-plugin-topology-server/meta" "gitee.com/openeuler/PilotGo-plugin-topology-server/processor" + "gitee.com/openeuler/PilotGo-plugin-topology-server/utils" "github.com/neo4j/neo4j-go-driver/v4/neo4j" "github.com/pkg/errors" ) @@ -50,7 +54,7 @@ func (n *Neo4jclient) Entity_create(cypher string, params map[string]interface{} }) if err != nil { - err = errors.Errorf("neo4j writetransaction failed: %s", err.Error()) + err = errors.Errorf("neo4j writetransaction failed: %s **9", err.Error()) return err } @@ -119,8 +123,13 @@ func (n *Neo4jclient) Relation_query(cypher string) ([]neo4j.Relationship, error return list, err } -func PeriodProcessNeo4j(unixtime int64) { - var cqlIN string +func PeriodProcessNeo4j(unixtime int64, agentnum int) { + start := time.Now() + + var nodeTypeWg sync.WaitGroup + var nodeUuidWg sync.WaitGroup + var edgeBreakWg sync.WaitGroup + _unixtime := unixtime _neo4j := CreateNeo4j(conf.Global_config.Neo4j.Addr, conf.Global_config.Neo4j.Username, conf.Global_config.Neo4j.Password, conf.Global_config.Neo4j.DB) @@ -150,42 +159,89 @@ func PeriodProcessNeo4j(unixtime int64) { } } - for _, node := range nodes.Nodes { - if len(node.Metrics) == 0 { - cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'} set node:'%s')", - node.Type, unixtime, node.ID, node.Name, node.UUID) - } else { - cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'}) set node:`%s`, node += $metrics", - node.Type, unixtime, node.ID, node.Name, node.UUID) - } + // TODO: 临时获取运行状态agent的数目 + _agentnum := agentmanager.Topo.GetRunningAgentNumber() + if _agentnum <= 0 { + err := errors.New("no running agent **warn**2") // err top + agentmanager.Topo.ErrCh <- err + return + } - params := map[string]interface{}{ - "metrics": node.Metrics, - } + for _, nodesByUUID := range nodes.LookupByUUID { + nodesbyuuid := nodesByUUID + + nodeUuidWg.Add(1) + go func(_nodesbyuuid []*meta.Node) { + defer nodeUuidWg.Done() + + // TODO: 根据默认断点数拆分nodes + for _, _nodes := range utils.SplitNodesByBreakpoint(_nodesbyuuid, 10) { + __nodes := _nodes + nodeTypeWg.Add(1) + go func(_nodesbytype []*meta.Node) { + defer nodeTypeWg.Done() + + var cqlIN string + + for _, node := range _nodesbytype { + _node := node + if len(_node.Metrics) == 0 { + cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'} set node:'%s')", + _node.Type, _unixtime, _node.ID, _node.Name, _node.UUID) + } else { + cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'}) set node:`%s`, node += $metrics", + _node.Type, _unixtime, _node.ID, _node.Name, _node.UUID) + } + + params := map[string]interface{}{ + "metrics": _node.Metrics, + } + + err := _neo4j.Entity_create(cqlIN, params) + if err != nil { + err = errors.Wrapf(err, "create neo4j node failed; %s **warn**2", cqlIN) // err top + agentmanager.Topo.ErrCh <- err + } + } + }(__nodes) + } + nodeTypeWg.Wait() - err := _neo4j.Entity_create(cqlIN, params) - if err != nil { - err = errors.Wrapf(err, "create neo4j node failed **warn**2") // err top - agentmanager.Topo.ErrCh <- err - } + }(nodesbyuuid) + } + nodeUuidWg.Wait() + + for _, _edges := range utils.SplitEdgesByBreakpoint(edges.Edges, _agentnum) { + __edges := _edges + edgeBreakWg.Add(1) + go func(___edges []*meta.Edge) { + defer edgeBreakWg.Done() + + var cqlIN string + + for _, _edge := range ___edges { + if len(_edge.Metrics) == 0 { + cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst)", + _unixtime, _edge.Src, _unixtime, _edge.Dst, _edge.Type, _unixtime, _edge.ID, _edge.Dir) + } else { + cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst) set r += $metrics", + _unixtime, _edge.Src, _unixtime, _edge.Dst, _edge.Type, _unixtime, _edge.ID, _edge.Dir) + } + params := map[string]interface{}{ + "metrics": _edge.Metrics, + } + + err := _neo4j.Entity_create(cqlIN, params) + if err != nil { + err = errors.Wrapf(err, "create neo4j edge failed **warn**2") // err top + agentmanager.Topo.ErrCh <- err + } + } + }(__edges) } - for _, edge := range edges.Edges { - if len(edge.Metrics) == 0 { - cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst)", - unixtime, edge.Src, unixtime, edge.Dst, edge.Type, unixtime, edge.ID, edge.Dir) - } else { - cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst) set r += $metrics", - unixtime, edge.Src, unixtime, edge.Dst, edge.Type, unixtime, edge.ID, edge.Dir) - } - params := map[string]interface{}{ - "metrics": edge.Metrics, - } + edgeBreakWg.Wait() - err := _neo4j.Entity_create(cqlIN, params) - if err != nil { - err = errors.Wrapf(err, "create neo4j edge failed **warn**2") // err top - agentmanager.Topo.ErrCh <- err - } - } + elapse := time.Since(start) + fmt.Fprintf(agentmanager.Topo.Out, "\033[32mtopo server 数据库写入时间\033[0m: %v\n", elapse) }