From 2784560d5d722ef17cb8c5edf227e3cf228de16f Mon Sep 17 00:00:00 2001 From: hanchao Date: Sun, 5 Mar 2023 15:28:02 +0800 Subject: [PATCH] feature: add feature components initialize use feature spec file to initialize feature. --- pkg/config/config.go | 12 +-- pkg/feature/feature.go | 9 ++ pkg/rubik/import.go | 2 +- pkg/rubik/rubik.go | 22 +++-- pkg/rubik/rubik_feature.go | 29 ++++++ pkg/{services => rubik}/servicemanager.go | 69 +++++--------- .../blkio.go => blkcg/blkio_throttle.go} | 66 +++++--------- pkg/services/cachelimit/cachelimit.go | 33 +++---- pkg/services/cachelimit/cachelimit_test.go | 6 +- pkg/services/component.go | 48 ++++++++++ pkg/services/helper/factory.go | 32 +++++++ pkg/services/helper/logger.go | 35 ++++++++ pkg/services/helper/template.go | 56 ++++++++++++ pkg/services/qos/qos.go | 44 +++++---- pkg/services/qos/qos_test.go | 10 --- pkg/services/quotaburst/quotaburst.go | 36 ++++---- pkg/services/quotaburst/quotaburst_test.go | 9 +- pkg/services/quotaturbo/quotaturbo.go | 35 ++++---- pkg/services/quotaturbo/quotaturbo_test.go | 3 - pkg/services/registry.go | 53 ----------- pkg/services/services.go | 89 +++++++++++++++++++ 21 files changed, 457 insertions(+), 241 deletions(-) create mode 100644 pkg/feature/feature.go create mode 100644 pkg/rubik/rubik_feature.go rename pkg/{services => rubik}/servicemanager.go (83%) rename pkg/services/{blkio/blkio.go => blkcg/blkio_throttle.go} (36%) create mode 100644 pkg/services/component.go create mode 100644 pkg/services/helper/factory.go create mode 100644 pkg/services/helper/logger.go create mode 100644 pkg/services/helper/template.go delete mode 100644 pkg/services/registry.go create mode 100644 pkg/services/services.go diff --git a/pkg/config/config.go b/pkg/config/config.go index c828f12..3b0458d 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -38,11 +38,12 @@ type Config struct { // AgentConfig is the configuration of rubik, including important basic configurations such as logs type AgentConfig struct { - LogDriver string `json:"logDriver,omitempty"` - LogLevel string `json:"logLevel,omitempty"` - LogSize int64 `json:"logSize,omitempty"` - LogDir string `json:"logDir,omitempty"` - CgroupRoot string `json:"cgroupRoot,omitempty"` + LogDriver string `json:"logDriver,omitempty"` + LogLevel string `json:"logLevel,omitempty"` + LogSize int64 `json:"logSize,omitempty"` + LogDir string `json:"logDir,omitempty"` + CgroupRoot string `json:"cgroupRoot,omitempty"` + Feature []string `json:"feature,omitempty"` } // NewConfig returns an config object pointer @@ -55,6 +56,7 @@ func NewConfig(pType parserType) *Config { LogLevel: constant.DefaultLogLevel, LogDir: constant.DefaultLogDir, CgroupRoot: constant.DefaultCgroupRoot, + Feature: []string{}, }, } return c diff --git a/pkg/feature/feature.go b/pkg/feature/feature.go new file mode 100644 index 0000000..ac9f423 --- /dev/null +++ b/pkg/feature/feature.go @@ -0,0 +1,9 @@ +package feature + +var ( + Feature_Blkcg_Blkio_Throttle = "blkcg.blkio_throttle" + Feature_Memcg_Cache_Limit = "memcg.cache_limit" + Feature_cpucg_quota_turbo = "cpucg.quota_turbo" + Feature_cpucg_quota_burst = "cpucg.quota_burst" + Feature_qos_level = "qos_level" +) diff --git a/pkg/rubik/import.go b/pkg/rubik/import.go index dbb9b19..31f4dbc 100644 --- a/pkg/rubik/import.go +++ b/pkg/rubik/import.go @@ -16,7 +16,7 @@ package rubik import ( // introduce packages to auto register service - _ "isula.org/rubik/pkg/services/blkio" + _ "isula.org/rubik/pkg/services/blkcg" _ "isula.org/rubik/pkg/services/cachelimit" _ "isula.org/rubik/pkg/services/qos" _ "isula.org/rubik/pkg/services/quotaburst" diff --git a/pkg/rubik/rubik.go b/pkg/rubik/rubik.go index 62fa40f..087f180 100644 --- a/pkg/rubik/rubik.go +++ b/pkg/rubik/rubik.go @@ -40,14 +40,14 @@ type Agent struct { config *config.Config podManager *podmanager.PodManager informer api.Informer - servicesManager *services.ServiceManager + servicesManager *ServiceManager } // NewAgent returns an agent for given configuration func NewAgent(cfg *config.Config) (*Agent, error) { publisher := publisher.GetPublisherFactory().GetPublisher(publisher.GENERIC) - serviceManager := services.NewServiceManager() - if err := serviceManager.InitServices(cfg.UnwarpServiceConfig(), cfg); err != nil { + serviceManager := NewServiceManager() + if err := serviceManager.InitServices(cfg.Agent.Feature, cfg.UnwarpServiceConfig(), cfg); err != nil { return nil, err } a := &Agent{ @@ -116,15 +116,23 @@ func runAgent(ctx context.Context) error { if err := c.LoadConfig(constant.ConfigFile); err != nil { return fmt.Errorf("error loading config: %v", err) } - // 2. enable log system + + // 2. enable cgroup system + cgroup.InitMountDir(c.Agent.CgroupRoot) + + // 3. enable log system if err := log.InitConfig(c.Agent.LogDriver, c.Agent.LogDir, c.Agent.LogLevel, c.Agent.LogSize); err != nil { return fmt.Errorf("error initializing log: %v", err) } - // 3. enable cgroup system - cgroup.InitMountDir(c.Agent.CgroupRoot) + // 4. init service logger + services.SetLogger(log.WithCtx(context.WithValue(context.Background(), + log.CtxKey(constant.LogEntryKey), "rubik/services"))) + + // 5. init service components + services.InitServiceComponents(defaultRubikFeature) - // 4. Create and run the agent + // 6. Create and run the agent agent, err := NewAgent(c) if err != nil { return fmt.Errorf("error new agent: %v", err) diff --git a/pkg/rubik/rubik_feature.go b/pkg/rubik/rubik_feature.go new file mode 100644 index 0000000..6b195dc --- /dev/null +++ b/pkg/rubik/rubik_feature.go @@ -0,0 +1,29 @@ +package rubik + +import ( + "isula.org/rubik/pkg/feature" + "isula.org/rubik/pkg/services" +) + +var defaultRubikFeature = []services.FeatureSpec{ + { + Name: feature.Feature_Blkcg_Blkio_Throttle, + Default: true, + }, + { + Name: feature.Feature_Memcg_Cache_Limit, + Default: true, + }, + { + Name: feature.Feature_cpucg_quota_turbo, + Default: true, + }, + { + Name: feature.Feature_cpucg_quota_burst, + Default: true, + }, + { + Name: feature.Feature_qos_level, + Default: true, + }, +} diff --git a/pkg/services/servicemanager.go b/pkg/rubik/servicemanager.go similarity index 83% rename from pkg/services/servicemanager.go rename to pkg/rubik/servicemanager.go index 09bf007..c91e818 100644 --- a/pkg/services/servicemanager.go +++ b/pkg/rubik/servicemanager.go @@ -12,7 +12,7 @@ // Description: This file defines ServiceManager to manage the lifecycle of services // Package services implements service registration, discovery and management functions -package services +package rubik import ( "context" @@ -24,11 +24,11 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "isula.org/rubik/pkg/api" - "isula.org/rubik/pkg/common/constant" "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/config" "isula.org/rubik/pkg/core/subscriber" "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/services" ) // serviceManagerName is the unique ID of the service manager @@ -39,9 +39,9 @@ type ServiceManager struct { api.Subscriber api.Viewer sync.RWMutex - RunningServices map[string]api.Service + RunningServices map[string]services.Service RunningPersistentServices map[string]api.PersistentService - TerminateFuncs map[string]Terminator + TerminateFuncs map[string]services.Terminator } // NewServiceManager creates a servicemanager object @@ -55,29 +55,22 @@ func NewServiceManager() *ServiceManager { } // InitServices parses the to-be-run services config and loads them to the ServiceManager -func (manager *ServiceManager) InitServices(serviceConfig map[string]interface{}, parser config.ConfigParser) error { - for name, config := range serviceConfig { - creator := GetServiceCreator(name) - if creator == nil { - return fmt.Errorf("no corresponding module %v, please check the module name", name) - } - service := creator() - if err := parser.UnmarshalSubConfig(config, service); err != nil { - return fmt.Errorf("error unmarshaling %s config: %v", name, err) - } - - if SetLoggerOnService(service, - log.WithCtx(context.WithValue(context.Background(), log.CtxKey(constant.LogEntryKey), name))) { - log.Debugf("set logger for service: %s", name) +func (manager *ServiceManager) InitServices(feature []string, serviceConfig map[string]interface{}, parser config.ConfigParser) error { + for _, f := range feature { + s, err := services.GetServiceComponent(f) + if err != nil { + return fmt.Errorf("get component failed %s: %v", f, err) } - // try to verify configuration - if validator, ok := service.(Validator); ok { - if err := validator.Validate(); err != nil { - return fmt.Errorf("error configuring service %s: %v", name, err) + if config := serviceConfig[s.ID()]; config != nil { + if err := parser.UnmarshalSubConfig(config, s); err != nil { + return fmt.Errorf("error unmarshaling %s config: %v", f, err) + } + if err := s.Validate(); err != nil { + return fmt.Errorf("error configuring service %s: %v", f, err) } } - if err := manager.AddRunningService(name, service); err != nil { + if err := manager.addRunningService(f, s); err != nil { return err } } @@ -85,14 +78,13 @@ func (manager *ServiceManager) InitServices(serviceConfig map[string]interface{} } // AddRunningService adds a to-be-run service -func (manager *ServiceManager) AddRunningService(name string, service interface{}) error { +func (manager *ServiceManager) addRunningService(name string, service services.Service) error { manager.RLock() - _, existed1 := manager.RunningServices[name] - _, existed2 := manager.RunningPersistentServices[name] - manager.RUnlock() - if existed1 || existed2 { + if _, existed := manager.RunningServices[name]; existed { return fmt.Errorf("service name conflict: %s", name) } + manager.RUnlock() + addService := manager.tryAddService(name, service) addPersistentService := manager.tryAddPersistentService(name, service) @@ -190,14 +182,14 @@ func (manager *ServiceManager) Setup(v api.Viewer) error { } preStarted := make(map[string]struct{}, 0) manager.Viewer = v - manager.TerminateFuncs = make(map[string]Terminator) + manager.TerminateFuncs = make(map[string]services.Terminator) setupFunc := func(id string, s interface{}) error { // 1. record the termination function of the service that has been setup - if t, ok := s.(Terminator); ok { + if t, ok := s.(services.Terminator); ok { manager.TerminateFuncs[id] = t } // 2. execute the pre-start function of the service - p, ok := s.(PreStarter) + p, ok := s.(services.PreStarter) if !ok { return nil } @@ -345,18 +337,3 @@ func (manager *ServiceManager) deleteFunc(event typedef.Event) { wg.Wait() manager.RUnlock() } - -// Terminator is an interface that calls a collection of methods when the service terminates -type Terminator interface { - Terminate(api.Viewer) error -} - -// PreStarter is an interface for calling a collection of methods when the service is pre-started -type PreStarter interface { - PreStart(api.Viewer) error -} - -// Validator is a function interface to verify whether the configuration is correct or not -type Validator interface { - Validate() error -} diff --git a/pkg/services/blkio/blkio.go b/pkg/services/blkcg/blkio_throttle.go similarity index 36% rename from pkg/services/blkio/blkio.go rename to pkg/services/blkcg/blkio_throttle.go index 522284a..774160b 100644 --- a/pkg/services/blkio/blkio.go +++ b/pkg/services/blkcg/blkio_throttle.go @@ -1,10 +1,20 @@ -package blkio +package blkcg -import ( - "isula.org/rubik/pkg/api" - "isula.org/rubik/pkg/core/typedef" - "isula.org/rubik/pkg/services" -) +import "isula.org/rubik/pkg/services/helper" + +type BlkioThrottleFactory struct { + ObjName string +} + +func (i BlkioThrottleFactory) Name() string { + return "BlkioThrottleFactory" +} + +func (i BlkioThrottleFactory) NewObj() interface{} { + return &blkioThrottle{name: i.ObjName} +} + +//--------------------------------------------- // DeviceConfig defines blkio device configurations type DeviceConfig struct { @@ -12,50 +22,18 @@ type DeviceConfig struct { DeviceValue string `json:"value,omitempty"` } -type BlkioConfig struct { +type BlkioAnnotationConfig struct { DeviceReadBps []DeviceConfig `json:"device_read_bps,omitempty"` DeviceWriteBps []DeviceConfig `json:"device_write_bps,omitempty"` DeviceReadIops []DeviceConfig `json:"device_read_iops,omitempty"` DeviceWriteIops []DeviceConfig `json:"device_write_iops,omitempty"` } -type Blkio struct { - Name string `json:"-"` - Log api.Logger -} - -func init() { - services.Register("blkio", func() interface{} { - return NewBlkio() - }) -} - -func NewBlkio() *Blkio { - return &Blkio{Name: "blkio"} -} - -func (b *Blkio) PreStart(viewer api.Viewer) error { - b.Log.Debugf("blkio prestart") - return nil -} - -func (b *Blkio) Terminate(viewer api.Viewer) error { - b.Log.Infof("blkio Terminate") - return nil -} - -func (b *Blkio) ID() string { - return b.Name -} - -func (b *Blkio) AddFunc(podInfo *typedef.PodInfo) error { - return nil -} - -func (b *Blkio) UpdateFunc(old, new *typedef.PodInfo) error { - return nil +type blkioThrottle struct { + helper.ServiceTemplate + name string } -func (b *Blkio) DeleteFunc(podInfo *typedef.PodInfo) error { - return nil +func (b *blkioThrottle) ID() string { + return b.name } diff --git a/pkg/services/cachelimit/cachelimit.go b/pkg/services/cachelimit/cachelimit.go index a66c6e9..50c643e 100644 --- a/pkg/services/cachelimit/cachelimit.go +++ b/pkg/services/cachelimit/cachelimit.go @@ -22,8 +22,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "isula.org/rubik/pkg/api" - Log "isula.org/rubik/pkg/common/log" - "isula.org/rubik/pkg/services" + "isula.org/rubik/pkg/services/helper" ) // default value @@ -38,7 +37,6 @@ const ( defaultHighMB = 50 defaultMaxMiss = 20 defaultMinMiss = 10 - moduleName = "cacheLimit" defaultResctrlDir = "/sys/fs/resctrl" defaultNumaNodeDir = "/sys/devices/system/node" defaultPidNameSpace = "/proc/self/ns/pid" @@ -89,10 +87,11 @@ type Config struct { // CacheLimit is cache limit service structure type CacheLimit struct { + helper.ServiceTemplate *Config Attr *Attr + Name string Viewer api.Viewer - Name string `json:"-"` } // Attr is cache limit attribute differ from config @@ -113,25 +112,29 @@ type Attr struct { MinMiss int } +// -------------------------------------- // log is global logger for cache limit service -var log api.Logger +var ( + log helper.Logger +) + +type CacheLimitFactory struct { + ObjName string +} -func init() { - log = &Log.EmptyLog{} - services.Register(moduleName, func() interface{} { - return NewCacheLimit() - }) +func (i CacheLimitFactory) Name() string { + return "CacheLimitFactory" } -// SetupLog initializes the log interface for the module -func (c *CacheLimit) SetupLog(logger api.Logger) { - log = logger +func (i CacheLimitFactory) NewObj() interface{} { + log = helper.GetLogger() + return NewCacheLimit(i.ObjName) } // NewCacheLimit return cache limit instance with default settings -func NewCacheLimit() *CacheLimit { +func NewCacheLimit(name string) *CacheLimit { return &CacheLimit{ - Name: moduleName, + Name: name, Attr: &Attr{ NumaNodeDir: defaultNumaNodeDir, MaxMiss: defaultMaxMiss, diff --git a/pkg/services/cachelimit/cachelimit_test.go b/pkg/services/cachelimit/cachelimit_test.go index 35e6783..909ecc6 100644 --- a/pkg/services/cachelimit/cachelimit_test.go +++ b/pkg/services/cachelimit/cachelimit_test.go @@ -23,6 +23,10 @@ import ( "isula.org/rubik/pkg/api" ) +var ( + moduleName = "CacheLimit" +) + func TestCacheLimit_Validate(t *testing.T) { type fields struct { Config *Config @@ -225,7 +229,7 @@ func TestNewCacheLimit(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := NewCacheLimit(); !reflect.DeepEqual(got, tt.want) { + if got := NewCacheLimit(moduleName); !reflect.DeepEqual(got, tt.want) { t.Errorf("NewCacheLimit() = %v, want %v", got, tt.want) } }) diff --git a/pkg/services/component.go b/pkg/services/component.go new file mode 100644 index 0000000..ecae078 --- /dev/null +++ b/pkg/services/component.go @@ -0,0 +1,48 @@ +package services + +import ( + "isula.org/rubik/pkg/feature" + "isula.org/rubik/pkg/services/blkcg" + "isula.org/rubik/pkg/services/cachelimit" + "isula.org/rubik/pkg/services/helper" + "isula.org/rubik/pkg/services/qos" + "isula.org/rubik/pkg/services/quotaburst" + "isula.org/rubik/pkg/services/quotaturbo" +) + +type ServiceComponent func(name string) error + +var ( + serviceComponents = map[string]ServiceComponent{ + feature.Feature_Blkcg_Blkio_Throttle: initBlkioThrottleFactory, + feature.Feature_Memcg_Cache_Limit: initCacheLimitFactory, + feature.Feature_cpucg_quota_turbo: initQuotaTurboFactory, + feature.Feature_cpucg_quota_burst: initQuotaBurstFactory, + feature.Feature_qos_level: initQoSFactory, + } +) + +func initBlkioThrottleFactory(name string) error { + helper.AddFactory(name, blkcg.BlkioThrottleFactory{ObjName: name}) + return nil +} + +func initCacheLimitFactory(name string) error { + helper.AddFactory(name, cachelimit.CacheLimitFactory{ObjName: name}) + return nil +} + +func initQuotaTurboFactory(name string) error { + helper.AddFactory(name, quotaturbo.QuotaTurboFactory{ObjName: name}) + return nil +} + +func initQuotaBurstFactory(name string) error { + helper.AddFactory(name, quotaburst.QuotaBurstFactory{ObjName: name}) + return nil +} + +func initQoSFactory(name string) error { + helper.AddFactory(name, qos.QoSFactory{ObjName: name}) + return nil +} diff --git a/pkg/services/helper/factory.go b/pkg/services/helper/factory.go new file mode 100644 index 0000000..6b6fcd1 --- /dev/null +++ b/pkg/services/helper/factory.go @@ -0,0 +1,32 @@ +package helper + +import ( + "errors" + "sync" +) + +type ServiceFactory interface { + Name() string + NewObj() interface{} +} + +var ( + rwlock sync.RWMutex + serviceFactories = map[string]ServiceFactory{} +) + +func AddFactory(name string, factory ServiceFactory) { + rwlock.Lock() + defer rwlock.Unlock() + serviceFactories[name] = factory +} + +func GetComponent(name string) (interface{}, error) { + rwlock.RLock() + defer rwlock.RUnlock() + if f, found := serviceFactories[name]; found { + return f.NewObj(), nil + } else { + return nil, errors.New("factory is not found") + } +} diff --git a/pkg/services/helper/logger.go b/pkg/services/helper/logger.go new file mode 100644 index 0000000..c2ad51e --- /dev/null +++ b/pkg/services/helper/logger.go @@ -0,0 +1,35 @@ +package helper + +import ( + "sync" +) + +// Logger is the handler to print the log +type Logger interface { + // Errorf logs bugs that affect normal functionality + Errorf(f string, args ...interface{}) + // Warnf logs produce unexpected results + Warnf(f string, args ...interface{}) + // Infof logs normal messages + Infof(f string, args ...interface{}) + // Debugf logs verbose messages + Debugf(f string, args ...interface{}) +} + +var ( + log Logger + once sync.Once +) + +func SetLogger(l Logger) { + once.Do(func() { + log = l + }) +} + +func GetLogger() Logger { + if log == nil { + panic("no logger") + } + return log +} diff --git a/pkg/services/helper/template.go b/pkg/services/helper/template.go new file mode 100644 index 0000000..d586d7b --- /dev/null +++ b/pkg/services/helper/template.go @@ -0,0 +1,56 @@ +package helper + +import ( + "fmt" + + "isula.org/rubik/pkg/api" + "isula.org/rubik/pkg/core/typedef" +) + +type ServiceTemplate struct{} + +// ID is the name of plugin, must be unique. +func (s *ServiceTemplate) ID() string { + panic("this interface must be implemented.") +} + +// PreStarter is an interface for calling a collection of methods when the service is pre-started +func (s *ServiceTemplate) PreStart(api.Viewer) error { + return nil +} + +// Terminator is an interface that calls a collection of methods when the service terminates +func (s *ServiceTemplate) Terminate(api.Viewer) error { + return nil +} + +// Confirm whether it is +func (s *ServiceTemplate) IsRunner() bool { + return false +} + +// Start runner +func (s *ServiceTemplate) Run() error { + return fmt.Errorf("i am not runner") +} + +// Stop runner +func (s *ServiceTemplate) Stop() error { + return fmt.Errorf("i am not runner") +} + +func (s *ServiceTemplate) AddPod(podInfo *typedef.PodInfo) error { + return nil +} + +func (S *ServiceTemplate) UpdatePod(old, new *typedef.PodInfo) error { + return nil +} + +func (s *ServiceTemplate) DeletePod(podInfo *typedef.PodInfo) error { + return nil +} + +func (s *ServiceTemplate) Validate() error { + return nil +} diff --git a/pkg/services/qos/qos.go b/pkg/services/qos/qos.go index 4bf9d81..1f9b22a 100644 --- a/pkg/services/qos/qos.go +++ b/pkg/services/qos/qos.go @@ -22,9 +22,28 @@ import ( "isula.org/rubik/pkg/common/constant" "isula.org/rubik/pkg/core/typedef" "isula.org/rubik/pkg/core/typedef/cgroup" - "isula.org/rubik/pkg/services" + "isula.org/rubik/pkg/services/helper" ) +type QoSFactory struct { + ObjName string +} + +var logger helper.Logger + +func (i QoSFactory) Name() string { + return "QoSFactory" +} + +func (i QoSFactory) NewObj() interface{} { + logger = helper.GetLogger() + return &QoS{ + Name: i.ObjName, + } +} + +//--------------------------------------------- + var supportCgroupTypes = map[string]*cgroup.Key{ "cpu": {SubSys: "cpu", FileName: constant.CPUCgroupFileName}, "memory": {SubSys: "memory", FileName: constant.MemoryCgroupFileName}, @@ -32,8 +51,8 @@ var supportCgroupTypes = map[string]*cgroup.Key{ // QoS define service which related to qos level setting type QoS struct { - Name string `json:"-"` - Log api.Logger + helper.ServiceTemplate + Name string Config } @@ -42,19 +61,6 @@ type Config struct { SubSys []string `json:"subSys"` } -func init() { - services.Register("qos", func() interface{} { - return NewQoS() - }) -} - -// NewQoS return qos instance -func NewQoS() *QoS { - return &QoS{ - Name: "qos", - } -} - // ID return qos service name func (q *QoS) ID() string { return q.Name @@ -64,7 +70,7 @@ func (q *QoS) ID() string { func (q *QoS) PreStart(viewer api.Viewer) error { for _, pod := range viewer.ListPodsWithOptions() { if err := q.SetQoS(pod); err != nil { - q.Log.Errorf("error prestart pod %v: %v", pod.Name, err) + logger.Errorf("error prestart pod %v: %v", pod.Name, err) } } return nil @@ -128,7 +134,7 @@ func (q *QoS) SetQoS(pod *typedef.PodInfo) error { } qosLevel := getQoSLevel(pod) if qosLevel == constant.Online { - q.Log.Debugf("pod %s already online", pod.Name) + logger.Debugf("pod %s already online", pod.Name) return nil } @@ -142,7 +148,7 @@ func (q *QoS) SetQoS(pod *typedef.PodInfo) error { } } } - q.Log.Debugf("set pod %s(%s) qos level %d ok", pod.Name, pod.UID, qosLevel) + logger.Debugf("set pod %s(%s) qos level %d ok", pod.Name, pod.UID, qosLevel) return nil } diff --git a/pkg/services/qos/qos_test.go b/pkg/services/qos/qos_test.go index 77899a1..f4dd395 100644 --- a/pkg/services/qos/qos_test.go +++ b/pkg/services/qos/qos_test.go @@ -15,12 +15,9 @@ package qos import ( - "context" "testing" - "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" - "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/core/typedef/cgroup" "isula.org/rubik/test/try" ) @@ -31,7 +28,6 @@ func init() { type fields struct { Name string - Log api.Logger Config Config } type args struct { @@ -50,7 +46,6 @@ type test struct { var getCommonField = func(subSys []string) fields { return fields{ Name: "qos", - Log: log.WithCtx(context.WithValue(context.Background(), log.CtxKey(constant.LogEntryKey), "qos")), Config: Config{SubSys: subSys}, } } @@ -112,7 +107,6 @@ func TestQoS_AddFunc(t *testing.T) { t.Run(tt.name, func(t *testing.T) { q := &QoS{ Name: tt.fields.Name, - Log: tt.fields.Log, Config: tt.fields.Config, } if tt.preHook != nil { @@ -171,7 +165,6 @@ func TestQoS_UpdateFunc(t *testing.T) { t.Run(tt.name, func(t *testing.T) { q := &QoS{ Name: tt.fields.Name, - Log: tt.fields.Log, Config: tt.fields.Config, } if tt.preHook != nil { @@ -192,7 +185,6 @@ func TestQoS_Validate(t *testing.T) { name: "TC1-normal config", fields: fields{ Name: "qos", - Log: log.WithCtx(context.WithValue(context.Background(), log.CtxKey(constant.LogEntryKey), "qos")), Config: Config{SubSys: []string{"cpu", "memory"}}, }, }, @@ -200,7 +192,6 @@ func TestQoS_Validate(t *testing.T) { name: "TC2-abnormal config", fields: fields{ Name: "undefine", - Log: log.WithCtx(context.WithValue(context.Background(), log.CtxKey(constant.LogEntryKey), "qos")), Config: Config{SubSys: []string{"undefine"}}, }, wantErr: true, @@ -215,7 +206,6 @@ func TestQoS_Validate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { q := &QoS{ Name: tt.fields.Name, - Log: tt.fields.Log, Config: tt.fields.Config, } if err := q.Validate(); (err != nil) != tt.wantErr { diff --git a/pkg/services/quotaburst/quotaburst.go b/pkg/services/quotaburst/quotaburst.go index 1d9258c..f8c6c63 100644 --- a/pkg/services/quotaburst/quotaburst.go +++ b/pkg/services/quotaburst/quotaburst.go @@ -24,46 +24,42 @@ import ( "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" - Log "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/core/typedef" "isula.org/rubik/pkg/core/typedef/cgroup" - "isula.org/rubik/pkg/services" + "isula.org/rubik/pkg/services/helper" ) const ( moduleName = "quotaburst" ) -var log api.Logger +var log helper.Logger -func init() { - log = &Log.EmptyLog{} - services.Register(moduleName, func() interface{} { - return NewBurst() - }) +type QuotaBurstFactory struct { + ObjName string } -// Burst is used to control cpu burst -type Burst struct { - Name string `json:"-"` +func (i QuotaBurstFactory) Name() string { + return "BurstFactory" } -// NewBurst return an new Burst pointer -func NewBurst() *Burst { - return &Burst{ - Name: moduleName, - } +func (i QuotaBurstFactory) NewObj() interface{} { + log = helper.GetLogger() + return &Burst{Name: i.ObjName} } -// SetupLog initializes the log interface for the module -func (b *Burst) SetupLog(logger api.Logger) { - log = logger +// -------------------- + +// Burst is used to control cpu burst +type Burst struct { + helper.ServiceTemplate + Name string } // ID returns the module name func (b *Burst) ID() string { - return moduleName + return b.Name } // AddFunc implement add function when pod is added in k8s diff --git a/pkg/services/quotaburst/quotaburst_test.go b/pkg/services/quotaburst/quotaburst_test.go index d311c33..6c29bff 100644 --- a/pkg/services/quotaburst/quotaburst_test.go +++ b/pkg/services/quotaburst/quotaburst_test.go @@ -21,7 +21,6 @@ import ( "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" - Log "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/core/typedef" "isula.org/rubik/pkg/core/typedef/cgroup" "isula.org/rubik/pkg/podmanager" @@ -34,6 +33,13 @@ var ( cfsPeriodUs = &cgroup.Key{SubSys: "cpu", FileName: "cpu.cfs_period_us"} ) +// NewBurst return an new Burst pointer +func NewBurst() *Burst { + return &Burst{ + Name: moduleName, + } +} + // TestBurst_AddFunc tests AddFunc func TestBurst_AddFunc(t *testing.T) { type args struct { @@ -170,7 +176,6 @@ func TestOther(t *testing.T) { const tcName = "TC1-test Other" t.Run(tcName, func(t *testing.T) { got := NewBurst() - got.SetupLog(&Log.EmptyLog{}) assert.NoError(t, got.DeleteFunc(&typedef.PodInfo{})) assert.Equal(t, moduleName, got.ID()) }) diff --git a/pkg/services/quotaturbo/quotaturbo.go b/pkg/services/quotaturbo/quotaturbo.go index fcb2702..bd1b152 100644 --- a/pkg/services/quotaturbo/quotaturbo.go +++ b/pkg/services/quotaturbo/quotaturbo.go @@ -23,25 +23,35 @@ import ( "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" - Log "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/core/typedef" - "isula.org/rubik/pkg/services" + "isula.org/rubik/pkg/services/helper" ) -const moduleName = "quotaturbo" +// --------------------------------- +var log helper.Logger -var log api.Logger +type QuotaTurboFactory struct { + ObjName string +} -func init() { - log = &Log.EmptyLog{} - services.Register(moduleName, func() interface{} { - return NewQuotaTurbo() - }) +func (i QuotaTurboFactory) Name() string { + return "QuotaTurboFactory" +} + +func (i QuotaTurboFactory) NewObj() interface{} { + log = helper.GetLogger() + return &QuotaTurbo{ + Name: i.ObjName, + NodeData: NewNodeData(), + Driver: &EventDriver{}, + } } // QuotaTurbo manages all container CPU quota data on the current node. type QuotaTurbo struct { + helper.ServiceTemplate + Name string // NodeData including the container data, CPU usage, and QuotaTurbo configuration of the local node *NodeData // interfaces with different policies @@ -58,14 +68,9 @@ func NewQuotaTurbo() *QuotaTurbo { } } -// SetupLog initializes the log interface for the module -func (qt *QuotaTurbo) SetupLog(logger api.Logger) { - log = logger -} - // ID returns the module name func (qt *QuotaTurbo) ID() string { - return moduleName + return qt.Name } // AdjustQuota adjusts the quota of a container at a time diff --git a/pkg/services/quotaturbo/quotaturbo_test.go b/pkg/services/quotaturbo/quotaturbo_test.go index 52ca780..fd134d6 100644 --- a/pkg/services/quotaturbo/quotaturbo_test.go +++ b/pkg/services/quotaturbo/quotaturbo_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" "isula.org/rubik/pkg/common/constant" - Log "isula.org/rubik/pkg/common/log" "isula.org/rubik/pkg/common/util" "isula.org/rubik/pkg/core/typedef" "isula.org/rubik/pkg/podmanager" @@ -339,8 +338,6 @@ func TestNewQuotaTurbo(t *testing.T) { testName := "TC1-test otherv functions" t.Run(testName, func(t *testing.T) { got := NewQuotaTurbo() - got.SetupLog(&Log.EmptyLog{}) - assert.Equal(t, moduleName, got.ID()) got.Viewer = &podmanager.PodManager{ Pods: &podmanager.PodCache{ Pods: map[string]*typedef.PodInfo{ diff --git a/pkg/services/registry.go b/pkg/services/registry.go deleted file mode 100644 index 28f2d99..0000000 --- a/pkg/services/registry.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2023. All rights reserved. -// rubik licensed under the Mulan PSL v2. -// You can use this software according to the terms and conditions of the Mulan PSL v2. -// You may obtain a copy of Mulan PSL v2 at: -// http://license.coscl.org.cn/MulanPSL2 -// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR -// PURPOSE. -// See the Mulan PSL v2 for more details. -// Author: Jiaqi Yang -// Create: 2023-01-28 -// Description: This file defines registry for service registration - -// Package services implements service registration, discovery and management functions -package services - -import ( - "sync" -) - -type ( - // Creator creates Service objects - Creator func() interface{} - // registry is used for service registration - registry struct { - sync.RWMutex - // services is a collection of all registered service - services map[string]Creator - } -) - -// servicesRegistry is the globally unique registry -var servicesRegistry = ®istry{ - services: make(map[string]Creator, 0), -} - -// Register is used to register the service creators -func Register(name string, creator Creator) { - servicesRegistry.Lock() - servicesRegistry.services[name] = creator - servicesRegistry.Unlock() -} - -// GetServiceCreator returns the service creator based on the incoming service name -func GetServiceCreator(name string) Creator { - servicesRegistry.RLock() - creator, ok := servicesRegistry.services[name] - servicesRegistry.RUnlock() - if !ok { - return nil - } - return creator -} diff --git a/pkg/services/services.go b/pkg/services/services.go new file mode 100644 index 0000000..72af484 --- /dev/null +++ b/pkg/services/services.go @@ -0,0 +1,89 @@ +package services + +import ( + "fmt" + + "isula.org/rubik/pkg/api" + "isula.org/rubik/pkg/core/typedef" + "isula.org/rubik/pkg/services/helper" +) + +// PodEvent for listening to pod changes. +type PodEvent interface { + // Deal processing adding a pod. + AddPod(podInfo *typedef.PodInfo) error + + // Deal processing update a pod config. + UpdatePod(old, new *typedef.PodInfo) error + + // Deal processing delete a pod. + DeletePod(podInfo *typedef.PodInfo) error +} + +// Runner for background service process. +type Runner interface { + // Confirm whether it is + IsRunner() bool + + // Start runner + Run() error + + // Stop runner + Stop() error +} + +// Service interface contains methods which must be implemented by all services. +type Service interface { + Runner + + PodEvent + + // ID is the name of plugin, must be unique. + ID() string + + // PreStarter is an interface for calling a collection of methods when the service is pre-started + PreStart(api.Viewer) error + + // Terminator is an interface that calls a collection of methods when the service terminates + Terminate(api.Viewer) error + + Validate() error +} + +type FeatureSpec struct { + // feature name + Name string + + // Default is the default enablement state for the feature + Default bool +} + +func InitServiceComponents(specs []FeatureSpec) { + for _, spec := range specs { + if spec.Default { + if initFunc, found := serviceComponents[spec.Name]; found { + initFunc(spec.Name) + } else { + helper.GetLogger().Warnf("init service failed, name:%v", spec.Name) + continue + } + } else { + helper.GetLogger().Infof("disable feature:%v", spec.Name) + } + } +} + +func GetServiceComponent(name string) (Service, error) { + if s, err := helper.GetComponent(name); err == nil { + if si, ok := s.(Service); ok { + return si, nil + } + } + return nil, fmt.Errorf("get service failed, name:%v", name) +} + +// ------------------------- +// SetLogger for setting logger +func SetLogger(log helper.Logger) { + helper.SetLogger(log) +} -- Gitee