From ada41eb8d2739c96b514c73478ae4174e7e5631a Mon Sep 17 00:00:00 2001 From: vegbir Date: Mon, 22 May 2023 20:18:52 +0800 Subject: [PATCH] rubik: using cadvisor to get pods with max cpu&memory We implemented an eviction method based on cadvisor statistics. First, the psi indicators (cpu/memery/io) of online services will be counted according to the configuration parameters. If any online service psi indicators exceed the standard, the offline service resource statistics method will be triggered, and the offline service pods will be deleted. Signed-off-by: vegbir --- .gitignore | 1 + go.mod | 2 +- hack/rubik-daemonset.yaml | 3 + pkg/core/trigger/base.go | 63 +++++++- pkg/core/trigger/expulsion.go | 93 +++++++++++- pkg/core/trigger/resourceanalysis.go | 184 ++++++++++++++++++++--- pkg/informer/apiserverinformer.go | 6 +- pkg/resourcemanager/cadvisor/cadvisor.go | 76 +++++++--- pkg/rubik/rubik.go | 8 + pkg/services/psi/metric.go | 20 +-- pkg/services/psi/psi.go | 42 ++++-- 11 files changed, 411 insertions(+), 87 deletions(-) diff --git a/.gitignore b/.gitignore index 1fcd509..ce1ce3a 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ /coverage /gocov.json /testcasesresult.json +/compose-dev.yaml diff --git a/go.mod b/go.mod index f08a940..fe03728 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( k8s.io/api v0.20.2 k8s.io/apimachinery v0.20.2 k8s.io/client-go v0.20.2 - k8s.io/klog/v2 v2.80.1 ) require ( @@ -61,6 +60,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.80.1 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect sigs.k8s.io/yaml v1.2.0 // indirect diff --git a/hack/rubik-daemonset.yaml b/hack/rubik-daemonset.yaml index 4975b41..246d45e 100644 --- a/hack/rubik-daemonset.yaml +++ b/hack/rubik-daemonset.yaml @@ -6,6 +6,9 @@ rules: - apiGroups: [""] resources: ["pods"] verbs: ["list", "watch"] + - apiGroups: [""] + resources: ["pods/eviction"] + verbs: ["create"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/core/trigger/base.go b/pkg/core/trigger/base.go index 82af66a..7f1fbe9 100644 --- a/pkg/core/trigger/base.go +++ b/pkg/core/trigger/base.go @@ -16,6 +16,7 @@ package trigger import ( "fmt" + "sync" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" @@ -39,7 +40,7 @@ const ( type triggerCreator func() Trigger -var triggerMap map[Typ]triggerCreator = map[Typ]triggerCreator{ +var triggerCreatorMap map[Typ]triggerCreator = map[Typ]triggerCreator{ EXPULSION: expulsionCreator, RESOURCEANALYSIS: analyzerCreator, } @@ -52,6 +53,7 @@ type Descriptor interface { // Executor is the trigger execution function interface type Executor interface { Execute(Factor) (Factor, error) + Stop() error } // Trigger interface defines the trigger methods @@ -68,10 +70,11 @@ type TreeTrigger struct { subTriggers []Trigger } -// NewTreeTirggerNode returns a BaseMetric object -func NewTreeTirggerNode(name string) *TreeTrigger { +// withTreeTirgger returns a BaseMetric object +func withTreeTirgger(name string, exec Executor) *TreeTrigger { return &TreeTrigger{ name: name, + exec: exec, subTriggers: make([]Trigger, 0)} } @@ -88,6 +91,9 @@ func (t *TreeTrigger) Name() string { // Execute executes the sub-triggers of the current trigger func (t *TreeTrigger) Execute(f Factor) error { + if t.exec == nil { + return fmt.Errorf("trigger %v can not execute", t.name) + } var errs error res, err := t.exec.Execute(f) if err != nil { @@ -103,11 +109,54 @@ func (t *TreeTrigger) Execute(f Factor) error { return errs } -// GetTrigger returns a trigger singleton according to type -func GetTrigger(t Typ) Trigger { - if _, ok := triggerMap[t]; !ok { +// NewTrigger returns a trigger singleton according to type +func NewTrigger(t Typ) Trigger { + if _, ok := triggerCreatorMap[t]; !ok { log.Warnf("undefine trigger: %v", t) return nil } - return triggerMap[t]() + return triggerCreatorMap[t]() +} + +var ( + runningExecutors = make(map[string]Executor, 0) + execLock sync.Mutex +) + +func appendUsedExecutors(name string, exec Executor) { + if exec == nil { + log.Errorf("nil executor %v can not be used", name) + return + } + execLock.Lock() + defer execLock.Unlock() + if _, existed := runningExecutors[name]; existed { + log.Errorf("conflict executor %v", name) + return + } + log.Infof("using executor: %v", name) + runningExecutors[name] = exec +} + +// StopUsedExecutors stops running executors +func StopUsedExecutors() error { + execLock.Lock() + defer execLock.Unlock() + var errs error + // stop the executors one by one + for name, exec := range runningExecutors { + log.Infof("stopping executor %v", name) + if exec == nil { + log.Infof("executor %v has stopped", name) + continue + } + if err := exec.Stop(); err != nil { + errs = util.AppendErr(errs, util.AddErrorPrfix(err, name)) + } + } + // clear executors + for k := range runningExecutors { + delete(runningExecutors, k) + } + return errs } diff --git a/pkg/core/trigger/expulsion.go b/pkg/core/trigger/expulsion.go index dc03957..87dd484 100644 --- a/pkg/core/trigger/expulsion.go +++ b/pkg/core/trigger/expulsion.go @@ -9,23 +9,100 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for * +// Description: This file is used for expulsion trigger package trigger -import "isula.org/rubik/pkg/common/log" +import ( + "context" + "fmt" + "sync" -// expulsionExec is the singleton of eviction triggers -var expulsionExec = &Expulsion{} + policyv1beta1 "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/common/util" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/informer" +) + +// expulsionExec is the singleton of Expulsion executor implementation +var expulsionExec *Expulsion + +// expulsionCreator creates Expulsion trigger var expulsionCreator = func() Trigger { - return &TreeTrigger{name: ExpulsionAnno, exec: expulsionExec} + if expulsionExec == nil { + c := newKubeClient() + if c != nil { + log.Infof("initialize expulsionExec") + expulsionExec = &Expulsion{client: c} + appendUsedExecutors(ExpulsionAnno, expulsionExec) + } + } + return withTreeTirgger(ExpulsionAnno, expulsionExec) } // Expulsion is the trigger to evict pods -type Expulsion struct{} +type Expulsion struct { + sync.RWMutex + client *kubernetes.Clientset +} + +// newKubeClient returns a kubernetes.Clientset object +func newKubeClient() *kubernetes.Clientset { + client, err := informer.InitKubeClient() + if err != nil { + log.Errorf("fail to connect k8s: %v", err) + return nil + } + return client +} // Execute evicts pods based on the id of the given pod func (e *Expulsion) Execute(f Factor) (Factor, error) { - log.Infof("need to evict pod %v, real operation is TO BE CONTINUE", f.Message()) - return nil, nil + e.RLock() + defer e.RUnlock() + if e.client == nil { + return nil, fmt.Errorf("fail to use kubernetes client, please check") + } + var errs error + for name, pod := range f.TargetPods() { + log.Infof("evicting pod \"%v\"", name) + if err := inevictable(pod); err != nil { + errs = util.AppendErr(errs, fmt.Errorf("fail to evict pod \"%v\": %v", pod.Name, err)) + continue + } + eviction := &policyv1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: &metav1.DeleteOptions{}, + } + if err := e.client.CoreV1().Pods(pod.Namespace).Evict(context.TODO(), eviction); err != nil { + errs = util.AppendErr(errs, fmt.Errorf("fail to evict pod \"%v\": %v", pod.Name, err)) + continue + } + } + return nil, errs +} + +// Stop stops the expulsion trigger +func (e *Expulsion) Stop() error { + e.Lock() + defer e.Unlock() + e.client = nil + return nil +} + +func inevictable(pod *typedef.PodInfo) error { + var forbidden = map[string]struct{}{ + "kube-system": {}, + } + if _, existed := forbidden[pod.Namespace]; existed { + return fmt.Errorf("it is forbidden to delete the pod whose namespace is %v", pod.Namespace) + } + return nil } diff --git a/pkg/core/trigger/resourceanalysis.go b/pkg/core/trigger/resourceanalysis.go index 260b384..a3d99e5 100644 --- a/pkg/core/trigger/resourceanalysis.go +++ b/pkg/core/trigger/resourceanalysis.go @@ -9,40 +9,109 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-18 -// Description: This file is used for * +// Description: This file is used for resource Analyzer package trigger import ( "fmt" + "sync" + "time" + + "github.com/google/cadvisor/cache/memory" + "github.com/google/cadvisor/container" + v2 "github.com/google/cadvisor/info/v2" + "github.com/google/cadvisor/manager" + "github.com/google/cadvisor/utils/sysfs" "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/resourcemanager/cadvisor" +) + +const ( + miniNum = 2 + nanoToMicro float64 = 1000 + percentageRate float64 = 100 ) -// resourceAnalysisExec is the singleton of Analyzer triggers -var resourceAnalysisExec = &Analyzer{} +// resourceAnalysisExec is the singleton of Analyzer executor implementation +var resourceAnalysisExec *Analyzer + +// analyzerCreator creates Analyzer trigger var analyzerCreator = func() Trigger { - return &TreeTrigger{name: ResourceAnalysisAnno, exec: resourceAnalysisExec} + if resourceAnalysisExec == nil { + m := NewCadvisorManager() + if m != nil { + log.Infof("initialize resourceAnalysisExec") + resourceAnalysisExec = &Analyzer{cadvisorManager: m} + appendUsedExecutors(ResourceAnalysisAnno, resourceAnalysisExec) + } + } + return withTreeTirgger(ResourceAnalysisAnno, resourceAnalysisExec) +} + +// rreqOpt is the option to get information from cadvisor +var reqOpt = v2.RequestOptions{ + IdType: v2.TypeName, + Count: 2, + Recursive: false, } // Analyzer is the resource analysis trigger -type Analyzer struct{} +type Analyzer struct { + sync.RWMutex + cadvisorManager *cadvisor.Manager +} + +// NewCadvisorManager returns an cadvisor.Manager object +func NewCadvisorManager() *cadvisor.Manager { + const ( + cacheMinutes = 10 + keepingIntervalSec = 10 + ) + var ( + allowDynamic = true + maxHousekeepingInterval = time.Duration(keepingIntervalSec * time.Second) + cacheAge = time.Duration(cacheMinutes * time.Minute) + ) + args := cadvisor.StartArgs{ + MemCache: memory.New(cacheAge, nil), + SysFs: sysfs.NewRealSysFs(), + IncludeMetrics: container.MetricSet{ + container.CpuUsageMetrics: struct{}{}, + container.MemoryUsageMetrics: struct{}{}, + container.DiskUsageMetrics: struct{}{}, + container.DiskIOMetrics: struct{}{}, + }, + MaxHousekeepingConfig: manager.HouskeepingConfig{ + Interval: &maxHousekeepingInterval, + AllowDynamic: &allowDynamic, + }, + } + return cadvisor.WithStartArgs(args) +} // Execute filters the corresponding Pod according to the operation type and triggers it on demand func (a *Analyzer) Execute(f Factor) (Factor, error) { + a.RLock() + defer a.RUnlock() + if a.cadvisorManager == nil { + return nil, fmt.Errorf("fail to use cadvisor, please check") + } var ( - target string + target *typedef.PodInfo opTyp = f.Message() errMsg string - ) - log.Infof("receive operation: %v", opTyp) - alarm := func(target, errMsg string) (Factor, error) { - if len(target) == 0 { - return nil, fmt.Errorf(errMsg) + alarm = func(target *typedef.PodInfo, errMsg string) (Factor, error) { + if target == nil { + return nil, fmt.Errorf(errMsg) + } + return &FactorImpl{Pods: map[string]*typedef.PodInfo{target.Name: target}}, nil } - return &FactorImpl{Msg: target}, nil - } + ) + log.Debugf("receive operation: %v", opTyp) switch opTyp { case "max_cpu": errMsg = "unable to find pod with maximum CPU utilization" @@ -51,22 +120,97 @@ func (a *Analyzer) Execute(f Factor) (Factor, error) { errMsg = "unable to find pod with maximum memory utilization" target = a.maxMemoryUtil(f.TargetPods()) case "max_io": + /* + TODO: a better way to reduce io bandwidth + Cadvisor's processing of io bandwidth is not so suitable for comparison, + so when io is high, we use the most cpu utilization to eliminate. + Temporarily adopt the maximum CPU utilization method. + */ errMsg = "unable to find pod with maximum I/O bandwidth" - target = a.maxIOBandwidth(f.TargetPods()) + target = a.maxCPUUtil(f.TargetPods()) default: errMsg = "undefined operation: " + opTyp } return alarm(target, errMsg) } -func (a *Analyzer) maxCPUUtil(pods map[string]*typedef.PodInfo) string { - return "testtest" +func (a *Analyzer) maxCPUUtil(pods map[string]*typedef.PodInfo) *typedef.PodInfo { + var ( + chosen *typedef.PodInfo + maxUtil float64 = 0 + ) + for name, pod := range pods { + podStats, err := a.cgroupCadvisorInfo("/"+pod.Path, reqOpt) + if err != nil { + log.Errorf("fail to get cgroup information %v: %v", pod.Path, err) + continue + } + if len(podStats) < miniNum { + log.Errorf("pod %v has no enough cpu stats collected, skip it", name) + continue + } + last := podStats[len(podStats)-1] + penultimate := podStats[len(podStats)-2] + cpuUsageUs := float64(last.Cpu.Usage.Total-penultimate.Cpu.Usage.Total) / nanoToMicro + timeDeltaUs := float64(last.Timestamp.Sub(penultimate.Timestamp).Microseconds()) + cpuUtil := util.Div(cpuUsageUs, timeDeltaUs) * percentageRate + log.Debugf("pod %v cpu util %v%%=%v/%v(us)", name, cpuUtil, cpuUsageUs, timeDeltaUs) + if maxUtil < cpuUtil { + maxUtil = cpuUtil + chosen = pod + } + } + if chosen != nil { + log.Infof("find the max cpu util pod \"%v\": %v", chosen.Name, maxUtil) + } + return chosen } -func (a *Analyzer) maxMemoryUtil(pods map[string]*typedef.PodInfo) string { - return "" +func (a *Analyzer) maxMemoryUtil(pods map[string]*typedef.PodInfo) *typedef.PodInfo { + var ( + chosen *typedef.PodInfo + maxUtil uint64 = 0 + ) + for name, pod := range pods { + podStats, err := a.cgroupCadvisorInfo("/"+pod.Path, reqOpt) + if err != nil { + log.Errorf("fail to get cgroup information %v: %v", pod.Path, err) + continue + } + last := podStats[len(podStats)-1].Memory.Usage + log.Debugf("pod %v memory usage %vB", name, last) + if maxUtil < last { + maxUtil = last + chosen = pod + } + } + if chosen != nil { + log.Infof("find the max cpu util pod \"%v\": %v", chosen.Name, maxUtil) + } + return chosen +} + +func (a *Analyzer) maxIOBandwidth(pods map[string]*typedef.PodInfo) *typedef.PodInfo { + return nil +} + +func (a *Analyzer) cgroupCadvisorInfo(cgroupPath string, opts v2.RequestOptions) ([]*v2.ContainerStats, error) { + infoMap, err := a.cadvisorManager.ContainerInfoV2(cgroupPath, opts) + if err != nil { + return nil, fmt.Errorf("fail to get cgroup information %v: %v", cgroupPath, err) + } + info, existed := infoMap[cgroupPath] + if !existed { + return nil, fmt.Errorf("fail to get cgroup info from cadvisor") + } + return info.Stats, nil } -func (a *Analyzer) maxIOBandwidth(pods map[string]*typedef.PodInfo) string { - return "" +// Stop stops Analyzer +func (a *Analyzer) Stop() error { + a.Lock() + defer a.Unlock() + m := a.cadvisorManager + a.cadvisorManager = nil + return m.Stop() } diff --git a/pkg/informer/apiserverinformer.go b/pkg/informer/apiserverinformer.go index 0f8daaa..cbba1e2 100644 --- a/pkg/informer/apiserverinformer.go +++ b/pkg/informer/apiserverinformer.go @@ -47,7 +47,7 @@ func NewAPIServerInformer(publisher api.Publisher) (api.Informer, error) { } // create apiserver client - client, err := initKubeClient() + client, err := InitKubeClient() if err != nil { return nil, fmt.Errorf("fail to init kubenetes client: %v", err) } @@ -63,8 +63,8 @@ func NewAPIServerInformer(publisher api.Publisher) (api.Informer, error) { return informer, nil } -// initKubeClient initializes kubeClient -func initKubeClient() (*kubernetes.Clientset, error) { +// InitKubeClient initializes kubeClient +func InitKubeClient() (*kubernetes.Clientset, error) { conf, err := rest.InClusterConfig() if err != nil { return nil, err diff --git a/pkg/resourcemanager/cadvisor/cadvisor.go b/pkg/resourcemanager/cadvisor/cadvisor.go index 96475bc..7978b34 100644 --- a/pkg/resourcemanager/cadvisor/cadvisor.go +++ b/pkg/resourcemanager/cadvisor/cadvisor.go @@ -20,42 +20,72 @@ import ( "net/http" "time" - cmemory "github.com/google/cadvisor/cache/memory" - cadvisorcontainer "github.com/google/cadvisor/container" + "github.com/google/cadvisor/cache/memory" + "github.com/google/cadvisor/container" + v2 "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/manager" - csysfs "github.com/google/cadvisor/utils/sysfs" - "k8s.io/klog/v2" + "github.com/google/cadvisor/utils/sysfs" + + "isula.org/rubik/pkg/common/log" ) -type CadvisorManager struct { - cgroupDriver string +// Manager is the cadvisor manager +type Manager struct { manager.Manager } -func NewCadvidorManager() *CadvisorManager { - var includedMetrics = cadvisorcontainer.MetricSet{ - cadvisorcontainer.CpuUsageMetrics: struct{}{}, - cadvisorcontainer.ProcessSchedulerMetrics: struct{}{}, - } - - allowDynamic := true - maxHousekeepingInterval := 10 * time.Second - memCache := cmemory.New(10*time.Minute, nil) - sysfs := csysfs.NewRealSysFs() - maxHousekeepingConfig := manager.HouskeepingConfig{Interval: &maxHousekeepingInterval, AllowDynamic: &allowDynamic} +// StartArgs is a set of parameters that control the startup of cadvisor +type StartArgs struct { + MemCache *memory.InMemoryCache + SysFs sysfs.SysFs + IncludeMetrics container.MetricSet + MaxHousekeepingConfig manager.HouskeepingConfig +} - m, err := manager.New(memCache, sysfs, maxHousekeepingConfig, includedMetrics, http.DefaultClient, []string{"/kubepods"}, []string{""}, "", time.Duration(1)) +// WithStartArgs creates cadvisor.Manager object +func WithStartArgs(args StartArgs) *Manager { + const ( + perfEventsFile = "/sys/kernel/debug/tracing/events/raw_syscalls/sys_enter" + ) + var ( + rawContainerCgroupPathPrefixWhiteList = []string{"/kubepods"} + containerEnvMetadataWhiteList = []string{} + resctrlInterval = time.Second + ) + m, err := manager.New(args.MemCache, args.SysFs, args.MaxHousekeepingConfig, + args.IncludeMetrics, http.DefaultClient, rawContainerCgroupPathPrefixWhiteList, + containerEnvMetadataWhiteList, perfEventsFile, resctrlInterval) if err != nil { - klog.Errorf("Failed to create cadvisor manager start: %v", err) + log.Errorf("Failed to create cadvisor manager: %v", err) return nil } - if err := m.Start(); err != nil { - klog.Errorf("Failed to start cadvisor manager: %v", err) + log.Errorf("Failed to start cadvisor manager: %v", err) return nil } - - return &CadvisorManager{ + return &Manager{ Manager: m, } } + +// Start starts cadvisor manager +func (c *Manager) Start() error { + return c.Manager.Start() +} + +// Stop stops cadvisor and clear existing factory +func (c *Manager) Stop() error { + err := c.Manager.Stop() + if err != nil { + return err + } + // clear existing factory + container.ClearContainerHandlerFactories() + return nil +} + +// ContainerInfo gets container infos v2 +func (c *Manager) ContainerInfoV2(name string, + options v2.RequestOptions) (map[string]v2.ContainerInfo, error) { + return c.GetContainerInfoV2(name, options) +} diff --git a/pkg/rubik/rubik.go b/pkg/rubik/rubik.go index 94dc373..3864956 100644 --- a/pkg/rubik/rubik.go +++ b/pkg/rubik/rubik.go @@ -29,6 +29,7 @@ import ( "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/config" "isula.org/rubik/pkg/core/publisher" + "isula.org/rubik/pkg/core/trigger" "isula.org/rubik/pkg/core/typedef/cgroup" "isula.org/rubik/pkg/informer" "isula.org/rubik/pkg/podmanager" @@ -165,6 +166,7 @@ func Run() int { go handleSignals(cancel) // 3. run rubik-agent + defer cleanDepecdencies() if err := runAgent(ctx); err != nil { log.Errorf("error running rubik agent: %v", err) return constant.ErrorExitCode @@ -182,3 +184,9 @@ func handleSignals(cancel context.CancelFunc) { } } } + +func cleanDepecdencies() { + if err := trigger.StopUsedExecutors(); err != nil { + log.Errorf("fail to stop dependencies: %v", err) + } +} diff --git a/pkg/services/psi/metric.go b/pkg/services/psi/metric.go index 3f05564..b3d4f5f 100644 --- a/pkg/services/psi/metric.go +++ b/pkg/services/psi/metric.go @@ -9,7 +9,7 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for * +// Description: This file defines metrics used for psi service package psi @@ -24,11 +24,11 @@ import ( ) const ( - cpuRes = "cpu" - memoryRes = "memory" - ioRes = "io" - psiSubSys = "cpuacct" - avg10Threshold = 5.0 + cpuRes = "cpu" + memoryRes = "memory" + ioRes = "io" + psiSubSys = "cpuacct" + defaultAvg10Threshold = 5.0 ) // supportResources is the supported resource type @@ -41,7 +41,8 @@ var supportResources map[string]*cgroup.Key = map[string]*cgroup.Key{ // BasePSIMetric is the basic PSI indicator type BasePSIMetric struct { *metric.BaseMetric - resources []string + avg10Threshold float64 + resources []string // conservation is the Pod object that needs to guarantee resources conservation map[string]*typedef.PodInfo // Suspicion is the pod object that needs to be suspected of eviction @@ -55,7 +56,7 @@ func (m *BasePSIMetric) Update() error { return nil } for _, typ := range m.resources { - if detectPSiMetric(typ, m.conservation) { + if detectPSiMetric(typ, m.conservation, m.avg10Threshold) { if err := alarm(typ, m.Triggers, m.suspicion); err != nil { return err } @@ -64,7 +65,7 @@ func (m *BasePSIMetric) Update() error { return nil } -func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo) bool { +func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo, avg10Threshold float64) bool { var key *cgroup.Key key, supported := supportResources[resTyp] if !supported { @@ -73,6 +74,7 @@ func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo) bo } for _, pod := range conservation { + log.Debugf("check psi of online pod: %v", pod.Name) pressure, err := pod.GetCgroupAttr(key).PSI() if err != nil { log.Warnf("fail to get file %v: %v", key.FileName, err) diff --git a/pkg/services/psi/psi.go b/pkg/services/psi/psi.go index 75793a8..1c70255 100644 --- a/pkg/services/psi/psi.go +++ b/pkg/services/psi/psi.go @@ -2,14 +2,16 @@ // rubik licensed under the Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan PSL v2. // You may obtain a copy of Mulan PSL v2 at: -// http://license.coscl.org.cn/MulanPSL2 +// +// http://license.coscl.org.cn/MulanPSL2 +// // THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR // PURPOSE. // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for * +// Description: This file is used for psi service package psi import ( @@ -29,8 +31,10 @@ import ( ) const ( - minInterval = 10 - maxInterval = 30 + minInterval = 10 + maxInterval = 30 + maxThreshold float64 = 100.0 + minThreshold float64 = 5.0 ) // Factory is the QuotaTurbo factory class @@ -50,15 +54,17 @@ func (i Factory) NewObj() (interface{}, error) { // Config is PSI service configuration type Config struct { - Interval int `json:"interval,omitempty"` - Resource []string `json:"resource,omitempty"` + Interval int `json:"interval,omitempty"` + Avg10Threshold float64 `json:"avg10threshold,omitempty"` + Resource []string `json:"resource,omitempty"` } // NewConfig returns default psi configuration func NewConfig() *Config { return &Config{ - Interval: minInterval, - Resource: make([]string, 0), + Interval: minInterval, + Resource: make([]string, 0), + Avg10Threshold: defaultAvg10Threshold, } } @@ -67,6 +73,9 @@ func (conf *Config) Validate() error { if conf.Interval < minInterval || conf.Interval > maxInterval { return fmt.Errorf("interval should in the range [%v, %v]", minInterval, maxInterval) } + if conf.Avg10Threshold < minThreshold || conf.Avg10Threshold > maxThreshold { + return fmt.Errorf("avg10 threshold should in the range [%v, %v]", minThreshold, maxThreshold) + } if len(conf.Resource) == 0 { return fmt.Errorf("specify at least one type resource") } @@ -85,7 +94,7 @@ type Manager struct { helper.ServiceBase } -// NewManager returns psi manager objects +// NewManager returns psi manager objects func NewManager(n string) *Manager { return &Manager{ ServiceBase: helper.ServiceBase{ @@ -103,7 +112,7 @@ func (m *Manager) Run(ctx context.Context) { log.Errorf("fail to monitor PSI metrics: %v", err) } }, - time.Millisecond*time.Duration(m.conf.Interval), + time.Second*time.Duration(m.conf.Interval), ctx.Done()) } @@ -152,13 +161,14 @@ func priority(online bool) api.ListOption { // monitor gets metrics and fire triggers when satisfied func (m *Manager) monitor() error { metric := &BasePSIMetric{conservation: m.Viewer.ListPodsWithOptions(priority(true)), - suspicion: m.Viewer.ListPodsWithOptions(priority(false)), - BaseMetric: metric.NewBaseMetric(), - resources: m.conf.Resource} - + suspicion: m.Viewer.ListPodsWithOptions(priority(false)), + BaseMetric: metric.NewBaseMetric(), + resources: m.conf.Resource, + avg10Threshold: m.conf.Avg10Threshold, + } metric.AddTrigger( - trigger.GetTrigger(trigger.RESOURCEANALYSIS). - SetNext(trigger.GetTrigger(trigger.EXPULSION)), + trigger.NewTrigger(trigger.RESOURCEANALYSIS). + SetNext(trigger.NewTrigger(trigger.EXPULSION)), ) if err := metric.Update(); err != nil { return err -- Gitee