From ba04ea57449de33245cddc60444bfd244e4a0306 Mon Sep 17 00:00:00 2001 From: likun Date: Sun, 28 Sep 2025 20:56:35 +0800 Subject: [PATCH] support fssr container policy for dynamic memory --- pkg/services/dynmemory/dynmemory.go | 8 +- pkg/services/dynmemory/fssr.go | 8 +- pkg/services/dynmemory/fssr_container.go | 269 +++++++++++++++++++++++ 3 files changed, 278 insertions(+), 7 deletions(-) create mode 100644 pkg/services/dynmemory/fssr_container.go diff --git a/pkg/services/dynmemory/dynmemory.go b/pkg/services/dynmemory/dynmemory.go index fa071f6..59100f6 100644 --- a/pkg/services/dynmemory/dynmemory.go +++ b/pkg/services/dynmemory/dynmemory.go @@ -18,7 +18,7 @@ type DynMemoryAdapter interface { preStart(api.Viewer) error getInterval() int dynamicAdjust() - setOfflinePod(path string) error + setOfflinePod(podInfo *typedef.PodInfo) error } type dynMemoryConfig struct { Policy string `json:"policy,omitempty"` @@ -91,7 +91,7 @@ func (dynMem *DynMemory) IsRunner() bool { // AddPod to deal the event of adding a pod. func (dynMem *DynMemory) AddPod(podInfo *typedef.PodInfo) error { if podInfo.Offline() { - return dynMem.dynMemoryAdapter.setOfflinePod(podInfo.Path) + return dynMem.dynMemoryAdapter.setOfflinePod(podInfo) } return nil } @@ -99,7 +99,7 @@ func (dynMem *DynMemory) AddPod(podInfo *typedef.PodInfo) error { // UpdatePod to deal the pod update event. func (dynMem *DynMemory) UpdatePod(old, new *typedef.PodInfo) error { if new.Offline() { - return dynMem.dynMemoryAdapter.setOfflinePod(new.Path) + return dynMem.dynMemoryAdapter.setOfflinePod(new) } return nil } @@ -109,6 +109,8 @@ func newAdapter(policy string) DynMemoryAdapter { switch policy { case "fssr": return initFssrDynMemAdapter() + case "fssr-container": + return initFssrContainerDynMemAdapter() default: log.Errorf("no matching policy[%v] is found", policy) } diff --git a/pkg/services/dynmemory/fssr.go b/pkg/services/dynmemory/fssr.go index b838cdd..35b8fa1 100644 --- a/pkg/services/dynmemory/fssr.go +++ b/pkg/services/dynmemory/fssr.go @@ -122,7 +122,7 @@ func (f *fssrDynMemAdapter) adjustOfflinePodHighMemory() { func (f *fssrDynMemAdapter) dealExistedPods() error { pods := listOfflinePods(f.viewer) for _, podInfo := range pods { - if err := f.setOfflinePod(podInfo.Path); err != nil { + if err := f.setOfflinePod(podInfo); err != nil { log.Errorf("failed to set fssr of offline pod[%v]:%v", podInfo.UID, err) } } @@ -138,11 +138,11 @@ func listOfflinePods(viewer api.Viewer) map[string]*typedef.PodInfo { } // setOfflinePod sets the offline pod for the given path. -func (f *fssrDynMemAdapter) setOfflinePod(path string) error { - if err := setOfflinePodHighAsyncRatio(path, highRatio); err != nil { +func (f *fssrDynMemAdapter) setOfflinePod(podInfo *typedef.PodInfo) error { + if err := setOfflinePodHighAsyncRatio(podInfo.Path, highRatio); err != nil { return err } - return setOfflinePodHighMemory(path, f.memHigh) + return setOfflinePodHighMemory(podInfo.Path, f.memHigh) } // setOfflinePodHighMemory sets the high memory limit for the specified pod in the diff --git a/pkg/services/dynmemory/fssr_container.go b/pkg/services/dynmemory/fssr_container.go new file mode 100644 index 0000000..154856c --- /dev/null +++ b/pkg/services/dynmemory/fssr_container.go @@ -0,0 +1,269 @@ +package dynmemory + +import ( + "fmt" + "math" + "strconv" + "sync" + + "isula.org/rubik/pkg/api" + "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/core/typedef/cgroup" +) + +// fssr container structure for policy +type fssrContainerDynMemAdapter struct { + // total memory of the node + memTotal int64 + // free memory of the node + memFree int64 + viewer api.Viewer + // memory high of all containers + pods map[string]*fssrPod + lock sync.Mutex +} + +type fssrPod struct { + containers map[string]*fssrContainer +} + +type fssrContainer struct { + memHigh int64 +} + +// initFssrContainerDynMemAdapter initializes a new fssrContainerDynMemAdapter struct. +func initFssrContainerDynMemAdapter() *fssrContainerDynMemAdapter { + memTotal, err := getFieldMemory("MemTotal") + if err != nil { + log.Errorf("get field memory of MemTotal error: %v", err) + panic("get memory total failed") + } + if memTotal <= 0 { + log.Errorf("invalid MemTotal of %d", memTotal) + panic("invalid memory total") + } + + memFree, err := getFieldMemory("MemFree") + if err != nil { + log.Errorf("get field memory of MemFree error: %v", err) + panic("get memory free failed") + } + if memFree <= 0 { + log.Errorf("invalid MemFree of %d", memTotal) + panic("invalid memory free") + } + + return &fssrContainerDynMemAdapter{ + memTotal: memTotal, + memFree: memFree, + pods: map[string]*fssrPod{}, + } +} + +// preStart initializes the fssrContainerDynMemAdapter with the provided viewer and +// deals with any existing pods. +func (f *fssrContainerDynMemAdapter) preStart(viewer api.Viewer) error { + f.viewer = viewer + return f.dealExistedPods() +} + +// getInterval returns the fssrInterval value. +func (f *fssrContainerDynMemAdapter) getInterval() int { + return fssrInterval +} + +// dynamicAdjust adjusts the memory allocation of the fssrContainerDynMemAdapter by +// increasing or decreasing the amount of memory reserved for offline pods +// based on the current amount of free memory available on the system. +func (f *fssrContainerDynMemAdapter) dynamicAdjust() { + f.lock.Lock() + defer f.lock.Unlock() + + var freeMem int64 + var err error + if freeMem, err = getFieldMemory("MemFree"); err != nil { + log.Errorf(err.Error()) + return + } + if freeMem <= 0 { + log.Errorf("invalid MemFree of %d", freeMem) + return + } + f.memFree = freeMem + + pods := listOfflinePods(f.viewer) + f.trimContainerMemoryHigh(pods) + for _, podInfo := range pods { + if err = f.adjustMemoryHigh(podInfo); err != nil { + log.Errorf(err.Error()) + } + } +} + +// getContainerMemoryHigh return the memHigh of specified container +func (f *fssrContainerDynMemAdapter) getContainerMemoryHigh(podID, containerID string) int64 { + pod, ok := f.pods[podID] + if !ok || pod == nil { + pod = &fssrPod{ + containers: map[string]*fssrContainer{}, + } + f.pods[podID] = pod + } + + container, ok := pod.containers[containerID] + if !ok || container == nil { + container = &fssrContainer{ + memHigh: math.MaxInt64, + } + pod.containers[containerID] = container + } + + return container.memHigh +} + +// setContainerMemoryHigh set the memHigh of specified container +func (f *fssrContainerDynMemAdapter) setContainerMemoryHigh(podID, containerID string, memHigh int64) { + pod, ok := f.pods[podID] + if !ok || pod == nil { + pod = &fssrPod{ + containers: map[string]*fssrContainer{}, + } + f.pods[podID] = pod + } + + container, ok := pod.containers[containerID] + if !ok || container == nil { + container = &fssrContainer{} + pod.containers[containerID] = container + } + + container.memHigh = memHigh +} + +// trimContainerMemoryHigh will sync f.pods by podInfo map from podmanager +// it will delete pods and containers those not exists in podInfo map +func (f *fssrContainerDynMemAdapter) trimContainerMemoryHigh(pods map[string]*typedef.PodInfo) { + var podInfo *typedef.PodInfo + var ok bool + + for podUID, pod := range f.pods { + // trim pods + if podInfo, ok = pods[podUID]; !ok || podInfo == nil { + delete(f.pods, podUID) + continue + } + + // trim containers + for containerID := range pod.containers { + if _, ok := podInfo.IDContainersMap[containerID]; !ok { + delete(pod.containers, containerID) + } + } + } +} + +// adjust memory high for all containers of a pod +func (f *fssrContainerDynMemAdapter) adjustMemoryHigh(podInfo *typedef.PodInfo) error { + for _, containerInfo := range podInfo.IDContainersMap { + if containerInfo == nil { + continue + } + + // set memTotal as limits.memory for containers with limits.memory + // set memTotal as node memory total for containers without limits.memory + memTotal := f.memTotal + memLimit := containerInfo.LimitResources[typedef.ResourceMem] + if memLimit > 0 { + memTotal = int64(memLimit) + } + memHigh := f.getContainerMemoryHigh(podInfo.UID, containerInfo.ID) + + // compute new memory high by memTotal and current memHigh + newMemHigh, err := f.computeMemoryHigh(memTotal, memHigh) + if err != nil { + return err + } + + if newMemHigh != memHigh { + f.setContainerMemoryHigh(podInfo.UID, containerInfo.ID, newMemHigh) + if err := setHighMemory(containerInfo.Path, newMemHigh); err != nil { + return fmt.Errorf("failed to adjust high memory for container %s of offline pod %v: %v", containerInfo.Name, podInfo.UID, err) + } + } + } + + return nil +} + +// compute new memory high by memTotal and current memHigh +// the increase or decrease is based on the free memory of the node +func (f *fssrContainerDynMemAdapter) computeMemoryHigh(memTotal, curMemHigh int64) (int64, error) { + var newMemHigh int64 = curMemHigh + + if memTotal < 0 || curMemHigh < 0 { + return 0, fmt.Errorf("invalid memTotal=%d or memHigh=%d", memTotal, curMemHigh) + } + + // fast suppression and slow recovery + if f.memFree > f.memTotal/5 { + newMemHigh = curMemHigh + memTotal/1000 + if newMemHigh <= 0 { + newMemHigh = math.MaxInt64 + } + } else if f.memFree < f.memTotal/10 { + newMemHigh = curMemHigh - memTotal/10 + if newMemHigh < 0 { + newMemHigh = 0 + } + } + + // make sure newMemHigh is between 30% and 80% of memTotal + if newMemHigh < memTotal/10*3 { + newMemHigh = memTotal / 10 * 3 + } else if newMemHigh > memTotal/10*8 { + newMemHigh = memTotal / 10 * 8 + } + + return newMemHigh, nil +} + +// dealExistedPods handles offline pods by setting their memory.high and memory.high_async_ratio +func (f *fssrContainerDynMemAdapter) dealExistedPods() error { + pods := listOfflinePods(f.viewer) + for _, podInfo := range pods { + if err := f.setOfflinePod(podInfo); err != nil { + log.Errorf("failed to set fssr of offline pod[%v]:%v", podInfo.UID, err) + } + } + return nil +} + +// setOfflinePod sets the offline pod for the given pod. +func (f *fssrContainerDynMemAdapter) setOfflinePod(podInfo *typedef.PodInfo) error { + f.lock.Lock() + defer f.lock.Unlock() + + if err := setHighAsyncRatio(podInfo, highRatio); err != nil { + return err + } + return f.adjustMemoryHigh(podInfo) +} + +// setHighMemory sets the high memory limit for the specified container or pod in the cgroup memory +func setHighMemory(containerPath string, memHigh int64) error { + return cgroup.WriteCgroupFile(strconv.FormatUint(uint64(memHigh), scale), memcgRootDir, + containerPath, highMemFile) +} + +// setHighAsyncRatio sets the high memory async ratio for a pod in an offline state. +func setHighAsyncRatio(podInfo *typedef.PodInfo, ratio uint) error { + for _, containerInfo := range podInfo.IDContainersMap { + err := cgroup.WriteCgroupFile(strconv.FormatUint(uint64(ratio), scale), memcgRootDir, + containerInfo.Path, highMemAsyncRatioFile) + if err != nil { + return err + } + } + return nil +} -- Gitee