From ec5708e4915fa63818715bb9f50e63b10eff9ab8 Mon Sep 17 00:00:00 2001 From: vegbir Date: Fri, 1 Nov 2024 02:58:27 +0000 Subject: [PATCH] rubik: optimize triggers & resource manager & psi 1. Abstract resource manager. `cadvisor` is one of the implementations. We provide a factory class method to return the generator of `Manager` and the configuration options of the corresponding manager. Users can customize the configuration options and pass them into the corresponding `Manager` generator. 2. Abstract resource analysis class, which provides different types of analyzer generators. 3. Abstract base trigger template `BaseTemplate`. `BaseTemplate` further abstracts the `Executor` interface into `Transformation` and `Action`. The `Transformation` type indicates that the input is calculated and a specific output is returned (such as `resourceanalysisTrigger`), while the `Action` type operates on the input and does not return data (such as `evictionTrigger`). Parameters are passed uniformly using context variables, and standard keys are defined for passing data. 4. Provide common `Transformation` (Eviction) and `Action` (MaxResource). 5. Adapt PSI. The PSI service manages the start and stop of Resource Manager. The declaration of Triggers for executing different types of operations is defined by the service. 6. Provide a kubernetes clientSet singleton. For common variables in multiple modules, define a unified global singleton to avoid repeated links and resource consumption. Signed-off-by: vegbir --- pkg/core/metric/metric.go | 16 +- .../{base.go => common/treetrigger.go} | 138 +++--------- .../trigger/{factor.go => common/type.go} | 47 ++-- .../{expulsion.go => executor/eviction.go} | 85 ++------ pkg/core/trigger/executor/resource.go | 54 +++++ pkg/core/trigger/resourceanalysis.go | 206 ------------------ pkg/core/trigger/template/base.go | 109 +++++++++ pkg/informer/apiserverinformer.go | 22 +- pkg/lib/kubernetes/client.go | 65 ++++++ pkg/resource/analyze/analyzer.go | 87 ++++++++ pkg/resource/manager/builder.go | 52 +++++ .../manager}/cadvisor/cadvisor.go | 49 +++-- pkg/resource/manager/cadvisor/config.go | 90 ++++++++ pkg/resource/manager/common/common.go | 36 +++ pkg/rubik/rubik.go | 8 - pkg/services/psi/metric.go | 19 +- pkg/services/psi/psi.go | 106 +++++++-- pkg/services/psi/psi_test.go | 126 ----------- 18 files changed, 722 insertions(+), 593 deletions(-) rename pkg/core/trigger/{base.go => common/treetrigger.go} (34%) rename pkg/core/trigger/{factor.go => common/type.go} (45%) rename pkg/core/trigger/{expulsion.go => executor/eviction.go} (43%) create mode 100644 pkg/core/trigger/executor/resource.go delete mode 100644 pkg/core/trigger/resourceanalysis.go create mode 100644 pkg/core/trigger/template/base.go create mode 100644 pkg/lib/kubernetes/client.go create mode 100644 pkg/resource/analyze/analyzer.go create mode 100644 pkg/resource/manager/builder.go rename pkg/{resourcemanager => resource/manager}/cadvisor/cadvisor.go (68%) create mode 100644 pkg/resource/manager/cadvisor/config.go create mode 100644 pkg/resource/manager/common/common.go delete mode 100644 pkg/services/psi/psi_test.go diff --git a/pkg/core/metric/metric.go b/pkg/core/metric/metric.go index 1bae684..3c51b9b 100644 --- a/pkg/core/metric/metric.go +++ b/pkg/core/metric/metric.go @@ -14,29 +14,33 @@ // Package metric define metric interface package metric -import "isula.org/rubik/pkg/core/trigger" +import "isula.org/rubik/pkg/core/trigger/common" // Metric interface defines a series of rubik observation indicator methods type Metric interface { Update() error - AddTrigger(...trigger.Trigger) Metric + AddTrigger(string, ...common.Trigger) Metric } // BaseMetric is the basic Metric implementation type BaseMetric struct { - Triggers []trigger.Trigger + Triggers map[string][]common.Trigger } // NewBaseMetric returns a BaseMetric object func NewBaseMetric() *BaseMetric { return &BaseMetric{ - Triggers: make([]trigger.Trigger, 0), + Triggers: make(map[string][]common.Trigger, 0), } } // AddTrigger adds trigger methods for metric -func (m *BaseMetric) AddTrigger(triggers ...trigger.Trigger) Metric { - m.Triggers = append(m.Triggers, triggers...) +func (m *BaseMetric) AddTrigger(name string, triggers ...common.Trigger) Metric { + if _, ok := m.Triggers[name]; !ok { + m.Triggers[name] = make([]common.Trigger, 0) + } + + m.Triggers[name] = append(m.Triggers[name], triggers...) return m } diff --git a/pkg/core/trigger/base.go b/pkg/core/trigger/common/treetrigger.go similarity index 34% rename from pkg/core/trigger/base.go rename to pkg/core/trigger/common/treetrigger.go index 20168f1..65a5c61 100644 --- a/pkg/core/trigger/base.go +++ b/pkg/core/trigger/common/treetrigger.go @@ -1,4 +1,4 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2021-2023. All rights reserved. +// Copyright (c) Huawei Technologies Co., Ltd. 2021-2024. All rights reserved. // rubik licensed under the Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan PSL v2. // You may obtain a copy of Mulan PSL v2 at: @@ -9,60 +9,18 @@ // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang // Date: 2023-05-16 -// Description: This file is used for diverse triggers +// Description: This file is used for common triggers -// Package trigger defines diverse triggers -package trigger +// Package common defines trigger interface and tree trigger +package common import ( + "context" "fmt" - "sync" - "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" ) -// Typ is the type of trigger -type Typ int8 - -const ( - // ExpulsionAnno is the key of expulsion trigger - ExpulsionAnno = "expulsion" - // ResourceAnalysisAnno is the key of resource analysis trigger - ResourceAnalysisAnno = "resourceAnalysis" -) -const ( - // EXPULSION is the key of expulsion trigger - EXPULSION Typ = iota - // RESOURCEANALYSIS is the key of resource analysis trigger - RESOURCEANALYSIS -) - -type triggerCreator func() Trigger - -var triggerCreatorMap = map[Typ]triggerCreator{ - EXPULSION: expulsionCreator, - RESOURCEANALYSIS: analyzerCreator, -} - -// Descriptor defines methods for describing triggers -type Descriptor interface { - Name() string -} - -// Executor is the trigger execution function interface -type Executor interface { - Execute(Factor) (Factor, error) - Stop() error -} - -// Trigger interface defines the trigger methods -type Trigger interface { - Descriptor - Execute(Factor) error - SetNext(...Trigger) Trigger -} - // TreeTrigger organizes Triggers in a tree format and executes sub-triggers in a chain of responsibility mode type TreeTrigger struct { name string @@ -70,92 +28,66 @@ type TreeTrigger struct { subTriggers []Trigger } -// withTreeTrigger returns a BaseMetric object -func withTreeTrigger(name string, exec Executor) *TreeTrigger { +// NewTreeTrigger returns a BaseMetric object +func NewTreeTrigger(name string) *TreeTrigger { return &TreeTrigger{ name: name, - exec: exec, subTriggers: make([]Trigger, 0)} } -// SetNext sets the trigger that needs to be checked next -func (t *TreeTrigger) SetNext(triggers ...Trigger) Trigger { - t.subTriggers = append(t.subTriggers, triggers...) - return t -} - // Name returns the name of trigger func (t *TreeTrigger) Name() string { return t.name } -// Execute executes the sub-triggers of the current trigger -func (t *TreeTrigger) Execute(f Factor) error { +// Activate executes the sub-triggers of the current trigger +func (t *TreeTrigger) Activate(ctx context.Context) error { if t.exec == nil { return fmt.Errorf("trigger can not execute: %v", t.name) } var errs error - res, err := t.exec.Execute(f) + res, err := t.exec.Execute(ctx) if err != nil { return fmt.Errorf("failed to execute %v: %v", t.name, err) } for _, next := range t.subTriggers { - if err := next.Execute(res); err != nil { + if err := next.Activate(res); err != nil { errs = util.AppendErr(errs, util.AddErrorPrfix(err, t.name)) } } return errs } -// NewTrigger returns a trigger singleton according to type -func NewTrigger(t Typ) Trigger { - if _, ok := triggerCreatorMap[t]; !ok { - log.Warnf("undefine trigger: %v", t) +// Halt stops the sub-triggers of the current trigger and current trigger +func (t *TreeTrigger) Halt() error { + if t.exec == nil { return nil } - return triggerCreatorMap[t]() -} - -var ( - runningExecutors = make(map[string]Executor, 0) - execLock sync.Mutex -) + var errs error -func appendUsedExecutors(name string, exec Executor) { - if exec == nil { - log.Errorf("invalid executor: %v", name) - return + for _, next := range t.subTriggers { + errs = util.AppendErr(errs, next.Halt()) } - execLock.Lock() - defer execLock.Unlock() - if _, existed := runningExecutors[name]; existed { - log.Errorf("the executor already exist: %v", name) - return + errs = util.AppendErr(errs, t.exec.Stop()) + if errs != nil { + return fmt.Errorf("failed to stop trigger %v: %v", t.name, errs) } - log.Infof("append executor successfully: %v", name) - runningExecutors[name] = exec + return errs } -// StopUsedExecutors stops running executors -func StopUsedExecutors() error { - execLock.Lock() - defer execLock.Unlock() - var errs error - // stop the executors one by one - for name, exec := range runningExecutors { - log.Infof("stopping executor %v", name) - if exec == nil { - log.Infof("executor has already stopped: %v", name) - continue - } - if err := exec.Stop(); err != nil { - errs = util.AppendErr(errs, util.AddErrorPrfix(err, name)) - } - } - // clear executors - for k := range runningExecutors { - delete(runningExecutors, k) - } - return errs +// SetName sets the trigger name +func (t *TreeTrigger) SetName(name string) { + t.name = name +} + +// SetExecutor sets the executor of the trigger +func (t *TreeTrigger) SetExecutor(exec Executor) { + t.exec = exec +} + +// SetNext sets the triggers that need to be checked next +func (t *TreeTrigger) SetNext(triggers ...Trigger) Trigger { + t.subTriggers = append(t.subTriggers, triggers...) + return t } diff --git a/pkg/core/trigger/factor.go b/pkg/core/trigger/common/type.go similarity index 45% rename from pkg/core/trigger/factor.go rename to pkg/core/trigger/common/type.go index e1bfe99..901ca0d 100644 --- a/pkg/core/trigger/factor.go +++ b/pkg/core/trigger/common/type.go @@ -1,4 +1,4 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2021-2023. All rights reserved. +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. // rubik licensed under the Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan PSL v2. // You may obtain a copy of Mulan PSL v2 at: @@ -8,31 +8,42 @@ // PURPOSE. // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang -// Date: 2023-05-19 +// Date: 2024-10-31 // Description: This file is used for defining trigger factor -package trigger +package common -import "isula.org/rubik/pkg/core/typedef" +import "context" -// Factor represents the event trigger factor -type Factor interface { - Message() string - TargetPods() map[string]*typedef.PodInfo +type ( + // Factor is used to trigger various events + Factor uint8 +) + +const ( + TARGETPODS Factor = iota + DEPORTPOD +) + +// Descriptor defines methods for describing triggers +type Descriptor interface { + Name() string } -// FactorImpl is the basic implementation of the trigger factor -type FactorImpl struct { - Msg string - Pods map[string]*typedef.PodInfo +// Executor is the trigger execution function interface +type Executor interface { + Execute(context.Context) (context.Context, error) + Stop() error } -// Message returns the string information carried by Factor -func (impl *FactorImpl) Message() string { - return impl.Msg +type Setter interface { + SetNext(...Trigger) Trigger } -// TargetPods returns the pods that need to be processed -func (impl *FactorImpl) TargetPods() map[string]*typedef.PodInfo { - return impl.Pods +// Trigger interface defines the trigger methods +type Trigger interface { + Descriptor + Setter + Activate(context.Context) error + Halt() error } diff --git a/pkg/core/trigger/expulsion.go b/pkg/core/trigger/executor/eviction.go similarity index 43% rename from pkg/core/trigger/expulsion.go rename to pkg/core/trigger/executor/eviction.go index a8d790f..d561d73 100644 --- a/pkg/core/trigger/expulsion.go +++ b/pkg/core/trigger/executor/eviction.go @@ -1,4 +1,4 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2021-2023. All rights reserved. +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. // rubik licensed under the Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan PSL v2. // You may obtain a copy of Mulan PSL v2 at: @@ -8,93 +8,52 @@ // PURPOSE. // See the Mulan PSL v2 for more details. // Author: Jiaqi Yang -// Date: 2023-05-16 -// Description: This file is used for expulsion trigger +// Date: 2024-10-31 +// Description: This file is used for expulsion action -package trigger +package executor import ( "context" "fmt" - "sync" policyv1beta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" + "isula.org/rubik/pkg/core/trigger/common" "isula.org/rubik/pkg/core/typedef" - "isula.org/rubik/pkg/informer" + "isula.org/rubik/pkg/lib/kubernetes" ) -// expulsionExec is the singleton of Expulsion executor implementation -var expulsionExec *Expulsion - -// expulsionCreator creates Expulsion trigger -var expulsionCreator = func() Trigger { - if expulsionExec == nil { - c := newKubeClient() - if c != nil { - log.Infof("initialize expulsionExec") - expulsionExec = &Expulsion{client: c} - appendUsedExecutors(ExpulsionAnno, expulsionExec) - } - } - return withTreeTrigger(ExpulsionAnno, expulsionExec) -} - -// Expulsion is the trigger to evict pods -type Expulsion struct { - sync.RWMutex - client *kubernetes.Clientset -} - -// newKubeClient returns a kubernetes.Clientset object -func newKubeClient() *kubernetes.Clientset { - client, err := informer.InitKubeClient() +func EvictPod(ctx context.Context) error { + var errs error + client, err := kubernetes.GetClient() if err != nil { - log.Errorf("failed to connect kubernetes: %v", err) - return nil + return fmt.Errorf("failed to get kubernetes client: %v", err) } - return client -} - -// Execute evicts pods based on the id of the given pod -func (e *Expulsion) Execute(f Factor) (Factor, error) { - e.RLock() - defer e.RUnlock() - if e.client == nil { - return nil, fmt.Errorf("failed to use kubernetes client, please check") + pods, ok := ctx.Value(common.TARGETPODS).(map[string]*typedef.PodInfo) + if !ok { + return fmt.Errorf("failed to get target pods") } - var errs error - for name, pod := range f.TargetPods() { + eviction := &policyv1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{}, + DeleteOptions: &metav1.DeleteOptions{}, + } + for name, pod := range pods { log.Infof("evicting pod \"%v\"", name) if err := inevictable(pod); err != nil { errs = util.AppendErr(errs, fmt.Errorf("failed to evict pod \"%v\": %v", pod.Name, err)) continue } - eviction := &policyv1beta1.Eviction{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - }, - DeleteOptions: &metav1.DeleteOptions{}, - } - if err := e.client.CoreV1().Pods(pod.Namespace).Evict(context.TODO(), eviction); err != nil { + eviction.ObjectMeta.Name = pod.Name + eviction.ObjectMeta.Namespace = pod.Namespace + if err := client.CoreV1().Pods(pod.Namespace).Evict(context.TODO(), eviction); err != nil { errs = util.AppendErr(errs, fmt.Errorf("failed to evict pod \"%v\": %v", pod.Name, err)) - continue } } - return nil, errs -} - -// Stop stops the expulsion trigger -func (e *Expulsion) Stop() error { - e.Lock() - defer e.Unlock() - e.client = nil - return nil + return errs } func inevictable(pod *typedef.PodInfo) error { diff --git a/pkg/core/trigger/executor/resource.go b/pkg/core/trigger/executor/resource.go new file mode 100644 index 0000000..be4bd04 --- /dev/null +++ b/pkg/core/trigger/executor/resource.go @@ -0,0 +1,54 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Date: 2024-10-31 +// Description: This file is used for maxValue transfomation + +package executor + +import ( + "context" + "fmt" + + "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/core/trigger/common" + "isula.org/rubik/pkg/core/trigger/template" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/resource/analyze" +) + +// MaxValueTransformer returns a function that conforms to the Transformation format to filter for maximum utilization +func MaxValueTransformer(cal analyze.Calculator) template.Transformation { + return func(ctx context.Context) (context.Context, error) { + var ( + chosen *typedef.PodInfo + maxValue float64 = 0 + ) + + pods, ok := ctx.Value(common.TARGETPODS).(map[string]*typedef.PodInfo) + if !ok { + return ctx, fmt.Errorf("failed to get target pods") + } + + for _, pod := range pods { + value := cal(pod) + if maxValue < value { + maxValue = value + chosen = pod + } + } + + if chosen != nil { + log.Infof("find the pod(%v) with the highest utilization(%v)", chosen.Name, maxValue) + return context.WithValue(ctx, common.TARGETPODS, map[string]*typedef.PodInfo{chosen.Name: chosen}), nil + } + return context.Background(), fmt.Errorf("failed to find target pod") + } +} diff --git a/pkg/core/trigger/resourceanalysis.go b/pkg/core/trigger/resourceanalysis.go deleted file mode 100644 index 2298bd9..0000000 --- a/pkg/core/trigger/resourceanalysis.go +++ /dev/null @@ -1,206 +0,0 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2021-2023. All rights reserved. -// rubik licensed under the Mulan PSL v2. -// You can use this software according to the terms and conditions of the Mulan PSL v2. -// You may obtain a copy of Mulan PSL v2 at: -// http://license.coscl.org.cn/MulanPSL2 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -// PURPOSE. -// See the Mulan PSL v2 for more details. -// Author: Jiaqi Yang -// Date: 2023-05-18 -// Description: This file is used for resource Analyzer - -package trigger - -import ( - "fmt" - "sync" - "time" - - "github.com/google/cadvisor/cache/memory" - "github.com/google/cadvisor/container" - v2 "github.com/google/cadvisor/info/v2" - "github.com/google/cadvisor/manager" - "github.com/google/cadvisor/utils/sysfs" - - "isula.org/rubik/pkg/common/log" - "isula.org/rubik/pkg/common/util" - "isula.org/rubik/pkg/core/typedef" - "isula.org/rubik/pkg/resourcemanager/cadvisor" -) - -const ( - miniNum = 2 - nanoToMicro float64 = 1000 - percentageRate float64 = 100 -) - -// resourceAnalysisExec is the singleton of Analyzer executor implementation -var resourceAnalysisExec *Analyzer - -// analyzerCreator creates Analyzer trigger -var analyzerCreator = func() Trigger { - if resourceAnalysisExec == nil { - m := NewCadvisorManager() - if m != nil { - log.Infof("initialize resourceAnalysisExec") - resourceAnalysisExec = &Analyzer{cadvisorManager: m} - appendUsedExecutors(ResourceAnalysisAnno, resourceAnalysisExec) - } - } - return withTreeTrigger(ResourceAnalysisAnno, resourceAnalysisExec) -} - -// rreqOpt is the option to get information from cadvisor -var reqOpt = v2.RequestOptions{ - IdType: v2.TypeName, - Count: 2, - Recursive: false, -} - -// Analyzer is the resource analysis trigger -type Analyzer struct { - sync.RWMutex - cadvisorManager *cadvisor.Manager -} - -// NewCadvisorManager returns an cadvisor.Manager object -func NewCadvisorManager() *cadvisor.Manager { - const ( - cacheMinutes = 10 - keepingIntervalSec = 10 - ) - var ( - allowDynamic = true - maxHousekeepingInterval = keepingIntervalSec * time.Second - cacheAge = cacheMinutes * time.Minute - ) - args := cadvisor.StartArgs{ - MemCache: memory.New(cacheAge, nil), - SysFs: sysfs.NewRealSysFs(), - IncludeMetrics: container.MetricSet{ - container.CpuUsageMetrics: struct{}{}, - container.MemoryUsageMetrics: struct{}{}, - container.DiskUsageMetrics: struct{}{}, - container.DiskIOMetrics: struct{}{}, - }, - MaxHousekeepingConfig: manager.HouskeepingConfig{ - Interval: &maxHousekeepingInterval, - AllowDynamic: &allowDynamic, - }, - } - return cadvisor.WithStartArgs(args) -} - -// Execute filters the corresponding Pod according to the operation type and triggers it on demand -func (a *Analyzer) Execute(f Factor) (Factor, error) { - a.RLock() - defer a.RUnlock() - if a.cadvisorManager == nil { - return nil, fmt.Errorf("failed to use cadvisor, please check") - } - var ( - target *typedef.PodInfo - opTyp = f.Message() - errMsg string - alarm = func(target *typedef.PodInfo, errMsg string) (Factor, error) { - if target == nil { - return nil, fmt.Errorf(errMsg) - } - return &FactorImpl{Pods: map[string]*typedef.PodInfo{target.Name: target}}, nil - } - ) - log.Debugf("receive operation: %v", opTyp) - switch opTyp { - case "max_cpu": - errMsg = "unable to find pod with maximum CPU utilization" - target = a.maxCPUUtil(f.TargetPods()) - case "max_memory": - errMsg = "unable to find pod with maximum memory utilization" - target = a.maxMemoryUtil(f.TargetPods()) - case "max_io": - errMsg = "unable to find pod with maximum I/O bandwidth" - target = a.maxCPUUtil(f.TargetPods()) - default: - errMsg = "undefined operation: " + opTyp - } - return alarm(target, errMsg) -} - -func (a *Analyzer) maxCPUUtil(pods map[string]*typedef.PodInfo) *typedef.PodInfo { - var ( - chosen *typedef.PodInfo - maxUtil float64 - ) - for name, pod := range pods { - podStats, err := a.cgroupCadvisorInfo("/"+pod.Path, reqOpt) - if err != nil { - log.Errorf("failed to get cgroup information %v: %v", pod.Path, err) - continue - } - if len(podStats) < miniNum { - log.Errorf("pod %v has no enough cpu stats collected, skip it", name) - continue - } - last := podStats[len(podStats)-1] - penultimate := podStats[len(podStats)-2] - cpuUsageUs := float64(last.Cpu.Usage.Total-penultimate.Cpu.Usage.Total) / nanoToMicro - timeDeltaUs := float64(last.Timestamp.Sub(penultimate.Timestamp).Microseconds()) - cpuUtil := util.Div(cpuUsageUs, timeDeltaUs) * percentageRate - log.Debugf("pod %v cpu util %v%%=%v/%v(us)", name, cpuUtil, cpuUsageUs, timeDeltaUs) - if maxUtil < cpuUtil { - maxUtil = cpuUtil - chosen = pod - } - } - if chosen != nil { - log.Infof("find the pod(%v) with the highest cpu utilization(%v)", chosen.Name, maxUtil) - } - return chosen -} - -func (a *Analyzer) maxMemoryUtil(pods map[string]*typedef.PodInfo) *typedef.PodInfo { - var ( - chosen *typedef.PodInfo - maxUtil uint64 - ) - for name, pod := range pods { - podStats, err := a.cgroupCadvisorInfo("/"+pod.Path, reqOpt) - if err != nil { - log.Errorf("failed to get cgroup information %v: %v", pod.Path, err) - continue - } - last := podStats[len(podStats)-1].Memory.Usage - log.Debugf("pod %v memory usage %vB", name, last) - if maxUtil < last { - maxUtil = last - chosen = pod - } - } - if chosen != nil { - log.Infof("find the pod(%v) with the highest memory utilization(%v)", chosen.Name, maxUtil) - } - return chosen -} - -func (a *Analyzer) cgroupCadvisorInfo(cgroupPath string, opts v2.RequestOptions) ([]*v2.ContainerStats, error) { - infoMap, err := a.cadvisorManager.ContainerInfoV2(cgroupPath, opts) - if err != nil { - return nil, fmt.Errorf("failed to get cgroup information %v: %v", cgroupPath, err) - } - info, existed := infoMap[cgroupPath] - if !existed { - return nil, fmt.Errorf("failed to get cgroup info from cadvisor") - } - return info.Stats, nil -} - -// Stop stops Analyzer -func (a *Analyzer) Stop() error { - a.Lock() - defer a.Unlock() - m := a.cadvisorManager - a.cadvisorManager = nil - return m.Stop() -} diff --git a/pkg/core/trigger/template/base.go b/pkg/core/trigger/template/base.go new file mode 100644 index 0000000..f627884 --- /dev/null +++ b/pkg/core/trigger/template/base.go @@ -0,0 +1,109 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Date: 2024-10-30 +// Description: This file is used for It provides trigger templates +// Further, just provide the name and the operation to be performed + +package template + +import ( + "context" + "fmt" + + "isula.org/rubik/pkg/core/trigger/common" +) + +// Transformation returns the pods after convertion +type Transformation func(context.Context) (context.Context, error) + +// Action acts on Pods +type Action func(context.Context) error + +// ResourceAnalyzer is the resource analysis trigger +type BaseTemplate struct { + common.TreeTrigger + transfomer Transformation + actor Action +} + +// Execute filters the corresponding Pod according to the operation type and triggers it on demand +func (t *BaseTemplate) Execute(ctx context.Context) (context.Context, error) { + if t.transfomer != nil { + return transform(ctx, t.transfomer) + } + + if t.actor != nil { + return ctx, act(ctx, t.actor) + } + return ctx, nil +} + +// Stop stops the executor +func (t *BaseTemplate) Stop() error { + return nil +} + +func transform(ctx context.Context, f Transformation) (context.Context, error) { + if f == nil { + return nil, fmt.Errorf("podFilter method is not implemented") + } + // pods, ok := ctx.Value(common.TARGETPODS).(map[string]*typedef.PodInfo) + // if !ok { + // return ctx, fmt.Errorf("failed to get target pods") + // } + ctx, err := f(ctx) + if err != nil { + return ctx, fmt.Errorf("failed to transform pod: %v", err) + } + return ctx, nil +} + +func act(ctx context.Context, f Action) error { + if f == nil { + return fmt.Errorf("podAction method is not implemented") + } + // pods, ok := ctx.Value(common.TARGETPODS).(map[string]*typedef.PodInfo) + // if !ok { + // return nil + // } + return f(ctx) +} + +type opt func(t *BaseTemplate) + +func WithName(name string) opt { + return func(t *BaseTemplate) { + t.SetName(name) + } +} + +func WithPodTransformation(f Transformation) opt { + return func(t *BaseTemplate) { + t.transfomer = f + } +} + +func WithPodAction(f Action) opt { + return func(t *BaseTemplate) { + t.actor = f + } +} + +func FromBaseTemplate(opts ...opt) common.Trigger { + t := &BaseTemplate{ + TreeTrigger: *common.NewTreeTrigger("base template"), + } + for _, opt := range opts { + opt(t) + } + t.SetExecutor(t) + return t +} diff --git a/pkg/informer/apiserverinformer.go b/pkg/informer/apiserverinformer.go index d5917f3..b2867c1 100644 --- a/pkg/informer/apiserverinformer.go +++ b/pkg/informer/apiserverinformer.go @@ -23,20 +23,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/lib/kubernetes" ) // APIServerInformer interacts with k8s api server and forward data to the internal type APIServerInformer struct { api.Publisher - client *kubernetes.Clientset + client *kubernetes.Client nodeName string } @@ -47,7 +46,7 @@ func NewAPIServerInformer(publisher api.Publisher) (api.Informer, error) { } // create apiserver client - client, err := InitKubeClient() + client, err := kubernetes.GetClient() if err != nil { return nil, fmt.Errorf("failed to init kubenetes client: %v", err) } @@ -63,21 +62,6 @@ func NewAPIServerInformer(publisher api.Publisher) (api.Informer, error) { return informer, nil } -// InitKubeClient initializes kubeClient -func InitKubeClient() (*kubernetes.Clientset, error) { - conf, err := rest.InClusterConfig() - if err != nil { - return nil, err - } - - kubeClient, err := kubernetes.NewForConfig(conf) - if err != nil { - return nil, err - } - - return kubeClient, nil -} - // Start starts and enables PIServerInformer func (informer *APIServerInformer) Start(ctx context.Context) error { const specNodeNameField = "spec.nodeName" diff --git a/pkg/lib/kubernetes/client.go b/pkg/lib/kubernetes/client.go new file mode 100644 index 0000000..11e9ed1 --- /dev/null +++ b/pkg/lib/kubernetes/client.go @@ -0,0 +1,65 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// +// http://license.coscl.org.cn/MulanPSL2 +// +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Date: 2024-10-31 +// Description: This file is used for kubernetes client + +package kubernetes + +import ( + "fmt" + "sync" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type Client struct { + kubernetes.Clientset +} + +var ( + defaultClient *Client + clientSync sync.RWMutex +) + +// initKubeClient initializes kubeClient +func initClient() (*Client, error) { + conf, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + + kubeClient, err := kubernetes.NewForConfig(conf) + if err != nil { + return nil, err + } + + return &Client{Clientset: *kubeClient}, nil +} + +// GetClient gets the globally unique default kubernetes client +func GetClient() (*Client, error) { + // prevent multiple initializations + clientSync.Lock() + defer clientSync.Unlock() + + if defaultClient != nil { + return defaultClient, nil + } + c, err := initClient() + if err != nil { + return nil, fmt.Errorf("failed to init client: %v", err) + } + defaultClient = c + return defaultClient, nil +} diff --git a/pkg/resource/analyze/analyzer.go b/pkg/resource/analyze/analyzer.go new file mode 100644 index 0000000..5c85897 --- /dev/null +++ b/pkg/resource/analyze/analyzer.go @@ -0,0 +1,87 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Date: 2024-10-31 +// Description: This file is used for resource Analyzer + +package analyze + +import ( + v2 "github.com/google/cadvisor/info/v2" + + "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/common/util" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/resource/manager/common" +) + +type Calculator func(*typedef.PodInfo) float64 + +type Analyzer struct { + common.Manager +} + +func NewResourceAnalyzer(manager common.Manager) *Analyzer { + return &Analyzer{ + Manager: manager, + } +} + +func (a *Analyzer) CPUCalculatorBuilder(reqOpt *common.GetOption) Calculator { + return func(pi *typedef.PodInfo) float64 { + const ( + miniNum int = 2 + nanoToMicro float64 = 1000 + percentageRate float64 = 100 + ) + + podStats := a.getPodStats("/"+pi.Path, reqOpt) + if len(podStats) < miniNum { + log.Errorf("pod %v has no enough cpu stats collected, skip it", pi.Name) + return -1 + } + var ( + last = podStats[len(podStats)-1] + penultimate = podStats[len(podStats)-2] + cpuUsageUs = float64(last.Cpu.Usage.Total-penultimate.Cpu.Usage.Total) / nanoToMicro + timeDeltaUs = float64(last.Timestamp.Sub(penultimate.Timestamp).Microseconds()) + ) + return util.Div(cpuUsageUs, timeDeltaUs) * percentageRate + } +} + +func (a *Analyzer) MemoryCalculatorBuilder(reqOpt *common.GetOption) Calculator { + return func(pi *typedef.PodInfo) float64 { + const ( + bytesToMb float64 = 1000000.0 + miniNum int = 1 + ) + podStats := a.getPodStats("/"+pi.Path, reqOpt) + if len(podStats) < miniNum { + log.Errorf("pod %v has no enough memory stats collected, skip it", pi.Name) + return -1 + } + return float64(podStats[len(podStats)-1].Memory.Usage) / bytesToMb + } +} + +func (a *Analyzer) getPodStats(cgroupPath string, reqOpt *common.GetOption) []*v2.ContainerStats { + infoMap, err := a.GetPodStats(cgroupPath, *reqOpt) + if err != nil { + log.Errorf("failed to get cgroup information %v: %v", cgroupPath, err) + return nil + } + info, existed := infoMap[cgroupPath] + if !existed { + log.Errorf("failed to get cgroup %v from cadvisor", cgroupPath) + return nil + } + return info.Stats +} diff --git a/pkg/resource/manager/builder.go b/pkg/resource/manager/builder.go new file mode 100644 index 0000000..88b54d6 --- /dev/null +++ b/pkg/resource/manager/builder.go @@ -0,0 +1,52 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Date: 2024-10-31 +// Description: This file defines the builder of resource managers + +// Package manager implements manager builer +package manager + +import ( + "fmt" + + "isula.org/rubik/pkg/resource/manager/cadvisor" + "isula.org/rubik/pkg/resource/manager/common" +) + +type managerTyp uint8 + +const ( + CADVISOR managerTyp = iota +) + +type CadvisorConfig interface { + Config() *cadvisor.Config +} + +type Builder func(interface{}) (common.Manager, error) + +func GetManagerBuilder(typ managerTyp) Builder { + switch typ { + case CADVISOR: + return newCasvisorManagerBuilder() + } + return nil +} + +func newCasvisorManagerBuilder() Builder { + return func(args interface{}) (common.Manager, error) { + conf, ok := args.(CadvisorConfig) + if !ok { + return nil, fmt.Errorf("failed to get cadvisor config") + } + return cadvisor.New(conf.Config()), nil + } +} diff --git a/pkg/resourcemanager/cadvisor/cadvisor.go b/pkg/resource/manager/cadvisor/cadvisor.go similarity index 68% rename from pkg/resourcemanager/cadvisor/cadvisor.go rename to pkg/resource/manager/cadvisor/cadvisor.go index 31cf196..c13312e 100644 --- a/pkg/resourcemanager/cadvisor/cadvisor.go +++ b/pkg/resource/manager/cadvisor/cadvisor.go @@ -1,7 +1,7 @@ //go:build linux // +build linux -// Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. +// Copyright (c) Huawei Technologies Co., Ltd. 2023-2024. All rights reserved. // rubik licensed under the Mulan PSL v2. // You can use this software according to the terms and conditions of the Mulan PSL v2. // You may obtain a copy of Mulan PSL v2 at: @@ -18,32 +18,24 @@ package cadvisor import ( "net/http" + "sync" "time" - "github.com/google/cadvisor/cache/memory" "github.com/google/cadvisor/container" - v2 "github.com/google/cadvisor/info/v2" "github.com/google/cadvisor/manager" - "github.com/google/cadvisor/utils/sysfs" "isula.org/rubik/pkg/common/log" + "isula.org/rubik/pkg/resource/manager/common" ) // Manager is the cadvisor manager type Manager struct { manager.Manager + sync.RWMutex } -// StartArgs is a set of parameters that control the startup of cadvisor -type StartArgs struct { - MemCache *memory.InMemoryCache - SysFs sysfs.SysFs - IncludeMetrics container.MetricSet - MaxHousekeepingConfig manager.HouskeepingConfig -} - -// WithStartArgs creates cadvisor.Manager object -func WithStartArgs(args StartArgs) *Manager { +// New creates cadvisor.Manager object +func New(args *Config) *Manager { const ( perfEventsFile = "/sys/kernel/debug/tracing/events/raw_syscalls/sys_enter" ) @@ -69,13 +61,17 @@ func WithStartArgs(args StartArgs) *Manager { } // Start starts cadvisor manager -func (c *Manager) Start() error { - return c.Manager.Start() +func (m *Manager) Start() error { + m.Lock() + defer m.Unlock() + return m.Manager.Start() } // Stop stops cadvisor and clear existing factory -func (c *Manager) Stop() error { - err := c.Manager.Stop() +func (m *Manager) Stop() error { + m.Lock() + defer m.Unlock() + err := m.Manager.Stop() if err != nil { return err } @@ -85,7 +81,18 @@ func (c *Manager) Stop() error { } // ContainerInfo gets container infos v2 -func (c *Manager) ContainerInfoV2(name string, - options v2.RequestOptions) (map[string]v2.ContainerInfo, error) { - return c.GetContainerInfoV2(name, options) +func (m *Manager) GetPodStats(name string, options common.GetOption) (map[string]common.PodStat, error) { + m.RLock() + defer m.RUnlock() + contInfo, err := m.GetContainerInfoV2(name, options.CadvisorV2RequestOptions) + if err != nil { + return nil, err + } + var podStats = make(map[string]common.PodStat, len(contInfo)) + for name, info := range contInfo { + podStats[name] = common.PodStat{ + ContainerInfo: info, + } + } + return podStats, nil } diff --git a/pkg/resource/manager/cadvisor/config.go b/pkg/resource/manager/cadvisor/config.go new file mode 100644 index 0000000..d007ae7 --- /dev/null +++ b/pkg/resource/manager/cadvisor/config.go @@ -0,0 +1,90 @@ +//go:build linux +// +build linux + +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Create: 2024-10-31 +// Description: This file defines cadvisor config + +package cadvisor + +import ( + "time" + + "github.com/google/cadvisor/cache/memory" + "github.com/google/cadvisor/container" + "github.com/google/cadvisor/manager" + "github.com/google/cadvisor/utils/sysfs" +) + +// Config is a set of parameters that control the startup of cadvisor +type Config struct { + MemCache *memory.InMemoryCache + SysFs sysfs.SysFs + IncludeMetrics container.MetricSet + MaxHousekeepingConfig manager.HouskeepingConfig +} + +func (c *Config) Config() *Config { + return c +} + +type ConfigOpt func(args *Config) + +func WithCacheAge(cacheAge time.Duration) ConfigOpt { + return func(args *Config) { + args.MemCache = memory.New(cacheAge, nil) + } +} + +func WithFs(fs sysfs.SysFs) ConfigOpt { + return func(args *Config) { + args.SysFs = fs + } +} + +func WithMetrics(metrics string) ConfigOpt { + return func(args *Config) { + var ms container.MetricSet + ms.Set(metrics) + args.IncludeMetrics = ms + } +} + +func WithHouseKeepingInterval(interval time.Duration) ConfigOpt { + return func(args *Config) { + args.MaxHousekeepingConfig.Interval = &interval + } +} + +func WithHouseKeepingDynamic(allowDynamic bool) ConfigOpt { + return func(args *Config) { + args.MaxHousekeepingConfig.AllowDynamic = &allowDynamic + } +} + +func NewConfig(opts ...ConfigOpt) *Config { + var ( + allowDynamic = true + interval = time.Second + ) + var conf = &Config{ + MaxHousekeepingConfig: manager.HouskeepingConfig{ + AllowDynamic: &allowDynamic, + Interval: &interval, + }, + SysFs: sysfs.NewRealSysFs(), + } + for _, opt := range opts { + opt(conf) + } + return conf +} diff --git a/pkg/resource/manager/common/common.go b/pkg/resource/manager/common/common.go new file mode 100644 index 0000000..65f8b64 --- /dev/null +++ b/pkg/resource/manager/common/common.go @@ -0,0 +1,36 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2024. All rights reserved. +// rubik licensed under the Mulan PSL v2. +// You can use this software according to the terms and conditions of the Mulan PSL v2. +// You may obtain a copy of Mulan PSL v2 at: +// http://license.coscl.org.cn/MulanPSL2 +// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR +// PURPOSE. +// See the Mulan PSL v2 for more details. +// Author: Jiaqi Yang +// Date: 2024-10-31 +// Description: This file defines the commonlib for resource manager + +// Package common defines the commonlib for resource manager +package common + +import ( + v2 "github.com/google/cadvisor/info/v2" +) + +// PodStat is the status of pod +type PodStat struct { + v2.ContainerInfo +} + +// GetOption is the option to get podStat +type GetOption struct { + CadvisorV2RequestOptions v2.RequestOptions +} + +// Manager is the function set of Resource Manager +type Manager interface { + Start() error + Stop() error + GetPodStats(name string, opt GetOption) (map[string]PodStat, error) +} diff --git a/pkg/rubik/rubik.go b/pkg/rubik/rubik.go index ddc7ff2..22ff924 100644 --- a/pkg/rubik/rubik.go +++ b/pkg/rubik/rubik.go @@ -29,7 +29,6 @@ import ( "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/config" "isula.org/rubik/pkg/core/publisher" - "isula.org/rubik/pkg/core/trigger" "isula.org/rubik/pkg/core/typedef/cgroup" "isula.org/rubik/pkg/informer" "isula.org/rubik/pkg/podmanager" @@ -167,7 +166,6 @@ func Run() int { go handleSignals(cancel) // 3. run rubik-agent - defer cleanDepecdencies() if err := runAgent(ctx); err != nil { log.Errorf("failed to run rubik agent: %v", err) return constant.ErrorExitCode @@ -185,9 +183,3 @@ func handleSignals(cancel context.CancelFunc) { } } } - -func cleanDepecdencies() { - if err := trigger.StopUsedExecutors(); err != nil { - log.Errorf("failed to stop dependencies: %v", err) - } -} diff --git a/pkg/services/psi/metric.go b/pkg/services/psi/metric.go index b47fffb..892596b 100644 --- a/pkg/services/psi/metric.go +++ b/pkg/services/psi/metric.go @@ -14,11 +14,13 @@ package psi import ( + "context" + "isula.org/rubik/pkg/common/constant" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/core/metric" - "isula.org/rubik/pkg/core/trigger" + "isula.org/rubik/pkg/core/trigger/common" "isula.org/rubik/pkg/core/typedef" "isula.org/rubik/pkg/core/typedef/cgroup" ) @@ -57,7 +59,7 @@ func (m *BasePSIMetric) Update() error { } for _, typ := range m.resources { if detectPSiMetric(typ, m.conservation, m.avg10Threshold) { - if err := alarm(typ, m.Triggers, m.suspicion); err != nil { + if err := alarm(typ, m.Triggers[typ], m.suspicion); err != nil { return err } } @@ -89,14 +91,13 @@ func detectPSiMetric(resTyp string, conservation map[string]*typedef.PodInfo, av return false } -func alarm(resTyp string, triggers []trigger.Trigger, suspicion map[string]*typedef.PodInfo) error { - var errs error - const prefix = "max_" +func alarm(resTyp string, triggers []common.Trigger, suspicion map[string]*typedef.PodInfo) error { + var ( + errs error + ctx = context.WithValue(context.Background(), common.TARGETPODS, suspicion) + ) for _, t := range triggers { - log.Infof("trigger %v", t.Name()) - if err := t.Execute(&trigger.FactorImpl{Msg: prefix + resTyp, Pods: suspicion}); err != nil { - errs = util.AppendErr(errs, err) - } + errs = util.AppendErr(errs, t.Activate(ctx)) } return errs } diff --git a/pkg/services/psi/psi.go b/pkg/services/psi/psi.go index 45d5faf..946adc0 100644 --- a/pkg/services/psi/psi.go +++ b/pkg/services/psi/psi.go @@ -21,14 +21,21 @@ import ( "fmt" "time" + v2 "github.com/google/cadvisor/info/v2" "k8s.io/apimachinery/pkg/util/wait" "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/core/metric" - "isula.org/rubik/pkg/core/trigger" + "isula.org/rubik/pkg/core/trigger/common" + "isula.org/rubik/pkg/core/trigger/executor" + "isula.org/rubik/pkg/core/trigger/template" "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/resource/analyze" + "isula.org/rubik/pkg/resource/manager" + "isula.org/rubik/pkg/resource/manager/cadvisor" + resource "isula.org/rubik/pkg/resource/manager/common" "isula.org/rubik/pkg/services/helper" ) @@ -37,8 +44,18 @@ const ( maxInterval = 30 maxThreshold float64 = 100.0 minThreshold float64 = 5.0 + factoryName string = "PSIFactory" ) +// requestOptions is the option to get information from cadvisor +var requestOptions = resource.GetOption{ + CadvisorV2RequestOptions: v2.RequestOptions{ + IdType: v2.TypeName, + Count: 2, + Recursive: false, + }, +} + // Factory is the PSI Manager factory class type Factory struct { ObjName string @@ -46,12 +63,12 @@ type Factory struct { // Name returns the factory class name func (f Factory) Name() string { - return "PSIFactory" + return factoryName } // NewObj returns a Manager object func (f Factory) NewObj() (interface{}, error) { - return NewManager(f.ObjName), nil + return NewManager(f.ObjName) } // Config is PSI service configuration @@ -91,23 +108,62 @@ func (conf *Config) Validate() error { // Manager is used to manage PSI services type Manager struct { + helper.ServiceBase conf *Config Viewer api.Viewer - helper.ServiceBase + // analyzer is used to assist in analyzing Pod data + analyzer *analyze.Analyzer + // met is used to implement condition-triggered + met *metric.BaseMetric } -// NewManager returns psi manager objects -func NewManager(n string) *Manager { +// NewManager returns psi manager +func NewManager(name string) (*Manager, error) { + // 1. Use cadvisor as a manager of Pod data + cm, err := newCadvisorManager() + if err != nil { + return nil, err + } + + // 2. Analyze Pod resources through cadvisor data + analyzer := analyze.NewResourceAnalyzer(cm) + + // 3。 Define different kinds of triggers, including sorting, eviction + var ( + evictTrigger = template.FromBaseTemplate( + template.WithName("psi_eviction"), + template.WithPodAction(executor.EvictPod), + ) + cpuTrigger = template.FromBaseTemplate( + template.WithName("psi_cpu_trigger"), + template.WithPodTransformation(executor.MaxValueTransformer(analyzer.CPUCalculatorBuilder(&requestOptions))), + ).SetNext(evictTrigger) + memoryTrigger = template.FromBaseTemplate( + template.WithName("psi_memory_trigger"), + template.WithPodTransformation(executor.MaxValueTransformer(analyzer.MemoryCalculatorBuilder(&requestOptions))), + ).SetNext(evictTrigger) + ) + return &Manager{ ServiceBase: helper.ServiceBase{ - Name: n, + Name: name, }, conf: NewConfig(), - } + met: &metric.BaseMetric{ + Triggers: map[string][]common.Trigger{ + cpuRes: {cpuTrigger}, + memoryRes: {memoryTrigger}, + ioRes: {cpuTrigger}, + }, + }, + analyzer: analyzer}, nil } // Run checks psi metrics cyclically. func (m *Manager) Run(ctx context.Context) { + // 1. Start Resource Manager to collect data + m.analyzer.Start() + // 2。Loop to determine the PSI of the online pod and execute the trigger wait.Until( func() { if err := m.monitor(); err != nil { @@ -169,15 +225,37 @@ func priority(online bool) api.ListOption { // monitor gets metrics and fire triggers when satisfied func (m *Manager) monitor() error { - metric := &BasePSIMetric{conservation: m.Viewer.ListPodsWithOptions(priority(true)), + metric := &BasePSIMetric{ + conservation: m.Viewer.ListPodsWithOptions(priority(true)), suspicion: m.Viewer.ListPodsWithOptions(priority(false)), - BaseMetric: metric.NewBaseMetric(), + BaseMetric: m.met, resources: m.conf.Resource, avg10Threshold: m.conf.Avg10Threshold, } - metric.AddTrigger( - trigger.NewTrigger(trigger.RESOURCEANALYSIS). - SetNext(trigger.NewTrigger(trigger.EXPULSION)), - ) return metric.Update() } + +// Terminate clean the resource +func (m *Manager) Terminate(api.Viewer) error { + return m.analyzer.Stop() +} + +// newCadvisorManager returns cadvisor manager instance +func newCadvisorManager() (resource.Manager, error) { + const ( + cacheMinutes = 10 + keepingIntervalSec = 10 + ) + var ( + allowDynamic = true + maxHousekeepingInterval = keepingIntervalSec * time.Second + ) + builder := manager.GetManagerBuilder(manager.CADVISOR) + conf := cadvisor.NewConfig( + cadvisor.WithCacheAge(cacheMinutes*time.Minute), + cadvisor.WithMetrics("cpu,memory,disk,diskIO"), + cadvisor.WithHouseKeepingInterval(maxHousekeepingInterval), + cadvisor.WithHouseKeepingDynamic(allowDynamic), + ) + return builder(conf) +} diff --git a/pkg/services/psi/psi_test.go b/pkg/services/psi/psi_test.go deleted file mode 100644 index 2036aa1..0000000 --- a/pkg/services/psi/psi_test.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. -// rubik licensed under the Mulan PSL v2. -// You can use this software according to the terms and conditions of the Mulan PSL v2. -// You may obtain a copy of Mulan PSL v2 at: -// http://license.coscl.org.cn/MulanPSL2 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -// PURPOSE. -// See the Mulan PSL v2 for more details. -// Author: Jingxiao Lu -// Date: 2023-06-12 -// Description: This file is used for testing psi.go - -package psi - -import ( - "context" - "fmt" - "testing" - "time" - - "isula.org/rubik/pkg/api" - "isula.org/rubik/pkg/core/typedef" -) - -// TestNewManagerObj tests NewObj() for Factory -func TestNewManagerObj(t *testing.T) { - var fact = Factory{ - ObjName: "psi", - } - nm, err := fact.NewObj() - if err != nil { - t.Fatalf("New PSI Manager failed: %v", err) - return - } - fmt.Printf("New PSI Manager %s is %#v", fact.Name(), nm) -} - -// TestConfigValidate tests Config Validate -func TestConfigValidate(t *testing.T) { - var tests = []struct { - name string - conf *Config - wantErr bool - }{ - { - name: "TC1 - Default Config", - conf: NewConfig(), - wantErr: true, - }, - { - name: "TC2 - Wrong Interval value", - conf: &Config{ - Interval: minInterval - 1, - }, - wantErr: true, - }, - { - name: "TC3 - Wrong Threshold value", - conf: &Config{ - Interval: minInterval, - Avg10Threshold: minThreshold - 1, - }, - wantErr: true, - }, - { - name: "TC4 - No resource type specified", - conf: &Config{ - Interval: minInterval, - Avg10Threshold: minThreshold, - }, - wantErr: true, - }, - { - name: "TC5 - Wrong resource type cpuacct - cpuacct is for psi subsystem, not for resource type", - conf: &Config{ - Interval: minInterval, - Avg10Threshold: minThreshold, - Resource: []string{"cpu", "memory", "io", "cpuacct"}, - }, - wantErr: true, - }, - { - name: "TC6 - Success case - trully end", - conf: &Config{ - Interval: minInterval, - Avg10Threshold: minThreshold, - Resource: []string{"cpu", "memory", "io"}, - }, - wantErr: false, - }, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - if err := tc.conf.Validate(); (err != nil) != tc.wantErr { - t.Errorf("Config.Validate() error = %v, wantErr %v", err, tc.wantErr) - } - }) - } -} - -type FakeManager struct{} - -func (m *FakeManager) ListContainersWithOptions(options ...api.ListOption) map[string]*typedef.ContainerInfo { - return make(map[string]*typedef.ContainerInfo) -} -func (m *FakeManager) ListPodsWithOptions(options ...api.ListOption) map[string]*typedef.PodInfo { - return make(map[string]*typedef.PodInfo, 1) -} - -// TestManagerRun creates a fake manager and runs it -func TestManagerRun(t *testing.T) { - nm := NewManager("psi") - nm.conf.Interval = 1 - nm.PreStart(&FakeManager{}) - nm.SetConfig(func(configName string, d interface{}) error { return nil }) - if !nm.IsRunner() { - t.Fatalf("FakeManager is not a runner!") - return - } - - ctx, cancel := context.WithCancel(context.Background()) - go nm.Run(ctx) - time.Sleep(time.Second) - cancel() -} -- Gitee