From f1f2c00162e9ae499590918de440c8f423a32512 Mon Sep 17 00:00:00 2001 From: Wangjunqi123 Date: Mon, 8 Apr 2024 14:34:33 +0800 Subject: [PATCH] add matching to container process --- agent/handler/httphandler.go | 14 +- agent/handler/router.go | 3 +- agent/service/container/container.go | 49 +++++++ agent/service/container/init.go | 7 + .../service/{service.go => dataCollector.go} | 0 agent/utils/file.go | 7 +- server/meta/init.go | 1 - server/processor/init.go | 129 +++++++++++++++++- server/processor/processMatching.go | 62 --------- server/processor/processor.go | 4 +- server/utils/file.go | 43 ++++++ 11 files changed, 248 insertions(+), 71 deletions(-) create mode 100644 agent/service/container/container.go create mode 100644 agent/service/container/init.go rename agent/service/{service.go => dataCollector.go} (100%) delete mode 100644 server/processor/processMatching.go create mode 100755 server/utils/file.go diff --git a/agent/handler/httphandler.go b/agent/handler/httphandler.go index b4b20e9..b6c3f35 100755 --- a/agent/handler/httphandler.go +++ b/agent/handler/httphandler.go @@ -6,12 +6,13 @@ import ( "gitee.com/openeuler/PilotGo-plugin-topology-agent/conf" "gitee.com/openeuler/PilotGo-plugin-topology-agent/service" + "gitee.com/openeuler/PilotGo-plugin-topology-agent/service/container" "gitee.com/openeuler/PilotGo/sdk/response" "github.com/gin-gonic/gin" "github.com/pkg/errors" ) -func Raw_metric_data(ctx *gin.Context) { +func RawMetricDataHandle(ctx *gin.Context) { data, err := service.DataCollectorService() if err != nil { err = errors.Wrap(err, "**2") @@ -42,4 +43,15 @@ func HealthCheckHandle(ctx *gin.Context) { } response.Success(ctx, agentinfo, fmt.Sprintf("agent %s is running", conf.Config().Topo.Agent_addr)) +} + +func ContainerListHandle(ctx *gin.Context) { + containers, err := container.ContainerList() + if err != nil { + err = errors.Wrap(err, "") // err top + fmt.Printf("%+v\n", err) + response.Fail(ctx, nil, errors.Cause(err).Error()) + } + + response.Success(ctx, containers, "success") } \ No newline at end of file diff --git a/agent/handler/router.go b/agent/handler/router.go index 24f4a12..eab8976 100755 --- a/agent/handler/router.go +++ b/agent/handler/router.go @@ -8,6 +8,7 @@ func InitRouter(router *gin.Engine) { api := router.Group("/plugin/topology/api") { api.GET("/health", HealthCheckHandle) - api.GET("/rawdata", Raw_metric_data) + api.GET("/rawdata", RawMetricDataHandle) + api.GET("/container_list", ContainerListHandle) } } diff --git a/agent/service/container/container.go b/agent/service/container/container.go new file mode 100644 index 0000000..736e22f --- /dev/null +++ b/agent/service/container/container.go @@ -0,0 +1,49 @@ +package container + +import ( + "encoding/json" + "fmt" + "strings" + + "gitee.com/openeuler/PilotGo-plugin-topology-agent/utils" + docker "github.com/fsouza/go-dockerclient" + "github.com/pkg/errors" +) + +func ContainerList() ([]docker.APIContainers, error) { + if DOCKERD_PORT == "" { + bytes, err := utils.FileReadBytes(DOCKER_CONFIG) + if err != nil { + return nil, errors.Wrap(err, " **errstack**0") + } + var daemoncontent struct { + Hosts []string `json:"hosts"` + } + if err := json.Unmarshal(bytes, &daemoncontent); err != nil { + return nil, errors.Errorf("%s **errstack**0", err.Error()) + } + + for _, host := range daemoncontent.Hosts { + if strings.HasPrefix(host, "tcp") { + DOCKERD_PORT = strings.Split(host, ":")[2] + break + } + } + } + + if DOCKERD_PORT == "" { + return nil, errors.Errorf("no dockerd port found **errstack**0") + } + + client, err := docker.NewClient(fmt.Sprintf("tcp://127.0.0.1:%s", DOCKERD_PORT)) + if err != nil { + return nil, errors.Errorf("%s **errstack**0", err.Error()) + } + + containers, err := client.ListContainers(docker.ListContainersOptions{All: true}) + if err != nil { + return nil, errors.Errorf("%s **errstack**0", err.Error()) + } + + return containers, nil +} diff --git a/agent/service/container/init.go b/agent/service/container/init.go new file mode 100644 index 0000000..7a575d5 --- /dev/null +++ b/agent/service/container/init.go @@ -0,0 +1,7 @@ +package container + +const ( + DOCKER_CONFIG = "/etc/docker/daemon.json" +) + +var DOCKERD_PORT = "" diff --git a/agent/service/service.go b/agent/service/dataCollector.go similarity index 100% rename from agent/service/service.go rename to agent/service/dataCollector.go diff --git a/agent/utils/file.go b/agent/utils/file.go index 01e4a43..9573784 100755 --- a/agent/utils/file.go +++ b/agent/utils/file.go @@ -3,12 +3,13 @@ package utils import ( "io" "os" + "github.com/pkg/errors" ) func FileReadString(filePath string) (string, error) { content, err := os.ReadFile(filePath) if err != nil { - return "", err + return "", errors.New(err.Error()) } return string(content), nil @@ -17,7 +18,7 @@ func FileReadString(filePath string) (string, error) { func FileReadBytes(filePath string) ([]byte, error) { f, err := os.Open(filePath) if err != nil { - return nil, err + return nil, errors.New(err.Error()) } defer f.Close() @@ -32,7 +33,7 @@ func FileReadBytes(filePath string) ([]byte, error) { } break } - return nil, err + return nil, errors.New(err.Error()) } content = append(content, readbuff[:n]...) } diff --git a/server/meta/init.go b/server/meta/init.go index c201a9f..a6fa9f2 100755 --- a/server/meta/init.go +++ b/server/meta/init.go @@ -1,4 +1,3 @@ -// 禁止调用utils包 package meta var ( diff --git a/server/processor/init.go b/server/processor/init.go index e133dfc..8bcd5f1 100755 --- a/server/processor/init.go +++ b/server/processor/init.go @@ -1 +1,128 @@ -package processor \ No newline at end of file +package processor + +import ( + "encoding/json" + "strconv" + "strings" + + "gitee.com/openeuler/PilotGo-plugin-topology-server/agentmanager" + "gitee.com/openeuler/PilotGo/sdk/utils/httputils" + docker "github.com/fsouza/go-dockerclient" + + "github.com/pkg/errors" +) + +func ContainerList(agent *agentmanager.Agent_m) ([]docker.APIContainers, error) { + resp, err := httputils.Get("http://"+agent.IP+":"+agent.Port+"/plugin/topology/api/container_list", nil) + if err != nil { + return nil, errors.Errorf("get container list from agent %s failed: %s **errstack**0", agent.IP, err.Error()) + } + + if resp == nil || resp.StatusCode != 200 { + return nil, errors.Errorf("get container list from agent %s failed: %+v **errstack**0", agent.IP, resp) + } + + resp_body := struct { + Code int `json:"code"` + Data []docker.APIContainers `json:"data"` + Msg string `json:"msg"` + }{} + + err = json.Unmarshal(resp.Body, &resp_body) + if err != nil { + return nil, errors.Errorf("json unmarshal from agent %s failed: %s **errstack**2", agent.IP, err.Error()) + } + + return resp_body.Data, nil +} + +func ProcessMatching(agent *agentmanager.Agent_m, exename, cmdline, component string) bool { + component_lower := strings.ToLower(component) + cmdline_lower := strings.ToLower(cmdline) + cmdline_lower_arr := strings.Split(cmdline_lower, " ") + + switch exename { + case "java": + mainclass_match := false + match_count := 0 + + for i := 1; i < len(cmdline_lower_arr); i++ { + if !strings.HasPrefix(cmdline_lower_arr[i], "-") && !strings.HasPrefix(cmdline_lower_arr[i], "/") { + if strings.Contains(cmdline_lower_arr[i], component_lower) { + mainclass_match = true + } + } + + if strings.Contains(cmdline_lower_arr[i], component_lower) { + match_count = match_count + 1 + } + + if mainclass_match && match_count >= 1 { + return true + } + } + case "python", "python3", "python2": + for i := 1; i < len(cmdline_lower_arr); i++ { + if strings.HasSuffix(cmdline_lower_arr[i], ".py") && strings.Split(strings.Split(cmdline_lower_arr[i], "/")[len(strings.Split(cmdline_lower_arr[i], "/"))-1], ".py")[0] == component_lower { + return true + } + } + case "ruby": + for i := 1; i < len(cmdline_lower_arr); i++ { + if strings.HasSuffix(cmdline_lower_arr[i], ".rb") && strings.Split(strings.Split(cmdline_lower_arr[i], "/")[len(strings.Split(cmdline_lower_arr[i], "/"))-1], ".rb")[0] == component_lower { + return true + } + } + case "node": + for i := 1; i < len(cmdline_lower_arr); i++ { + if strings.HasSuffix(cmdline_lower_arr[i], ".js") || strings.HasSuffix(cmdline_lower_arr[i], ".ts") && strings.Split(strings.Split(cmdline_lower_arr[i], "/")[len(strings.Split(cmdline_lower_arr[i], "/"))-1], ".")[0] == component_lower { + return true + } + } + case "perl": + for i := 1; i < len(cmdline_lower_arr); i++ { + if strings.HasSuffix(cmdline_lower_arr[i], ".pl") && strings.Split(strings.Split(cmdline_lower_arr[i], "/")[len(strings.Split(cmdline_lower_arr[i], "/"))-1], ".pl")[0] == component_lower { + return true + } + } + // ①组件名与容器名匹配;②进程命令行中的-container-port与容器信息中的port匹配 + case "docker-proxy": + cmdline_container_port := "" + for i := 1; i < len(cmdline_lower_arr); i++ { + if cmdline_lower_arr[i] == "-container-port" { + cmdline_container_port = cmdline_lower_arr[i+1] + break + } + } + + containers, err := ContainerList(agent) + if err != nil { + err = errors.Wrap(err, " **errstack**0") // err top + agentmanager.ErrorTransmit(agentmanager.Topo.Tctx, err, agentmanager.Topo.ErrCh, false) + break + } + + for _, container := range containers { + for _, name := range container.Names { + name_lower := strings.ToLower(name) + if strings.HasPrefix(name_lower, "/") { + name_lower = strings.Replace(name_lower, "/", "", -1) + } + if name_lower != component_lower { + continue + } + for _, port := range container.Ports { + if strconv.Itoa(int(port.PrivatePort)) == cmdline_container_port { + return true + } + } + } + } + default: + if strings.ToLower(exename) == component_lower { + return true + } + } + + return false +} diff --git a/server/processor/processMatching.go b/server/processor/processMatching.go deleted file mode 100644 index 7bf041f..0000000 --- a/server/processor/processMatching.go +++ /dev/null @@ -1,62 +0,0 @@ -package processor - -import "strings" - -func ProcessMatching(exename, cmdline, component string) bool { - component_lower := strings.ToLower(component) - cmdline_lower := strings.ToLower(cmdline) - cmdline_lower_arr := strings.Split(cmdline_lower, " ") - - switch exename { - case "java": - mainclass_match := false - match_count := 0 - - for i := 1; i < len(cmdline_lower_arr); i++ { - if !strings.HasPrefix(cmdline_lower_arr[i], "-") && !strings.HasPrefix(cmdline_lower_arr[i], "/") { - if strings.Contains(cmdline_lower_arr[i], component_lower) { - mainclass_match = true - } - } - - if strings.Contains(cmdline_lower_arr[i], component_lower) { - match_count = match_count + 1 - } - - if mainclass_match && match_count >= 1 { - return true - } - } - case "python", "python3", "python2": - for i := 1; i < len(cmdline_lower_arr); i++ { - if strings.HasSuffix(cmdline_lower_arr[i], ".py") && cmdline_lower_arr[i] == component_lower { - return true - } - } - case "ruby": - for i := 1; i < len(cmdline_lower_arr); i++ { - if strings.HasSuffix(cmdline_lower_arr[i], ".rb") && cmdline_lower_arr[i] == component_lower { - return true - - } - } - case "node": - for i := 1; i < len(cmdline_lower_arr); i++ { - if strings.HasSuffix(cmdline_lower_arr[i], ".js") || strings.HasSuffix(cmdline_lower_arr[i], ".ts") && cmdline_lower_arr[i] == component_lower { - return true - } - } - case "perl": - for i := 1; i < len(cmdline_lower_arr); i++ { - if strings.HasSuffix(cmdline_lower_arr[i], ".pl") && cmdline_lower_arr[i] == component_lower { - return true - } - } - default: - if strings.Contains(exename, component_lower) { - return true - } - } - - return false -} diff --git a/server/processor/processor.go b/server/processor/processor.go index 4452cea..5553948 100755 --- a/server/processor/processor.go +++ b/server/processor/processor.go @@ -303,7 +303,7 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent_m, no if _name, ok := condition.Rule_condition["name"]; !ok { atomic.AddInt32(&d.agent_node_count, int32(1)) return errors.Errorf("there is no name field in node rule_condition: %+v **errstack**3", condition.Rule_condition) - } else if ProcessMatching(process.ExeName, process.Cmdline, _name.(string)) { + } else if ProcessMatching(agent, process.ExeName, process.Cmdline, _name.(string)) { proc_node := &meta.Node{ ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, meta.NODE_CONNECTOR, meta.NODE_PROCESS, meta.NODE_CONNECTOR, process.Pid), Name: _name.(string), @@ -331,7 +331,7 @@ func (d *DataProcesser) CustomCreateNodeEntities(agent *agentmanager.Agent_m, no if _tag, ok := condition.Rule_condition["tag_name"]; !ok { atomic.AddInt32(&d.agent_node_count, int32(1)) return errors.Errorf("there is no tag_name field in node rule_condition: %+v **errstack**3", condition.Rule_condition) - } else if ProcessMatching(process.ExeName, process.Cmdline, _tag.(string)) { + } else if ProcessMatching(agent, process.ExeName, process.Cmdline, _tag.(string)) { proc_node := &meta.Node{ ID: fmt.Sprintf("%s%s%s%s%d", agent.UUID, meta.NODE_CONNECTOR, meta.NODE_PROCESS, meta.NODE_CONNECTOR, process.Pid), Name: _tag.(string), diff --git a/server/utils/file.go b/server/utils/file.go new file mode 100755 index 0000000..c12f322 --- /dev/null +++ b/server/utils/file.go @@ -0,0 +1,43 @@ +package utils + +import ( + "io" + "os" + + "github.com/pkg/errors" +) + +func FileReadString(filePath string) (string, error) { + content, err := os.ReadFile(filePath) + if err != nil { + return "", errors.Errorf("%s **errstack**0", err.Error()) + } + + return string(content), nil +} + +func FileReadBytes(filePath string) ([]byte, error) { + f, err := os.Open(filePath) + if err != nil { + return nil, errors.Errorf("%s **errstack**0", err.Error()) + } + defer f.Close() + + var content []byte + readbuff := make([]byte, 1024*4) + for { + n, err := f.Read(readbuff) + if err != nil { + if err == io.EOF { + if n != 0 { + content = append(content, readbuff[:n]...) + } + break + } + return nil, errors.Errorf("%s **errstack**0", err.Error()) + } + content = append(content, readbuff[:n]...) + } + + return content, nil +} -- Gitee