diff --git a/topology/agent/collector/psutil_collector.go b/topology/agent/collector/psutil_collector.go index fc402c664aba9fe9e9bb793681b379034124cb4c..5b5580796737718619c03420001b84261d7cb766 100644 --- a/topology/agent/collector/psutil_collector.go +++ b/topology/agent/collector/psutil_collector.go @@ -8,6 +8,7 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-agent/utils" "gitee.com/openeuler/PilotGo-plugins/sdk/logger" + "github.com/shirou/gopsutil/disk" "github.com/shirou/gopsutil/process" "github.com/shirou/gopsutil/v3/host" "github.com/shirou/gopsutil/v3/net" @@ -18,6 +19,11 @@ type PsutilCollector struct { Processes_1 []*utils.Process Netconnections_1 []*utils.Netconnection AddrInterfaceMap_1 map[string][]string + Disks_1 []*utils.Disk +} + +func CreatePsutilCollector() *PsutilCollector { + return &PsutilCollector{} } func (pc *PsutilCollector) Collect_host_data() error { @@ -247,4 +253,46 @@ func (pc *PsutilCollector) Collect_addrInterfaceMap_data() error { pc.AddrInterfaceMap_1 = addrinterfacemap return nil -} \ No newline at end of file +} + +func (pc *PsutilCollector) Collect_disk_data() error { + partitions, err := disk.Partitions(false) + if err != nil { + pro_c, filepath, line, ok := runtime.Caller(0) + if ok { + logger.Error("file: %s, line: %d, func: %s, err: %s\n", filepath, line-2, runtime.FuncForPC(pro_c).Name(), err.Error()) + } + return fmt.Errorf("file: %s, line: %d, func: %s, err: %s", filepath, line-2, runtime.FuncForPC(pro_c).Name(), err.Error()) + } + + for _, partition := range partitions { + disk_entity := &utils.Disk{} + disk_entity.Partition = partition + + iocounter, err := disk.IOCounters([]string{disk_entity.Partition.Device}...) + if err != nil { + pro_c, filepath, line, ok := runtime.Caller(0) + if ok { + logger.Error("file: %s, line: %d, func: %s, err: %s\n", filepath, line-2, runtime.FuncForPC(pro_c).Name(), err.Error()) + } + return fmt.Errorf("file: %s, line: %d, func: %s, err: %s", filepath, line-2, runtime.FuncForPC(pro_c).Name(), err.Error()) + } + + disk_entity.IOcounter = iocounter[partition.Device] + + usage, err := disk.Usage(partition.Mountpoint) + if err != nil { + pro_c, filepath, line, ok := runtime.Caller(0) + if ok { + logger.Error("file: %s, line: %d, func: %s, err: %s\n", filepath, line-2, runtime.FuncForPC(pro_c).Name(), err.Error()) + } + return fmt.Errorf("file: %s, line: %d, func: %s, err: %s", filepath, line-2, runtime.FuncForPC(pro_c).Name(), err.Error()) + } + + disk_entity.Usage = *usage + + pc.Disks_1 = append(pc.Disks_1, disk_entity) + } + + return nil +} diff --git a/topology/agent/service/service.go b/topology/agent/service/service.go index 16a123fd54f783032dc4d28a7d895b26bc320624..bfc797eac7e9f1ace18a7f657e00e2cabe230a09 100644 --- a/topology/agent/service/service.go +++ b/topology/agent/service/service.go @@ -13,7 +13,7 @@ func DataCollectorService() (utils.Data_collector, error) { datasource := conf.Config().Topo.Datasource switch datasource { case "gopsutil": - gops := &collector.PsutilCollector{} + gops := collector.CreatePsutilCollector() err := gops.Collect_host_data() if err != nil { filepath, line, funcname := utils.CallerInfo(err) @@ -42,6 +42,13 @@ func DataCollectorService() (utils.Data_collector, error) { return nil, fmt.Errorf("file: %s, line: %d, func: %s, err -> %s", filepath, line, funcname, err.Error()) } + err = gops.Collect_disk_data() + if err != nil { + filepath, line, funcname := utils.CallerInfo(err) + logger.Error("file: %s, line: %d, func: %s, err: %s\n", filepath, line, funcname, err.Error()) + return nil, fmt.Errorf("file: %s, line: %d, func: %s, err -> %s", filepath, line, funcname, err.Error()) + } + return gops, nil case "ebpf": diff --git a/topology/agent/utils/client_data_meta.go b/topology/agent/utils/client_data_meta.go index 05675eaed4500281abce9762d432f8fe085a47e4..ae6ad37b9f743303d907f6108ad1eff213fafd41 100644 --- a/topology/agent/utils/client_data_meta.go +++ b/topology/agent/utils/client_data_meta.go @@ -3,12 +3,14 @@ package utils import ( "github.com/shirou/gopsutil/net" "github.com/shirou/gopsutil/process" + "github.com/shirou/gopsutil/disk" ) type Data_collector interface { Collect_process_instant_data() error Collect_host_data() error Collect_netconnection_all_data() error + Collect_addrInterfaceMap_data() error } type Host struct { @@ -92,8 +94,8 @@ type Netconnection struct { type NetIOcounters struct { } -type Resource struct { -} - -type Container struct { +type Disk struct { + Partition disk.PartitionStat `json:"partition"` + IOcounter disk.IOCountersStat `json:"iocounter"` + Usage disk.UsageStat `json:"usage"` } diff --git a/topology/server/agentmanager/mach_agent.go b/topology/server/agentmanager/mach_agent.go index d5b2d4fa9119508f9143d24a10dafa3fe66b3d8f..51b14088bfc4b2a1b62b3d108323a989bc6d8d38 100644 --- a/topology/server/agentmanager/mach_agent.go +++ b/topology/server/agentmanager/mach_agent.go @@ -19,6 +19,7 @@ type Agent_m struct { Processes_2 []*meta.Process `json:"processes"` Netconnections_2 []*meta.Netconnection `json:"netconnections"` AddrInterfaceMap_2 map[string][]string `json:"addrinterfacemap"` + Disks_2 []*meta.Disk `json:"disks"` } func (t *Topoclient) AddAgent(a *Agent_m) { diff --git a/topology/server/agentmanager/topoclient.go b/topology/server/agentmanager/topoclient.go index 06afd6c76add0ac928c5702e2045ce68096037f7..dbcf4bd1fe22dec8ea22c34ba074b93bd99fd7d0 100644 --- a/topology/server/agentmanager/topoclient.go +++ b/topology/server/agentmanager/topoclient.go @@ -26,14 +26,17 @@ func (t *Topoclient) InitMachineList() { resp, err := httputils.Get(url, nil) if err != nil { - fmt.Printf("%+v\n", errors.Errorf("%s**2", err.Error())) // err top + err = errors.Errorf("%s**2", err) + fmt.Printf("%+v\n", err) // err top + // errors.EORE(err) os.Exit(1) } statuscode := resp.StatusCode if statuscode != 200 { - fmt.Printf("%+v\n", errors.New("http返回状态码异常**2")) // err top - + err = errors.New("http返回状态码异常**2") + fmt.Printf("%+v\n", err) // err top + // errors.EORE(err) } result := &struct { @@ -43,7 +46,9 @@ func (t *Topoclient) InitMachineList() { err = json.Unmarshal(resp.Body, result) if err != nil { - fmt.Printf("%+v\n", errors.Errorf("%s**2", err.Error())) // err top + err = errors.Errorf("%s**2", err.Error()) + fmt.Printf("%+v\n", err) // err top + // errors.EORE(err) } for _, m := range result.Data.([]interface{}) { @@ -57,7 +62,9 @@ func (t *Topoclient) InitMachineList() { func (t *Topoclient) InitLogger() { err := logger.Init(conf.Config().Logopts) if err != nil { - fmt.Printf("%+v\n", errors.Errorf("%s**2", err.Error())) // err top + err = errors.Errorf("%s**2", err.Error()) + fmt.Printf("%+v\n", err) // err top + // errors.EORE(err) os.Exit(1) } } @@ -69,3 +76,7 @@ func (t *Topoclient) InitPluginClient() { Sdkmethod: PluginClient, } } + +func (t *Topoclient) InitArangodb() { + +} diff --git a/topology/server/collector/collector.go b/topology/server/collector/collector.go index f9af68e654113d5bd0c838a00d73d9bd84a662fb..f20745e072be70dbe72ebb4eb0c25c789dc734b7 100644 --- a/topology/server/collector/collector.go +++ b/topology/server/collector/collector.go @@ -84,6 +84,7 @@ func (d *DataCollector) GetCollectDataFromTopoAgent(agent *agentmanager.Agent_m) Processes_1 []*meta.Process Netconnections_1 []*meta.Netconnection AddrInterfaceMap_1 map[string][]string + Disks_1 []*meta.Disk }{} mapstructure.Decode(results.Data, collectdata) @@ -91,6 +92,7 @@ func (d *DataCollector) GetCollectDataFromTopoAgent(agent *agentmanager.Agent_m) agent.Processes_2 = collectdata.Processes_1 agent.Netconnections_2 = collectdata.Netconnections_1 agent.AddrInterfaceMap_2 = collectdata.AddrInterfaceMap_1 + agent.Disks_2 = collectdata.Disks_1 return nil } diff --git a/topology/server/go.sum b/topology/server/go.sum index 5f250b657e87636615d1440d4218b71231a606d6..b30d65d70ff7f648455cfbd61af6b147ee782409 100644 --- a/topology/server/go.sum +++ b/topology/server/go.sum @@ -55,6 +55,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ= github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shirou/gopsutil v3.21.11+incompatible h1:+1+c1VGhc88SSonWP6foOcLhvnKlUeu/erjjvaPEYiI= diff --git a/topology/server/handler/handler.go b/topology/server/handler/handler.go index 7e58724c05ec906257aa642783b37d8b37dbba3a..bce20ce1d5a0c4099031afc70ae459bafae732cc 100644 --- a/topology/server/handler/handler.go +++ b/topology/server/handler/handler.go @@ -10,7 +10,7 @@ import ( ) func SingleHostHandle(ctx *gin.Context) { - nodes, edges, collect_errlist, process_errlist := service.DataProcessService() + nodes, edges, collect_errlist, process_errlist := service.SingleHostService() if len(collect_errlist) != 0 || len(process_errlist) != 0 { for i, cerr := range collect_errlist { diff --git a/topology/server/main.go b/topology/server/main.go index 525b7d05fc1c3cc43d09edf3a6f8d6779d3697e3..dc0300277fed4552b5b768a74bd55d4d1d6e9304 100644 --- a/topology/server/main.go +++ b/topology/server/main.go @@ -24,6 +24,7 @@ func main() { init arangodb TODO: */ + agentmanager.Topo.InitArangodb() /* init machine agent list diff --git a/topology/server/meta/client_data_meta.go b/topology/server/meta/client_data_meta.go index 4ab5e15647379af5ee827f3d85070a0c75d4fe4f..3f08c6826b5ba73eb9a1fc605fd26cee65d36aeb 100644 --- a/topology/server/meta/client_data_meta.go +++ b/topology/server/meta/client_data_meta.go @@ -3,6 +3,7 @@ package meta import ( "github.com/shirou/gopsutil/net" "github.com/shirou/gopsutil/process" + "github.com/shirou/gopsutil/disk" ) type Host struct { @@ -86,8 +87,8 @@ type Netconnection struct { type NetIOcounters struct { } -type Resource struct { -} - -type Container struct { +type Disk struct { + Partition disk.PartitionStat `json:"partition"` + IOcounter disk.IOCountersStat `json:"iocounter"` + Usage disk.UsageStat `json:"usage"` } diff --git a/topology/server/meta/init.go b/topology/server/meta/init.go index aa08162fc0474449c068efb2c9ae2559aa37e3e2..1fbe213d357b15ff4f6252405120e1e899af8c1f 100644 --- a/topology/server/meta/init.go +++ b/topology/server/meta/init.go @@ -1,11 +1,12 @@ package meta const ( - NODE_HOST = "host" - NODE_PROCESS = "process" - NODE_THREAD = "thread" - NODE_NET = "net" - NODE_APP = "app" + NODE_HOST = "host" + NODE_PROCESS = "process" + NODE_THREAD = "thread" + NODE_NET = "net" + NODE_APP = "app" + NODE_RESOURCE = "resource" ) const ( diff --git a/topology/server/processor/processor.go b/topology/server/processor/processor.go index 631bd93fcd1ff43f0c8685b48c28959bbfff69ca..fbaf0d1f79464de59f5351d312873b023bde20e1 100644 --- a/topology/server/processor/processor.go +++ b/topology/server/processor/processor.go @@ -167,6 +167,20 @@ func (d *DataProcesser) Create_node_entities(agent *agentmanager.Agent_m, nodes mu.Unlock() } + for _, disk := range agent.Disks_2 { + disk_node := &meta.Node{ + ID: fmt.Sprintf("%s_%s_%s", agent.UUID, meta.NODE_RESOURCE, disk.Partition.Device), + Name: disk.Partition.Device, + Type: meta.NODE_RESOURCE, + UUID: agent.UUID, + Metrics: *utils.DiskToMap(disk), + } + + mu.Lock() + nodes.Add(disk_node) + mu.Unlock() + } + agent_node_count_rwlock.Lock() agent_node_count++ agent_node_count_rwlock.Unlock() @@ -187,12 +201,30 @@ func (d *DataProcesser) Create_edge_entities(agent *agentmanager.Agent_m, edges nodes_map[meta.NODE_THREAD] = append(nodes_map[meta.NODE_THREAD], node) case meta.NODE_NET: nodes_map[meta.NODE_NET] = append(nodes_map[meta.NODE_NET], node) + case meta.NODE_RESOURCE: + nodes_map[meta.NODE_RESOURCE] = append(nodes_map[meta.NODE_RESOURCE], node) } } for _, obj := range nodes_map[meta.NODE_HOST] { for _, sub := range nodes_map[meta.NODE_PROCESS] { - if sub.Metrics["Ppid"] == "1" && sub.UUID == obj.UUID { + if sub.Metrics["Pid"] == "1" && sub.UUID == obj.UUID { + belong_edge := &meta.Edge{ + ID: fmt.Sprintf("%s_%s_%s", sub.ID, meta.EDGE_BELONG, obj.ID), + Type: meta.EDGE_BELONG, + Src: sub.ID, + Dst: obj.ID, + Dir: true, + } + + edges.Add(belong_edge) + } + } + } + + for _, obj := range nodes_map[meta.NODE_HOST] { + for _, sub := range nodes_map[meta.NODE_RESOURCE] { + if sub.UUID == obj.UUID { belong_edge := &meta.Edge{ ID: fmt.Sprintf("%s_%s_%s", sub.ID, meta.EDGE_BELONG, obj.ID), Type: meta.EDGE_BELONG, diff --git a/topology/server/service/service.go b/topology/server/service/service.go index e3bcc0afcc0c6799536b04450b0cfa91a69cbe2b..2c3f0174950adb156b621dd1b6b4ab4203e4a223 100644 --- a/topology/server/service/service.go +++ b/topology/server/service/service.go @@ -6,20 +6,21 @@ import ( "github.com/pkg/errors" ) -func DataProcessService() ([]*meta.Node, []*meta.Edge, []error, []error) { +func SingleHostService() ([]*meta.Node, []*meta.Edge, []error, []error) { dataprocesser := processor.CreateDataProcesser() nodes, edges, collect_errlist, process_errlist := dataprocesser.Process_data() if len(collect_errlist) != 0 || len(process_errlist) != 0 { for i, cerr := range collect_errlist { - collect_errlist[i] = errors.Wrap(cerr, "**3") + collect_errlist[i] = errors.Wrap(cerr, "**4") } for i, perr := range process_errlist { - process_errlist[i] = errors.Wrap(perr, "**7") + process_errlist[i] = errors.Wrap(perr, "**8") } } + return nodes.Nodes, edges.Edges, collect_errlist, process_errlist // if len(collect_errlist) != 0 && len(process_errlist) != 0 { // for i, cerr := range collect_errlist { // collect_errlist[i] = errors.Wrap(cerr, "**3") @@ -43,6 +44,4 @@ func DataProcessService() ([]*meta.Node, []*meta.Edge, []error, []error) { // return nil, nil, nil, process_errlist // } - - return nodes.Nodes, edges.Edges, collect_errlist, process_errlist } diff --git a/topology/server/utils/structTomap.go b/topology/server/utils/structTomap.go index 3c3510b7c0d10088000a179e083e0b124ade3c47..8aa0da387db26713d5cbd6ce2906a42ffeaeb69c 100644 --- a/topology/server/utils/structTomap.go +++ b/topology/server/utils/structTomap.go @@ -32,6 +32,9 @@ func StructToMap(obj interface{}) map[string]string { case reflect.Uint64: fieldvalue_uint64 := fieldValue.Interface().(uint64) m[field.Name] = strconv.Itoa(int(fieldvalue_uint64)) + case reflect.Float64: + fieldvalue_float64 := fieldValue.Interface().(float64) + m[field.Name] = strconv.FormatFloat(fieldvalue_float64, 'f', -1, 64) } } @@ -162,3 +165,28 @@ func NetToMap(net *meta.Netconnection) *map[string]string { // "Fifoout": strconv.Itoa(int(net.Fifoout)), // } // } + +func DiskToMap(disk *meta.Disk) *map[string]string { + disk_map := make(map[string]string) + partition_map := StructToMap(disk.Partition) + iocounter_map := StructToMap(disk.IOcounter) + usage_map := StructToMap(disk.Usage) + + for k, v := range partition_map { + disk_map[k] = v + } + + for k, v := range iocounter_map { + if k != "Name" { + disk_map[k] = v + } + } + + for k, v := range usage_map { + if k != "Path" && k != "Fstype" { + disk_map[k] = v + } + } + + return &disk_map +}