diff --git a/server/dao/neo4j.go b/server/dao/neo4j.go index c36372984c51db54387c2c085c45840e8542d08b..60fb9887225d71827e29e1a7e56e7a3282df074e 100644 --- a/server/dao/neo4j.go +++ b/server/dao/neo4j.go @@ -3,6 +3,7 @@ package dao import ( "fmt" "os" + "strconv" "sync" "time" @@ -53,33 +54,70 @@ func (n *Neo4jclient) Entity_create(cypher string, params map[string]interface{} }) if err != nil { - err = errors.Errorf("neo4j writetransaction failed: %s **9", err.Error()) + err = errors.Errorf("neo4j writetransaction failed: %s, %s **9", err.Error(), cypher) return err } return nil } -func (n *Neo4jclient) Node_query(cypher string, driver neo4j.Driver) ([]neo4j.Node, error) { - var list []neo4j.Node +func (n *Neo4jclient) General_query(cypher string, varia string, driver neo4j.Driver) ([]string, error) { + var list []string session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead, DatabaseName: n.DB}) defer session.Close() _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) { result, err := tx.Run(cypher, nil) if err != nil { - fmt.Println("NodeQuery failed: ", err) + err = errors.Errorf("neo4j query failed: %s, %s **2", err.Error(), cypher) return nil, err } for result.Next() { record := result.Record() - if value, ok := record.Get("n"); ok { - node := value.(neo4j.Node) - list = append(list, node) + if value, ok := record.Get(varia); ok { + _value := value.([]interface{}) + for _, v := range _value { + list = append(list, v.(string)) + } } } - if err = result.Err(); err != nil { - fmt.Println("iterate node result failed: ", err) + + if err := result.Err(); err != nil { + err = errors.Errorf("iterate result failed: %s, %s **1", err.Error(), cypher) + return nil, err + } + + return list, result.Err() + }) + + if err != nil { + err = errors.Errorf("query Readtransaction error: %s, %s **26", err.Error(), cypher) + } + + return list, nil +} + +func (n *Neo4jclient) Node_query(cypher string, varia string, driver neo4j.Driver) ([]*meta.Node, error) { + var list []*meta.Node + session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead, DatabaseName: n.DB}) + defer session.Close() + _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) { + result, err := tx.Run(cypher, nil) + if err != nil { + err = errors.Errorf("neo4j query failed: %s, %s **2", err.Error(), cypher) + return nil, err + } + + for result.Next() { + record := result.Record() + if value, ok := record.Get(varia); ok { + neo4jnode := value.(neo4j.Node) + toponode := utils.Neo4jnodeToToponode(neo4jnode) + list = append(list, toponode) + } + } + if err := result.Err(); err != nil { + err = errors.Errorf("iterate node result failed: %s, %s **1", err.Error(), cypher) return nil, err } @@ -87,37 +125,38 @@ func (n *Neo4jclient) Node_query(cypher string, driver neo4j.Driver) ([]neo4j.No }) if err != nil { - fmt.Println("node Readtransaction error:", err) + err = errors.Errorf("node Readtransaction error: %s, %s **24", err.Error(), cypher) } return list, err } -func (n *Neo4jclient) Relation_query(cypher string, driver neo4j.Driver) ([]neo4j.Relationship, error) { - var list []neo4j.Relationship +func (n *Neo4jclient) Relation_query(cypher string, varia string, driver neo4j.Driver) ([]*meta.Edge, error) { + var list []*meta.Edge session := driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead, DatabaseName: n.DB}) defer session.Close() _, err := session.ReadTransaction(func(tx neo4j.Transaction) (interface{}, error) { result, err := tx.Run(cypher, nil) if err != nil { - fmt.Println("RelationshipQuery failed: ", err) + err = errors.Errorf("RelationshipQuery failed: %s, %s **2", err.Error(), cypher) return nil, err } for result.Next() { record := result.Record() - if value, ok := record.Get("r"); ok { + if value, ok := record.Get(varia); ok { relationship := value.(neo4j.Relationship) - list = append(list, relationship) + toporelation := utils.Neo4jrelaToToporela(relationship) + list = append(list, toporelation) } } if err = result.Err(); err != nil { - fmt.Println("iterate relation result failed: ", err) + err = errors.Errorf("iterate relation result failed: %s, %s **1", err.Error(), cypher) return nil, err } return list, result.Err() }) if err != nil { - fmt.Println("relation Readtransaction error:", err) + err = errors.Errorf("relation Readtransaction error: %s, %s **22", err.Error(), cypher) } return list, err } @@ -128,7 +167,7 @@ func PeriodProcessNeo4j(unixtime int64, agentnum int) { var nodeTypeWg sync.WaitGroup var nodeUuidWg sync.WaitGroup var edgeBreakWg sync.WaitGroup - _unixtime := unixtime + _unixtime := strconv.Itoa(int(unixtime)) dri, err := Neo4j.Create_driver() if err != nil { @@ -183,10 +222,10 @@ func PeriodProcessNeo4j(unixtime int64, agentnum int) { for _, node := range _nodesbytype { _node := node if len(_node.Metrics) == 0 { - cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'} set node:'%s')", + cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:'%s', nid:'%s', name:'%s'} set node:'%s')", _node.Type, _unixtime, _node.ID, _node.Name, _node.UUID) } else { - cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'}) set node:`%s`, node += $metrics", + cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:'%s', nid:'%s', name:'%s'}) set node:`%s`, node += $metrics", _node.Type, _unixtime, _node.ID, _node.Name, _node.UUID) } @@ -218,11 +257,11 @@ func PeriodProcessNeo4j(unixtime int64, agentnum int) { for _, _edge := range ___edges { if len(_edge.Metrics) == 0 { - cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst)", - _unixtime, _edge.Src, _unixtime, _edge.Dst, _edge.Type, _unixtime, _edge.ID, _edge.Dir) + cqlIN = fmt.Sprintf("match (src {unixtime:'%s', nid:'%s'}), (dst {unixtime:'%s', nid:'%s'}) create (src)-[r:`%s` {unixtime:'%s', rid:'%s', dir:'%s', src:'%s', dst:'%s'}]->(dst)", + _unixtime, _edge.Src, _unixtime, _edge.Dst, _edge.Type, _unixtime, _edge.ID, _edge.Dir, _edge.Src, _edge.Dst) } else { - cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst) set r += $metrics", - _unixtime, _edge.Src, _unixtime, _edge.Dst, _edge.Type, _unixtime, _edge.ID, _edge.Dir) + cqlIN = fmt.Sprintf("match (src {unixtime:'%s', nid:'%s'}), (dst {unixtime:'%s', nid:'%s'}) create (src)-[r:`%s` {unixtime:'%s', rid:'%s', dir:'%s', src:'%s', dst:'%s'}]->(dst) set r += $metrics", + _unixtime, _edge.Src, _unixtime, _edge.Dst, _edge.Type, _unixtime, _edge.ID, _edge.Dir, _edge.Src, _edge.Dst) } params := map[string]interface{}{ "metrics": _edge.Metrics,