From 4550e5f7f87071ea41c36b6fadf2b14d3ed9d655 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Thu, 31 Oct 2024 13:53:18 +0800 Subject: [PATCH] =?UTF-8?q?server:=20add=20resourcemanage=20module(error?= =?UTF-8?q?=20control=E3=80=81resource=20release=E3=80=81goroutine=20end)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/agentmanager/PAgentMap.go | 9 +- server/db/dbmanager.go | 8 +- server/errormanager/errormanager.go | 157 ----------------------- server/global/close.go | 25 +--- server/handler/router.go | 7 +- server/logger/sdkLogger.go | 8 +- server/main.go | 21 ++-- server/resourcemanage/meta.go | 14 +++ server/resourcemanage/resourcemanage.go | 160 ++++++++++++++++++++++++ server/service/periodcollect.go | 10 +- server/signal/signalMonitor.go | 4 +- 11 files changed, 207 insertions(+), 216 deletions(-) delete mode 100644 server/errormanager/errormanager.go create mode 100644 server/resourcemanage/meta.go create mode 100644 server/resourcemanage/resourcemanage.go diff --git a/server/agentmanager/PAgentMap.go b/server/agentmanager/PAgentMap.go index 1c3fd2f..7e624f9 100644 --- a/server/agentmanager/PAgentMap.go +++ b/server/agentmanager/PAgentMap.go @@ -6,9 +6,8 @@ import ( "time" "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" - "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" - "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo-plugin-topology/server/resourcemanage" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" "github.com/pkg/errors" ) @@ -31,11 +30,11 @@ func WaitingForHandshake() { } func Wait4TopoServerReady() { - defer global.END.Wg.Done() - global.END.Wg.Add(1) + defer resourcemanage.ERManager.Wg.Done() + resourcemanage.ERManager.Wg.Add(1) for { select { - case <-global.END.CancelCtx.Done(): + case <-resourcemanage.ERManager.GoCancelCtx.Done(): break default: url := "http://" + conf.Global_Config.Topo.Addr + "/plugin_manage/info" diff --git a/server/db/dbmanager.go b/server/db/dbmanager.go index 1c701d2..81f8e63 100644 --- a/server/db/dbmanager.go +++ b/server/db/dbmanager.go @@ -10,9 +10,9 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/db/influxmanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/db/mysqlmanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/db/redismanager" - "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo-plugin-topology/server/resourcemanage" "gitee.com/openeuler/PilotGo/sdk/logger" ) @@ -91,12 +91,12 @@ func ClearGraphData(retention int64) { graphmanager.Global_GraphDB.ClearExpiredData(retention) - global.END.Wg.Add(1) + resourcemanage.ERManager.Wg.Add(1) go func() { - defer global.END.Wg.Done() + defer resourcemanage.ERManager.Wg.Done() for { select { - case <-global.END.CancelCtx.Done(): + case <-resourcemanage.ERManager.GoCancelCtx.Done(): return default: current := time.Now() diff --git a/server/errormanager/errormanager.go b/server/errormanager/errormanager.go deleted file mode 100644 index b2cadea..0000000 --- a/server/errormanager/errormanager.go +++ /dev/null @@ -1,157 +0,0 @@ -package errormanager - -import ( - "context" - "fmt" - "io" - "os" - - "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" - "gitee.com/openeuler/PilotGo-plugin-topology/server/global" - "gitee.com/openeuler/PilotGo/sdk/logger" - "github.com/pkg/errors" -) - -type ResourceReleaseIface interface { - Close() -} - -type FinalError struct { - Err error - - Severity string - - Cancel context.CancelFunc - - PrintStack bool - - ExitAfterPrint bool -} - -func (e *FinalError) Error() string { - return e.Err.Error() -} - -var ErrorManager *ErrorManagement - -type ErrorManagement struct { - ErrCh chan error - - out io.Writer - - cancelCtx context.Context - cancelFunc context.CancelFunc - - end ResourceReleaseIface -} - -func CreateErrorManager(_end ResourceReleaseIface) { - ErrorManager = &ErrorManagement{ - ErrCh: make(chan error, 20), - end: _end, - } - ErrorManager.cancelCtx, ErrorManager.cancelFunc = context.WithCancel(global.END.CancelCtx) - - switch conf.Global_Config.Logopts.Driver { - case "stdout": - ErrorManager.out = os.Stdout - case "file": - logfile, err := os.OpenFile(conf.Global_Config.Logopts.Path, os.O_WRONLY|os.O_CREATE, 0666) - if err != nil { - panic(err) - } - ErrorManager.out = logfile - } - - global.END.Wg.Add(1) - go func(ch <-chan error) { - defer global.END.Wg.Done() - for { - select { - case <-ErrorManager.cancelCtx.Done(): - return - case _error := <-ch: - _terror, ok := _error.(*FinalError) - if !ok { - fmt.Fprintf(ErrorManager.out, "%+v\n", _error) - continue - } - - if _terror.Err != nil { - if !_terror.PrintStack && !_terror.ExitAfterPrint { - switch _terror.Severity { - case "debug": - logger.Debug(errors.Cause(_terror.Err).Error()) - case "info": - logger.Info(errors.Cause(_terror.Err).Error()) - case "warn": - logger.Warn(errors.Cause(_terror.Err).Error()) - case "error": - logger.Error(errors.Cause(_terror.Err).Error()) - default: - logger.Error("only support \"debug info warn error\" type: %s\n", errors.Cause(_terror.Err).Error()) - } - } else if _terror.PrintStack && !_terror.ExitAfterPrint { - logger.ErrorStack("%+v", _terror.Err) - // errors.EORE(err) - } else if !_terror.PrintStack && _terror.ExitAfterPrint { - switch _terror.Severity { - case "debug": - logger.Debug(errors.Cause(_terror.Err).Error()) - case "info": - logger.Info(errors.Cause(_terror.Err).Error()) - case "warn": - logger.Warn(errors.Cause(_terror.Err).Error()) - case "error": - logger.Error(errors.Cause(_terror.Err).Error()) - default: - logger.Error("only support \"debug info warn error\" type: %s\n", errors.Cause(_terror.Err).Error()) - } - _terror.Cancel() - } else if _terror.PrintStack && _terror.ExitAfterPrint { - logger.ErrorStack("%+v", _terror.Err) - // errors.EORE(err) - _terror.Cancel() - } - } - } - } - }(ErrorManager.ErrCh) -} - -/* -@ctx: 插件服务端初始上下文(默认为pluginclient.Global_Context) - -@err: 最终生成的error - -@exit_after_print: 打印完错误链信息后是否结束主程序 -*/ -func ErrorTransmit(_severity string, _err error, _exit_after_print, _print_stack bool) { - if ErrorManager == nil { - logger.Error("globalerrormanager is nil") - ErrorManager.end.Close() - os.Exit(1) - } - - if _exit_after_print { - ctx, cancel := context.WithCancel(ErrorManager.cancelCtx) - ErrorManager.ErrCh <- &FinalError{ - Err: _err, - Cancel: cancel, - Severity: _severity, - PrintStack: _print_stack, - ExitAfterPrint: _exit_after_print, - } - <-ctx.Done() - close(ErrorManager.ErrCh) - ErrorManager.end.Close() - os.Exit(1) - } - - ErrorManager.ErrCh <- &FinalError{ - Err: _err, - PrintStack: _print_stack, - ExitAfterPrint: _exit_after_print, - Cancel: nil, - } -} diff --git a/server/global/close.go b/server/global/close.go index ca56c5e..f5f4cc4 100644 --- a/server/global/close.go +++ b/server/global/close.go @@ -1,31 +1,12 @@ package global import ( - "context" "fmt" - "sync" "gitee.com/openeuler/PilotGo/sdk/logger" ) -var END *ResourceRelease - -func init() { - ctx, cancel := context.WithCancel(RootContext) - END = &ResourceRelease{ - CancelCtx: ctx, - CancelFunc: cancel, - } -} - -type ResourceRelease struct { - Wg sync.WaitGroup - - CancelCtx context.Context - CancelFunc context.CancelFunc -} - -func (rr *ResourceRelease) Close() { +func Close() { switch Global_graph_database { case "neo4j": if Global_neo4j_driver != nil { @@ -44,8 +25,4 @@ func (rr *ResourceRelease) Close() { Global_influx_client.Close() logger.Info("close the connection to influx\n") } - - rr.CancelFunc() - - rr.Wg.Wait() } diff --git a/server/handler/router.go b/server/handler/router.go index 27b2394..fd26452 100644 --- a/server/handler/router.go +++ b/server/handler/router.go @@ -11,6 +11,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo-plugin-topology/server/resourcemanage" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -34,9 +35,9 @@ func InitWebServer() { Handler: engine, } - global.END.Wg.Add(1) + resourcemanage.ERManager.Wg.Add(1) go func() { - defer global.END.Wg.Done() + defer resourcemanage.ERManager.Wg.Done() if conf.Global_Config.Topo.Https_enabled { if err := webserver.ListenAndServeTLS(conf.Global_Config.Topo.Addr, conf.Global_Config.Topo.Public_certificate, conf.Global_Config.Topo.Private_key); err != nil { @@ -51,7 +52,7 @@ func InitWebServer() { }() go func() { - <-global.END.CancelCtx.Done() + <-resourcemanage.ERManager.GoCancelCtx.Done() logger.Info("shutting down web server...") diff --git a/server/logger/sdkLogger.go b/server/logger/sdkLogger.go index dd62931..1938d05 100644 --- a/server/logger/sdkLogger.go +++ b/server/logger/sdkLogger.go @@ -1,17 +1,15 @@ package logger import ( + "fmt" + "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" - "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" - "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" "gitee.com/openeuler/PilotGo/sdk/logger" - "github.com/pkg/errors" ) func InitLogger() { err := logger.Init(conf.Global_Config.Logopts) if err != nil { - err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top - errormanager.ErrorTransmit(pluginclient.Global_Context, err, true) + fmt.Printf("logger module init failed: %s\n", err.Error()) } } diff --git a/server/main.go b/server/main.go index bb1d9c5..63a97b0 100755 --- a/server/main.go +++ b/server/main.go @@ -5,11 +5,10 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/conf" "gitee.com/openeuler/PilotGo-plugin-topology/server/db" "gitee.com/openeuler/PilotGo-plugin-topology/server/db/mysqlmanager" - "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" - "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/handler" "gitee.com/openeuler/PilotGo-plugin-topology/server/logger" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo-plugin-topology/server/resourcemanage" "gitee.com/openeuler/PilotGo-plugin-topology/server/service" "gitee.com/openeuler/PilotGo-plugin-topology/server/signal" // "github.com/pyroscope-io/pyroscope/pkg/agent/profiler" @@ -28,14 +27,19 @@ func main() { conf.InitConfig() /* - init plugin client + init logger */ - pluginclient.InitPluginClient() + logger.InitLogger() + + /* + init error control、resource release、goroutine end + */ + resourcemanage.InitResourceManage() /* - init error control + init plugin client */ - errormanager.CreateErrorManager(global.END) + pluginclient.InitPluginClient() /* init agent manager @@ -53,11 +57,6 @@ func main() { */ handler.InitWebServer() - /* - init logger - */ - logger.InitLogger() - /* init machine agent list */ diff --git a/server/resourcemanage/meta.go b/server/resourcemanage/meta.go new file mode 100644 index 0000000..44ccce7 --- /dev/null +++ b/server/resourcemanage/meta.go @@ -0,0 +1,14 @@ +package resourcemanage + +import ( + "gitee.com/openeuler/PilotGo-plugin-topology/server/global" + "gitee.com/openeuler/PilotGo/sdk/logger" +) + +func InitResourceManage() { + ermanager, err := CreateErrorReleaseManager(global.RootContext, global.Close) + if err != nil { + logger.Fatal(err.Error()) + } + ERManager = ermanager +} diff --git a/server/resourcemanage/resourcemanage.go b/server/resourcemanage/resourcemanage.go new file mode 100644 index 0000000..6edea5e --- /dev/null +++ b/server/resourcemanage/resourcemanage.go @@ -0,0 +1,160 @@ +package resourcemanage + +import ( + "context" + "fmt" + "os" + "sync" + + "gitee.com/openeuler/PilotGo/sdk/logger" + "github.com/pkg/errors" +) + +var ERManager *ErrorReleaseManagement + +type ResourceReleaseFunction func() + +type FinalError struct { + Err error + + Severity string + + Cancel context.CancelFunc + + PrintStack bool + + ExitAfterPrint bool +} + +func (e *FinalError) Error() string { + return e.Err.Error() +} + +type ErrorReleaseManagement struct { + errChan chan error + + cancelCtx context.Context + cancelFunc context.CancelFunc + GoCancelCtx context.Context + GoCancelFunc context.CancelFunc + + Wg sync.WaitGroup + + errEndChan chan struct{} + + releaseFunc ResourceReleaseFunction +} + +func CreateErrorReleaseManager(_ctx context.Context, _releaseFunc ResourceReleaseFunction) (*ErrorReleaseManagement, error) { + if _ctx == nil || _releaseFunc == nil { + return nil, fmt.Errorf("context or closeFunc is nil") + } + + ErrorM := &ErrorReleaseManagement{ + errChan: make(chan error, 20), + errEndChan: make(chan struct{}), + releaseFunc: _releaseFunc, + } + ErrorM.cancelCtx, ErrorM.cancelFunc = context.WithCancel(_ctx) + ErrorM.GoCancelCtx, ErrorM.GoCancelFunc = context.WithCancel(_ctx) + + go ErrorM.errorFactory() + + return ErrorM, nil +} + +func (erm *ErrorReleaseManagement) errorFactory() { + for { + select { + case <-erm.errEndChan: + logger.Info("errormanager exit") + return + case _error := <-erm.errChan: + _terror, ok := _error.(*FinalError) + if !ok { + logger.Error("plain error: %s", _error.Error()) + continue + } + + if _terror.Err != nil { + if !_terror.PrintStack && !_terror.ExitAfterPrint { + switch _terror.Severity { + case "debug": + logger.Debug(errors.Cause(_terror.Err).Error()) + case "info": + logger.Info(errors.Cause(_terror.Err).Error()) + case "warn": + logger.Warn(errors.Cause(_terror.Err).Error()) + case "error": + logger.Error(errors.Cause(_terror.Err).Error()) + default: + logger.Error("only support \"debug info warn error\" type: %s\n", errors.Cause(_terror.Err).Error()) + } + } else if _terror.PrintStack && !_terror.ExitAfterPrint { + logger.ErrorStack("%+v", _terror.Err) + // errors.EORE(err) + } else if !_terror.PrintStack && _terror.ExitAfterPrint { + switch _terror.Severity { + case "debug": + logger.Debug(errors.Cause(_terror.Err).Error()) + case "info": + logger.Info(errors.Cause(_terror.Err).Error()) + case "warn": + logger.Warn(errors.Cause(_terror.Err).Error()) + case "error": + logger.Error(errors.Cause(_terror.Err).Error()) + default: + logger.Error("only support \"debug info warn error\" type: %s\n", errors.Cause(_terror.Err).Error()) + } + _terror.Cancel() + } else if _terror.PrintStack && _terror.ExitAfterPrint { + logger.ErrorStack("%+v", _terror.Err) + // errors.EORE(err) + _terror.Cancel() + } + } + } + } +} + +func (erm *ErrorReleaseManagement) ResourceRelease() { + erm.releaseFunc() + + erm.GoCancelFunc() + + erm.Wg.Wait() + + close(erm.errEndChan) + + close(erm.errChan) +} + +/* +@ctx: 插件服务端初始上下文(默认为pluginclient.Global_Context) + +@err: 最终生成的error + +@exit_after_print: 打印完错误链信息后是否结束主程序 +*/ +func (erm *ErrorReleaseManagement) ErrorTransmit(_severity string, _err error, _exit_after_print, _print_stack bool) { + if _exit_after_print { + ctx, cancel := context.WithCancel(erm.cancelCtx) + erm.errChan <- &FinalError{ + Err: _err, + Cancel: cancel, + Severity: _severity, + PrintStack: _print_stack, + ExitAfterPrint: _exit_after_print, + } + <-ctx.Done() + erm.ResourceRelease() + os.Exit(1) + } + + erm.errChan <- &FinalError{ + Err: _err, + PrintStack: _print_stack, + ExitAfterPrint: _exit_after_print, + Cancel: nil, + } +} diff --git a/server/service/periodcollect.go b/server/service/periodcollect.go index 6660156..5b776f7 100644 --- a/server/service/periodcollect.go +++ b/server/service/periodcollect.go @@ -14,9 +14,9 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology/server/db/redismanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology/server/generator" - "gitee.com/openeuler/PilotGo-plugin-topology/server/global" "gitee.com/openeuler/PilotGo-plugin-topology/server/graph" "gitee.com/openeuler/PilotGo-plugin-topology/server/pluginclient" + "gitee.com/openeuler/PilotGo-plugin-topology/server/resourcemanage" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/pkg/errors" ) @@ -25,7 +25,7 @@ func InitPeriodCollectWorking(batch []string, noderules [][]mysqlmanager.Filter_ if graphmanager.Global_GraphDB == nil { err := errors.New("global_graphdb is nil **debug**0") errormanager.ErrorTransmit(pluginclient.Global_Context, err, false) - return + return } graphperiod := conf.Global_Config.Neo4j.Period @@ -44,12 +44,12 @@ func InitPeriodCollectWorking(batch []string, noderules [][]mysqlmanager.Filter_ agentmanager.Global_AgentManager.UpdateMachineList() - global.END.Wg.Add(1) + resourcemanage.ERManager.Wg.Add(1) go func(_interval int64, _gdb graphmanager.GraphdbIface, _noderules [][]mysqlmanager.Filter_rule) { - defer global.END.Wg.Done() + defer resourcemanage.ERManager.Wg.Done() for { select { - case <-global.END.CancelCtx.Done(): + case <-resourcemanage.ERManager.GoCancelCtx.Done(): logger.Info("cancelCtx is done, exit period collect goroutine") return default: diff --git a/server/signal/signalMonitor.go b/server/signal/signalMonitor.go index f688a0f..ae132dd 100644 --- a/server/signal/signalMonitor.go +++ b/server/signal/signalMonitor.go @@ -5,7 +5,7 @@ import ( "os/signal" "syscall" - "gitee.com/openeuler/PilotGo-plugin-topology/server/global" + "gitee.com/openeuler/PilotGo-plugin-topology/server/resourcemanage" "gitee.com/openeuler/PilotGo/sdk/logger" ) @@ -16,7 +16,7 @@ func SignalMonitoring() { for s := range ch { switch s { case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: - global.END.Close() + resourcemanage.ERManager.ResourceRelease() os.Exit(1) default: logger.Warn("unknown signal-> %s\n", s.String()) -- Gitee