From 56d8f79005834b3ee996e8fdad7dcb03f5b759c7 Mon Sep 17 00:00:00 2001 From: panxilong Date: Fri, 21 Jul 2023 10:36:20 +0800 Subject: [PATCH] Support for systematic cgroup drivers Signed-off-by: panxilong --- Dockerfile | 2 + hack/rubik-daemonset.yaml | 4 +- pkg/core/typedef/cgroup_driver.go | 160 +++++++++++++++++++++ pkg/core/typedef/linux.go | 229 ++++++++++++++++++++++++++++++ pkg/core/typedef/rawpod.go | 94 +++++++++--- pkg/core/typedef/unsupported.go | 28 ++++ pkg/rubik/rubik.go | 10 +- 7 files changed, 504 insertions(+), 23 deletions(-) create mode 100644 pkg/core/typedef/cgroup_driver.go create mode 100644 pkg/core/typedef/linux.go create mode 100644 pkg/core/typedef/unsupported.go diff --git a/Dockerfile b/Dockerfile index bbaf598..04129ea 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,6 @@ FROM scratch +#### need nsenter cmd to determine kubelet cgroup driver is systemd or cgroupfs +FROM dockerpinata/util-linux:2.1 COPY ./build/rubik /rubik ENTRYPOINT ["/rubik"] diff --git a/hack/rubik-daemonset.yaml b/hack/rubik-daemonset.yaml index 246d45e..7de83b2 100644 --- a/hack/rubik-daemonset.yaml +++ b/hack/rubik-daemonset.yaml @@ -17,7 +17,7 @@ metadata: roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole - name: rubik + name: cluster-admin subjects: - kind: ServiceAccount name: rubik @@ -73,6 +73,7 @@ spec: spec: serviceAccountName: rubik hostPID: true + hostNetwork: true containers: - name: rubik-agent image: hub.oepkgs.net/cloudnative/rubik:latest @@ -86,6 +87,7 @@ spec: capabilities: add: - SYS_ADMIN + privileged: true resources: limits: memory: 200Mi diff --git a/pkg/core/typedef/cgroup_driver.go b/pkg/core/typedef/cgroup_driver.go new file mode 100644 index 0000000..039ac6b --- /dev/null +++ b/pkg/core/typedef/cgroup_driver.go @@ -0,0 +1,160 @@ +package typedef + +import ( + "bufio" + "bytes" + "context" + "fmt" + "os" + "path/filepath" + "strconv" + "strings" + + "isula.org/rubik/pkg/common/util" + + "github.com/spf13/pflag" + "isula.org/rubik/pkg/common/constant" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +type CgroupDriverType string + +const ( + Cgroupfs CgroupDriverType = "cgroupfs" + Systemd CgroupDriverType = "systemd" + + kubeletDefaultCgroupDriver = Cgroupfs + + KubeRootNameSystemd = "kubepods.slice/" + KubeBurstableNameSystemd = "kubepods-burstable.slice/" + KubeBesteffortNameSystemd = "kubepods-besteffort.slice/" + + KubeRootNameCgroupfs = "kubepods/" + + KubepodsPod = "kubepods-pod" + KubepodsBesteffortPod = "kubepods-besteffort-pod" + KubepodsBurstablePod = "kubepods-burstable-pod" + PodSuffix = ".slice" + + kubeletConfigCgroupDriverKey = "cgroupDriver" +) + +var cgroupDriver CgroupDriverType + +func GuessCgroupDriverFromCgroupName() CgroupDriverType { + systemdKubepodDirExists := util.PathExist(filepath.Join(constant.DefaultCgroupRoot, "cpu", KubeRootNameSystemd)) + cgroupfsKubepodDirExists := util.PathExist(filepath.Join(constant.DefaultCgroupRoot, "cpu", KubeRootNameCgroupfs)) + if systemdKubepodDirExists != cgroupfsKubepodDirExists { + if systemdKubepodDirExists { + return Systemd + } else { + return Cgroupfs + } + } + return "" +} + +func GuessCgroupDriverFromKubeletPort(port int) (CgroupDriverType, error) { + kubeletPid, err := KubeletPortToPid(port) + if err != nil { + return "", fmt.Errorf("failed to find kubelet's pid, kubelet may stop: %v", err) + } + kubeletArgs, err := ProcCmdLine(ProcRootDir, kubeletPid) + if err != nil || len(kubeletArgs) <= 1 { + return "", fmt.Errorf("failed to get kubelet's args: %v", err) + } + var argsCgroupDriver string + var argsConfigFile string + fs := pflag.NewFlagSet("GuessTest", pflag.ContinueOnError) + fs.ParseErrorsWhitelist.UnknownFlags = true + fs.StringVar(&argsCgroupDriver, "cgroup-driver", "", "") + fs.StringVar(&argsConfigFile, "config", "", "") + if err := fs.Parse(kubeletArgs[1:]); err != nil { + return "", fmt.Errorf("failed to parse kubelet's args, kubelet version may not support: %v", err) + } + // kubelet command-line args will override configuration from config file + if argsCgroupDriver != "" { + return CgroupDriverType(argsCgroupDriver), nil + } else if argsConfigFile == "" { + return kubeletDefaultCgroupDriver, nil + } + + // parse kubelet config file + var kubeletConfigFile string + if filepath.IsAbs(argsConfigFile) { + kubeletConfigFile = argsConfigFile + } else { + kubletCWD, err := os.Readlink(filepath.Join(ProcRootDir, strconv.Itoa(kubeletPid), "cwd")) + if err != nil { + klog.Errorf("failed to get kubelet's cwd: %v", err) + if exePath, err := os.Readlink(filepath.Join(ProcRootDir, strconv.Itoa(kubeletPid), "exe")); err != nil { + kubletCWD = filepath.Dir(exePath) + } else { + kubletCWD = "/" + } + } + kubeletConfigFile = filepath.Join(kubletCWD, argsConfigFile) + } + // kubelet config file is in host path + fileBuf, _, err := ExecCmdOnHost([]string{"cat", kubeletConfigFile}) + if err != nil { + return "", fmt.Errorf("failed to read kubelet's config file(%s): %v", kubeletConfigFile, err) + } + scanner := bufio.NewScanner(bytes.NewBuffer(fileBuf)) + for scanner.Scan() { + line := scanner.Text() + if len(line) == 0 { + continue + } + parts := strings.Fields(line) + // remove trailing ':' from key + key := parts[0][:len(parts[0])-1] + if key == kubeletConfigCgroupDriverKey { + return CgroupDriverType(strings.TrimSpace(parts[1])), nil + } + } + klog.Infof("Cgroup driver is not specify in kubelet config file, use default: '%s'", kubeletDefaultCgroupDriver) + return kubeletDefaultCgroupDriver, nil +} + +func DetectCgroupDriver(nodeName string) error { + driver := GuessCgroupDriverFromCgroupName() + if driver.Validate() { + cgroupDriver = (driver) + klog.Infof("cgroupDriver: %v", cgroupDriver) + return nil + } + conf, err := rest.InClusterConfig() + if err != nil { + return err + } + kubeClient, err := kubernetes.NewForConfig(conf) + if err != nil { + return err + } + node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil || node == nil { + klog.Error("Can't get node") + return err + } + port := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port) + portDriver, err := GuessCgroupDriverFromKubeletPort(port) + if portDriver.Validate() { + cgroupDriver = (portDriver) + klog.Infof("cgroupDriver: %v", cgroupDriver) + return nil + } + return nil +} + +func GetCgroupDriver() CgroupDriverType { + return cgroupDriver +} + +func (c CgroupDriverType) Validate() bool { + s := string(c) + return s == string(Cgroupfs) || s == string(Systemd) +} diff --git a/pkg/core/typedef/linux.go b/pkg/core/typedef/linux.go new file mode 100644 index 0000000..8e8116d --- /dev/null +++ b/pkg/core/typedef/linux.go @@ -0,0 +1,229 @@ +//go:build linux +// +build linux + +package typedef + +import ( + "bytes" + "fmt" + "io" + "os" + "os/exec" + "path" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync/atomic" + "syscall" + "time" + "unicode" + + "github.com/cakturk/go-netstat/netstat" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" +) + +var ( + kubeletPortAndPid atomic.Value + expireTime = time.Minute +) + +const ( + DS_MODE = "dsMode" + ProcRootDir = "/proc/" +) + +var AgentMode = DS_MODE + +type portAndPid struct { + port int + pid int + lastUpdateTime time.Time +} + +func TCPSocks(fn netstat.AcceptFn) ([]netstat.SockTabEntry, error) { + v6Socks, v6Err := netstat.TCP6Socks(fn) + if v6Err == nil && v6Socks != nil && len(v6Socks) > 0 { + return v6Socks, nil + } + + socks, err := netstat.TCPSocks(fn) + if err == nil && socks != nil { + return socks, nil + } + + return nil, utilerrors.NewAggregate([]error{err, v6Err}) +} + +// KubeletPortToPid Query pid by tcp port number with the help of go-netstat +// note: Due to the low efficiency of full traversal, we cache the result and verify each time +func KubeletPortToPid(port int) (int, error) { + if port < 0 { + return -1, fmt.Errorf("failed to get pid, the port is invalid") + } + if val, ok := kubeletPortAndPid.Load().(portAndPid); ok && val.port == port { + if time.Now().Before(val.lastUpdateTime.Add(expireTime)) && + syscall.Kill(val.pid, 0) == nil { + return val.pid, nil + } + } + socks, err := TCPSocks(func(entry *netstat.SockTabEntry) bool { + if entry.State == netstat.Listen && entry.LocalAddr.Port == uint16(port) { + return true + } + return false + }) + if err != nil { + return -1, fmt.Errorf("failed to get pid, err is %v", err) + } + if len(socks) == 0 || socks[0].Process == nil { + return -1, fmt.Errorf("failed to get pid, the port is not used") + } + pid := socks[0].Process.Pid + kubeletPortAndPid.Store(portAndPid{port: port, pid: pid, lastUpdateTime: time.Now()}) + return pid, nil +} + +func ReadFileNoStat(filename string) ([]byte, error) { + const maxBufferSize = 1024 * 512 + + f, err := os.Open(filename) + if err != nil { + return nil, err + } + defer f.Close() + + reader := io.LimitReader(f, maxBufferSize) + return io.ReadAll(reader) +} + +// CmdLine returns the command line args of a process. +func ProcCmdLine(procRoot string, pid int) ([]string, error) { + data, err := ReadFileNoStat(path.Join(procRoot, strconv.Itoa(pid), "cmdline")) + if err != nil { + fmt.Errorf("ProcCmdLine error is ", err) + return nil, err + } + + if len(data) < 1 { + return []string{}, nil + } + + return strings.Split(string(bytes.TrimRight(data, string("\x00"))), string(byte(0))), nil +} + +// PidOf finds process(es) with a specified name (regexp match) +// and return their pid(s). +var PidOf = pidOfFn + +// From k8s.io/kubernetes/pkg/util/procfs/procfs_linux.go +// caller should specify proc root dir +func pidOfFn(procRoot string, name string) ([]int, error) { + if len(name) == 0 { + return []int{}, fmt.Errorf("name should not be empty") + } + re, err := regexp.Compile("(^|/)" + name + "$") + if err != nil { + return []int{}, err + } + return getPids(procRoot, re), nil +} + +func getPids(procRoot string, re *regexp.Regexp) []int { + pids := []int{} + + dirFD, err := os.Open(procRoot) + if err != nil { + return nil + } + defer dirFD.Close() + + for { + // Read a small number at a time in case there are many entries, we don't want to + // allocate a lot here. + ls, err := dirFD.Readdir(10) + if err == io.EOF { + break + } + if err != nil { + return nil + } + + for _, entry := range ls { + if !entry.IsDir() { + continue + } + + // If the directory is not a number (i.e. not a PID), skip it + pid, err := strconv.Atoi(entry.Name()) + if err != nil { + continue + } + + cmdline, err := os.ReadFile(filepath.Join(procRoot, entry.Name(), "cmdline")) + if err != nil { + klog.V(4).Infof("Error reading file %s: %+v", filepath.Join(procRoot, entry.Name(), "cmdline"), err) + continue + } + + // The bytes we read have '\0' as a separator for the command line + parts := bytes.SplitN(cmdline, []byte{0}, 2) + if len(parts) == 0 { + continue + } + // Split the command line itself we are interested in just the first part + exe := strings.FieldsFunc(string(parts[0]), func(c rune) bool { + return unicode.IsSpace(c) || c == ':' + }) + if len(exe) == 0 { + continue + } + // Check if the name of the executable is what we are looking for + if re.MatchString(exe[0]) { + // Grab the PID from the directory path + pids = append(pids, pid) + } + } + } + + return pids +} + +// If running in container, exec command by 'nsenter --mount=/proc/1/ns/mnt ${cmds}'. +// return stdout, exitcode, error +var ExecCmdOnHost = execCmdOnHostFn + +func execCmdOnHostFn(cmds []string) ([]byte, int, error) { + if len(cmds) == 0 { + return nil, -1, fmt.Errorf("nil command") + } + cmdPrefix := []string{} + if AgentMode == DS_MODE { + cmdPrefix = append(cmdPrefix, "nsenter", fmt.Sprintf("--mount=%s", path.Join(ProcRootDir, "/1/ns/mnt"))) + } + cmdPrefix = append(cmdPrefix, cmds...) + + var errB bytes.Buffer + command := exec.Command(cmdPrefix[0], cmdPrefix[1:]...) + command.Stderr = &errB + if out, err := command.Output(); err != nil { + return out, command.ProcessState.ExitCode(), + fmt.Errorf("nsenter command('%s') failed: %v, stderr: %s", strings.Join(cmds, " "), err, errB.String()) + } else { + return out, 0, nil + } +} + +// return working dir of process +func WorkingDirOf(pid int) (string, error) { + var errB bytes.Buffer + command := exec.Command("pwdx", fmt.Sprintf("%d", pid)) + command.Stderr = &errB + if out, err := command.Output(); err != nil { + return "", fmt.Errorf("pwdx command('%d') failed: %v, stderr: %s", pid, err, errB.String()) + } else { + tokens := strings.Split(string(out), ":") + return strings.TrimSpace(tokens[1]), nil + } +} diff --git a/pkg/core/typedef/rawpod.go b/pkg/core/typedef/rawpod.go index 59dfb59..7ea1d43 100644 --- a/pkg/core/typedef/rawpod.go +++ b/pkg/core/typedef/rawpod.go @@ -26,11 +26,19 @@ import ( ) const ( - configHashAnnotationKey = "kubernetes.io/config.hash" + configHashAnnotationKey = "kubernetes.io/config.hash" + dockerPrefix = "docker://" + containerdPrefix = "containerd://" + dockerContainerPrefix = "docker-" + containerdContainerPrefix = "cri-containerd-" + containerSuffix = ".scope" + // RUNNING means the Pod is in the running phase RUNNING = corev1.PodRunning ) +var containerEngine string + type ( // RawContainer is kubernetes contaienr structure RawContainer struct { @@ -93,25 +101,51 @@ func (pod *RawPod) CgroupPath() string { } qosClassPath := "" - switch pod.Status.QOSClass { - case corev1.PodQOSGuaranteed: - case corev1.PodQOSBurstable: - qosClassPath = strings.ToLower(string(corev1.PodQOSBurstable)) - case corev1.PodQOSBestEffort: - qosClassPath = strings.ToLower(string(corev1.PodQOSBestEffort)) - default: - return "" + if GetCgroupDriver() == Cgroupfs { + switch pod.Status.QOSClass { + case corev1.PodQOSGuaranteed: + case corev1.PodQOSBurstable: + qosClassPath = strings.ToLower(string(corev1.PodQOSBurstable)) + case corev1.PodQOSBestEffort: + qosClassPath = strings.ToLower(string(corev1.PodQOSBestEffort)) + default: + return "" + } + /* + example: + 1. Burstable: pod requests are less than the value of limits and not 0; + kubepods/burstable/pod34152897-dbaf-11ea-8cb9-0653660051c3 + 2. BestEffort: pod requests and limits are both 0; + kubepods/bestEffort/pod34152897-dbaf-11ea-8cb9-0653660051c3 + 3. Guaranteed: pod requests are equal to the value set by limits; + kubepods/pod34152897-dbaf-11ea-8cb9-0653660051c3 + */ + return filepath.Join(constant.KubepodsCgroup, qosClassPath, constant.PodCgroupNamePrefix+id) + } else { + podId := "" + switch pod.Status.QOSClass { + case corev1.PodQOSGuaranteed: + podId = KubepodsPod + strings.ReplaceAll(id, "-", "_") + PodSuffix + case corev1.PodQOSBurstable: + qosClassPath = strings.ToLower(string(KubeBurstableNameSystemd)) + podId = KubepodsBurstablePod + strings.ReplaceAll(id, "-", "_") + PodSuffix + case corev1.PodQOSBestEffort: + qosClassPath = strings.ToLower(string(KubeBesteffortNameSystemd)) + podId = KubepodsBesteffortPod + strings.ReplaceAll(id, "-", "_") + PodSuffix + default: + return "" + } + /* + example: + 1. Burstable: pod requests are less than the value of limits and not 0; + kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod09a60690_404b_4ba1_a4a0_5c24bab98275.slice + 2. BestEffort: pod requests and limits are both 0; + kubepods.slice/kubepods-besteffort.slice/kubepods-besteffort-pod09a60690_404b_4ba1_a4a0_5c24bab98275.slice + 3. Guaranteed: pod requests are equal to the value set by limits; + kubepods.slice/kubepods-pod34152897_dbaf_11ea_8cb9_0653660051c3.slice + */ + return filepath.Join(KubeRootNameSystemd, qosClassPath, podId) } - /* - example: - 1. Burstable: pod requests are less than the value of limits and not 0; - kubepods/burstable/pod34152897-dbaf-11ea-8cb9-0653660051c3 - 2. BestEffort: pod requests and limits are both 0; - kubepods/bestEffort/pod34152897-dbaf-11ea-8cb9-0653660051c3 - 3. Guaranteed: pod requests are equal to the value set by limits; - kubepods/pod34152897-dbaf-11ea-8cb9-0653660051c3 - */ - return filepath.Join(constant.KubepodsCgroup, qosClassPath, constant.PodCgroupNamePrefix+id) } // ListRawContainers returns all RawContainers in the RawPod @@ -125,6 +159,14 @@ func (pod *RawPod) ListRawContainers() map[string]*RawContainer { nameRawContainersMap[containerStatus.Name] = &RawContainer{ status: containerStatus, } + if containerEngine == "" { + if strings.HasPrefix(containerStatus.ContainerID, dockerPrefix) { + containerEngine = "docker" + } else if strings.HasPrefix(containerStatus.ContainerID, containerdPrefix) { + containerEngine = "containerd" + } + } + } for _, container := range pod.Spec.Containers { cont, ok := nameRawContainersMap[container.Name] @@ -152,7 +194,19 @@ func (pod *RawPod) ExtractContainerInfos() map[string]*ContainerInfo { if id == "" || err != nil { continue } - idContainersMap[id] = NewContainerInfo(id, podCgroupPath, rawContainer) + if GetCgroupDriver() == Cgroupfs { + idContainersMap[id] = NewContainerInfo(id, podCgroupPath, rawContainer) + } else { + var containerdId string + switch containerEngine { + case "docker": + containerdId = dockerContainerPrefix + id + containerSuffix + case "containerd": + containerdId = containerdContainerPrefix + id + containerSuffix + } + idContainersMap[id] = NewContainerInfo(containerdId, podCgroupPath, rawContainer) + } + } return idContainersMap } diff --git a/pkg/core/typedef/unsupported.go b/pkg/core/typedef/unsupported.go new file mode 100644 index 0000000..edb3c9a --- /dev/null +++ b/pkg/core/typedef/unsupported.go @@ -0,0 +1,28 @@ +//go:build !linux +// +build !linux + +package typedef + +import ( + "fmt" +) + +func ProcCmdLine(procRoot string, pid int) ([]string, error) { + return []string{}, fmt.Errorf("only support linux") +} + +var PidOf = pidOfFn + +func pidOfFn(procRoot string, name string) ([]int, error) { + return []int{}, fmt.Errorf("only support linux") +} + +var ExecCmdOnHost = execCmdOnHostFn + +func execCmdOnHostFn(cmds []string) ([]byte, int, error) { + return nil, -1, fmt.Errorf("only support linux") +} + +func WorkingDirOf(pid int) (string, error) { + return "", fmt.Errorf("only support linux") +} diff --git a/pkg/rubik/rubik.go b/pkg/rubik/rubik.go index 3864956..e9715a1 100644 --- a/pkg/rubik/rubik.go +++ b/pkg/rubik/rubik.go @@ -30,6 +30,7 @@ import ( "isula.org/rubik/pkg/config" "isula.org/rubik/pkg/core/publisher" "isula.org/rubik/pkg/core/trigger" + "isula.org/rubik/pkg/core/typedef" "isula.org/rubik/pkg/core/typedef/cgroup" "isula.org/rubik/pkg/informer" "isula.org/rubik/pkg/podmanager" @@ -127,10 +128,15 @@ func runAgent(ctx context.Context) error { // 3. enable cgroup system cgroup.InitMountDir(c.Agent.CgroupRoot) - // 4. init service components + // 4. determine cgroupdriver is systemd or cgroupfs + nodeName := os.Getenv(constant.NodeNameEnvKey) + typedef.DetectCgroupDriver(nodeName) + + // 5. init service components services.InitServiceComponents(defaultRubikFeature) - // 5. Create and run the agent + // 6. Create and run the agent + agent, err := NewAgent(c) if err != nil { return fmt.Errorf("error new agent: %v", err) -- Gitee