diff --git a/server/agentmanager/topoclient.go b/server/agentmanager/topoclient.go index 0147c7b77ae7012152ba82b2f4d4476772f28b28..71c48c8a386e38d3455e18d6f71068c23a06aa56 100644 --- a/server/agentmanager/topoclient.go +++ b/server/agentmanager/topoclient.go @@ -23,7 +23,8 @@ var Topo *Topoclient type Topoclient struct { Sdkmethod *client.Client AgentMap sync.Map - ErrGroup *sync.WaitGroup + Errmu sync.Locker + ErrCond *sync.Cond ErrCh chan error Out io.Writer } @@ -35,18 +36,20 @@ func (t *Topoclient) InitMachineList() { if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } statuscode := resp.StatusCode if statuscode != 200 { - err = errors.Errorf("http返回状态码异常: %d; %s **fatal**2", statuscode, url) // err top + err := errors.Errorf("http返回状态码异常: %d, %s **fatal**2", statuscode, url) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } @@ -60,8 +63,9 @@ func (t *Topoclient) InitMachineList() { if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } @@ -81,8 +85,9 @@ func (t *Topoclient) UpdateMachineList() { if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } @@ -91,8 +96,9 @@ func (t *Topoclient) UpdateMachineList() { if statuscode != 200 { err = errors.Errorf("http返回状态码异常: %d **fatal**2", statuscode) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } @@ -106,8 +112,9 @@ func (t *Topoclient) UpdateMachineList() { if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } @@ -140,25 +147,29 @@ func (t *Topoclient) InitLogger() { if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top t.ErrCh <- err - t.ErrGroup.Add(1) - t.ErrGroup.Wait() + t.Errmu.Lock() + t.ErrCond.Wait() + t.Errmu.Unlock() close(t.ErrCh) os.Exit(1) } } func (t *Topoclient) InitPluginClient() { + var errcondmu sync.Mutex PluginInfo.Url = "http://" + conf.Config().Topo.Server_addr + "/plugin/topology" PluginClient := client.DefaultClient(PluginInfo) PluginClient.Server = "http://" + conf.Config().PilotGo.Addr + Topo = &Topoclient{ Sdkmethod: PluginClient, - ErrGroup: &sync.WaitGroup{}, + Errmu: &errcondmu, + ErrCond: sync.NewCond(&errcondmu), ErrCh: make(chan error, 10), } } -func (t *Topoclient) InitErrorControl(errch <-chan error, errgroup *sync.WaitGroup) { +func (t *Topoclient) InitErrorControl(errch <-chan error, emu sync.Locker, econd *sync.Cond) { switch conf.Global_config.Logopts.Driver { case "stdout": t.Out = os.Stdout @@ -170,9 +181,9 @@ func (t *Topoclient) InitErrorControl(errch <-chan error, errgroup *sync.WaitGro t.Out = logfile } - go func(ch <-chan error, group *sync.WaitGroup) { + go func(ch <-chan error, mu sync.Locker, ec *sync.Cond) { for { - err, ok := <-errch + err, ok := <-ch if !ok { break } @@ -184,16 +195,18 @@ func (t *Topoclient) InitErrorControl(errch <-chan error, errgroup *sync.WaitGro fmt.Fprintf(t.Out, "%+v\n", err) // errors.EORE(err) case "fatal": + mu.Lock() fmt.Fprintf(t.Out, "%+v\n", err) // errors.EORE(err) - errgroup.Done() + ec.Broadcast() + mu.Unlock() default: fmt.Printf("only support warn and fatal error type: %+v\n", err) os.Exit(1) } } } - }(errch, errgroup) + }(errch, emu, econd) } func (t *Topoclient) InitConfig() { diff --git a/server/collector/collector.go b/server/collector/collector.go index 5c9f848bdd823d5ba18e95e7266861da23e52857..8796a7291380eac3c0685e51967e143b55a37889 100644 --- a/server/collector/collector.go +++ b/server/collector/collector.go @@ -60,7 +60,11 @@ func (d *DataCollector) GetCollectDataFromTopoAgent(agent *agentmanager.Agent_m) resp, err := httputils.Get(url, nil) if err != nil { - return errors.Errorf("%s**2", err.Error()) + return errors.Errorf("%v, %s, %s **2", resp.StatusCode, url, err.Error()) + } + + if statuscode := resp.StatusCode; statuscode != 200 { + return errors.Errorf("%v, %s **2", resp.StatusCode, url) } results := &struct { @@ -71,12 +75,7 @@ func (d *DataCollector) GetCollectDataFromTopoAgent(agent *agentmanager.Agent_m) err = json.Unmarshal(resp.Body, &results) if err != nil { - return errors.Errorf("%s**2", err.Error()) - } - - statuscode := results.Code - if statuscode != 0 { - return errors.Errorf("%s**2", results.Error) + return errors.Errorf("%s **2", err.Error()) } collectdata := &struct { diff --git a/server/dao/init.go b/server/dao/init.go index 40653ccd9bdcac8fe93e79861d167abf97c343c6..3057eae268c9a8d37b1c46d867efb79f9fe07924 100755 --- a/server/dao/init.go +++ b/server/dao/init.go @@ -42,8 +42,9 @@ func InitDB() { default: err := errors.Errorf("unknown database in config_server.yaml: %s **fatal**4", conf.Global_config.Topo.Database) // err top agentmanager.Topo.ErrCh <- err - agentmanager.Topo.ErrGroup.Add(1) - agentmanager.Topo.ErrGroup.Wait() + agentmanager.Topo.Errmu.Lock() + agentmanager.Topo.ErrCond.Wait() + agentmanager.Topo.Errmu.Unlock() close(agentmanager.Topo.ErrCh) os.Exit(1) } diff --git a/server/dao/neo4j.go b/server/dao/neo4j.go index 30707504e6f39b8abaf6d9c2df2fdaf3393036cd..e9a3ca8798bc296b686400b7ed6c0c7c020a325b 100644 --- a/server/dao/neo4j.go +++ b/server/dao/neo4j.go @@ -137,8 +137,9 @@ func PeriodProcessNeo4j(unixtime int64, agentnum int) { if err != nil { err := errors.Errorf("create neo4j driver failed: %s **fatal**2", err.Error()) // err top agentmanager.Topo.ErrCh <- err - agentmanager.Topo.ErrGroup.Add(1) - agentmanager.Topo.ErrGroup.Wait() + agentmanager.Topo.Errmu.Lock() + agentmanager.Topo.ErrCond.Wait() + agentmanager.Topo.Errmu.Unlock() close(agentmanager.Topo.ErrCh) os.Exit(1) } diff --git a/server/handler/router.go b/server/handler/router.go index 936ab48b7582f0325af870d7ba9471f488ce67bd..cff117a0600ff404b7f899e2dccdb4c6ed02a268 100644 --- a/server/handler/router.go +++ b/server/handler/router.go @@ -25,8 +25,9 @@ func InitWebServer() { if err != nil { err = errors.Errorf("%s **fatal**2", err.Error()) // err top agentmanager.Topo.ErrCh <- err - agentmanager.Topo.ErrGroup.Add(1) - agentmanager.Topo.ErrGroup.Wait() + agentmanager.Topo.Errmu.Lock() + agentmanager.Topo.ErrCond.Wait() + agentmanager.Topo.Errmu.Unlock() close(agentmanager.Topo.ErrCh) os.Exit(1) } diff --git a/server/main.go b/server/main.go index cf49d6cc070ee23235a1c8ca91fb31869fdab289..d42f721efd1c75c52ffb152165b893059afda153 100644 --- a/server/main.go +++ b/server/main.go @@ -24,7 +24,7 @@ func main() { /* init error control */ - agentmanager.Topo.InitErrorControl(agentmanager.Topo.ErrCh, agentmanager.Topo.ErrGroup) + agentmanager.Topo.InitErrorControl(agentmanager.Topo.ErrCh, agentmanager.Topo.Errmu, agentmanager.Topo.ErrCond) /* init logger