diff --git a/server/agentmanager/PAgentMap.go b/server/agentmanager/PAgentMap.go index 5281df9253f6e619f9eee29ce564178ae794608a..d4d38efee0b03afe59c707874d7eac2e598a4ada 100644 --- a/server/agentmanager/PAgentMap.go +++ b/server/agentmanager/PAgentMap.go @@ -7,6 +7,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" "github.com/pkg/errors" @@ -30,13 +31,20 @@ func WaitingForHandshake() { } func Wait4TopoServerReady() { + defer global.Global_wg.Done() + global.Global_wg.Add(1) for { - url := "http://" + conf.Global_Config.Topo.Addr + "/plugin_manage/info" - resp, err := httputils.Get(url, nil) - if err == nil && resp != nil && resp.StatusCode == http.StatusOK { + select { + case <-global.Global_cancelCtx.Done(): break + default: + url := "http://" + conf.Global_Config.Topo.Addr + "/plugin_manage/info" + resp, err := httputils.Get(url, nil) + if err == nil && resp != nil && resp.StatusCode == http.StatusOK { + break + } + time.Sleep(100 * time.Millisecond) } - time.Sleep(100 * time.Millisecond) } } diff --git a/server/db/dbmanager.go b/server/db/dbmanager.go index e71844f3ee52fc345d0cc2be266f3fe8729dca73..046d41f39d04107738c110bd2b6c8acdf1df5837 100644 --- a/server/db/dbmanager.go +++ b/server/db/dbmanager.go @@ -19,7 +19,7 @@ import ( func InitDB() { if conf.Global_Config.Topo.GraphDB != "" { initGraphDB() - go ClearGraphData(conf.Global_Config.Neo4j.Retention) + ClearGraphData(conf.Global_Config.Neo4j.Retention) } else { err := errors.New("do not save graph data **warn**0") errormanager.ErrorTransmit(pluginclient.Global_Context, err, false) @@ -91,22 +91,33 @@ func ClearGraphData(retention int64) { graphmanager.Global_GraphDB.ClearExpiredData(retention) - for { - current := time.Now() - clear, err := time.Parse("15:04:05", conf.Global_Config.Neo4j.Cleartime) - if err != nil { - logger.Error("ClearGraphData time parse error: %s, %s", err.Error(), conf.Global_Config.Neo4j.Cleartime) - } + global.Global_wg.Add(1) + go func() { + defer global.Global_wg.Done() + for { + select { + case <-global.Global_cancelCtx.Done(): + return + default: + current := time.Now() + clear, err := time.Parse("15:04:05", conf.Global_Config.Neo4j.Cleartime) + if err != nil { + logger.Error("ClearGraphData time parse error: %s, %s", err.Error(), conf.Global_Config.Neo4j.Cleartime) + } - next := time.Date(current.Year(), current.Month(), current.Day()+1, clear.Hour(), clear.Minute(), clear.Second(), 0, current.Location()) - if next.Before(current) { - next = next.Add(24 * time.Hour) - } + next := time.Date(current.Year(), current.Month(), current.Day()+1, clear.Hour(), clear.Minute(), clear.Second(), 0, current.Location()) + if next.Before(current) { + next = next.Add(24 * time.Hour) + } - timer := time.NewTimer(next.Sub(current)) + timer := time.NewTimer(next.Sub(current)) - <-timer.C + <-timer.C + + graphmanager.Global_GraphDB.ClearExpiredData(retention) + } + + } + }() - graphmanager.Global_GraphDB.ClearExpiredData(retention) - } } diff --git a/server/global/close.go b/server/global/close.go index f5f4cc4d2fe06f96305817ab6321d6c50dc94155..d60843a9ccadd00d348d92efcacd32f14f02265b 100644 --- a/server/global/close.go +++ b/server/global/close.go @@ -25,4 +25,8 @@ func Close() { Global_influx_client.Close() logger.Info("close the connection to influx\n") } + + Global_cancelFunc() + + Global_wg.Wait() } diff --git a/server/global/global.go b/server/global/global.go index 3a746d446ed6a7e4e4c53603e241894cf4db2e01..63f93453ac38eb123f2528677da8b4cd426753bd 100755 --- a/server/global/global.go +++ b/server/global/global.go @@ -1,9 +1,12 @@ package global import ( + "context" + "sync" + "github.com/go-redis/redis/v8" - "github.com/neo4j/neo4j-go-driver/v4/neo4j" influx "github.com/influxdata/influxdb-client-go/v2" + "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) var ( @@ -12,6 +15,13 @@ var ( DEFAULT_TAGS []string ) +var Global_wg sync.WaitGroup + +var ( + RootContext = context.Background() + Global_cancelCtx, Global_cancelFunc = context.WithCancel(context.Background()) +) + var ( Global_graph_database string diff --git a/server/handler/router.go b/server/handler/router.go index 03a9967291cfaf499207f173fd4e3e485ff742bb..b236cb919fdd4df1cb1cfe4645d7c85f703c717f 100644 --- a/server/handler/router.go +++ b/server/handler/router.go @@ -9,7 +9,9 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gin-gonic/gin" "github.com/pkg/errors" ) @@ -21,27 +23,48 @@ func InitWebServer() { return } + engine := gin.Default() + gin.SetMode(gin.ReleaseMode) + pluginclient.Global_Client.RegisterHandlers(engine) + InitRouter(engine) + StaticRouter(engine) + + webserver := &http.Server{ + Addr: conf.Global_Config.Topo.Addr, + Handler: engine, + } + + global.Global_wg.Add(1) go func() { - engine := gin.Default() - gin.SetMode(gin.ReleaseMode) - pluginclient.Global_Client.RegisterHandlers(engine) - InitRouter(engine) - StaticRouter(engine) + defer global.Global_wg.Done() if conf.Global_Config.Topo.Https_enabled { - err := engine.RunTLS(conf.Global_Config.Topo.Addr, conf.Global_Config.Topo.Public_certificate, conf.Global_Config.Topo.Private_key) - if err != nil { + if err := webserver.ListenAndServeTLS(conf.Global_Config.Topo.Addr, conf.Global_Config.Topo.Public_certificate, conf.Global_Config.Topo.Private_key); err != nil { err = errors.Errorf("%s, addr: %s **errstackfatal**2", err.Error(), conf.Global_Config.Topo.Addr) // err top errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) } + } + if err := webserver.ListenAndServe(); err != nil { + err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) + } + }() + + go func() { + <-global.Global_cancelCtx.Done() + + logger.Info("shutting down web server...") + + ctx, cancel := context.WithTimeout(global.RootContext, 1*time.Second) + defer cancel() + + if err := webserver.Shutdown(ctx); err != nil { + logger.Error("web server shutdown error: %s", err.Error()) } else { - err := engine.Run(conf.Global_Config.Topo.Addr) - if err != nil { - err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top - errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) - } + logger.Info("web server stopped gracefully") } }() + } func InitRouter(router *gin.Engine) { diff --git a/server/pluginclient/pluginClient.go b/server/pluginclient/pluginClient.go index 75171428cd6a112c0d7e4243c9d52976a490266a..b4b6ca556f73d381ea2e7714804fa71fe1ec2a6f 100644 --- a/server/pluginclient/pluginClient.go +++ b/server/pluginclient/pluginClient.go @@ -1,7 +1,6 @@ package pluginclient import ( - "context" "fmt" "os" "time" @@ -15,8 +14,6 @@ import ( var Global_Client *client.Client -var Global_Context context.Context - func InitPluginClient() { if conf.Global_Config != nil && conf.Global_Config.Topo.Https_enabled { PluginInfo.Url = fmt.Sprintf("https://%s", conf.Global_Config.Topo.Addr_target) @@ -33,8 +30,6 @@ func InitPluginClient() { GetTags() - Global_Context = context.Background() - go uploadResource() } diff --git a/server/service/periodcollect.go b/server/service/periodcollect.go index 439190cacdea8747ff9d50672d6b1a5ca5d6c0c8..110575226236c77b948b9043d3db6933f4e654b3 100644 --- a/server/service/periodcollect.go +++ b/server/service/periodcollect.go @@ -13,9 +13,10 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/db/redismanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology/server/generator" + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/graph" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology/server/generator" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/pkg/errors" ) @@ -43,13 +44,21 @@ func InitPeriodCollectWorking(batch []string, noderules [][]mysqlmanager.Filter_ agentmanager.Global_AgentManager.UpdateMachineList() + global.Global_wg.Add(1) go func(_interval int64, _gdb graphmanager.GraphdbIface, _noderules [][]mysqlmanager.Filter_rule) { + defer global.Global_wg.Done() for { - redismanager.Global_Redis.ActiveHeartbeatDetection(batch) - running_agent_num := redismanager.Global_Redis.UpdateTopoRunningAgentList(batch, false) - unixtime_now := time.Now().Unix() - DataProcessWorking(unixtime_now, running_agent_num, _gdb, nil, _noderules) - time.Sleep(time.Duration(_interval) * time.Second) + select { + case <-global.Global_cancelCtx.Done(): + logger.Info("cancelCtx is done, exit period collect goroutine") + return + default: + redismanager.Global_Redis.ActiveHeartbeatDetection(batch) + running_agent_num := redismanager.Global_Redis.UpdateTopoRunningAgentList(batch, false) + unixtime_now := time.Now().Unix() + DataProcessWorking(unixtime_now, running_agent_num, _gdb, nil, _noderules) + time.Sleep(time.Duration(_interval) * time.Second) + } } }(graphperiod, graphmanager.Global_GraphDB, noderules) }