From 7765855cd35cda322fc3be2a7b1a8f4b698264a6 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Wed, 30 Oct 2024 20:28:20 +0800 Subject: [PATCH] server: end goroutine gracefully --- server/agentmanager/PAgentMap.go | 16 +++++++--- server/db/dbmanager.go | 41 ++++++++++++++++--------- server/global/close.go | 4 +++ server/global/global.go | 12 +++++++- server/handler/router.go | 47 +++++++++++++++++++++-------- server/pluginclient/pluginClient.go | 5 --- server/service/periodcollect.go | 21 +++++++++---- 7 files changed, 103 insertions(+), 43 deletions(-) diff --git a/server/agentmanager/PAgentMap.go b/server/agentmanager/PAgentMap.go index 5281df9..d4d38ef 100644 --- a/server/agentmanager/PAgentMap.go +++ b/server/agentmanager/PAgentMap.go @@ -7,6 +7,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" "github.com/pkg/errors" @@ -30,13 +31,20 @@ func WaitingForHandshake() { } func Wait4TopoServerReady() { + defer global.Global_wg.Done() + global.Global_wg.Add(1) for { - url := "http://" + conf.Global_Config.Topo.Addr + "/plugin_manage/info" - resp, err := httputils.Get(url, nil) - if err == nil && resp != nil && resp.StatusCode == http.StatusOK { + select { + case <-global.Global_cancelCtx.Done(): break + default: + url := "http://" + conf.Global_Config.Topo.Addr + "/plugin_manage/info" + resp, err := httputils.Get(url, nil) + if err == nil && resp != nil && resp.StatusCode == http.StatusOK { + break + } + time.Sleep(100 * time.Millisecond) } - time.Sleep(100 * time.Millisecond) } } diff --git a/server/db/dbmanager.go b/server/db/dbmanager.go index e71844f..046d41f 100644 --- a/server/db/dbmanager.go +++ b/server/db/dbmanager.go @@ -19,7 +19,7 @@ import ( func InitDB() { if conf.Global_Config.Topo.GraphDB != "" { initGraphDB() - go ClearGraphData(conf.Global_Config.Neo4j.Retention) + ClearGraphData(conf.Global_Config.Neo4j.Retention) } else { err := errors.New("do not save graph data **warn**0") errormanager.ErrorTransmit(pluginclient.Global_Context, err, false) @@ -91,22 +91,33 @@ func ClearGraphData(retention int64) { graphmanager.Global_GraphDB.ClearExpiredData(retention) - for { - current := time.Now() - clear, err := time.Parse("15:04:05", conf.Global_Config.Neo4j.Cleartime) - if err != nil { - logger.Error("ClearGraphData time parse error: %s, %s", err.Error(), conf.Global_Config.Neo4j.Cleartime) - } + global.Global_wg.Add(1) + go func() { + defer global.Global_wg.Done() + for { + select { + case <-global.Global_cancelCtx.Done(): + return + default: + current := time.Now() + clear, err := time.Parse("15:04:05", conf.Global_Config.Neo4j.Cleartime) + if err != nil { + logger.Error("ClearGraphData time parse error: %s, %s", err.Error(), conf.Global_Config.Neo4j.Cleartime) + } - next := time.Date(current.Year(), current.Month(), current.Day()+1, clear.Hour(), clear.Minute(), clear.Second(), 0, current.Location()) - if next.Before(current) { - next = next.Add(24 * time.Hour) - } + next := time.Date(current.Year(), current.Month(), current.Day()+1, clear.Hour(), clear.Minute(), clear.Second(), 0, current.Location()) + if next.Before(current) { + next = next.Add(24 * time.Hour) + } - timer := time.NewTimer(next.Sub(current)) + timer := time.NewTimer(next.Sub(current)) - <-timer.C + <-timer.C + + graphmanager.Global_GraphDB.ClearExpiredData(retention) + } + + } + }() - graphmanager.Global_GraphDB.ClearExpiredData(retention) - } } diff --git a/server/global/close.go b/server/global/close.go index f5f4cc4..d60843a 100644 --- a/server/global/close.go +++ b/server/global/close.go @@ -25,4 +25,8 @@ func Close() { Global_influx_client.Close() logger.Info("close the connection to influx\n") } + + Global_cancelFunc() + + Global_wg.Wait() } diff --git a/server/global/global.go b/server/global/global.go index 3a746d4..63f9345 100755 --- a/server/global/global.go +++ b/server/global/global.go @@ -1,9 +1,12 @@ package global import ( + "context" + "sync" + "github.com/go-redis/redis/v8" - "github.com/neo4j/neo4j-go-driver/v4/neo4j" influx "github.com/influxdata/influxdb-client-go/v2" + "github.com/neo4j/neo4j-go-driver/v4/neo4j" ) var ( @@ -12,6 +15,13 @@ var ( DEFAULT_TAGS []string ) +var Global_wg sync.WaitGroup + +var ( + RootContext = context.Background() + Global_cancelCtx, Global_cancelFunc = context.WithCancel(context.Background()) +) + var ( Global_graph_database string diff --git a/server/handler/router.go b/server/handler/router.go index 03a9967..b236cb9 100644 --- a/server/handler/router.go +++ b/server/handler/router.go @@ -9,7 +9,9 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gin-gonic/gin" "github.com/pkg/errors" ) @@ -21,27 +23,48 @@ func InitWebServer() { return } + engine := gin.Default() + gin.SetMode(gin.ReleaseMode) + pluginclient.Global_Client.RegisterHandlers(engine) + InitRouter(engine) + StaticRouter(engine) + + webserver := &http.Server{ + Addr: conf.Global_Config.Topo.Addr, + Handler: engine, + } + + global.Global_wg.Add(1) go func() { - engine := gin.Default() - gin.SetMode(gin.ReleaseMode) - pluginclient.Global_Client.RegisterHandlers(engine) - InitRouter(engine) - StaticRouter(engine) + defer global.Global_wg.Done() if conf.Global_Config.Topo.Https_enabled { - err := engine.RunTLS(conf.Global_Config.Topo.Addr, conf.Global_Config.Topo.Public_certificate, conf.Global_Config.Topo.Private_key) - if err != nil { + if err := webserver.ListenAndServeTLS(conf.Global_Config.Topo.Addr, conf.Global_Config.Topo.Public_certificate, conf.Global_Config.Topo.Private_key); err != nil { err = errors.Errorf("%s, addr: %s **errstackfatal**2", err.Error(), conf.Global_Config.Topo.Addr) // err top errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) } + } + if err := webserver.ListenAndServe(); err != nil { + err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top + errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) + } + }() + + go func() { + <-global.Global_cancelCtx.Done() + + logger.Info("shutting down web server...") + + ctx, cancel := context.WithTimeout(global.RootContext, 1*time.Second) + defer cancel() + + if err := webserver.Shutdown(ctx); err != nil { + logger.Error("web server shutdown error: %s", err.Error()) } else { - err := engine.Run(conf.Global_Config.Topo.Addr) - if err != nil { - err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top - errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) - } + logger.Info("web server stopped gracefully") } }() + } func InitRouter(router *gin.Engine) { diff --git a/server/pluginclient/pluginClient.go b/server/pluginclient/pluginClient.go index 7517142..b4b6ca5 100644 --- a/server/pluginclient/pluginClient.go +++ b/server/pluginclient/pluginClient.go @@ -1,7 +1,6 @@ package pluginclient import ( - "context" "fmt" "os" "time" @@ -15,8 +14,6 @@ import ( var Global_Client *client.Client -var Global_Context context.Context - func InitPluginClient() { if conf.Global_Config != nil && conf.Global_Config.Topo.Https_enabled { PluginInfo.Url = fmt.Sprintf("https://%s", conf.Global_Config.Topo.Addr_target) @@ -33,8 +30,6 @@ func InitPluginClient() { GetTags() - Global_Context = context.Background() - go uploadResource() } diff --git a/server/service/periodcollect.go b/server/service/periodcollect.go index 439190c..1105752 100644 --- a/server/service/periodcollect.go +++ b/server/service/periodcollect.go @@ -13,9 +13,10 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/db/redismanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology/server/generator" + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/graph" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology/server/generator" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/pkg/errors" ) @@ -43,13 +44,21 @@ func InitPeriodCollectWorking(batch []string, noderules [][]mysqlmanager.Filter_ agentmanager.Global_AgentManager.UpdateMachineList() + global.Global_wg.Add(1) go func(_interval int64, _gdb graphmanager.GraphdbIface, _noderules [][]mysqlmanager.Filter_rule) { + defer global.Global_wg.Done() for { - redismanager.Global_Redis.ActiveHeartbeatDetection(batch) - running_agent_num := redismanager.Global_Redis.UpdateTopoRunningAgentList(batch, false) - unixtime_now := time.Now().Unix() - DataProcessWorking(unixtime_now, running_agent_num, _gdb, nil, _noderules) - time.Sleep(time.Duration(_interval) * time.Second) + select { + case <-global.Global_cancelCtx.Done(): + logger.Info("cancelCtx is done, exit period collect goroutine") + return + default: + redismanager.Global_Redis.ActiveHeartbeatDetection(batch) + running_agent_num := redismanager.Global_Redis.UpdateTopoRunningAgentList(batch, false) + unixtime_now := time.Now().Unix() + DataProcessWorking(unixtime_now, running_agent_num, _gdb, nil, _noderules) + time.Sleep(time.Duration(_interval) * time.Second) + } } }(graphperiod, graphmanager.Global_GraphDB, noderules) } -- Gitee