From 7e02654d2055931b508035911c9d5045eab7ae4e Mon Sep 17 00:00:00 2001 From: liuyuqing <1475439461@qq.com> Date: Wed, 30 Oct 2024 20:31:39 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0resource=20manager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 +- pkg/resmgr/flags.go | 28 ++ pkg/resmgr/kubeclient.go | 53 ++++ pkg/resmgr/kubeletstub.go | 152 ++++++++++ pkg/resmgr/nri.go | 244 +++++++++++++++ pkg/resmgr/pod.go | 67 ++++ pkg/resmgr/regulator.go | 622 ++++++++++++++++++++++++++++++++++++++ pkg/resmgr/resmgr.go | 121 ++++++++ 8 files changed, 1288 insertions(+), 1 deletion(-) create mode 100644 pkg/resmgr/flags.go create mode 100644 pkg/resmgr/kubeclient.go create mode 100644 pkg/resmgr/kubeletstub.go create mode 100644 pkg/resmgr/nri.go create mode 100644 pkg/resmgr/pod.go create mode 100644 pkg/resmgr/regulator.go create mode 100644 pkg/resmgr/resmgr.go diff --git a/.gitignore b/.gitignore index 66fd13c..942f1f5 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ *.out # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ \ No newline at end of file diff --git a/pkg/resmgr/flags.go b/pkg/resmgr/flags.go new file mode 100644 index 0000000..ca961d1 --- /dev/null +++ b/pkg/resmgr/flags.go @@ -0,0 +1,28 @@ +package resmgr + +import ( + "flag" + "time" + + nri "github.com/containerd/nri/pkg/api" +) + +const ( + defaultPluginName = "numaadj" + defaultPluginIndex = "00" +) + +type options struct { + RebalaceTimer time.Duration + NriPluginName string + NriPluginIdx string + NriSocket string +} + +var opt = options{} + +func init() { + flag.StringVar(&opt.NriPluginName, "nri-plugin-name", defaultPluginName, "NRI plugin name to register.") + flag.StringVar(&opt.NriPluginIdx, "nri-plugin-index", defaultPluginIndex, "NRI plugin index to register.") + flag.StringVar(&opt.NriSocket, "nri-socket", nri.DefaultSocketPath, "NRI unix domain socker path to connect to.") +} diff --git a/pkg/resmgr/kubeclient.go b/pkg/resmgr/kubeclient.go new file mode 100644 index 0000000..77e938c --- /dev/null +++ b/pkg/resmgr/kubeclient.go @@ -0,0 +1,53 @@ +package resmgr + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" +) + +type KubeClient struct { + // kind configKind + cfg *rest.Config + cli *kubernetes.Clientset +} + +func NewKubeCli(cfg *rest.Config) (*KubeClient, error) { + kc := &KubeClient{ + cfg: cfg, + } + if err := kc.SetKubeClient(cfg); err != nil { + return nil, err + } + return kc, nil +} + +func (kc *KubeClient) SetKubeClient(cfg *rest.Config) error { + kc.cfg = cfg + + cli, err := kubernetes.NewForConfig(cfg) + if err != nil { + klog.Errorf("Failed to create client kuebclient: %v", err) + return err + } + kc.cli = cli + return nil +} + +func (kc *KubeClient) GetPodListOnNode(nodeName string) (*corev1.PodList, error) { + ctx := context.TODO() + + podList, err := kc.cli.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), + }) + if err != nil { + return nil, fmt.Errorf("failed to list pods on node %s: %w", nodeName, err) + } + + return podList, nil +} diff --git a/pkg/resmgr/kubeletstub.go b/pkg/resmgr/kubeletstub.go new file mode 100644 index 0000000..4ad848d --- /dev/null +++ b/pkg/resmgr/kubeletstub.go @@ -0,0 +1,152 @@ +package resmgr + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + kubeletconfigv1beta1 "k8s.io/kubelet/config/v1beta1" + kubeletoptions "k8s.io/kubernetes/cmd/kubelet/app/options" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + kubeletscheme "k8s.io/kubernetes/pkg/kubelet/apis/config/scheme" +) + +type CliConfig struct { + KubeletPreferredAddressTypes string + KubeletEndpoint string + KubeletConfigPath string + InCluster bool + InsecureKubeletTLS bool + KubeletReadOnlyPort uint + KubeletSyncInterval time.Duration + KubeletSyncTimeout time.Duration + Scheme string +} + +func NewConfig() *CliConfig { + nodeName := os.Getenv("NODE_NAME") + + klog.Infof("NODE_NAME is %v", nodeName) + + return &CliConfig{ + KubeletPreferredAddressTypes: string(corev1.NodeInternalIP), + KubeletEndpoint: "https://" + nodeName + ":10250", + KubeletConfigPath: "", + InCluster: true, + KubeletSyncInterval: 10 * time.Second, + KubeletSyncTimeout: 3 * time.Second, + Scheme: "https", + } +} + +type KubeletStub interface { + GetAllPods() (corev1.PodList, error) + + GetKubeletConfig() (*kubeletconfig.KubeletConfiguration, error) +} + +type kubeletStub struct { + kubeletURL string + scheme string + client *http.Client +} + +// 使用InClusterConfig来访问 +func NewKubeletStub(cfg *rest.Config, kubeletURL string) (*kubeletStub, error) { + client := &http.Client{ + Timeout: cfg.Timeout, + } + + trans, err := rest.TransportFor(cfg) + if err != nil { + klog.Error(err, "Failed to create transport") + return nil, err + } + client.Transport = trans + + return &kubeletStub{ + kubeletURL: kubeletURL, + scheme: "https", + client: client, + }, nil +} + +func (k *kubeletStub) GetAllPods() (corev1.PodList, error) { + path := "/pods/" + podsURL, err := url.Parse(k.kubeletURL + path) + if err != nil { + klog.Error("kubeletURL is invalid.") + return corev1.PodList{}, err + } + + podList := corev1.PodList{} + + resp, err := k.client.Get(podsURL.String()) + if err != nil { + return podList, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return podList, fmt.Errorf("request %s failed, code %d", podsURL.String(), resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return podList, err + } + + err = json.Unmarshal(body, &podList) + if err != nil { + return podList, fmt.Errorf("parse kubelet pod list failed, err: %v", err) + } + + return podList, nil +} + +func (k *kubeletStub) GetKubeletConfig() (*kubeletconfig.KubeletConfiguration, error) { + path := "/configz" + configURL, err := url.Parse(k.kubeletURL + path) + + resp, err := k.client.Get(configURL.String()) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Failed to request %s, code %d", configURL.String(), resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + klog.V(4).Infof("Got Kubelet config: %s", string(body)) + + // TODO + var configz kubeletconfigv1beta1.KubeletConfiguration + if err = json.Unmarshal(body, &configz); err != nil { + return nil, err + } + + kubeletConfig, err := kubeletoptions.NewKubeletConfiguration() + + scheme, _, err := kubeletscheme.NewSchemeAndCodecs() + if err != nil { + return nil, err + } + + if err = scheme.Convert(&configz, &kubeletConfig, nil); err != nil { + return nil, err + } + + return kubeletConfig, nil +} diff --git a/pkg/resmgr/nri.go b/pkg/resmgr/nri.go new file mode 100644 index 0000000..e7e7b82 --- /dev/null +++ b/pkg/resmgr/nri.go @@ -0,0 +1,244 @@ +package resmgr + +import ( + "context" + "fmt" + "os" + + "github.com/containerd/nri/pkg/api" + "github.com/containerd/nri/pkg/stub" + "github.com/containerd/otelttrpc" + "github.com/containerd/ttrpc" + "gopkg.in/yaml.v2" + "k8s.io/klog/v2" +) + +type nriPlugin struct { + stub stub.Stub + resmgr *resmgr + // ctx context.Context + // eventCh chan api.Event +} + +var ( + _ = stub.ConfigureInterface(&nriPlugin{}) + _ = stub.SynchronizeInterface(&nriPlugin{}) + _ = stub.RunPodInterface(&nriPlugin{}) + _ = stub.RemovePodInterface(&nriPlugin{}) + _ = stub.CreateContainerInterface(&nriPlugin{}) + _ = stub.PostCreateContainerInterface(&nriPlugin{}) + _ = stub.RemoveContainerInterface(&nriPlugin{}) + // _ = stub.UpdateContainerInterface(&nriPlugin{}) +) + +func newNRIPlugin(resmgr *resmgr) (*nriPlugin, error) { + p := &nriPlugin{ + resmgr: resmgr, + // eventCh: make(chan api.Event), + } + + klog.Infof("creating plugin...") + + return p, nil +} + +func (p *nriPlugin) createStub() error { + var ( + opts = []stub.Option{ + stub.WithPluginName(opt.NriPluginName), + stub.WithPluginIdx(opt.NriPluginIdx), + stub.WithSocketPath(opt.NriSocket), + stub.WithOnClose(p.onClose), + stub.WithTTRPCOptions( + []ttrpc.ClientOpts{ + ttrpc.WithUnaryClientInterceptor( + otelttrpc.UnaryClientInterceptor(), + ), + }, + []ttrpc.ServerOpt{ + ttrpc.WithUnaryServerInterceptor( + otelttrpc.UnaryServerInterceptor(), + ), + }, + ), + } + err error + ) + + klog.Info("creating plugin stub") + + if p.stub, err = stub.New(p, opts...); err != nil { + return fmt.Errorf("failed to create NRI plugin stub: %w", err) + } + + return nil +} + +func (p *nriPlugin) start() error { + if p == nil { + return nil + } + klog.Info("starting plugin...") + + if err := p.createStub(); err != nil { + return err + } + + if err := p.stub.Start(context.Background()); err != nil { + return fmt.Errorf("failed to start NRI plugin: %w", err) + } + // TODO: 获取事件通知,每次进行状态同步 + // go func() { + // for { + // select { + // case event := <-p.eventCh: + // switch event { + // case api.Event_CREATE_CONTAINER: + // // TODO: 无法持续调用 Synchronize 进行同步 + // if err := p.Synchronize(); err != nil { + // klog.Errorf("failed to synchronize: %v", err) + // } + // } + // klog.V(2).Infof("Received event: %v", event) + // case <-p.ctx.Done(): + // fmt.Println("Shutting down event loop") + // return + // } + // } + // }() + klog.Info("Success to start NRI plugin.") + return nil +} + +func (p *nriPlugin) stop() { + if p == nil { + return + } + klog.Info("stop plugin...") + p.stub.Stop() +} + +func (p *nriPlugin) onClose() { + klog.Error("connection to NRI/runtime lost, exiting...") + os.Exit(1) +} + +func (p *nriPlugin) Configure(ctx context.Context, config, runtime, version string) (stub.EventMask, error) { + klog.V(2).Infof("Connected to %s%s...", runtime, version) + + if config == "" { + return 0, nil + } + + err := yaml.Unmarshal([]byte(config), &config) + if err != nil { + klog.Errorf("Failed to parse configuration: %v", err) + return 0, err + } + klog.V(2).Infof("Got configuration data: %+v...", config) + + return 0, nil +} + +func (p *nriPlugin) RunPodSandbox(ctx context.Context, pod *api.PodSandbox) error { + klog.Infof("Pod created --- name: %v, id: %v", pod.GetName(), pod.GetId()) + + return nil +} + +// TODO: 如何主动触发,通过传递事件信息,每次创建都去同步一下状态 +func (p *nriPlugin) Synchronize(ctx context.Context, pods []*api.PodSandbox, containers []*api.Container) ([]*api.ContainerUpdate, error) { + klog.Info("Starting Synchronize........................") + + var updates []*api.ContainerUpdate + + if len(containers) == 0 { + klog.V(2).Info("No Container") + } + + for _, container := range containers { + klog.V(2).Infof("Checking container: %s\n", container.Id) + } + klog.V(2).Info("Starting Sync Info........................") + p.resmgr.regulator.SyncInfo(pods, containers) + klog.V(2).Info("Sync Info Done........................") + + klog.Info("Synchronize Done..............................") + // 返回需要更新 cpuset 设置的container + return updates, nil +} + +// func (p *nriPlugin) adjustContainerResources(container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { +// adjustment := &api.ContainerAdjustment{} +// updates := []*api.ContainerUpdate{} + +// p.resmgr.Adjust(container, adjustment, updates) +// return adjustment, updates, nil +// } + +func (p *nriPlugin) RemovePodSandbox(context.Context, *api.PodSandbox) error { + klog.Info("........................Remove container..............") + p.resmgr.regulator.Record(pod, nil, RemovePodOps) + return nil +} + +func (p *nriPlugin) CreateContainer(ctx context.Context, pod *api.PodSandbox, container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { + klog.Info("Starting to create container..............") + klog.Infof("Adjusting container %s resources\n", container.Id) + + adjustment := &api.ContainerAdjustment{} + updates := []*api.ContainerUpdate{} + + // FIXME 通过接收 NRI 事件主动触发状态同步 + // p.nriEventCh <- api.Event_CREATE_CONTAINER + + adjustment, updates, err := p.resmgr.Regulate(pod, container) + if err != nil { + klog.Errorf("Regulate failed, error: %v", err) + } + + klog.Info("Regulate Done!!!!!") + klog.Infof("adjustment = %v", adjustment) + if adjustment != nil { + klog.Infof("After Regulate, container info: cpus - %v, shares - %v, quota - %v, ", adjustment.GetLinux().GetResources().GetCpu().GetCpus(), + adjustment.GetLinux().GetResources().GetCpu().GetShares().GetValue(), adjustment.GetLinux().GetResources().GetCpu().GetQuota().GetValue()) + } + return adjustment, updates, nil +} + +func (p *nriPlugin) PostCreateContainer(ctx context.Context, pod *api.PodSandbox, container *api.Container) error { + klog.Info("...................After creating container..............") + p.resmgr.regulator.Record(pod, container, CreateConOps) + return nil +} + +func (p *nriPlugin) RemoveContainer(ctx context.Context, pod *api.PodSandbox, container *api.Container) error { + klog.Info("........................Remove container..............") + p.resmgr.regulator.Record(pod, container, RemoveConOps) + return nil +} + +// func (p *nriPlugin) UpdateContainer(context.Context, *api.PodSandbox, *api.Container, *api.LinuxResources) ([]*api.ContainerUpdate, error) { +// _, err := p.stub.UpdateContainers([]*api.ContainerUpdate{update}) + +// if err != nil { +// klog.V(2).Infof("Failed to update container %s, err: %v", container.Id, err) + +// } +// } + +// func (p *nriPlugin) updateContainers(oenuma *v1alpha1.Oenuma) error { +// _ = oenuma + +// // for _, node := range oenuma.Spec.Node { +// // for _, podAfi := range node.PodAffinity { + +// // } +// // } + +// return nil +// } + +// func (p *nriPlugin) getAdjustmentContainer() { + +// } diff --git a/pkg/resmgr/pod.go b/pkg/resmgr/pod.go new file mode 100644 index 0000000..cbd8e40 --- /dev/null +++ b/pkg/resmgr/pod.go @@ -0,0 +1,67 @@ +package resmgr + +import ( + "github.com/containerd/nri/pkg/api" + resources "numaadj.huawei.com/pkg/numaresources" +) + +// 保存各个 NUMA 节点信息,记录每个节点上有哪些 Pod 和 container +type NUMANodeMap struct { + topo resources.CPUTopology + numaNode map[int]NUMANodeInfo + + // // 当前占据的CPU核心 + // sharedPool []cpuset.CPUSet + // // 排他的CPU核心 + // exclusivePool []cpuset.CPUSet +} + +func NewNUMANodeMap() *NUMANodeMap { + return &NUMANodeMap{ + topo: resources.CPUTopology{}, + numaNode: make(map[int]NUMANodeInfo), + } +} + +type CPUSpec struct { + CPURequests string // TODO 确认类型 + CPULimits string +} + +type NUMANodeInfo struct { + pods []PodMeta + // bestEffort []PodMeta + // burstable []PodMeta + // guaranteed []PodMeta + containers []ContainerMeta +} + +type QoSClass string + +const ( + QoSBE QoSClass = "BE" + QoSBurstable QoSClass = "Burstable" + QoSGuaranteed QoSClass = "Guaranteed" + QoSNone QoSClass = "None" +) + +type PodMeta struct { + Name string + Uid string + Namespace string + Resource *api.LinuxResources + QosClass QoSClass +} + +type ContainerMeta struct { + Name string // podname/container_name + Pid uint32 + ID string + PodID string + CPUShares uint64 + + State api.ContainerState + Resource *api.LinuxResources + CgroupsPath string + CPUAllocation CPUSpec +} diff --git a/pkg/resmgr/regulator.go b/pkg/resmgr/regulator.go new file mode 100644 index 0000000..c7c7cda --- /dev/null +++ b/pkg/resmgr/regulator.go @@ -0,0 +1,622 @@ +package resmgr + +import ( + "fmt" + "math" + "os" + "sort" + "strconv" + "sync" + + "github.com/containerd/nri/pkg/api" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + resources "numaadj.huawei.com/pkg/numaresources" +) + +// 集成 gRPCClient 和 kubernetes 的 Client,获取信息 +type Regulator struct { + sync.Mutex + nodeName string + podInfo map[string]PodMeta // uid - podMeta + containerInfo map[string]ContainerMeta // id - containerMeta + nodemap *NUMANodeMap // numa node 中的容器分布 + // grpcClient *nf.GrpcClient + kubeletStub *kubeletStub + kubeCli *KubeClient +} + +func NewRegulator() (*Regulator, error) { + // TODO: 进行自定义设置 + cliConf := NewConfig() + + k8sConf, err := rest.InClusterConfig() + if err != nil { + return nil, fmt.Errorf("failed to create client kubeclient: %v", err) + } + + k8sConf.Timeout = cliConf.KubeletSyncTimeout + + kubeletStub, err := NewKubeletStub(k8sConf, cliConf.KubeletEndpoint) + if err != nil { + return nil, fmt.Errorf("failed to create client kubeclient: %v", err) + } + + kubecli, err := NewKubeCli(k8sConf) + if err != nil { + klog.Error("Failed to Create Kubernetes Client") + } + + r := &Regulator{ + nodeName: os.Getenv("NODE_NAME"), + podInfo: make(map[string]PodMeta), + containerInfo: make(map[string]ContainerMeta), + kubeletStub: kubeletStub, + kubeCli: kubecli, + nodemap: NewNUMANodeMap(), + } + + if r.nodeName == "" { + return nil, fmt.Errorf("failed to create regulator: Couldn't get node name") + } + + return r, nil +} + +func (r *Regulator) Start() error { + klog.Infof("running regulator...") + // for { + // // + // } + return nil +} + +// CPUSet 需求的计算单元 +// type CPUAccumulator struct { +// topo *topology.CPUTopology +// details topology.CPUDetails +// numCPUsNeeded int +// result cpuset.CPUSet +// } + +// 把获取到的信息同步到 Regulator中,并实现与 NUMANode 的映射 +func (r *Regulator) SyncInfo(pods []*api.PodSandbox, containers []*api.Container) { + + // 将 Pod 和 NUMA NOde 进行关系映射 + r.syncPodInfo(pods) + + // 将 Container 和 NUMA NOde 进行关系映射 + r.syncContainerInfo(containers) + + r.syncNumaNodeMap() +} + +func (r *Regulator) syncPodInfo(pods []*api.PodSandbox) { + podList, err := r.kubeCli.GetPodListOnNode(r.nodeName) + if err != nil { + klog.Errorf("Failed to get pod in %v", r.nodeName) + } + + for _, podSandBox := range pods { + uid := podSandBox.Uid + name := podSandBox.Name + namespace := podSandBox.Namespace + resource := podSandBox.Linux.Resources + qos := getQosClass(podList, uid) + klog.Infof("uid = %v, name = %v, namespace = %v resource = %v qos = %v", uid, name, namespace, resource, qos) + if resource != nil { + r.podInfo[uid] = PodMeta{ + Name: name, + Uid: uid, + Namespace: namespace, + Resource: resource, + QosClass: qos, + } + } else { + klog.Infof("Pod: %v is zombie pod, uid : %v", name, uid) + } + } +} + +func getQosClass(podList *corev1.PodList, uid string) QoSClass { + klog.Infof("len podlist items = %v", len(podList.Items)) + for _, pod := range podList.Items { + if string(pod.UID) == uid { + return QoSClass(pod.Status.QOSClass) + } + } + + return "" +} + +func (r *Regulator) syncContainerInfo(containers []*api.Container) { + containerCpuSpecMap := r.getCPULimitsAndRequests() + + for _, container := range containers { + name := container.Name + pid := container.Pid + id := container.Id + podid := container.PodSandboxId + cpushares := container.Linux.Resources.Cpu.Shares.GetValue() + state := container.State + resource := container.Linux.Resources + cgrouppath := container.Linux.CgroupsPath + klog.Infof("name = %v, pid = %v, id = %v podid = %v cpushares = %v, state = %v, resource = %v cgrouppath = %v", + name, pid, id, podid, cpushares, state, resource, cgrouppath) + r.containerInfo[id] = ContainerMeta{ + Name: name, + Pid: pid, + ID: id, + PodID: podid, + CPUShares: cpushares, + State: state, + Resource: resource, + CgroupsPath: cgrouppath, + CPUAllocation: containerCpuSpecMap[name], + } + } +} + +// 设置 nodeMap 字段,将pod 和 container信息与 numanode 进行映射 +func (r *Regulator) syncNumaNodeMap() { + localCPUInfo, err := resources.GetLocalCPUInfo() + if err != nil { + klog.Errorf("localCPU info get failed, err: %v", err) + } + if localCPUInfo == nil { + klog.Error("-----Failed to Detect CPU Info -----------") + } + + // if localCPUInfo.TotalInfo == nil { + // klog.Error("-----Failed to Detect CPU Info -----------") + // } + + cpuTopoInfo, err := resources.Discover(&localCPUInfo.TotalInfo) + if err != nil { + klog.Errorf("cpuTopoInfo get failed, err: %v", err) + } + + r.nodemap.topo = *cpuTopoInfo + if r.podInfo == nil { + klog.Errorf("pod info is nil") + } + + klog.Infof("PodInfo's length: %v", len(r.podInfo)) + + // pod 加入到部署的 NUMA Node 下 + for _, podmeta := range r.podInfo { + if podmeta.Resource == nil { + klog.Errorf("podmeta reource is nil") + } + cpulist := resources.GetCPUSetList(podmeta.Resource.Cpu.Cpus) + numaSet := map[int]int{} + for _, cpuid := range cpulist { + numaNodeId := cpuTopoInfo.CPUDetails[cpuid].NUMANodeID + numaSet[numaNodeId] = 1 + } + for numaNodeId, _ := range numaSet { + numaInfo := r.nodemap.numaNode[numaNodeId] + numaInfo.pods = append(numaInfo.pods, podmeta) + r.nodemap.numaNode[numaNodeId] = numaInfo + klog.Infof("Add container into numaInfo --- nodeID: %v, ContainerMeta: %v", numaNodeId, podmeta) + } + } + + klog.Infof("ContainerInfo's length: %v", len(r.containerInfo)) + + // container 加入到部署的 NUMA Node 下 + for _, conmeta := range r.containerInfo { + cpulist := resources.GetCPUSetList(conmeta.Resource.Cpu.Cpus) + numaSet := map[int]int{} + for _, cpuid := range cpulist { + numaNodeId := cpuTopoInfo.CPUDetails[cpuid].NUMANodeID + numaSet[numaNodeId] = 1 + } + for numaNodeId, _ := range numaSet { + numaInfo := r.nodemap.numaNode[numaNodeId] + numaInfo.containers = append(numaInfo.containers, conmeta) + r.nodemap.numaNode[numaNodeId] = numaInfo + klog.Infof("Add container into numaInfo --- nodeID: %v, ContainerMeta: %v", numaNodeId, conmeta) + } + + } +} + +func (r *Regulator) Regulate(pod *api.PodSandbox, container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { + r.Lock() + defer r.Unlock() + + // Adjust cpuset and memset according to cpu Info + + // 进行单个 NUMA 节点的空闲资源计算 + numaNodeStat, allocatableNode := r.singleNodeEstimate(pod, container) + if len(allocatableNode) == 0 { + klog.Info("Can't meet the demand, SuppressEstimate........") + _, allocatableNode = r.suppressEstimate(pod, container, numaNodeStat) + + } + + klog.Infof("After SingleNodeEstimate, allocatableNode: %v", len(allocatableNode)) + + // TODO: 尝试主动驱逐,暂时没有实现 + // r.evictEstimate(pod, container, adjustment, updates) + + // 选择cpu可用值最大的节点 + maxNode := 0 + maxUsage := 0 + for i, usage := range allocatableNode { + if maxUsage < usage.cpuFree { + maxUsage = usage.cpuFree + maxNode = i + } + } + + // 生成 container 的资源配置 + adjustment, err := r.genAdjustment(pod, container, allocatableNode[maxNode]) + if err != nil { + klog.Info("failed to generate container adjustment") + return nil, nil, err + } + + // 生成运行中被压缩 container 的资源更新配置 + updates, err := r.genUpdates(pod, container, allocatableNode[maxNode]) + if err != nil { + klog.Info("failed to generate container updates") + return nil, nil, err + } + + return adjustment, updates, nil +} + +func (r *Regulator) genUpdates(pod *api.PodSandbox, container *api.Container, nodeUsage NUMANodeUsage) ([]*api.ContainerUpdate, error) { + nodeId := nodeUsage.nodeId + conUpdates := []*api.ContainerUpdate{} + + // 尽量少地压缩容器 + cons := r.nodemap.numaNode[nodeId].containers + + // 以 cpu.limits 和 cpu.requests 的差值为主要依据降序排序 + sort.Slice(cons, func(i, j int) bool { + a, b := cons[i], cons[j] + aLimits, _ := strconv.ParseUint(a.CPUAllocation.CPULimits, 10, 64) + aReqs, _ := strconv.ParseUint(a.CPUAllocation.CPURequests, 10, 64) + bLimits, _ := strconv.ParseUint(b.CPUAllocation.CPULimits, 10, 64) + bReqs, _ := strconv.ParseUint(b.CPUAllocation.CPURequests, 10, 64) + // 没有 Limits 时,优先压缩 requests 值小的容器 + if aLimits == 0 && bLimits == 0 { + return aReqs < bReqs + } else if aLimits == 0 && bLimits != 0 { + // 选择没有 Limits 值的进行压缩 + return true + } else if aLimits != 0 && bLimits == 0 { + return false + } + // 相等时,选择较小 requests 值的容器 + if (aLimits - aReqs) == (bLimits - bReqs) { + return aReqs > bReqs + } + return (aLimits - aReqs) > (bLimits - bReqs) + }) + + // 选择出能够满足的节点,生成对应的 ContainerUpdate + var suppressSpace uint64 = 0 + + conCpuSpec := r.getCPULimitsAndRequests() + cpuRequest, _ := strconv.ParseUint(conCpuSpec[container.Name].CPURequests, 10, 64) + // cpuLimit := conCpuSpec[containerName].CPULimits + + for _, con := range r.nodemap.numaNode[nodeId].containers { + update := api.ContainerUpdate{} + // 判断资源足够 + if suppressSpace > cpuRequest { + break + } + + // 调低 quota 值至 request + cpuReq, _ := strconv.ParseUint(con.CPUAllocation.CPURequests, 10, 64) + cpuLimit, _ := strconv.ParseUint(con.CPUAllocation.CPULimits, 10, 64) + update.SetLinuxCPUQuota(int64(cpuReq * r.containerInfo[con.ID].Resource.Cpu.Period.GetValue())) + conUpdates = append(conUpdates, &update) + if cpuLimit != 0 { + suppressSpace += (cpuLimit - cpuReq) + } + } + return conUpdates, nil +} + +// TODO +func (r *Regulator) genAdjustment(pod *api.PodSandbox, container *api.Container, nodeUsage NUMANodeUsage) (*api.ContainerAdjustment, error) { + nodeid := nodeUsage.nodeId + adjustment := &api.ContainerAdjustment{} + + // 获取节点当中的 pod 信息 + poduid := pod.Uid + podList, err := r.kubeCli.GetPodListOnNode(r.nodeName) + if err != nil { + klog.Errorf("Failed to get pod in %v", r.nodeName) + } + qos := getQosClass(podList, poduid) + // 设置 cpuset 范围 + nodecpuset := resources.GetNumaNodeCPUSet(&r.nodemap.topo, nodeid) + adjustment.SetLinuxCPUSetCPUs(nodecpuset) + klog.Infof("node id: %v, poduid: %v, qos: %v", nodeid, poduid, qos) + + // 针对 Burstable 类型进行 CPU 参数调整 + if qos == QoSBurstable { + containerName := container.Name + result := r.getCPULimitsAndRequests() + cpurequest := result[containerName].CPURequests + cpulimit := result[containerName].CPULimits + + cpuspec := r.containerInfo[containerName].CPUAllocation + // 设置 cpu.shares 和 cpu.quota + share, _ := strconv.ParseUint(cpurequest, 10, 64) + quota, _ := strconv.ParseInt(cpulimit, 10, 64) + share = share * 1024 + quota = quota * int64(container.Linux.Resources.Cpu.Period.Value) + klog.Infof("containername: %v, cpuspec: %v, cpurequest: %v, cpulimit: %v share: %v,quota : %v", + containerName, cpuspec, cpurequest, cpulimit, share, quota) + + adjustment.SetLinuxCPUShares(share) + adjustment.SetLinuxCPUQuota(quota) + } + + return adjustment, nil +} + +// 获取设置的 cpu.Limits 和 cpu.Requests +func (r *Regulator) getCPULimitsAndRequests() map[string]CPUSpec { + // podList, err := r.kubeletStub.GetAllPods() + podList, err := r.kubeCli.GetPodListOnNode(r.nodeName) + if err != nil { + klog.Infof("podlist get failed, err: %v", err) + } + + // klog.Info("Get Pod List from kubernetes --- PodList: %v", podList) + + // 依据容器名字查询 + result := make(map[string]CPUSpec) + for _, podInfo := range podList.Items { + containers := podInfo.Spec.Containers + + for _, con := range containers { + conName := con.Name + + cpuRequest := con.Resources.Requests["cpu"] + cpuLimit := con.Resources.Limits["cpu"] + + klog.Infof("Get From Kubernetes --- Container: %v, requests.cpu = %v, limits.cpu = %v", conName, cpuRequest, cpuLimit) + + result[conName] = CPUSpec{ + CPURequests: cpuRequest.String(), + CPULimits: cpuLimit.String(), + } + } + } + return result +} + +// NUMA +type NUMANodeUsage struct { + nodeId int + cpuUsed int + cpuFree int +} + +func (r *Regulator) singleNodeEstimate(_ *api.PodSandbox, container *api.Container) ([]NUMANodeUsage, []NUMANodeUsage) { + // 按照空闲CPU资源排序 NUMA Node,从多至少依次排列 + numaNodeNum := r.nodemap.topo.NumNUMANodes + reqQuota := container.Linux.Resources.Cpu.Quota.GetValue() + reqPeriod := container.Linux.Resources.Cpu.Period.GetValue() + cpuNum := r.nodemap.topo.NumCPUs + reqNum := int(math.Ceil(float64(reqQuota) / float64(reqPeriod))) + + nodeInfos := r.nodemap.numaNode + klog.Infof("numaNodeNum = %v, reqQuota = %v, reqPeriod = %v, cpuNum = %v, reqNum = %v, nodeInfos = %v", + numaNodeNum, reqQuota, reqPeriod, cpuNum, reqNum, nodeInfos) + nodeUsage := make([]int, numaNodeNum) + for i := 0; i < len(nodeUsage); i++ { + nodeUsage[i] = cpuNum / numaNodeNum + } + // NUMA Node 当前的资源使用情况,依据 CPU 核心数量来计算 + // 统计每一个Node当中的资源使用,剩余最多的 + // 遍历 numaNode 集合 + for nodeId, nodeInfo := range nodeInfos { + cpuUsage := float64(0) + + klog.Infof("NodeID: %v, nodeInfo: %v", nodeId, nodeInfo) + + for _, containerInfo := range nodeInfo.containers { + quota := containerInfo.Resource.Cpu.Quota.GetValue() + period := containerInfo.Resource.Cpu.Period.GetValue() + if quota != -1 { + cpuUsage += float64(quota) / float64(period) + } + } + nodeUsage[nodeId] -= int(math.Ceil(cpuUsage)) + } + + allNode := []NUMANodeUsage{} + allocatableNode := []NUMANodeUsage{} + for i, v := range nodeUsage { + tmp := NUMANodeUsage{ + cpuUsed: cpuNum/numaNodeNum - 1, + nodeId: i, + cpuFree: v, + } + allNode = append(allNode, tmp) + if v >= reqNum { + allocatableNode = append(allocatableNode, tmp) + } + } + + klog.Infof("AllocatableNode: %v", allocatableNode) + + return allNode, allocatableNode +} + +// 返回值:压缩统计后,压缩后能满足的空闲节点 +func (r *Regulator) suppressEstimate(_ *api.PodSandbox, container *api.Container, numaUsage []NUMANodeUsage) ([]NUMANodeUsage, []NUMANodeUsage) { + // 按照 (cpu.limits 和 cpu.requests 差值 + 空闲核心值),排序 NUMA Node + numaStat := []NUMANodeUsage{} + + for _, v := range numaUsage { + perNumaCpuNum := v.cpuFree + v.cpuUsed + numaId := v.nodeId + zipSpace := 0 + // 对集群中的 Burstable Pod 下的容器进行压缩 + for _, con := range r.nodemap.numaNode[numaId].containers { + podId := con.PodID + qos := r.podInfo[podId].QosClass + + if qos == QoSBurstable { + cpuspec := con.CPUAllocation + limit, _ := strconv.ParseUint(cpuspec.CPULimits, 10, 64) + request, _ := strconv.ParseUint(cpuspec.CPURequests, 10, 64) + zipSpace += int(limit - request) + } + } + numaStat = append(numaStat, NUMANodeUsage{ + cpuFree: zipSpace + v.cpuFree, // 压缩的资源空间 + 原本空闲的资源空间 + cpuUsed: perNumaCpuNum - v.cpuFree, + nodeId: v.nodeId, + }) + } + // 计算可分配节点 + reqquota := container.Linux.Resources.Cpu.Quota.GetValue() + reqperiod := container.Linux.Resources.Cpu.Period.GetValue() + reqNum := int(math.Ceil(float64(reqquota) / float64(reqperiod))) + + allocatableNode := []NUMANodeUsage{} + for _, v := range numaStat { + // 空闲CPU数超过请求CPU数,则可以分配 + if v.cpuFree >= reqNum { + allocatableNode = append(allocatableNode, v) + } + } + return numaStat, allocatableNode +} + +// TODO:暂时不进行驱逐 +func (r *Regulator) evictEstimate(_ *api.PodSandbox, _ *api.Container, _ *api.ContainerAdjustment, _ []*api.ContainerUpdate) error { + return nil +} + +type Ops int + +const ( + CreateConOps Ops = 1 + RemoveConOps Ops = 2 + RemovePodOps Ops = 3 + RunPodOps Ops = 4 +) + +func (r *Regulator) Record(pod *api.PodSandbox, container *api.Container, ops Ops) error { + podList, err := r.kubeCli.GetPodListOnNode(r.nodeName) + if err != nil { + klog.Errorf("Failed to get pod in %v", r.nodeName) + } + + switch ops { + case CreateConOps: + klog.Info("Record --- Container Creation......") + // 加入 containerInfo + cpuSpecMap := r.getCPULimitsAndRequests() + containerMeta := ContainerMeta{ + Name: container.GetName(), + Pid: container.GetPid(), + ID: container.GetId(), + PodID: container.GetPodSandboxId(), + CPUShares: container.GetLinux().Resources.Cpu.Shares.GetValue(), + State: container.GetState(), + Resource: container.GetLinux().Resources, + CgroupsPath: container.GetLinux().GetCgroupsPath(), + CPUAllocation: cpuSpecMap[container.GetName()], + } + r.containerInfo[container.Id] = containerMeta + // 同步 container 信息到 NodeMap 当中 + numaSet := map[int]int{} + conCpulist := resources.GetCPUSetList(containerMeta.Resource.Cpu.Cpus) + for _, cpuid := range conCpulist { + numaNodeId := r.nodemap.topo.CPUDetails[cpuid].NUMANodeID + numaSet[numaNodeId] = 1 + } + for numaNodeId, _ := range numaSet { + numaInfo := r.nodemap.numaNode[numaNodeId] + numaInfo.containers = append(numaInfo.containers, containerMeta) + r.nodemap.numaNode[numaNodeId] = numaInfo + } + case RemoveConOps: + klog.Info("Record --- Remove Container......") + // 从 ContainerInfo 中删除 + delete(r.containerInfo, container.GetId()) + // 从 NodeMap 中删除 Container + conCpulist := resources.GetCPUSetList(container.GetLinux().GetResources().Cpu.Cpus) + for _, cpuid := range conCpulist { + numaNodeId := r.nodemap.topo.CPUDetails[cpuid].NUMANodeID + numaInfo := r.nodemap.numaNode[numaNodeId] + // 删除 container 信息 + idx := 0 + for _, v := range numaInfo.containers { + if v.ID != container.Id { + idx++ + } + } + numaInfo.containers = append(numaInfo.containers[:idx], numaInfo.containers[idx+1:]...) + r.nodemap.numaNode[numaNodeId] = numaInfo + klog.Infof("Remove pod into numaInfo --- nodeID: %v, PodMeta: %v", numaNodeId, numaInfo.pods) + } + case RemovePodOps: + // TODO: Remove Pod 和 Remove Container 的事件发生顺序,是严格按照 Pod 先于 Container 吗? + // 从 PodInfo 中删除 + klog.Info("Record --- Remove Pod......") + uid := pod.GetUid() + delete(r.podInfo, uid) + // 从 NodeMap 中删除Pod + numaSet := map[int]int{} + podCpulist := resources.GetCPUSetList(pod.GetLinux().GetResources().Cpu.Cpus) + for _, cpuid := range podCpulist { + numaNodeId := r.nodemap.topo.CPUDetails[cpuid].NUMANodeID + numaSet[numaNodeId] = 1 + } + for numaNodeId, _ := range numaSet { + numaInfo := r.nodemap.numaNode[numaNodeId] + // 删除 container 信息 + idx := 0 + for _, v := range numaInfo.pods { + if v.Uid != pod.Uid { + idx++ + } + } + numaInfo.pods = append(numaInfo.pods[:idx], numaInfo.pods[idx+1:]...) + r.nodemap.numaNode[numaNodeId] = numaInfo + klog.Infof("Remove pod into numaInfo --- nodeID: %v, PodMeta: %v", numaNodeId, numaInfo.pods) + } + case RunPodOps: + klog.Info("Record --- Run Pod......") + uid := pod.GetUid() + podMeta := PodMeta{ + Name: pod.GetName(), + Uid: uid, + Namespace: pod.GetNamespace(), + Resource: pod.GetLinux().GetResources(), + QosClass: getQosClass(podList, uid), + } + r.podInfo[uid] = podMeta + // 同步 Pod 信息到 NodeMap 当中 + podCpulist := resources.GetCPUSetList(podMeta.Resource.Cpu.Cpus) + numaSet := map[int]int{} + for _, cpuid := range podCpulist { + numaNodeId := r.nodemap.topo.CPUDetails[cpuid].NUMANodeID + numaSet[numaNodeId] = 1 + } + for numaNodeId, _ := range numaSet { + numaInfo := r.nodemap.numaNode[numaNodeId] + numaInfo.pods = append(numaInfo.pods, podMeta) + r.nodemap.numaNode[numaNodeId] = numaInfo + } + } + return nil +} + diff --git a/pkg/resmgr/resmgr.go b/pkg/resmgr/resmgr.go new file mode 100644 index 0000000..7d1150c --- /dev/null +++ b/pkg/resmgr/resmgr.go @@ -0,0 +1,121 @@ +package resmgr + +import ( + "fmt" + "sync" + + "github.com/containerd/nri/pkg/api" + "k8s.io/klog/v2" + "numaadj.huawei.com/pkg/agent" + "numaadj.huawei.com/pkg/policy" +) + +type ResourceManager interface { + // Start starts the resource manager. + Start() error + + // Stop stops the resource manager. + Stop() error + + // SendEvent sends an evnet to be processed by the resource manager. + SendEvent(event interface{}) error +} + +type resmgr struct { + sync.RWMutex + agent *agent.Agent + nri *nriPlugin // NRI plugin, if we're running as such + regulator *Regulator // 调整器 + running bool +} + +func NewResourceManager(agt *agent.Agent, backend interface{}) (*resmgr, error) { + m := &resmgr{ + agent: agt, + } + + regulator, err := NewRegulator() + if err != nil { + klog.Info("Failed to new regulator") + } + m.regulator = regulator + + klog.Info("running as an NRI plugin...") + nrip, err := newNRIPlugin(m) + if err != nil { + return nil, err + } + m.nri = nrip + + // TODO: m set policy + + return m, nil +} + +func (m *resmgr) Start() error { + klog.Infof("starting agent, waiting for initial configuration...") + if err := m.regulator.Start(); err != nil { + return err + } + + if err := m.agent.Start(m.updateConfig); err != nil { + return err + } + + return nil +} + +func (m *resmgr) Regulate(pod *api.PodSandbox, container *api.Container) (*api.ContainerAdjustment, []*api.ContainerUpdate, error) { + return m.regulator.Regulate(pod, container) +} + +func (m *resmgr) updateConfig(newCfg interface{}) error { + if newCfg == nil { + return fmt.Errorf("can not run without effective configuration...") + } + + crdCfg, ok := newCfg.(*policy.Config) + if !ok { + return fmt.Errorf("can not run without effective configuration...") + } + + if !m.running { + klog.Infof("aquired initial configuration") + + if err := m.start(newCfg); err != nil { + klog.Fatalf("failed to start with initial configuration: %v", err) + } + m.running = true + } + + return m.reconfigCrd(crdCfg) +} + +func (m *resmgr) reconfigCrd(crdCfg *policy.Config) error { + if err := crdCfg.Validate(); err != nil { + return fmt.Errorf("in validate crd configuration") + } + + klog.Infof("update size: %v, crdCfg: %v", len(crdCfg.GetUpdate()), crdCfg.GetUpdate()) + + //for _, upd := range crdCfg.GetUpdate() { + // klog.Warningf("containerdId: %s, cpuset: %s, memset: %s", upd.ContainerId, upd.Linux.Resources.Cpu.Cpus, upd.Linux.Resources.Cpu.Mems) + //} + + _, err := m.nri.stub.UpdateContainers(crdCfg.GetUpdate()) + + if err != nil { + return fmt.Errorf("failed to adjust numa node by nriPlugin, %v", err) + } + return nil +} + +func (m *resmgr) start(cfg interface{}) error { + klog.Infof("starting resource manager...") + if cfg != nil { + } + if err := m.nri.start(); err != nil { + return err + } + return nil +} -- Gitee