From 60e3ab0f52af930eb0603e33c5e85239b23b331a Mon Sep 17 00:00:00 2001 From: vhbkbk Date: Mon, 16 Oct 2023 13:39:27 +0800 Subject: [PATCH] feat(hextech): optimization service run --- hextech/config/configs.go | 33 +++++++-------- hextech/config/options.go | 72 ++++++++++++++++++++++++--------- hextech/core/rpcxserver.go | 31 ++++++-------- hextech/core/webserver.go | 22 ++++++---- hextech/server/server.go | 82 +++++++++++++------------------------- user/conf/dev.yml | 4 +- user/main.go | 2 +- 7 files changed, 125 insertions(+), 121 deletions(-) diff --git a/hextech/config/configs.go b/hextech/config/configs.go index 9c2c27b..41bb443 100644 --- a/hextech/config/configs.go +++ b/hextech/config/configs.go @@ -6,7 +6,6 @@ import ( "time" "github.com/fsnotify/fsnotify" - "github.com/smallnest/rpcx/server" "github.com/spf13/viper" ) @@ -20,7 +19,7 @@ type Config struct { HTTPServer HTTPServer `mapstructure:"httpServer"` FilePath FilePath `mapstructure:"filePath"` Minio Minio `mapstructure:"minio"` - RpcX RpcX `mapstructure:"rpcServer"` + RpcXServer RpcXServer `mapstructure:"rpcServer"` } type APP struct { @@ -83,22 +82,20 @@ type Minio struct { Secret string `mapstructure:"secret"` } -type RpcX struct { - ServerName string `mapstructure:"serverName"` - Address string `mapstructure:"address"` - Port int `mapstructure:"port"` - ReadTimeout int64 `mapstructure:"readTimeout"` // 单位秒 - WriteTimeout int64 `mapstructure:"writeTimeout"` // 单位秒 - EtcdNodes []string `mapstructure:"etcdNodes"` - EtcdBasePath string `mapstructure:"etcdBasePath"` - EtcdUpdateInterval int64 `mapstructure:"etcdUpdateInterval"` // 单位秒 - EtcdExpired int64 `mapstructure:"etcdExpired"` // 单位秒 - PondPoolMaxWorkers int `mapstructure:"pondPoolMaxWorkers"` // 协程配置,和pondPoolMaxCapacity必须都有值才生效 - PondPoolMaxCapacity int `mapstructure:"pondPoolMaxCapacity"` - CertFile string `mapstructure:"certFile"` - KeyFile string `mapstructure:"keyFile"` - Receiver interface{} // main函数初始化填充路由结构体指针,不在配置文件中 - Plugins []*server.Plugin // main函数初始化填充路由组件,不在配置文件中 +type RpcXServer struct { + ServerName string `mapstructure:"serverName"` + Address string `mapstructure:"address"` + Port int `mapstructure:"port"` + ReadTimeout int64 `mapstructure:"readTimeout"` // 单位秒 + WriteTimeout int64 `mapstructure:"writeTimeout"` // 单位秒 + EtcdNodes []string `mapstructure:"etcdNodes"` + EtcdBasePath string `mapstructure:"etcdBasePath"` + EtcdUpdateInterval int64 `mapstructure:"etcdUpdateInterval"` // 单位秒 + EtcdExpired int64 `mapstructure:"etcdExpired"` // 单位秒 + PondPoolMaxWorkers int `mapstructure:"pondPoolMaxWorkers"` // 协程配置,和pondPoolMaxCapacity必须都有值才生效 + PondPoolMaxCapacity int `mapstructure:"pondPoolMaxCapacity"` + CertFile string `mapstructure:"certFile"` + KeyFile string `mapstructure:"keyFile"` } func InitConfig(configName string, configPath string) { diff --git a/hextech/config/options.go b/hextech/config/options.go index 0961311..6dbd64a 100644 --- a/hextech/config/options.go +++ b/hextech/config/options.go @@ -47,31 +47,29 @@ func (o *Options) register() error { fmt.Printf("[REGISTER] register logger failed, err: %v\n", err.Error()) return err } - fmt.Println("[REGISTER] register logger successful") + // 注册数据库 if err := o.registerDatabase(); err != nil { fmt.Printf("[REGISTER] register db failed, err: %v\n", err.Error()) return err } - fmt.Println("[REGISTER] register db successful") + + // 注册Redis if err := o.registerRedis(); err != nil { fmt.Printf("[REGISTER] register redis failed, err: %v\n", err.Error()) return err } - fmt.Println("[REGISTER] register redis successful") + // 注册web服务器 if err := o.registerWebServer(); err != nil { fmt.Printf("[REGISTER] register web server failed, err: %v\n", err.Error()) return err } - fmt.Println("[REGISTER] register web server successful") - // 注册RPC服务 - if o.ComponentConfig.RpcX.Receiver != nil { - if err := o.registerRpcXServer(); err != nil { - fmt.Printf("[REGISTER] register rpc server failed, err: %v\n", err.Error()) - return err - } - fmt.Println("[REGISTER] register rpc server successful") + + // 注册rpc服务 + if err := o.registerRpcXServer(); err != nil { + fmt.Printf("[REGISTER] register rpc server failed, err: %v\n", err.Error()) + return err } // 注册clusterSets @@ -83,6 +81,40 @@ func (o *Options) register() error { return nil } +// Close 关闭服务和资源 +func (o *Options) Close() { + // 关闭数据库 + if o.Db != nil { + d, _ := o.Db.DB() + _ = d.Close() + } + // 关闭redis + if o.Redis != nil { + _ = o.Redis.Close() + } + // 关闭http服务 + if o.WebServer != nil { + o.WebServer.Close() + } + // 关闭rpc服务 + if o.RpcXServer != nil { + o.RpcXServer.Close() + } +} + +// RunServer 启动服务 +func (o *Options) RunServer() { + // 启动http服务 + if o.WebServer != nil { + o.WebServer.Run() + } + + // 启动rpc服务 + if o.RpcXServer != nil { + o.RpcXServer.Run() + } +} + func (o *Options) registerEnv() error { env.SetAppName(o.ComponentConfig.APP.Name) return nil @@ -101,6 +133,7 @@ func (o *Options) registerLogger() error { }, } nlog.InitLog(logOptions) + fmt.Println("[REGISTER] register logger successful") return nil } @@ -122,15 +155,17 @@ func (o *Options) registerDatabase() (err error) { if err != nil { return err } - + fmt.Println("[REGISTER] register db successful") return nil } func (o *Options) registerWebServer() error { - webServerConfig := o.ComponentConfig.HTTPServer + conf := o.ComponentConfig.HTTPServer webServerOptions := &core.WebServerOptions{ - Mode: webServerConfig.Mode, - Port: webServerConfig.Port, + Mode: conf.Mode, + Address: conf.Address, + Port: conf.Port, + RouterFunc: nil, // 注册时无值,Set进去 } webServer, err := core.NewWebServer( webServerOptions, @@ -139,6 +174,7 @@ func (o *Options) registerWebServer() error { return err } o.WebServer = webServer + fmt.Println("[REGISTER] register web server successful") return nil } @@ -157,11 +193,12 @@ func (o *Options) registerRedis() error { return err } o.Redis = client + fmt.Println("[REGISTER] register redis successful") return nil } func (o *Options) registerRpcXServer() error { - conf := o.ComponentConfig.RpcX + conf := o.ComponentConfig.RpcXServer var poolConf *core.RpcXServerOptionsPoolConfig if conf.PondPoolMaxWorkers > 0 && conf.PondPoolMaxCapacity > 0 { poolConf = &core.RpcXServerOptionsPoolConfig{ @@ -174,8 +211,6 @@ func (o *Options) registerRpcXServer() error { Port: conf.Port, Address: conf.Address, ServerName: conf.ServerName, - Receiver: conf.Receiver, - Plugins: conf.Plugins, CertFile: conf.CertFile, KeyFile: conf.KeyFile, }, @@ -201,5 +236,6 @@ func (o *Options) registerRpcXServer() error { return err } o.RpcXServer = srv + fmt.Println("[REGISTER] register rpc server successful") return nil } diff --git a/hextech/core/rpcxserver.go b/hextech/core/rpcxserver.go index b48e2e2..64e486a 100644 --- a/hextech/core/rpcxserver.go +++ b/hextech/core/rpcxserver.go @@ -25,13 +25,11 @@ type RpcXServerEtcdConfig struct { } type RpcXServerBaseConfig struct { - Port int // rpc服务端口 - Address string // rpc服务地址 - ServerName string // rpc服务名 - Receiver interface{} // 路由结构 - 指针 - Plugins []*server.Plugin // 自定义插件 - CertFile string // tls证书 - KeyFile string // tls证书 + Port int // rpc服务端口 + Address string // rpc服务地址 + ServerName string // rpc服务名 + CertFile string // tls证书 + KeyFile string // tls证书 } type RpcXServerOptionsConfig struct { @@ -56,13 +54,13 @@ type RpcXServer struct { options []server.OptionFn server *server.Server etcdV3Plugin *serverplugin.EtcdV3RegisterPlugin - plugins []*server.Plugin - ServerName string // rpc服务名 - Receiver interface{} // 路由结构 - 指针 + Plugins []*server.Plugin // 启动rpc服务时,自定义插件 + ServerName string // rpc服务名 + Receiver interface{} // 路由结构 - 指针 } func (rpc *RpcXServer) Run() { - if rpc == nil { + if rpc == nil || rpc.Receiver == nil { return } ctx := context.Background() @@ -75,8 +73,8 @@ func (rpc *RpcXServer) Run() { } rpc.server.Plugins.Add(rpc.etcdV3Plugin) // 添加自定义插件 - if len(rpc.plugins) > 0 { - rpc.server.Plugins.Add(rpc.plugins) + if len(rpc.Plugins) > 0 { + rpc.server.Plugins.Add(rpc.Plugins) } // 注册服务路由 if err = rpc.server.RegisterName(rpc.ServerName, rpc.Receiver, ""); err != nil { @@ -102,10 +100,6 @@ func (rpc *RpcXServer) Close() { } func NewRpcXServer(config *RpcXServerConfig) (*RpcXServer, error) { - // 如果路由结构体不存在,则不初始化 - if config.Base.Receiver == nil { - return nil, nil - } base := config.Base opt := config.Options @@ -148,6 +142,7 @@ func NewRpcXServer(config *RpcXServerConfig) (*RpcXServer, error) { rpcXserver := &RpcXServer{ Address: base.getAddress(), Network: base.getNetwork(), + options: opts, etcdV3Plugin: &serverplugin.EtcdV3RegisterPlugin{ ServiceAddress: base.getNetwork() + "@" + base.getAddress(), EtcdServers: etcd.Nodes, @@ -155,9 +150,7 @@ func NewRpcXServer(config *RpcXServerConfig) (*RpcXServer, error) { UpdateInterval: etcd.getUpdateInterval(), Expired: etcd.getExpired(), }, - plugins: base.Plugins, ServerName: base.ServerName, - Receiver: base.Receiver, } return rpcXserver, nil diff --git a/hextech/core/webserver.go b/hextech/core/webserver.go index 55c65c9..451c075 100644 --- a/hextech/core/webserver.go +++ b/hextech/core/webserver.go @@ -14,16 +14,18 @@ import ( ) type WebServerOptions struct { - Mode string - Address string - Port int + Mode string + Address string + Port int + RouterFunc func(engine *gin.Engine) } type WebServer struct { - engine *gin.Engine - Port int - Address string - server *http.Server + engine *gin.Engine + Port int + Address string + server *http.Server + RouterFunc func(engine *gin.Engine) } func (web *WebServer) Get() *gin.Engine { @@ -35,9 +37,12 @@ func (web *WebServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { } func (web *WebServer) Run() { - if web == nil { + if web == nil || web.RouterFunc == nil { return } + // 注册路由 + web.RouterFunc(web.Get()) + srv := &http.Server{ Addr: fmt.Sprintf("%s:%v", web.Address, web.Port), Handler: web.engine, @@ -70,6 +75,7 @@ func NewWebServer(opt *WebServerOptions) (*WebServer, error) { gin.SetMode(opt.Mode) server := WebServer{engine: gin.Default(), Address: opt.Address, Port: opt.Port} + // 注册限制访问 server.engine.Use(cors.New(cors.Options{ AllowedOrigins: []string{"*"}, AllowedMethods: []string{ diff --git a/hextech/server/server.go b/hextech/server/server.go index fd55dee..7629b96 100644 --- a/hextech/server/server.go +++ b/hextech/server/server.go @@ -20,49 +20,24 @@ import ( var APP *Server type Server struct { - gs *shutdown.GracefulShutdown - options *config.Options - ctx context.Context - name string - httpRouterFunc func(engine *gin.Engine) + gs *shutdown.GracefulShutdown + options *config.Options + ctx context.Context + name string } -// ConfigFile 配置文件 -type ConfigFile struct { - Name, Path string -} - -type DynamicRpcConfig struct { - Receiver interface{} // 如果要启动rpc服务,则传递路由结构体的指针 - Plugins []*server.Plugin // 路由对应的过滤插件,可以没有 -} - -// cert, _ := tls.LoadX509KeyPair("server.pem", "server.key") -// config := &tls.Config{Certificates: []tls.Certificate{cert}} - -// DynamicConfig 动态配置 -type DynamicConfig struct { - Rpc DynamicRpcConfig -} - -func (d *Server) SetDynamicConfig(dc DynamicConfig) *Server { - appendDynamicOptions(d.options, dc) - return d -} - -func New(params *ConfigFile) *Server { +func New(configName, configPath string) *Server { ctx := context.Background() gs := shutdown.New() gs.AddShutdownManager(posixsignal.NewPosixSignalManager()) - configName := params.Name - configPath := params.Path + // 加载静态配置 opts, err := config.NewOptions(configName, configPath) if err != nil { nlog.Fatalf(ctx, "unable to initialize command options: %v", err) } - // 追加动态参数 + // 资源注册 err = opts.Complete() if err != nil { nlog.Fatalf(ctx, "unable to complete options: %v", err) @@ -75,11 +50,6 @@ func New(params *ConfigFile) *Server { return APP } -func appendDynamicOptions(opts *config.Options, dc DynamicConfig) { - opts.ComponentConfig.RpcX.Receiver = dc.Rpc.Receiver - opts.ComponentConfig.RpcX.Plugins = dc.Rpc.Plugins -} - func (d *Server) Run() { if d == nil { return @@ -89,40 +59,42 @@ func (d *Server) Run() { defer nlog.CloseLogger() - if d.options.WebServer != nil { - d.httpRouterFunc(d.options.WebServer.Get()) - } - d.runServer() <-stopCh } func (d *Server) runServer() { + // 添加退出回调 d.gs.AddShutdownCallback(shutdown.ShutdownFunc(func(string) error { - // close db - db, _ := d.options.Db.DB() - _ = db.Close() - // close web server - d.options.WebServer.Close() + d.options.Close() return nil })) - // start shutdown managers + // 启动退出管理 if err := d.gs.Start(); err != nil { nlog.Fatalf(d.ctx, "start shutdown manager failed: %s", err.Error()) } - // start web server - if d.httpRouterFunc != nil { - d.options.WebServer.Run() - } + // 启动服务 + d.options.RunServer() +} - // todo: judgment rpcx server options - d.options.RpcXServer.Run() +// SetHttpRouter 启动Http服务,则必须传递web路由函数 +func (d *Server) SetHttpRouter(router func(engine *gin.Engine)) *Server { + if d.options.WebServer != nil { + d.options.WebServer.RouterFunc = router + } + return d } -func (d *Server) RegisterHttpRouterFunc(router func(engine *gin.Engine)) *Server { - d.httpRouterFunc = router +// SetRpcXReceiver 启动RPC服务,则必须要传递路由结构体的指针 +// Receiver: RpcX路由结构体的指针 +// plugins: 路由对应的过滤插件,可以没有 +func (d *Server) SetRpcXReceiver(receiver interface{}, plugins []*server.Plugin) *Server { + if d.options.RpcXServer != nil { + d.options.RpcXServer.Receiver = receiver + d.options.RpcXServer.Plugins = plugins + } return d } diff --git a/user/conf/dev.yml b/user/conf/dev.yml index d6abaad..dcd340c 100644 --- a/user/conf/dev.yml +++ b/user/conf/dev.yml @@ -17,9 +17,9 @@ httpServer: port: 8100 address: "0.0.0.0" -Mysql: +mysql: user: root - password: 103243916 + password: 123456 addr: "127.0.0.1" port: 3306 database: neuronet diff --git a/user/main.go b/user/main.go index 44be13e..187673c 100644 --- a/user/main.go +++ b/user/main.go @@ -9,5 +9,5 @@ const LocalConfigFile = "dev.yml" const LocalConfigPath = "conf" func main() { - server.New(&server.ConfigFile{Name: LocalConfigFile, Path: LocalConfigPath}).RegisterHttpRouterFunc(router.HTTP).Run() + server.New(LocalConfigFile, LocalConfigPath).SetHttpRouter(router.HTTP).Run() } -- Gitee