diff --git a/server/agentmanager/PAgentMap.go b/server/agentmanager/PAgentMap.go index 82d41dd4e313c4687ce3b4bc0b574b1afbffaf3d..01d95b658206ce44c14204d22e5115513a3c3c87 100644 --- a/server/agentmanager/PAgentMap.go +++ b/server/agentmanager/PAgentMap.go @@ -15,7 +15,7 @@ func WaitingForHandshake() { i := 0 loop := []string{`*.....`, `.*....`, `..*...`, `...*..`, `....*.`, `.....*`} for { - if pluginclient.GlobalClient != nil && pluginclient.GlobalClient.Server() != "" { + if pluginclient.Global_Client != nil && pluginclient.Global_Client.Server() != "" { break } fmt.Printf("\r Waiting for handshake with pilotgo server%s", loop[i]) @@ -30,7 +30,7 @@ func WaitingForHandshake() { func Wait4TopoServerReady() { for { - url := "http://" + conf.Config().Topo.Server_addr + "/plugin_manage/info" + url := "http://" + conf.Global_Config.Topo.Server_addr + "/plugin_manage/info" resp, err := http.Get(url) if err == nil && resp != nil && resp.StatusCode == http.StatusOK { break @@ -43,15 +43,15 @@ func Wait4TopoServerReady() { func (am *AgentManager) InitMachineList() { Wait4TopoServerReady() - if pluginclient.GlobalClient == nil { - err := errors.New("globalclient is nil **errstackfatal**2") // err top + if pluginclient.Global_Client == nil { + err := errors.New("Global_Client is nil **errstackfatal**2") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - pluginclient.GlobalClient.Wait4Bind() + pluginclient.Global_Client.Wait4Bind() - machine_list, err := pluginclient.GlobalClient.MachineList() + machine_list, err := pluginclient.Global_Client.MachineList() if err != nil { err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) @@ -69,21 +69,16 @@ func (am *AgentManager) InitMachineList() { // 更新PAgentMap中的agent func (am *AgentManager) UpdateMachineList() { - machine_list, err := pluginclient.GlobalClient.MachineList() + machine_list, err := pluginclient.Global_Client.MachineList() if err != nil { err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) } - if Topo != nil { - am.PAgentMap.Range(func(key, value interface{}) bool { - am.DeleteAgent_P(key.(string)) - return true - }) - } else { - err := errors.New("agentmanager.Topo is nil, can not clear Topo.PAgentMap **errstackfatal**6") // err top - errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) - } + am.PAgentMap.Range(func(key, value interface{}) bool { + am.DeleteAgent_P(key.(string)) + return true + }) for _, m := range machine_list { p := &Agent{} diff --git a/server/agentmanager/agent.go b/server/agentmanager/agent.go index 12ecb6b13289da8f5737add351eaede3d8405dba..429bb4fc8b8f4dbccd431d2df23b8ab8d32a34ed 100644 --- a/server/agentmanager/agent.go +++ b/server/agentmanager/agent.go @@ -10,7 +10,7 @@ import ( "github.com/pkg/errors" ) -var GlobalAgentManager *AgentManager +var Global_AgentManager *AgentManager type AgentManager struct { PAgentMap sync.Map @@ -39,8 +39,8 @@ type Agent struct { } func InitAgentManager() { - GlobalAgentManager = &AgentManager{ - AgentPort: conf.Config().Topo.Agent_port, + Global_AgentManager = &AgentManager{ + AgentPort: conf.Global_Config.Topo.Agent_port, } } diff --git a/server/agentmanager/topoclient.go b/server/agentmanager/topoclient.go deleted file mode 100644 index 4f6a2c5e7b047e6a57ee85ba7c4d2047ed4f676f..0000000000000000000000000000000000000000 --- a/server/agentmanager/topoclient.go +++ /dev/null @@ -1,69 +0,0 @@ -package agentmanager - -import ( - "flag" - "fmt" - "os" - "os/signal" - "syscall" - - "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" - "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" - "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo/sdk/logger" - "github.com/go-redis/redis/v8" - "github.com/neo4j/neo4j-go-driver/v4/neo4j" - "github.com/pkg/errors" - "gopkg.in/yaml.v2" -) - -var Topo *Topoclient - -type Topoclient struct { -} - -func (t *Topoclient) InitLogger() { - err := logger.Init(conf.Config().Logopts) - if err != nil { - err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top - errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) - } -} - -func (t *Topoclient) InitConfig() { - flag.StringVar(&conf.Config_dir, "conf", "/opt/PilotGo/plugin/topology/server", "topo-server configuration directory") - flag.Parse() - - bytes, err := os.ReadFile(conf.Config_file()) - if err != nil { - err = errors.Errorf("open file failed: %s, %s", conf.Config_file(), err.Error()) // err top - fmt.Printf("%+v\n", err) - os.Exit(-1) - } - - err = yaml.Unmarshal(bytes, &conf.Global_config) - if err != nil { - err = errors.Errorf("yaml unmarshal failed: %s", err.Error()) // err top - fmt.Printf("%+v\n", err) - os.Exit(-1) - } -} - -func (t *Topoclient) SignalMonitoring(neo4jclient neo4j.Driver, redisclient redis.Client) { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) - for { - s := <-c - switch s { - case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: - neo4jclient.Close() - fmt.Println() - logger.Info("close the connection to neo4j\n") - redisclient.Close() - logger.Info("close the connection to redis\n") - os.Exit(-1) - default: - logger.Warn("unknown signal-> %s\n", s.String()) - } - } -} diff --git a/server/collector/collector.go b/server/collector/collector.go index 6335b9504977ace5b5981d197d7624fca846ee42..1606943e6cb4acd0112f0385342aaaa3531462e2 100644 --- a/server/collector/collector.go +++ b/server/collector/collector.go @@ -31,13 +31,13 @@ func (d *DataCollector) CollectInstantData() []error { var errorlist []error var errorlist_rwlock sync.RWMutex - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return nil } - agentmanager.GlobalAgentManager.TAgentMap.Range( + agentmanager.Global_AgentManager.TAgentMap.Range( func(key, value interface{}) bool { wg.Add(1) @@ -46,14 +46,14 @@ func (d *DataCollector) CollectInstantData() []error { // ttcode temp_start := time.Now() agent := value.(*agentmanager.Agent) - agent.Port = conf.Config().Topo.Agent_port + agent.Port = conf.Global_Config.Topo.Agent_port err := d.GetCollectDataFromTopoAgent(agent) if err != nil { errorlist_rwlock.Lock() errorlist = append(errorlist, errors.Wrapf(err, "%s**2", agent.IP)) errorlist_rwlock.Unlock() } - agentmanager.GlobalAgentManager.AddAgent_T(agent) + agentmanager.Global_AgentManager.AddAgent_T(agent) // ttcode temp_elapse := time.Since(temp_start) logger.Info("\033[32mtopo server 采集数据获取时间\033[0m: %s, %v, total\n", agent.UUID, temp_elapse) diff --git a/server/conf/config.go b/server/conf/config.go index e751b460a35fcd49a6892f914fcfc02f8f1d5cf2..360077245cdf6e8986baab5742e590f9d132202a 100644 --- a/server/conf/config.go +++ b/server/conf/config.go @@ -1,54 +1,22 @@ package conf import ( + "flag" + "fmt" + "os" "path" - "time" + "gitee.com/openeuler/PilotGo-plugin-topology-server/utils" "gitee.com/openeuler/PilotGo/sdk/logger" + "github.com/pkg/errors" + "gopkg.in/yaml.v2" ) -type TopoConf struct { - Server_addr string `yaml:"server_addr"` - Agent_port string `yaml:"agent_port"` - GraphDB string `yaml:"graphDB"` - Period int64 `yaml:"period"` - Retention int64 `yaml:"retention"` - Cleartime string `yaml:"cleartime"` -} - -type PilotGoConf struct { - Addr string `yaml:"http_addr"` -} - -type ArangodbConf struct { - Addr string `yaml:"addr"` - Database string `yaml:"database"` -} - -type Neo4jConf struct { - Addr string `yaml:"addr"` - Username string `yaml:"username"` - Password string `yaml:"password"` - DB string `yaml:"DB"` -} - -type PrometheusConf struct { - Addr string `yaml:"addr"` -} +var Global_Config *ServerConfig -type RedisConf struct { - Addr string `yaml:"addr"` - Password string `yaml:"password"` - DB int `yaml:"DB"` - DialTimeout time.Duration `yaml:"dialTimeout"` -} +const config_type = "topo_server.yaml" -type MysqlConf struct { - Addr string `yaml:"addr"` - Username string `yaml:"username"` - Password string `yaml:"password"` - DB string `yaml:"DB"` -} +var config_dir string type ServerConfig struct { Topo *TopoConf @@ -61,23 +29,30 @@ type ServerConfig struct { Mysql *MysqlConf } -const config_type = "topo_server.yaml" - -var Config_dir string - -func Config_file() string { - // _, thisfilepath, _, _ := runtime.Caller(0) - // dirpath := filepath.Dir(thisfilepath) - // configfilepath := path.Join(dirpath, "..", "..", "conf", config_type) - - // ttcode: - configfilepath := path.Join(Config_dir, config_type) +func ConfigFile() string { + configfilepath := path.Join(config_dir, config_type) return configfilepath } -var Global_config ServerConfig - -func Config() *ServerConfig { - return &Global_config +func InitConfig() { + flag.StringVar(&config_dir, "conf", "/opt/PilotGo/plugin/topology/server", "topo-server configuration directory") + flag.Usage = func() { + fmt.Fprintf(os.Stderr, "Usage: %s -conf /PATH/TO/TOPO_SERVER.YAML(default:/opt/PilotGo/plugin/topology/server) \n", os.Args[0]) + } + flag.Parse() + + bytes, err := utils.FileReadBytes(ConfigFile()) + if err != nil { + err = errors.Wrapf(err, "open file failed: %s, %s", ConfigFile(), err.Error()) // err top + fmt.Printf("%+v\n", err) + os.Exit(1) + } + + err = yaml.Unmarshal(bytes, Global_Config) + if err != nil { + err = errors.Errorf("yaml unmarshal failed: %s", err.Error()) // err top + fmt.Printf("%+v\n", err) + os.Exit(1) + } } diff --git a/server/conf/meta.go b/server/conf/meta.go new file mode 100644 index 0000000000000000000000000000000000000000..a669fb1c6124c3f5957ae3c43a1c14f1cf21cbbc --- /dev/null +++ b/server/conf/meta.go @@ -0,0 +1,46 @@ +package conf + +import "time" + +type TopoConf struct { + Server_addr string `yaml:"server_addr"` + Agent_port string `yaml:"agent_port"` + GraphDB string `yaml:"graphDB"` + Period int64 `yaml:"period"` + Retention int64 `yaml:"retention"` + Cleartime string `yaml:"cleartime"` +} + +type PilotGoConf struct { + Addr string `yaml:"http_addr"` +} + +type ArangodbConf struct { + Addr string `yaml:"addr"` + Database string `yaml:"database"` +} + +type Neo4jConf struct { + Addr string `yaml:"addr"` + Username string `yaml:"username"` + Password string `yaml:"password"` + DB string `yaml:"DB"` +} + +type PrometheusConf struct { + Addr string `yaml:"addr"` +} + +type RedisConf struct { + Addr string `yaml:"addr"` + Password string `yaml:"password"` + DB int `yaml:"DB"` + DialTimeout time.Duration `yaml:"dialTimeout"` +} + +type MysqlConf struct { + Addr string `yaml:"addr"` + Username string `yaml:"username"` + Password string `yaml:"password"` + DB string `yaml:"DB"` +} diff --git a/server/dao/mysql.go b/server/dao/mysql.go index b13697e1e11744209bfedb18a97e4376d97df39f..c4f32ffcfd54f3ad63f21709973e4870c8c183a1 100644 --- a/server/dao/mysql.go +++ b/server/dao/mysql.go @@ -17,7 +17,7 @@ import ( "gorm.io/gorm/schema" ) -var Global_mysql *MysqlClient +var Global_Mysql *MysqlClient type MysqlClient struct { ip string @@ -117,7 +117,7 @@ func ensureDatabase(conf *conf.MysqlConf) error { func (m *MysqlClient) QuerySingleTopoConfiguration(tcid uint) (*meta.Topo_configuration_DB, error) { var tcdb *meta.Topo_configuration_DB = new(meta.Topo_configuration_DB) - if Global_mysql == nil { + if Global_Mysql == nil { return nil, errors.New("mysql client not init **errstack**1") } @@ -130,7 +130,7 @@ func (m *MysqlClient) QuerySingleTopoConfiguration(tcid uint) (*meta.Topo_config } func (m *MysqlClient) QueryTopoConfigurationList(query *response.PaginationQ) ([]*meta.Topo_configuration_DB, int, error) { - if Global_mysql == nil { + if Global_Mysql == nil { return nil, 0, errors.New("mysql client not init **errstack**1") } @@ -148,7 +148,7 @@ func (m *MysqlClient) QueryTopoConfigurationList(query *response.PaginationQ) ([ } func (m *MysqlClient) AddTopoConfiguration(tc *meta.Topo_configuration_DB) (int, error) { - if Global_mysql == nil { + if Global_Mysql == nil { return -1, errors.New("mysql client not init **errstack**1") } @@ -162,7 +162,7 @@ func (m *MysqlClient) AddTopoConfiguration(tc *meta.Topo_configuration_DB) (int, } func (m *MysqlClient) DeleteTopoConfiguration(tcid uint) error { - if Global_mysql == nil { + if Global_Mysql == nil { return errors.New("mysql client not init **errstack**1") } diff --git a/server/dao/redis.go b/server/dao/redis.go index 63e091a046fd84163f4cb69cd4eaf348904a2cb0..6255684c57b00c6e4d7169ce7b24ba339172ebf3 100644 --- a/server/dao/redis.go +++ b/server/dao/redis.go @@ -20,6 +20,8 @@ import ( "github.com/pkg/errors" ) +var Global_Redis *RedisClient + type RedisClient struct { Addr string Password string @@ -27,8 +29,6 @@ type RedisClient struct { Client redis.Client } -var Global_redis *RedisClient - func RedisInit(url, pass string, db int, dialTimeout time.Duration) *RedisClient { r := &RedisClient{ Addr: url, @@ -55,64 +55,61 @@ func RedisInit(url, pass string, db int, dialTimeout time.Duration) *RedisClient } func (r *RedisClient) Set(key string, value interface{}) error { - if agentmanager.Topo != nil && Global_redis != nil { - bytes, _ := json.Marshal(value) - err := Global_redis.Client.Set(pluginclient.GlobalContext, key, string(bytes), 0).Err() - if err != nil { - err = errors.Errorf("failed to set key-value: %s **errstack**2", err.Error()) - return err - } + if key == "" { + return errors.New("key is empty **errstack**1") + } - return nil + bytes, _ := json.Marshal(value) + err := r.Client.Set(pluginclient.GlobalContext, key, string(bytes), 0).Err() + if err != nil { + err = errors.Errorf("failed to set key-value: %s **errstack**2", err.Error()) + return err } - err := errors.New("global_redis is nil **errstack**11") - return err + return nil } func (r *RedisClient) Get(key string, obj interface{}) (interface{}, error) { - if agentmanager.Topo != nil && Global_redis != nil { - data, err := Global_redis.Client.Get(pluginclient.GlobalContext, key).Result() - if err != nil { - err = errors.Errorf("failed to get value: %s, %s **errstack**2", key, err.Error()) - return nil, err - } - json.Unmarshal([]byte(data), obj) - return obj, nil + if key == "" { + return nil, errors.New("key is empty **errstack**1") } - return nil, errors.New("global_redis is nil **errstack**11") + data, err := r.Client.Get(pluginclient.GlobalContext, key).Result() + if err != nil { + err = errors.Errorf("failed to get value: %s, %s **errstack**2", key, err.Error()) + return nil, err + } + json.Unmarshal([]byte(data), obj) + return obj, nil } func (r *RedisClient) Scan(key string) ([]string, error) { keys := []string{} - if agentmanager.Topo != nil && Global_redis != nil { - iterator := Global_redis.Client.Scan(pluginclient.GlobalContext, 0, key, 0).Iterator() - for iterator.Next(pluginclient.GlobalContext) { - key := iterator.Val() - keys = append(keys, key) - } + if key == "" { + return nil, errors.New("key is empty **errstack**1") + } - return keys, nil + iterator := r.Client.Scan(pluginclient.GlobalContext, 0, key, 0).Iterator() + for iterator.Next(pluginclient.GlobalContext) { + key := iterator.Val() + keys = append(keys, key) } - err := errors.New("global_redis is nil **errstack**11") - return nil, err + return keys, nil } func (r *RedisClient) Delete(key string) error { - if agentmanager.Topo != nil && Global_redis != nil { - err := Global_redis.Client.Del(pluginclient.GlobalContext, key).Err() - if err != nil { - err = errors.Errorf("failed to del key-value: %s **errstack**2", err.Error()) - return err - } - return nil + if key == "" { + return errors.New("key is empty **errstack**1") } - err := errors.New("global_redis is nil **errstack**11") - return err + err := r.Client.Del(pluginclient.GlobalContext, key).Err() + if err != nil { + err = errors.Errorf("failed to del key-value: %s **errstack**2", err.Error()) + return err + } + return nil } // 基于batch中的机器列表和PAgentMap更新TAgentMap中运行状态的agent @@ -122,19 +119,15 @@ func (r *RedisClient) UpdateTopoRunningAgentList(uuids []string, updateonce bool var wg sync.WaitGroup var abort_reason []string - if Global_redis == nil { - return -1 - } - - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return -1 } // 重置TAgentMap - agentmanager.GlobalAgentManager.TAgentMap.Range(func(key, value interface{}) bool { - agentmanager.GlobalAgentManager.TAgentMap.Delete(key) + agentmanager.Global_AgentManager.TAgentMap.Range(func(key, value interface{}) bool { + agentmanager.Global_AgentManager.TAgentMap.Delete(key) return true }) @@ -182,19 +175,19 @@ func (r *RedisClient) UpdateTopoRunningAgentList(uuids []string, updateonce bool return } - agentp := agentmanager.GlobalAgentManager.GetAgent_P(agentvalue.UUID) + agentp := agentmanager.Global_AgentManager.GetAgent_P(agentvalue.UUID) if agentp == nil { abort_reason = append(abort_reason, fmt.Sprintf("%s:未被pilotgo纳管", agentvalue.UUID)) return } - if ok, err := utils.IsIPandPORTValid(agentp.IP, agentmanager.GlobalAgentManager.AgentPort); !ok { - err := errors.Errorf("%s:%s is unreachable (%s) %s **warn**1", agentp.IP, agentmanager.GlobalAgentManager.AgentPort, err.Error(), agentp.UUID) // err top + if ok, err := utils.IsIPandPORTValid(agentp.IP, agentmanager.Global_AgentManager.AgentPort); !ok { + err := errors.Errorf("%s:%s is unreachable (%s) %s **warn**1", agentp.IP, agentmanager.Global_AgentManager.AgentPort, err.Error(), agentp.UUID) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) abort_reason = append(abort_reason, fmt.Sprintf("%s:ip||port不可达", agentvalue.UUID)) return } - agentmanager.GlobalAgentManager.AddAgent_T(agentp) + agentmanager.Global_AgentManager.AddAgent_T(agentp) atomic.AddInt32(&running_agent_num, int32(1)) }(agentkey) @@ -212,7 +205,6 @@ func (r *RedisClient) UpdateTopoRunningAgentList(uuids []string, updateonce bool logger.Warn("no running agent......") }) - // ttcode if len(abort_reason) != 0 { logger.Debug(">>>>>>>>>>>>获取agent状态信息") for _, r := range abort_reason { @@ -237,14 +229,8 @@ func (r *RedisClient) UpdateTopoRunningAgentList(uuids []string, updateonce bool func (r *RedisClient) ActiveHeartbeatDetection(uuids []string) { var wg sync.WaitGroup - if Global_redis == nil { - err := errors.New("redis client not init **errstack**1") - errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) - return - } - activeHeartbeatDetection := func(agent *agentmanager.Agent) { - url := "http://" + agent.IP + ":" + conf.Config().Topo.Agent_port + "/plugin/topology/api/health" + url := "http://" + agent.IP + ":" + conf.Global_Config.Topo.Agent_port + "/plugin/topology/api/health" if resp, err := httputils.Get(url, nil); err == nil && resp != nil && resp.StatusCode == 200 { type agentinfo struct { Interval int `json:"interval"` @@ -266,12 +252,12 @@ func (r *RedisClient) ActiveHeartbeatDetection(uuids []string) { key := "heartbeat-topoagent-" + agent.UUID value := meta.AgentHeartbeat{ UUID: agent.UUID, - Addr: agent.IP + ":" + conf.Config().Topo.Agent_port, + Addr: agent.IP + ":" + conf.Global_Config.Topo.Agent_port, HeartbeatInterval: resp_body.Data.Interval, Time: time.Now(), } - err = Global_redis.Set(key, value) + err = r.Set(key, value) if err != nil { err = errors.Wrap(err, " **errstack**2") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) @@ -280,20 +266,20 @@ func (r *RedisClient) ActiveHeartbeatDetection(uuids []string) { } } - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } if len(uuids) == 0 { - agentmanager.GlobalAgentManager.PAgentMap.Range(func(key, value interface{}) bool { + agentmanager.Global_AgentManager.PAgentMap.Range(func(key, value interface{}) bool { agent := value.(*agentmanager.Agent) wg.Add(1) go func(a *agentmanager.Agent) { defer wg.Done() - if ok, _ := utils.IsIPandPORTValid(a.IP, agentmanager.GlobalAgentManager.AgentPort); !ok { + if ok, _ := utils.IsIPandPORTValid(a.IP, agentmanager.Global_AgentManager.AgentPort); !ok { // err := errors.Errorf("%s:%s is unreachable (%s) %s **warn**1", a.IP, agentmanager.Topo.AgentPort, err.Error(), a.UUID) // err top // agentmanager.ErrorTransmit(agentmanager.Topo.Tctx, err, agentmanager.Topo.ErrCh, false) return @@ -314,12 +300,12 @@ func (r *RedisClient) ActiveHeartbeatDetection(uuids []string) { go func(_uuid string) { defer wg.Done() - agent := agentmanager.GlobalAgentManager.GetAgent_P(_uuid) + agent := agentmanager.Global_AgentManager.GetAgent_P(_uuid) if agent == nil { return } - if ok, _ := utils.IsIPandPORTValid(agent.IP, agentmanager.GlobalAgentManager.AgentPort); !ok { + if ok, _ := utils.IsIPandPORTValid(agent.IP, agentmanager.Global_AgentManager.AgentPort); !ok { // err := errors.Errorf("%s:%s is unreachable (%s) %s **warn**1", agent.IP, agentmanager.Topo.AgentPort, err.Error(), agent.UUID) // err top // agentmanager.ErrorTransmit(agentmanager.Topo.Tctx, err, agentmanager.Topo.ErrCh, false) return diff --git a/server/errormanager/errormanager.go b/server/errormanager/errormanager.go index f0690ee5de001305afc7af8ca1602b5f03c027b3..3c1a0dce7b7f376986ef33801fbc74a0a91353f2 100644 --- a/server/errormanager/errormanager.go +++ b/server/errormanager/errormanager.go @@ -11,7 +11,7 @@ import ( "github.com/pkg/errors" ) -var GlobalErrorManager *ErrorManager +var Global_ErrorManager *ErrorManager type ErrorManager struct { ErrCh chan *Topoerror @@ -20,19 +20,19 @@ type ErrorManager struct { } func InitErrorManager() { - GlobalErrorManager = &ErrorManager{ + Global_ErrorManager = &ErrorManager{ ErrCh: make(chan *Topoerror, 20), } - switch conf.Config().Logopts.Driver { + switch conf.Global_Config.Logopts.Driver { case "stdout": - GlobalErrorManager.Out = os.Stdout + Global_ErrorManager.Out = os.Stdout case "file": - logfile, err := os.OpenFile(conf.Global_config.Logopts.Path, os.O_WRONLY|os.O_CREATE, 0666) + logfile, err := os.OpenFile(conf.Global_Config.Logopts.Path, os.O_WRONLY|os.O_CREATE, 0666) if err != nil { panic(err) } - GlobalErrorManager.Out = logfile + Global_ErrorManager.Out = logfile } go func(ch <-chan *Topoerror) { @@ -55,10 +55,10 @@ func InitErrorManager() { case "warn": // 只打印最底层error的message,不展开错误链的调用栈 logger.Warn("%+v\n", strings.Split(errors.Cause(topoerr.Err).Error(), "**")[0]) case "errstack": // 打印错误链的调用栈 - fmt.Fprintf(GlobalErrorManager.Out, "%+v\n", topoerr.Err) + fmt.Fprintf(Global_ErrorManager.Out, "%+v\n", topoerr.Err) // errors.EORE(err) case "errstackfatal": // 打印错误链的调用栈,并结束程序 - fmt.Fprintf(GlobalErrorManager.Out, "%+v\n", topoerr.Err) + fmt.Fprintf(Global_ErrorManager.Out, "%+v\n", topoerr.Err) // errors.EORE(err) topoerr.Cancel() default: @@ -67,5 +67,5 @@ func InitErrorManager() { } } } - }(GlobalErrorManager.ErrCh) + }(Global_ErrorManager.ErrCh) } diff --git a/server/errormanager/meta.go b/server/errormanager/meta.go index 4acfd74b7cf1fa9e6915b45ca13ebe83a91aac8d..69fd6c80341337a5b3c86761dcbeb4b4842f8582 100644 --- a/server/errormanager/meta.go +++ b/server/errormanager/meta.go @@ -20,23 +20,23 @@ type Topoerror struct { @exit_after_print: 打印完错误链信息后是否结束主程序 */ func ErrorTransmit(ctx context.Context, err error, exit_after_print bool) { - if GlobalErrorManager == nil { + if Global_ErrorManager == nil { logger.Error("globalerrormanager is nil") os.Exit(1) } if exit_after_print { cctx, cancelF := context.WithCancel(ctx) - GlobalErrorManager.ErrCh <- &Topoerror{ + Global_ErrorManager.ErrCh <- &Topoerror{ Err: err, Cancel: cancelF, } <-cctx.Done() - close(GlobalErrorManager.ErrCh) + close(Global_ErrorManager.ErrCh) os.Exit(1) } - GlobalErrorManager.ErrCh <- &Topoerror{ + Global_ErrorManager.ErrCh <- &Topoerror{ Err: err, } } diff --git a/server/handler/basicHandler.go b/server/handler/basicHandler.go index e2692478588e07f4b4aa99aa67e5129ede52822d..0bf36f07935d566ff831b989d74c797801317e32 100644 --- a/server/handler/basicHandler.go +++ b/server/handler/basicHandler.go @@ -8,9 +8,9 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/dao" + "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/meta" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" "github.com/pkg/errors" @@ -31,8 +31,8 @@ func HeartbeatHandle(ctx *gin.Context) { Time: time.Now(), } - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top ctx.JSON(http.StatusInternalServerError, gin.H{ "code": -1, "error": err.Error(), @@ -42,19 +42,29 @@ func HeartbeatHandle(ctx *gin.Context) { return } - if agentmanager.GlobalAgentManager.GetAgent_P(uuid) == nil { + if agentmanager.Global_AgentManager.GetAgent_P(uuid) == nil { err := errors.Errorf("unknown agent's heartbeat: %s, %s **warn**1", uuid, addr) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) + ctx.JSON(http.StatusUnauthorized, gin.H{ + "code": -1, + "error": err.Error(), + "data": nil, + }) + return + } + if dao.Global_Redis == nil { + err := errors.New("Global_Redis is nil **errstackfatal**0") // err top ctx.JSON(http.StatusUnauthorized, gin.H{ "code": -1, - "error": fmt.Sprintf("%+v", fmt.Errorf("unknown agent's heartbeat: %s, %s", uuid, addr)), + "error": err.Error(), "data": nil, }) + errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - err := dao.Global_redis.Set(key, value) + err := dao.Global_Redis.Set(key, value) if err != nil { err = errors.Wrap(err, " **errstack**2") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) @@ -90,14 +100,14 @@ func TimestampsHandle(ctx *gin.Context) { func AgentListHandle(ctx *gin.Context) { agentmap := make(map[string]string) - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top response.Fail(ctx, nil, err.Error()) errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - agentmanager.GlobalAgentManager.TAgentMap.Range(func(key, value interface{}) bool { + agentmanager.Global_AgentManager.TAgentMap.Range(func(key, value interface{}) bool { agent := value.(*agentmanager.Agent) if agent.Host_2 != nil { agentmap[agent.UUID] = agent.IP + ":" + agent.Port @@ -112,14 +122,14 @@ func AgentListHandle(ctx *gin.Context) { } func BatchListHandle(ctx *gin.Context) { - if pluginclient.GlobalClient == nil { - err := errors.New("globalclient is nil **errstackfatal**2") // err top + if pluginclient.Global_Client == nil { + err := errors.New("Global_Client is nil **errstackfatal**2") // err top response.Fail(ctx, nil, err.Error()) errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - - batchlist, err := pluginclient.GlobalClient.BatchList() + + batchlist, err := pluginclient.Global_Client.BatchList() if err != nil { err = errors.Errorf("%+v **errstack**2", err.Error()) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) @@ -139,14 +149,14 @@ func BatchMachineListHandle(ctx *gin.Context) { return } - if pluginclient.GlobalClient == nil { - err := errors.New("globalclient is nil **errstackfatal**2") // err top + if pluginclient.Global_Client == nil { + err := errors.New("Global_Client is nil **errstackfatal**2") // err top response.Fail(ctx, nil, err.Error()) errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - machine_uuids, err := pluginclient.GlobalClient.BatchUUIDList(BatchId) + machine_uuids, err := pluginclient.Global_Client.BatchUUIDList(BatchId) if err != nil { err = errors.Errorf("%+v **errstack**2", err.Error()) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, false) @@ -154,14 +164,14 @@ func BatchMachineListHandle(ctx *gin.Context) { return } - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top response.Fail(ctx, nil, err.Error()) errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - agentmanager.GlobalAgentManager.PAgentMap.Range(func(key, value interface{}) bool { + agentmanager.Global_AgentManager.PAgentMap.Range(func(key, value interface{}) bool { uuid := key.(string) agent := value.(*agentmanager.Agent) for _, _uuid := range machine_uuids { diff --git a/server/handler/router.go b/server/handler/router.go index bfce5fbe29f7c6bdd4787564a27e78085bf7f35a..c563ef5375818a6130b1b0a75b4890acb9bd7c77 100644 --- a/server/handler/router.go +++ b/server/handler/router.go @@ -15,20 +15,20 @@ import ( ) func InitWebServer() { - if pluginclient.GlobalClient == nil { - err := errors.New("globalclient is nil **errstackfatal**2") // err top + if pluginclient.Global_Client == nil { + err := errors.New("Global_Client is nil **errstackfatal**2") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - + go func() { engine := gin.Default() gin.SetMode(gin.ReleaseMode) - pluginclient.GlobalClient.RegisterHandlers(engine) + pluginclient.Global_Client.RegisterHandlers(engine) InitRouter(engine) StaticRouter(engine) - err := engine.Run(conf.Config().Topo.Server_addr) + err := engine.Run(conf.Global_Config.Topo.Server_addr) if err != nil { err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) diff --git a/server/logger/logger.go b/server/logger/logger.go new file mode 100644 index 0000000000000000000000000000000000000000..0093f2beafc7e53a07fef0c6bcacba70dce1e290 --- /dev/null +++ b/server/logger/logger.go @@ -0,0 +1,17 @@ +package logger + +import ( + "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" + "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" + "gitee.com/openeuler/PilotGo/sdk/logger" + "github.com/pkg/errors" +) + +func InitLogger() { + err := logger.Init(conf.Global_Config.Logopts) + if err != nil { + err = errors.Errorf("%s **errstackfatal**2", err.Error()) // err top + errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) + } +} diff --git a/server/main.go b/server/main.go index 09de7196194c5295b5a0f52670b2c5aad25c344c..def1b8ec32b65644862cb6f52b2ba6c5fb88a180 100755 --- a/server/main.go +++ b/server/main.go @@ -4,12 +4,15 @@ import ( "fmt" "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" "gitee.com/openeuler/PilotGo-plugin-topology-server/dao" + "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/handler" + "gitee.com/openeuler/PilotGo-plugin-topology-server/logger" "gitee.com/openeuler/PilotGo-plugin-topology-server/meta" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" service "gitee.com/openeuler/PilotGo-plugin-topology-server/service/background" + "gitee.com/openeuler/PilotGo-plugin-topology-server/utils" // "github.com/pyroscope-io/pyroscope/pkg/agent/profiler" ) @@ -24,7 +27,7 @@ func main() { /* init config */ - agentmanager.Topo.InitConfig() + conf.InitConfig() /* init plugin client @@ -49,12 +52,12 @@ func main() { /* init logger */ - agentmanager.Topo.InitLogger() + logger.InitLogger() /* init machine agent list */ - agentmanager.GlobalAgentManager.InitMachineList() + agentmanager.Global_AgentManager.InitMachineList() /* init database @@ -70,5 +73,5 @@ func main() { /* 终止进程信号监听 */ - agentmanager.Topo.SignalMonitoring(dao.Global_Neo4j.Driver, dao.Global_redis.Client) + utils.SignalMonitoring(dao.Global_Neo4j.Driver, dao.Global_Redis.Client) } diff --git a/server/pluginclient/meta.go b/server/pluginclient/meta.go new file mode 100644 index 0000000000000000000000000000000000000000..5ad6b3680fa0a010f60242d36854f96817a3f1c9 --- /dev/null +++ b/server/pluginclient/meta.go @@ -0,0 +1,15 @@ +package pluginclient + +import "gitee.com/openeuler/PilotGo/sdk/plugin/client" + +const Version = "1.0.1" + +var PluginInfo = &client.PluginInfo{ + Name: "topology", + Version: Version, + Description: "System application architecture detection.", + Author: "wangjunqi", + Email: "wangjunqi@kylinos.cn", + Url: "http://10.1.10.131:9991", + PluginType: "micro-app", +} diff --git a/server/pluginclient/pluginClient.go b/server/pluginclient/pluginClient.go index 3f6d9b2865f8e81f2010848bb4dccec6016d9090..7f8603722eb204d253c60e0259562c4a42ebeedd 100644 --- a/server/pluginclient/pluginClient.go +++ b/server/pluginclient/pluginClient.go @@ -8,25 +8,13 @@ import ( "gitee.com/openeuler/PilotGo/sdk/plugin/client" ) -var GlobalClient *client.Client +var Global_Client *client.Client var GlobalContext context.Context -const Version = "1.0.1" - -var PluginInfo = &client.PluginInfo{ - Name: "topology", - Version: Version, - Description: "System application architecture detection.", - Author: "wangjunqi", - Email: "wangjunqi@kylinos.cn", - Url: "http://10.1.10.131:9991", - PluginType: "micro-app", -} - func InitPluginClient() { - PluginInfo.Url = "http://" + conf.Config().Topo.Server_addr - GlobalClient = client.DefaultClient(PluginInfo) + PluginInfo.Url = "http://" + conf.Global_Config.Topo.Server_addr + Global_Client = client.DefaultClient(PluginInfo) // 注册插件扩展点 var ex []common.Extention @@ -43,7 +31,7 @@ func InitPluginClient() { Permission: "plugin.topology.page/menu", } ex = append(ex, pe1, pe2) - GlobalClient.RegisterExtention(ex) + Global_Client.RegisterExtention(ex) GlobalContext = context.Background() } diff --git a/server/processor/processor.go b/server/processor/processor.go index 63e1b3f351179dbccdda31434813349e5a97e79a..5bfc38e4fbf9774c79ddc832f1934bc11d93f774 100755 --- a/server/processor/processor.go +++ b/server/processor/processor.go @@ -68,13 +68,13 @@ func (d *DataProcesser) ProcessData(agentnum int, tagrules []meta.Tag_rule, node } }(cancel1) - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return nil, nil, nil, nil } - agentmanager.GlobalAgentManager.TAgentMap.Range( + agentmanager.Global_AgentManager.TAgentMap.Range( func(key, value interface{}) bool { wg.Add(1) diff --git a/server/service/background/initdb.go b/server/service/background/initdb.go index b47d79f959d5a2a28dbbbad15b4919bd634eee11..0e7507059b34558929e45897bc4b0ef4bcb28dae 100644 --- a/server/service/background/initdb.go +++ b/server/service/background/initdb.go @@ -7,8 +7,8 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/conf" "gitee.com/openeuler/PilotGo-plugin-topology-server/dao" - "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" + "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" "gitee.com/openeuler/PilotGo/sdk/logger" ) @@ -19,19 +19,19 @@ func InitDB() { initMysql() - go ClearGraphData(conf.Config().Topo.Retention) + go ClearGraphData(conf.Global_Config.Topo.Retention) } // 初始化图数据库 func initGraphDB() { - switch conf.Global_config.Topo.GraphDB { + switch conf.Global_Config.Topo.GraphDB { case "neo4j": - dao.Global_Neo4j = dao.Neo4jInit(conf.Global_config.Neo4j.Addr, conf.Global_config.Neo4j.Username, conf.Global_config.Neo4j.Password, conf.Global_config.Neo4j.DB) + dao.Global_Neo4j = dao.Neo4jInit(conf.Global_Config.Neo4j.Addr, conf.Global_Config.Neo4j.Username, conf.Global_Config.Neo4j.Password, conf.Global_Config.Neo4j.DB) dao.Global_GraphDB = dao.Global_Neo4j case "otherDB": default: - err := errors.Errorf("unknown database in topo_server.yaml: %s **errstackfatal**4", conf.Global_config.Topo.GraphDB) // err top + err := errors.Errorf("unknown database in topo_server.yaml: %s **errstackfatal**4", conf.Global_Config.Topo.GraphDB) // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) } @@ -44,8 +44,8 @@ func initGraphDB() { // 初始化redis func initRedis() { - dao.Global_redis = dao.RedisInit(conf.Config().Redis.Addr, conf.Config().Redis.Password, conf.Config().Redis.DB, conf.Config().Redis.DialTimeout) - if dao.Global_redis != nil { + dao.Global_Redis = dao.RedisInit(conf.Global_Config.Redis.Addr, conf.Global_Config.Redis.Password, conf.Global_Config.Redis.DB, conf.Global_Config.Redis.DialTimeout) + if dao.Global_Redis != nil { logger.Debug("redis database initialization successful") } else { logger.Error("redis database initialization failed") @@ -53,8 +53,8 @@ func initRedis() { } func initMysql() { - dao.Global_mysql = dao.MysqldbInit(conf.Config().Mysql) - if dao.Global_mysql != nil { + dao.Global_Mysql = dao.MysqldbInit(conf.Global_Config.Mysql) + if dao.Global_Mysql != nil { logger.Debug("mysql database initialization successful") } else { logger.Error("mysql database initialization failed") @@ -66,9 +66,9 @@ func ClearGraphData(retention int64) { for { current := time.Now() - clear, err := time.Parse("15:04:05", conf.Config().Topo.Cleartime) + clear, err := time.Parse("15:04:05", conf.Global_Config.Topo.Cleartime) if err != nil { - logger.Error("ClearGraphData time parse error: %s, %s", err.Error(), conf.Config().Topo.Cleartime) + logger.Error("ClearGraphData time parse error: %s, %s", err.Error(), conf.Global_Config.Topo.Cleartime) } next := time.Date(current.Year(), current.Month(), current.Day()+1, clear.Hour(), clear.Minute(), clear.Second(), 0, current.Location()) diff --git a/server/service/background/periodcollect.go b/server/service/background/periodcollect.go index 45986c546b41e08b13cba381710f429055b58aa1..158b15b0a743ea111964d54fc7ce4f468118f8e4 100644 --- a/server/service/background/periodcollect.go +++ b/server/service/background/periodcollect.go @@ -20,20 +20,26 @@ import ( ) func PeriodCollectWorking(batch []string, noderules [][]meta.Filter_rule) { - graphperiod := conf.Global_config.Topo.Period + graphperiod := conf.Global_Config.Topo.Period - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return } - agentmanager.GlobalAgentManager.UpdateMachineList() + if dao.Global_Redis == nil { + err := errors.New("Global_Redis is nil **errstackfatal**1") // err top + errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) + return + } + + agentmanager.Global_AgentManager.UpdateMachineList() go func(_interval int64, _gdb dao.GraphdbIface, _noderules [][]meta.Filter_rule) { for { - dao.Global_redis.ActiveHeartbeatDetection(batch) - running_agent_num := dao.Global_redis.UpdateTopoRunningAgentList(batch, false) + dao.Global_Redis.ActiveHeartbeatDetection(batch) + running_agent_num := dao.Global_Redis.UpdateTopoRunningAgentList(batch, false) unixtime_now := time.Now().Unix() DataProcessWorking(unixtime_now, running_agent_num, _gdb, nil, _noderules) time.Sleep(time.Duration(_interval) * time.Second) diff --git a/server/service/customTopo.go b/server/service/customTopo.go index 5c1cb334f7aab0c94f30398693926398c9705016..10c836eda55992dd4140ca7a668dffb84891488b 100755 --- a/server/service/customTopo.go +++ b/server/service/customTopo.go @@ -6,47 +6,54 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/dao" + "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" "gitee.com/openeuler/PilotGo-plugin-topology-server/meta" "gitee.com/openeuler/PilotGo-plugin-topology-server/pluginclient" - "gitee.com/openeuler/PilotGo-plugin-topology-server/errormanager" back "gitee.com/openeuler/PilotGo-plugin-topology-server/service/background" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/pkg/errors" ) func RunCustomTopoService(tcid uint) ([]*meta.Node, []*meta.Edge, []map[string]string, error) { - tcdb, err := dao.Global_mysql.QuerySingleTopoConfiguration(tcid) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "**2") + if pluginclient.Global_Client == nil { + err := errors.New("Global_Client is nil **errstackfatal**2") // err top + errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) + return nil, nil, nil, err } - tc, err := dao.Global_mysql.DBToTopoConfiguration(tcdb) - if err != nil { - return nil, nil, nil, errors.Wrap(err, "**2") + if agentmanager.Global_AgentManager == nil { + err := errors.New("Global_AgentManager is nil **errstackfatal**0") // err top + errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) + return nil, nil, nil, err } - if pluginclient.GlobalClient == nil { - err := errors.New("globalclient is nil **errstackfatal**2") // err top + if dao.Global_Redis == nil { + err := errors.New("global_redis is nil **errstackfatal**1") // err top errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) return nil, nil, nil, err } - machine_uuids, err := pluginclient.GlobalClient.BatchUUIDList(strconv.Itoa(int(tc.BatchId))) + tcdb, err := dao.Global_Mysql.QuerySingleTopoConfiguration(tcid) if err != nil { return nil, nil, nil, errors.Wrap(err, "**2") } - // ctxv := context.WithValue(agentmanager.Topo.Tctx, "custom_name", "pilotgo-topo") + tc, err := dao.Global_Mysql.DBToTopoConfiguration(tcdb) + if err != nil { + return nil, nil, nil, errors.Wrap(err, "**2") + } - if agentmanager.GlobalAgentManager == nil { - err := errors.New("globalagentmanager is nil **errstackfatal**0") // err top - errormanager.ErrorTransmit(pluginclient.GlobalContext, err, true) - return nil, nil, nil, err + machine_uuids, err := pluginclient.Global_Client.BatchUUIDList(strconv.Itoa(int(tc.BatchId))) + if err != nil { + return nil, nil, nil, errors.Wrap(err, "**2") } - agentmanager.GlobalAgentManager.UpdateMachineList() - dao.Global_redis.ActiveHeartbeatDetection(machine_uuids) - running_agent_num := dao.Global_redis.UpdateTopoRunningAgentList(machine_uuids, true) + // ctxv := context.WithValue(agentmanager.Topo.Tctx, "custom_name", "pilotgo-topo") + + agentmanager.Global_AgentManager.UpdateMachineList() + + dao.Global_Redis.ActiveHeartbeatDetection(machine_uuids) + running_agent_num := dao.Global_Redis.UpdateTopoRunningAgentList(machine_uuids, true) if running_agent_num == 0 { return nil, nil, nil, errors.Errorf("no running agent for custom id %d **errstack**2", tc.ID) } else if running_agent_num == -1 { @@ -65,13 +72,13 @@ func RunCustomTopoService(tcid uint) ([]*meta.Node, []*meta.Edge, []map[string]s func CustomTopoListService(query *response.PaginationQ) ([]*meta.Topo_configuration, int, error) { tcs := make([]*meta.Topo_configuration, 0) - tcdbs, total, err := dao.Global_mysql.QueryTopoConfigurationList(query) + tcdbs, total, err := dao.Global_Mysql.QueryTopoConfigurationList(query) if err != nil { return nil, 0, errors.Wrap(err, "**2") } for _, tcdb := range tcdbs { - tc, err := dao.Global_mysql.DBToTopoConfiguration(tcdb) + tc, err := dao.Global_Mysql.DBToTopoConfiguration(tcdb) if err != nil { return nil, 0, errors.Wrap(err, "**2") } @@ -83,14 +90,14 @@ func CustomTopoListService(query *response.PaginationQ) ([]*meta.Topo_configurat } func CreateCustomTopoService(topoconfig *meta.Topo_configuration) (int, error) { - tcdb, err := dao.Global_mysql.TopoConfigurationToDB(topoconfig) + tcdb, err := dao.Global_Mysql.TopoConfigurationToDB(topoconfig) if err != nil { return -1, errors.Wrap(err, "**2") } tcdb.CreatedAt = time.Now().Format("2006-01-02 15:04:05") tcdb.UpdatedAt = time.Now().Format("2006-01-02 15:04:05") - tcdb_id, err := dao.Global_mysql.AddTopoConfiguration(tcdb) + tcdb_id, err := dao.Global_Mysql.AddTopoConfiguration(tcdb) if err != nil { return -1, errors.Wrap(err, "**2") } @@ -99,12 +106,12 @@ func CreateCustomTopoService(topoconfig *meta.Topo_configuration) (int, error) { } func UpdateCustomTopoService(tc *meta.Topo_configuration, tcdb_id_old uint) (int, error) { - tcdb, err := dao.Global_mysql.TopoConfigurationToDB(tc) + tcdb, err := dao.Global_Mysql.TopoConfigurationToDB(tc) if err != nil { return -1, errors.Wrap(err, "**2") } - tcdb_old, err := dao.Global_mysql.QuerySingleTopoConfiguration(tcdb_id_old) + tcdb_old, err := dao.Global_Mysql.QuerySingleTopoConfiguration(tcdb_id_old) if err != nil { return -1, errors.Wrap(err, "**2") } @@ -118,7 +125,7 @@ func UpdateCustomTopoService(tc *meta.Topo_configuration, tcdb_id_old uint) (int tcdb_old.Version = tcdb.Version tcdb_old.Preserve = tcdb.Preserve - tcdb_old_id, err := dao.Global_mysql.AddTopoConfiguration(tcdb_old) + tcdb_old_id, err := dao.Global_Mysql.AddTopoConfiguration(tcdb_old) if err != nil { return -1, errors.Wrap(err, "**2") } @@ -128,7 +135,7 @@ func UpdateCustomTopoService(tc *meta.Topo_configuration, tcdb_id_old uint) (int func DeleteCustomTopoService(ids []uint) error { for _, tcid := range ids { - if err := dao.Global_mysql.DeleteTopoConfiguration(tcid); err != nil { + if err := dao.Global_Mysql.DeleteTopoConfiguration(tcid); err != nil { return errors.Wrap(err, "**errstack**2") } } diff --git a/server/utils/signalMonitor.go b/server/utils/signalMonitor.go new file mode 100644 index 0000000000000000000000000000000000000000..754987ca1d3861f6c0b09b65615ab432b46f010d --- /dev/null +++ b/server/utils/signalMonitor.go @@ -0,0 +1,30 @@ +package utils + +import ( + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/go-redis/redis/v8" + "github.com/neo4j/neo4j-go-driver/v4/neo4j" + "gitee.com/openeuler/PilotGo/sdk/logger" +) + +func SignalMonitoring(neo4jclient neo4j.Driver, redisclient redis.Client) { + ch := make(chan os.Signal, 1) + signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + for s := range ch { + switch s { + case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT: + neo4jclient.Close() + fmt.Println() + logger.Info("close the connection to neo4j\n") + redisclient.Close() + logger.Info("close the connection to redis\n") + os.Exit(1) + default: + logger.Warn("unknown signal-> %s\n", s.String()) + } + } +}