diff --git a/conf/config_server.yaml.templete b/conf/config_server.yaml.templete index 7bbb2614f87ec88e57454fba0f7c6a111d3ba305..4525f757a990d1aa590c0ee7315a1a37b43758d4 100644 --- a/conf/config_server.yaml.templete +++ b/conf/config_server.yaml.templete @@ -2,7 +2,7 @@ topo: server_addr: "localhost:9991" agent_port: "9992" database: "neo4j" - period: 60 + period: 5 PilotGo: http_addr: "localhost:8888" log: diff --git a/server/conf/config.go b/server/conf/config.go index adaf289f70ee9bd6125060755e33dd3c6d20342a..a0084e66033bbd15f21e030f9a7c4015bb0a2473 100644 --- a/server/conf/config.go +++ b/server/conf/config.go @@ -10,7 +10,7 @@ type TopoConf struct { Server_addr string `yaml:"server_addr"` Agent_port string `yaml:"agent_port"` Database string `yaml:"database"` - Period int `yaml:"period"` + Period int64 `yaml:"period"` } type PilotGoConf struct { diff --git a/server/dao/init.go b/server/dao/init.go index 07a0cc0fc69fdc13eda603a897b9116884ad9a61..0d978b3f8dd93386e9eb4d72fb5f6e31573cc1a0 100755 --- a/server/dao/init.go +++ b/server/dao/init.go @@ -1 +1,41 @@ package dao + +import ( + "os" + "time" + + "github.com/pkg/errors" + + "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" +) + +func InitDB() { + var unixtime_now int64 + var period int64 + period = conf.Global_config.Topo.Period + + switch conf.Global_config.Topo.Database { + case "neo4j": + go func(pt int64) { + for true { + unixtime_now = time.Now().Unix() + + PeriodProcessNeo4j(unixtime_now) + + time.Sleep(time.Duration(period) * time.Second) + + // break + } + }(period) + case "otherDB": + + default: + err := errors.Errorf("unknown database in config_server.yaml: %s **fatal**4", conf.Global_config.Topo.Database) // err top + agentmanager.Topo.ErrCh <- err + agentmanager.Topo.ErrGroup.Add(1) + agentmanager.Topo.ErrGroup.Wait() + close(agentmanager.Topo.ErrCh) + os.Exit(1) + } +} diff --git a/server/dao/neo4j.go b/server/dao/neo4j.go index 18c29e4173029acd7a93e3bed86fca392ed0be57..41fe978ece0701aba9be36d33b35ddce35716df2 100644 --- a/server/dao/neo4j.go +++ b/server/dao/neo4j.go @@ -2,8 +2,13 @@ package dao import ( "fmt" + "os" + "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" + "gitee.com/openeuler/PilotGo-plugin-topology-server/processor" "github.com/neo4j/neo4j-go-driver/v4/neo4j" + "github.com/pkg/errors" ) type Neo4jclient struct { @@ -32,24 +37,27 @@ func (n *Neo4jclient) Close_driver() error { return n.driver.Close() } -func (n *Neo4jclient) Entity_create(cypher string) error { - +func (n *Neo4jclient) Entity_create(cypher string, params map[string]interface{}) error { session := n.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite, DatabaseName: n.DB}) defer session.Close() _, err := session.WriteTransaction(func(tx neo4j.Transaction) (interface{}, error) { - result, err := tx.Run(cypher, nil) + result, err := tx.Run(cypher, params) if err != nil { return nil, err } return result.Consume() }) - return err + if err != nil { + err = errors.Errorf("neo4j writetransaction failed: %s", err.Error()) + return err + } + + return nil } func (n *Neo4jclient) Node_query(cypher string) ([]neo4j.Node, error) { - var list []neo4j.Node session := n.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead, DatabaseName: n.DB}) defer session.Close() @@ -82,7 +90,6 @@ func (n *Neo4jclient) Node_query(cypher string) ([]neo4j.Node, error) { } func (n *Neo4jclient) Relation_query(cypher string) ([]neo4j.Relationship, error) { - var list []neo4j.Relationship session := n.driver.NewSession(neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead, DatabaseName: n.DB}) defer session.Close() @@ -111,3 +118,74 @@ func (n *Neo4jclient) Relation_query(cypher string) ([]neo4j.Relationship, error } return list, err } + +func PeriodProcessNeo4j(unixtime int64) { + var cqlIN string + + _neo4j := CreateNeo4j(conf.Global_config.Neo4j.Addr, conf.Global_config.Neo4j.Username, conf.Global_config.Neo4j.Password, conf.Global_config.Neo4j.DB) + + d, err := _neo4j.Create_driver() + if err != nil { + err := errors.Errorf("create neo4j driver failed: %s **fatal**2", err.Error()) // err top + agentmanager.Topo.ErrCh <- err + agentmanager.Topo.ErrGroup.Add(1) + agentmanager.Topo.ErrGroup.Wait() + close(agentmanager.Topo.ErrCh) + os.Exit(1) + } + _neo4j.driver = d + defer _neo4j.Close_driver() + + dataprocesser := processor.CreateDataProcesser() + nodes, edges, collect_errlist, process_errlist := dataprocesser.Process_data() + if len(collect_errlist) != 0 || len(process_errlist) != 0 { + for i, cerr := range collect_errlist { + collect_errlist[i] = errors.Wrap(cerr, "**warn**3") // err top + agentmanager.Topo.ErrCh <- collect_errlist[i] + } + + for i, perr := range process_errlist { + process_errlist[i] = errors.Wrap(perr, "**warn**8") // err top + agentmanager.Topo.ErrCh <- process_errlist[i] + } + } + + for _, node := range nodes.Nodes { + if len(node.Metrics) == 0 { + cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'} set node:'%s')", + node.Type, unixtime, node.ID, node.Name, node.UUID) + } else { + cqlIN = fmt.Sprintf("create (node:`%s` {unixtime:%d, nid:'%s', name:'%s'}) set node:`%s`, node += $metrics", + node.Type, unixtime, node.ID, node.Name, node.UUID) + } + + params := map[string]interface{}{ + "metrics": node.Metrics, + } + + err := _neo4j.Entity_create(cqlIN, params) + if err != nil { + err = errors.Wrapf(err, "create neo4j node failed **warn**2") // err top + agentmanager.Topo.ErrCh <- err + } + } + + for _, edge := range edges.Edges { + if len(edge.Metrics) == 0 { + cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst)", + unixtime, edge.Src, unixtime, edge.Dst, edge.Type, unixtime, edge.ID, edge.Dir) + } else { + cqlIN = fmt.Sprintf("match (src {unixtime:%d, nid:'%s'}), (dst {unixtime:%d, nid:'%s'}) create (src)-[r:`%s` {unixtime:%d, rid:'%s', dir:%t}]->(dst) set r += $metrics", + unixtime, edge.Src, unixtime, edge.Dst, edge.Type, unixtime, edge.ID, edge.Dir) + } + params := map[string]interface{}{ + "metrics": edge.Metrics, + } + + err := _neo4j.Entity_create(cqlIN, params) + if err != nil { + err = errors.Wrapf(err, "create neo4j edge failed **warn**2") // err top + agentmanager.Topo.ErrCh <- err + } + } +} diff --git a/server/handler/router.go b/server/handler/router.go index 5bf663bebbcb6f01497c0edc05faf21c2f88d115..936ab48b7582f0325af870d7ba9471f488ce67bd 100644 --- a/server/handler/router.go +++ b/server/handler/router.go @@ -14,11 +14,13 @@ import ( ) func InitWebServer() { + // go func() { engine := gin.Default() engine.Use(TimeoutMiddleware()) agentmanager.Topo.Sdkmethod.RegisterHandlers(engine) InitRouter(engine) StaticRouter(engine) + err := engine.Run(conf.Config().Topo.Server_addr) if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top @@ -28,6 +30,7 @@ func InitWebServer() { close(agentmanager.Topo.ErrCh) os.Exit(1) } + // }() } func InitRouter(router *gin.Engine) { diff --git a/server/main.go b/server/main.go index e592915a38163066cf80412ad4a24eb917511d27..cf49d6cc070ee23235a1c8ca91fb31869fdab289 100644 --- a/server/main.go +++ b/server/main.go @@ -4,6 +4,7 @@ import ( "fmt" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/dao" "gitee.com/openeuler/PilotGo-plugin-topology-server/handler" ) @@ -30,12 +31,6 @@ func main() { */ agentmanager.Topo.InitLogger() - /* - init JanusGraph - TODO: 图数据库 - */ - agentmanager.Topo.InitJanusGraph() - /* init machine agent list TODO: 实时更新machine agent、topo agent的状态 @@ -43,8 +38,9 @@ func main() { agentmanager.Topo.InitMachineList() /* - init topo agent status + init database */ + dao.InitDB() /* init web server