diff --git a/.gitignore b/.gitignore index 1fcd5090c6c01e88b3d4162de2b5d63c2666a35b..ce1ce3a97a712f773de95011d51334070306044d 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ /coverage /gocov.json /testcasesresult.json +/compose-dev.yaml diff --git a/go.mod b/go.mod index f08a94019b7ae0ba5d26f9f57743cf934a4992c2..fe0372810a71ccf91110f2c0e9a0e16e9d9d3ba6 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,6 @@ require ( k8s.io/api v0.20.2 k8s.io/apimachinery v0.20.2 k8s.io/client-go v0.20.2 - k8s.io/klog/v2 v2.80.1 ) require ( @@ -61,6 +60,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/klog/v2 v2.80.1 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect sigs.k8s.io/structured-merge-diff/v4 v4.0.2 // indirect sigs.k8s.io/yaml v1.2.0 // indirect diff --git a/hack/rubik-daemonset.yaml b/hack/rubik-daemonset.yaml index 4975b41db260415a2ee3343f6744c7948f3608d9..246d45e8ee166701ac19336009fda8ab72868326 100644 --- a/hack/rubik-daemonset.yaml +++ b/hack/rubik-daemonset.yaml @@ -6,6 +6,9 @@ rules: - apiGroups: [""] resources: ["pods"] verbs: ["list", "watch"] + - apiGroups: [""] + resources: ["pods/eviction"] + verbs: ["create"] --- kind: ClusterRoleBinding apiVersion: rbac.authorization.k8s.io/v1 diff --git a/pkg/core/trigger/base.go b/pkg/core/trigger/base.go index 82af66a5074cdf7364cf89196c8de967bd38c958..7f1fbe9ffd9a19b38f3e9f56b655ab8f844d1a79 100644 --- a/pkg/core/trigger/base.go +++ b/pkg/core/trigger/base.go @@ -16,6 +16,7 @@ package trigger import ( "fmt" + "sync" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" @@ -39,7 +40,7 @@ const ( type triggerCreator func() Trigger -var triggerMap map[Typ]triggerCreator = map[Typ]triggerCreator{ +var triggerCreatorMap map[Typ]triggerCreator = map[Typ]triggerCreator{ EXPULSION: expulsionCreator, RESOURCEANALYSIS: analyzerCreator, } @@ -52,6 +53,7 @@ type Descriptor interface { // Executor is the trigger execution function interface type Executor interface { Execute(Factor) (Factor, error) + Stop() error } // Trigger interface defines the trigger methods @@ -68,10 +70,11 @@ type TreeTrigger struct { subTriggers []Trigger } -// NewTreeTirggerNode returns a BaseMetric object -func NewTreeTirggerNode(name string) *TreeTrigger { +// withTreeTirgger returns a BaseMetric object +func withTreeTirgger(name string, exec Executor) *TreeTrigger { return &TreeTrigger{ name: name, + exec: exec, subTriggers: make([]Trigger, 0)} } @@ -88,6 +91,9 @@ func (t *TreeTrigger) Name() string { // Execute executes the sub-triggers of the current trigger func (t *TreeTrigger) Execute(f Factor) error { + if t.exec == nil { + return fmt.Errorf("trigger %v can not execute", t.name) + } var errs error res, err := t.exec.Execute(f) if err != nil { @@ -103,11 +109,54 @@ func (t *TreeTrigger) Execute(f Factor) error { return errs } -// GetTrigger returns a trigger singleton according to type -func GetTrigger(t Typ) Trigger { - if _, ok := triggerMap[t]; !ok { +// NewTrigger returns a trigger singleton according to type +func NewTrigger(t Typ) Trigger { + if _, ok := triggerCreatorMap[t]; !ok { log.Warnf("undefine trigger: %v", t) return nil } - return triggerMap[t]() + return triggerCreatorMap[t]() +} + +var ( + runningExecutors = make(map[string]Executor, 0) + execLock sync.Mutex +) + +func appendUsedExecutors(name string, exec Executor) { + if exec == nil { + log.Errorf("nil executor %v can not be used", name) + return + } + execLock.Lock() + defer execLock.Unlock() + if _, existed := runningExecutors[name]; existed { + log.Errorf("conflict executor %v", name) + return + } + log.Infof("using executor: %v", name) + runningExecutors[name] = exec +} + +// StopUsedExecutors stops running executors +func StopUsedExecutors() error { + execLock.Lock() + defer execLock.Unlock() + var errs error + // stop the executors one by one + for name, exec := range runningExecutors { + log.Infof("stopping executor %v", name) + if exec == nil { + log.Infof("executor %v has stopped", name) + continue + } + if err := exec.Stop(); err != nil { + errs = util.AppendErr(errs, util.AddErrorPrfix(err, name)) + } + } + // clear executors + for k := range runningExecutors { + delete(runningExecutors, k) + } + return errs } diff --git a/pkg/core/trigger/expulsion.go b/pkg/core/trigger/expulsion.go index dc039575ab4ad04e34f29663d5102a0382049ebb..87dd484f18ce647d8c5a658678b29e05205ccf80 100644 --- a/pkg/core/trigger/expulsion.go +++ b/pkg/core/trigger/expulsion.go @@ -9,23 +9,100 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for * +// Description: This file is used for expulsion trigger package trigger -import "isula.org/rubik/pkg/common/log" +import ( + "context" + "fmt" + "sync" -// expulsionExec is the singleton of eviction triggers -var expulsionExec = &Expulsion{} + policyv1beta1 "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/common/util" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/informer" +) + +// expulsionExec is the singleton of Expulsion executor implementation +var expulsionExec *Expulsion + +// expulsionCreator creates Expulsion trigger var expulsionCreator = func() Trigger { - return &TreeTrigger{name: ExpulsionAnno, exec: expulsionExec} + if expulsionExec == nil { + c := newKubeClient() + if c != nil { + log.Infof("initialize expulsionExec") + expulsionExec = &Expulsion{client: c} + appendUsedExecutors(ExpulsionAnno, expulsionExec) + } + } + return withTreeTirgger(ExpulsionAnno, expulsionExec) } // Expulsion is the trigger to evict pods -type Expulsion struct{} +type Expulsion struct { + sync.RWMutex + client *kubernetes.Clientset +} + +// newKubeClient returns a kubernetes.Clientset object +func newKubeClient() *kubernetes.Clientset { + client, err := informer.InitKubeClient() + if err != nil { + log.Errorf("fail to connect k8s: %v", err) + return nil + } + return client +} // Execute evicts pods based on the id of the given pod func (e *Expulsion) Execute(f Factor) (Factor, error) { - log.Infof("need to evict pod %v, real operation is TO BE CONTINUE", f.Message()) - return nil, nil + e.RLock() + defer e.RUnlock() + if e.client == nil { + return nil, fmt.Errorf("fail to use kubernetes client, please check") + } + var errs error + for name, pod := range f.TargetPods() { + log.Infof("evicting pod \"%v\"", name) + if err := inevictable(pod); err != nil { + errs = util.AppendErr(errs, fmt.Errorf("fail to evict pod \"%v\": %v", pod.Name, err)) + continue + } + eviction := &policyv1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: &metav1.DeleteOptions{}, + } + if err := e.client.CoreV1().Pods(pod.Namespace).Evict(context.TODO(), eviction); err != nil { + errs = util.AppendErr(errs, fmt.Errorf("fail to evict pod \"%v\": %v", pod.Name, err)) + continue + } + } + return nil, errs +} + +// Stop stops the expulsion trigger +func (e *Expulsion) Stop() error { + e.Lock() + defer e.Unlock() + e.client = nil + return nil +} + +func inevictable(pod *typedef.PodInfo) error { + var forbidden = map[string]struct{}{ + "kube-system": {}, + } + if _, existed := forbidden[pod.Namespace]; existed { + return fmt.Errorf("it is forbidden to delete the pod whose namespace is %v", pod.Namespace) + } + return nil } diff --git a/pkg/core/trigger/resourceanalysis.go b/pkg/core/trigger/resourceanalysis.go index 260b384ee415c68e1c58457bf0e32ffd2a04a7cf..a3d99e51a1f7ddb2ad12718701ba24cd0a52aa72 100644 --- a/pkg/core/trigger/resourceanalysis.go +++ b/pkg/core/trigger/resourceanalysis.go @@ -9,40 +9,109 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-18 -// Description: This file is used for * +// Description: This file is used for resource Analyzer package trigger import ( "fmt" + "sync" + "time" + + "github.com/google/cadvisor/cache/memory" + "github.com/google/cadvisor/container" + v2 "github.com/google/cadvisor/info/v2" + "github.com/google/cadvisor/manager" + "github.com/google/cadvisor/utils/sysfs" "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/resourcemanager/cadvisor" +) + +const ( + miniNum = 2 + nanoToMicro float64 = 1000 + percentageRate float64 = 100 ) -// resourceAnalysisExec is the singleton of Analyzer triggers -var resourceAnalysisExec = &Analyzer{} +// resourceAnalysisExec is the singleton of Analyzer executor implementation +var resourceAnalysisExec *Analyzer + +// analyzerCreator creates Analyzer trigger var analyzerCreator = func() Trigger { - return &TreeTrigger{name: ResourceAnalysisAnno, exec: resourceAnalysisExec} + if resourceAnalysisExec == nil { + m := NewCadvisorManager() + if m != nil { + log.Infof("initialize resourceAnalysisExec") + resourceAnalysisExec = &Analyzer{cadvisorManager: m} + appendUsedExecutors(ResourceAnalysisAnno, resourceAnalysisExec) + } + } + return withTreeTirgger(ResourceAnalysisAnno, resourceAnalysisExec) +} + +// rreqOpt is the option to get information from cadvisor +var reqOpt = v2.RequestOptions{ + IdType: v2.TypeName, + Count: 2, + Recursive: false, } // Analyzer is the resource analysis trigger -type Analyzer struct{} +type Analyzer struct { + sync.RWMutex + cadvisorManager *cadvisor.Manager +} + +// NewCadvisorManager returns an cadvisor.Manager object +func NewCadvisorManager() *cadvisor.Manager { + const ( + cacheMinutes = 10 + keepingIntervalSec = 10 + ) + var ( + allowDynamic = true + maxHousekeepingInterval = time.Duration(keepingIntervalSec * time.Second) + cacheAge = time.Duration(cacheMinutes * time.Minute) + ) + args := cadvisor.StartArgs{ + MemCache: memory.New(cacheAge, nil), + SysFs: sysfs.NewRealSysFs(), + IncludeMetrics: container.MetricSet{ + container.CpuUsageMetrics: struct{}{}, + container.MemoryUsageMetrics: struct{}{}, + container.DiskUsageMetrics: struct{}{}, + container.DiskIOMetrics: struct{}{}, + }, + MaxHousekeepingConfig: manager.HouskeepingConfig{ + Interval: &maxHousekeepingInterval, + AllowDynamic: &allowDynamic, + }, + } + return cadvisor.WithStartArgs(args) +} // Execute filters the corresponding Pod according to the operation type and triggers it on demand func (a *Analyzer) Execute(f Factor) (Factor, error) { + a.RLock() + defer a.RUnlock() + if a.cadvisorManager == nil { + return nil, fmt.Errorf("fail to use cadvisor, please check") + } var ( - target string + target *typedef.PodInfo opTyp = f.Message() errMsg string - ) - log.Infof("receive operation: %v", opTyp) - alarm := func(target, errMsg string) (Factor, error) { - if len(target) == 0 { - return nil, fmt.Errorf(errMsg) + alarm = func(target *typedef.PodInfo, errMsg string) (Factor, error) { + if target == nil { + return nil, fmt.Errorf(errMsg) + } + return &FactorImpl{Pods: map[string]*typedef.PodInfo{target.Name: target}}, nil } - return &FactorImpl{Msg: target}, nil - } + ) + log.Debugf("receive operation: %v", opTyp) switch opTyp { case "max_cpu": errMsg = "unable to find pod with maximum CPU utilization" @@ -51,22 +120,97 @@ func (a *Analyzer) Execute(f Factor) (Factor, error) { errMsg = "unable to find pod with maximum memory utilization" target = a.maxMemoryUtil(f.TargetPods()) case "max_io": + /* + TODO: a better way to reduce io bandwidth + Cadvisor's processing of io bandwidth is not so suitable for comparison, + so when io is high, we use the most cpu utilization to eliminate. + Temporarily adopt the maximum CPU utilization method. + */ errMsg = "unable to find pod with maximum I/O bandwidth" - target = a.maxIOBandwidth(f.TargetPods()) + target = a.maxCPUUtil(f.TargetPods()) default: errMsg = "undefined operation: " + opTyp } return alarm(target, errMsg) } -func (a *Analyzer) maxCPUUtil(pods map[string]*typedef.PodInfo) string { - return "testtest" +func (a *Analyzer) maxCPUUtil(pods map[string]*typedef.PodInfo) *typedef.PodInfo { + var ( + chosen *typedef.PodInfo + maxUtil float64 = 0 + ) + for name, pod := range pods { + podStats, err := a.cgroupCadvisorInfo("/"+pod.Path, reqOpt) + if err != nil { + log.Errorf("fail to get cgroup information %v: %v", pod.Path, err) + continue + } + if len(podStats) < miniNum { + log.Errorf("pod %v has no enough cpu stats collected, skip it", name) + continue + } + last := podStats[len(podStats)-1] + penultimate := podStats[len(podStats)-2] + cpuUsageUs := float64(last.Cpu.Usage.Total-penultimate.Cpu.Usage.Total) / nanoToMicro + timeDeltaUs := float64(last.Timestamp.Sub(penultimate.Timestamp).Microseconds()) + cpuUtil := util.Div(cpuUsageUs, timeDeltaUs) * percentageRate + log.Debugf("pod %v cpu util %v%%=%v/%v(us)", name, cpuUtil, cpuUsageUs, timeDeltaUs) + if maxUtil < cpuUtil { + maxUtil = cpuUtil + chosen = pod + } + } + if chosen != nil { + log.Infof("find the max cpu util pod \"%v\": %v", chosen.Name, maxUtil) + } + return chosen } -func (a *Analyzer) maxMemoryUtil(pods map[string]*typedef.PodInfo) string { - return "" +func (a *Analyzer) maxMemoryUtil(pods map[string]*typedef.PodInfo) *typedef.PodInfo { + var ( + chosen *typedef.PodInfo + maxUtil uint64 = 0 + ) + for name, pod := range pods { + podStats, err := a.cgroupCadvisorInfo("/"+pod.Path, reqOpt) + if err != nil { + log.Errorf("fail to get cgroup information %v: %v", pod.Path, err) + continue + } + last := podStats[len(podStats)-1].Memory.Usage + log.Debugf("pod %v memory usage %vB", name, last) + if maxUtil < last { + maxUtil = last + chosen = pod + } + } + if chosen != nil { + log.Infof("find the max cpu util pod \"%v\": %v", chosen.Name, maxUtil) + } + return chosen +} + +func (a *Analyzer) maxIOBandwidth(pods map[string]*typedef.PodInfo) *typedef.PodInfo { + return nil +} + +func (a *Analyzer) cgroupCadvisorInfo(cgroupPath string, opts v2.RequestOptions) ([]*v2.ContainerStats, error) { + infoMap, err := a.cadvisorManager.ContainerInfoV2(cgroupPath, opts) + if err != nil { + return nil, fmt.Errorf("fail to get cgroup information %v: %v", cgroupPath, err) + } + info, existed := infoMap[cgroupPath] + if !existed { + return nil, fmt.Errorf("fail to get cgroup info from cadvisor") + } + return info.Stats, nil } -func (a *Analyzer) maxIOBandwidth(pods map[string]*typedef.PodInfo) string { - return "" +// Stop stops Analyzer +func (a *Analyzer) Stop() error { + a.Lock() + defer a.Unlock() + m := a.cadvisorManager + a.cadvisorManager = nil + return m.Stop() } diff --git a/pkg/informer/apiserverinformer.go b/pkg/informer/apiserverinformer.go index 0f8daaa1be5df808fe45ff1e3d77ac6934d994d4..cbba1e2fc2b5681f50b1e2912e9de1729d12e1a1 100644 --- a/pkg/informer/apiserverinformer.go +++ b/pkg/informer/apiserverinformer.go @@ -47,7 +47,7 @@ func NewAPIServerInformer(publisher api.Publisher) (api.Informer, error) { } // create apiserver client - client, err := initKubeClient() + client, err := InitKubeClient() if err != nil { return nil, fmt.Errorf("fail to init kubenetes client: %v", err) } @@ -63,8 +63,8 @@ func NewAPIServerInformer(publisher api.Publisher) (api.Informer, error) { return informer, nil } -// initKubeClient initializes kubeClient -func initKubeClient() (*kubernetes.Clientset, error) { +// InitKubeClient initializes kubeClient +func InitKubeClient() (*kubernetes.Clientset, error) { conf, err := rest.InClusterConfig() if err != nil { return nil, err diff --git a/pkg/resourcemanager/cadvisor/cadvisor.go b/pkg/resourcemanager/cadvisor/cadvisor.go index 96475bc423086e36823277335723ac5434682fd3..7978b34f2a6e9923df1b1b3e56993bc08496e719 100644 --- a/pkg/resourcemanager/cadvisor/cadvisor.go +++ b/pkg/resourcemanager/cadvisor/cadvisor.go @@ -20,42 +20,72 @@ import ( "net/http" "time" - cmemory "github.com/google/cadvisor/cache/memory" - cadvisorcontainer "github.com/google/cadvisor/container" + "github.com/google/cadvisor/cache/memory" + "github.com/google/cadvisor/container" + v2 "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/manager" - csysfs "github.com/google/cadvisor/utils/sysfs" - "k8s.io/klog/v2" + "github.com/google/cadvisor/utils/sysfs" + + "isula.org/rubik/pkg/common/log" ) -type CadvisorManager struct { - cgroupDriver string +// Manager is the cadvisor manager +type Manager struct { manager.Manager } -func NewCadvidorManager() *CadvisorManager { - var includedMetrics = cadvisorcontainer.MetricSet{ - cadvisorcontainer.CpuUsageMetrics: struct{}{}, - cadvisorcontainer.ProcessSchedulerMetrics: struct{}{}, - } - - allowDynamic := true - maxHousekeepingInterval := 10 * time.Second - memCache := cmemory.New(10*time.Minute, nil) - sysfs := csysfs.NewRealSysFs() - maxHousekeepingConfig := manager.HouskeepingConfig{Interval: &maxHousekeepingInterval, AllowDynamic: &allowDynamic} +// StartArgs is a set of parameters that control the startup of cadvisor +type StartArgs struct { + MemCache *memory.InMemoryCache + SysFs sysfs.SysFs + IncludeMetrics container.MetricSet + MaxHousekeepingConfig manager.HouskeepingConfig +} - m, err := manager.New(memCache, sysfs, maxHousekeepingConfig, includedMetrics, http.DefaultClient, []string{"/kubepods"}, []string{""}, "", time.Duration(1)) +// WithStartArgs creates cadvisor.Manager object +func WithStartArgs(args StartArgs) *Manager { + const ( + perfEventsFile = "/sys/kernel/debug/tracing/events/raw_syscalls/sys_enter" + ) + var ( + rawContainerCgroupPathPrefixWhiteList = []string{"/kubepods"} + containerEnvMetadataWhiteList = []string{} + resctrlInterval = time.Second + ) + m, err := manager.New(args.MemCache, args.SysFs, args.MaxHousekeepingConfig, + args.IncludeMetrics, http.DefaultClient, rawContainerCgroupPathPrefixWhiteList, + containerEnvMetadataWhiteList, perfEventsFile, resctrlInterval) if err != nil { - klog.Errorf("Failed to create cadvisor manager start: %v", err) + log.Errorf("Failed to create cadvisor manager: %v", err) return nil } - if err := m.Start(); err != nil { - klog.Errorf("Failed to start cadvisor manager: %v", err) + log.Errorf("Failed to start cadvisor manager: %v", err) return nil } - - return &CadvisorManager{ + return &Manager{ Manager: m, } } + +// Start starts cadvisor manager +func (c *Manager) Start() error { + return c.Manager.Start() +} + +// Stop stops cadvisor and clear existing factory +func (c *Manager) Stop() error { + err := c.Manager.Stop() + if err != nil { + return err + } + // clear existing factory + container.ClearContainerHandlerFactories() + return nil +} + +// ContainerInfo gets container infos v2 +func (c *Manager) ContainerInfoV2(name string, + options v2.RequestOptions) (map[string]v2.ContainerInfo, error) { + return c.GetContainerInfoV2(name, options) +} diff --git a/pkg/rubik/rubik.go b/pkg/rubik/rubik.go index 94dc37380007db70d5545c65b01890cb9712fd83..38649560233f36202f0d1e569690da3f777b537b 100644 --- a/pkg/rubik/rubik.go +++ b/pkg/rubik/rubik.go @@ -29,6 +29,7 @@ import ( "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/config" "isula.org/rubik/pkg/core/publisher" + "isula.org/rubik/pkg/core/trigger" "isula.org/rubik/pkg/core/typedef/cgroup" "isula.org/rubik/pkg/informer" "isula.org/rubik/pkg/podmanager" @@ -165,6 +166,7 @@ func Run() int { go handleSignals(cancel) // 3. run rubik-agent + defer cleanDepecdencies() if err := runAgent(ctx); err != nil { log.Errorf("error running rubik agent: %v", err) return constant.ErrorExitCode @@ -182,3 +184,9 @@ func handleSignals(cancel context.CancelFunc) { } } } + +func cleanDepecdencies() { + if err := trigger.StopUsedExecutors(); err != nil { + log.Errorf("fail to stop dependencies: %v", err) + } +} diff --git a/pkg/services/psi/metric.go b/pkg/services/psi/metric.go index 3f0556492619a10bf80798b361b1e93ec9ee2c0d..b3d4f5f7befb1ad77bd7e1f84d7a94423fbdd245 100644 --- a/pkg/services/psi/metric.go +++ b/pkg/services/psi/metric.go @@ -9,7 +9,7 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for * +// Description: This file defines metrics used for psi service package psi @@ -24,11 +24,11 @@ import ( ) const ( - cpuRes = "cpu" - memoryRes = "memory" - ioRes = "io" - psiSubSys = "cpuacct" - avg10Threshold = 5.0 + cpuRes = "cpu" + memoryRes = "memory" + ioRes = "io" + psiSubSys = "cpuacct" + defaultAvg10Threshold = 5.0 ) // supportResources is the supported resource type @@ -41,7 +41,8 @@ var supportResources map[string]*cgroup.Key = map[string]*cgroup.Key{ // BasePSIMetric is the basic PSI indicator type BasePSIMetric struct { *metric.BaseMetric - resources []string + avg10Threshold float64 + resources []string // conservation is the Pod object that needs to guarantee resources conservation map[string]*typedef.PodInfo // Suspicion is the pod object that needs to be suspected of eviction @@ -55,7 +56,7 @@ func (m *BasePSIMetric) Update() error { return nil } for _, typ := range m.resources { - if detectPSiMetric(typ, m.conservation) { + if detectPSiMetric(typ, m.conservation, m.avg10Threshold) { if err := alarm(typ, m.Triggers, m.suspicion); err != nil { return err } @@ -64,7 +65,7 @@ func (m *BasePSIMetric) Update() error { return nil } -func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo) bool { +func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo, avg10Threshold float64) bool { var key *cgroup.Key key, supported := supportResources[resTyp] if !supported { @@ -73,6 +74,7 @@ func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo) bo } for _, pod := range conservation { + log.Debugf("check psi of online pod: %v", pod.Name) pressure, err := pod.GetCgroupAttr(key).PSI() if err != nil { log.Warnf("fail to get file %v: %v", key.FileName, err) diff --git a/pkg/services/psi/psi.go b/pkg/services/psi/psi.go index 75793a8929b474d008356b39ebcb61e6d776ffbb..1c7025549a240c3d9507e6a52506973962e4da07 100644 --- a/pkg/services/psi/psi.go +++ b/pkg/services/psi/psi.go @@ -2,14 +2,16 @@ // rubik licensed under the Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan PSL v2. // You may obtain a copy of Mulan PSL v2 at: -// http://license.coscl.org.cn/MulanPSL2 +// +// http://license.coscl.org.cn/MulanPSL2 +// // THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR // PURPOSE. // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for * +// Description: This file is used for psi service package psi import ( @@ -29,8 +31,10 @@ import ( ) const ( - minInterval = 10 - maxInterval = 30 + minInterval = 10 + maxInterval = 30 + maxThreshold float64 = 100.0 + minThreshold float64 = 5.0 ) // Factory is the QuotaTurbo factory class @@ -50,15 +54,17 @@ func (i Factory) NewObj() (interface{}, error) { // Config is PSI service configuration type Config struct { - Interval int `json:"interval,omitempty"` - Resource []string `json:"resource,omitempty"` + Interval int `json:"interval,omitempty"` + Avg10Threshold float64 `json:"avg10threshold,omitempty"` + Resource []string `json:"resource,omitempty"` } // NewConfig returns default psi configuration func NewConfig() *Config { return &Config{ - Interval: minInterval, - Resource: make([]string, 0), + Interval: minInterval, + Resource: make([]string, 0), + Avg10Threshold: defaultAvg10Threshold, } } @@ -67,6 +73,9 @@ func (conf *Config) Validate() error { if conf.Interval < minInterval || conf.Interval > maxInterval { return fmt.Errorf("interval should in the range [%v, %v]", minInterval, maxInterval) } + if conf.Avg10Threshold < minThreshold || conf.Avg10Threshold > maxThreshold { + return fmt.Errorf("avg10 threshold should in the range [%v, %v]", minThreshold, maxThreshold) + } if len(conf.Resource) == 0 { return fmt.Errorf("specify at least one type resource") } @@ -85,7 +94,7 @@ type Manager struct { helper.ServiceBase } -// NewManager returns psi manager objects +// NewManager returns psi manager objects func NewManager(n string) *Manager { return &Manager{ ServiceBase: helper.ServiceBase{ @@ -103,7 +112,7 @@ func (m *Manager) Run(ctx context.Context) { log.Errorf("fail to monitor PSI metrics: %v", err) } }, - time.Millisecond*time.Duration(m.conf.Interval), + time.Second*time.Duration(m.conf.Interval), ctx.Done()) } @@ -152,13 +161,14 @@ func priority(online bool) api.ListOption { // monitor gets metrics and fire triggers when satisfied func (m *Manager) monitor() error { metric := &BasePSIMetric{conservation: m.Viewer.ListPodsWithOptions(priority(true)), - suspicion: m.Viewer.ListPodsWithOptions(priority(false)), - BaseMetric: metric.NewBaseMetric(), - resources: m.conf.Resource} - + suspicion: m.Viewer.ListPodsWithOptions(priority(false)), + BaseMetric: metric.NewBaseMetric(), + resources: m.conf.Resource, + avg10Threshold: m.conf.Avg10Threshold, + } metric.AddTrigger( - trigger.GetTrigger(trigger.RESOURCEANALYSIS). - SetNext(trigger.GetTrigger(trigger.EXPULSION)), + trigger.NewTrigger(trigger.RESOURCEANALYSIS). + SetNext(trigger.NewTrigger(trigger.EXPULSION)), ) if err := metric.Update(); err != nil { return err