From fc52f11f22a713d87a2e0d2ce4f4422282745767 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9E=97=E7=8E=A0=E4=BA=BA?= Date: Tue, 9 Jul 2024 16:58:51 +0800 Subject: [PATCH] feat: hot load file MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: 林玠人 --- src/app/server/cmd/commands/root.go | 36 +- src/app/server/cmd/options/options.go | 179 ++++- src/app/server/network/httpserver.go | 14 +- src/app/server/network/socketserver.go | 44 +- src/app/server/network/websocket/client.go | 7 +- src/app/server/service/auth/casbin.go | 56 +- src/app/server/service/eventbus/eventbus.go | 13 +- src/app/server/service/plugin/heartbeat.go | 14 +- src/app/server/service/plugin/plugin.go | 2 +- src/config_server.yaml.templete | 2 +- src/dbmanager/redismanager/redismanager.go | 15 +- src/go.mod | 3 +- src/vendor/k8s.io/apimachinery/LICENSE | 202 +++++ .../apimachinery/pkg/util/clock/clock.go | 445 +++++++++++ .../apimachinery/pkg/util/runtime/runtime.go | 173 ++++ .../k8s.io/apimachinery/pkg/util/wait/doc.go | 19 + .../k8s.io/apimachinery/pkg/util/wait/wait.go | 752 ++++++++++++++++++ src/vendor/modules.txt | 5 + 18 files changed, 1893 insertions(+), 88 deletions(-) create mode 100644 src/vendor/k8s.io/apimachinery/LICENSE create mode 100644 src/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go create mode 100644 src/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go create mode 100644 src/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go create mode 100644 src/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go diff --git a/src/app/server/cmd/commands/root.go b/src/app/server/cmd/commands/root.go index 42863eab..293af5aa 100644 --- a/src/app/server/cmd/commands/root.go +++ b/src/app/server/cmd/commands/root.go @@ -3,6 +3,7 @@ package commands import ( "context" "fmt" + "sync/atomic" "github.com/pkg/errors" "k8s.io/klog/v2" @@ -22,13 +23,29 @@ import ( const flagconfig = "conf" +var conut int64 + func NewServerCommand() *cobra.Command { + s := options.NewServerOptions() + conf, err := options.TryLoadFromDisk() + if err == nil { + s.ServerConfig = conf + klog.InfoS("TryLoadFromDisk pilotgo config !", "HttpServer", *s.ServerConfig.HttpServer) + klog.InfoS("TryLoadFromDisk pilotgo config !", "SocketServer", *s.ServerConfig.SocketServer) + klog.InfoS("TryLoadFromDisk pilotgo config !", "JWT", *s.ServerConfig.JWT) + klog.InfoS("TryLoadFromDisk pilotgo config !", "Logopts", *s.ServerConfig.Logopts) + klog.InfoS("TryLoadFromDisk pilotgo config !", "RedisDBinfo", *s.ServerConfig.RedisDBinfo) + klog.InfoS("TryLoadFromDisk pilotgo config !", "MysqlDBinfo", *s.ServerConfig.MysqlDBinfo) + klog.InfoS("TryLoadFromDisk pilotgo config !", "Storage", *s.ServerConfig.Storage) + } else { + klog.Fatal("Failed to load configuration from disk", err) + } cmd := &cobra.Command{ Use: "pilotgo", Long: `Run the pilotgo API server`, RunE: func(cmd *cobra.Command, args []string) error { - return Run(s, signals.SetupSignalHandler(), cmd, nil) + return Run(s, signals.SetupSignalHandler(), cmd, options.WatchConfigChange()) }, SilenceUsage: true, FParseErrWhitelist: cobra.FParseErrWhitelist{ @@ -47,7 +64,13 @@ func NewServerCommand() *cobra.Command { cmd.AddCommand(versionCmd) return cmd } + func run(_ *options.ServerOptions, ctx context.Context, cmd *cobra.Command) error { + if atomic.LoadInt64(&conut) > 0 { + return nil + } + atomic.AddInt64(&conut, 1) + config_file, err := cmd.Flags().GetString(flagconfig) if err != nil { return errors.Wrapf(err, "error accessing flag %s for command %s", flagconfig, cmd.Name()) @@ -88,7 +111,7 @@ func run(_ *options.ServerOptions, ctx context.Context, cmd *cobra.Command) erro //此处启动前端及REST http server err = network.HttpServerInit(&config.Config().HttpServer, ctx.Done()) if err != nil { - logger.Error("socket server init failed, error:%v", err) + logger.Error("HttpServerInit socket server init failed, error:%v", err) return err } @@ -101,8 +124,8 @@ func run(_ *options.ServerOptions, ctx context.Context, cmd *cobra.Command) erro // 前端推送告警 go websocket.SendWarnMsgToWeb(ctx.Done()) - logger.Info("start to serve.") - + logger.Info("start to serve") + atomic.AddInt64(&conut, -1) // 信号监听 redis return nil @@ -120,7 +143,7 @@ func startServices(stopCh <-chan struct{}) error { return nil } -func Run(s *options.ServerOptions, ctx context.Context, cmd *cobra.Command, configCh <-chan string) error { +func Run(s *options.ServerOptions, ctx context.Context, cmd *cobra.Command, configCh <-chan options.ServerConfig) error { cctx, cancelFunc := context.WithCancel(context.TODO()) errCh := make(chan error) @@ -138,8 +161,9 @@ func Run(s *options.ServerOptions, ctx context.Context, cmd *cobra.Command, conf klog.Warningln("pilotgo exit bye") return nil case cfg := <-configCh: + klog.Warningln("config is change") cancelFunc() - s.Config = cfg + s.ServerConfig = &cfg cctx, cancelFunc = context.WithCancel(context.TODO()) go func() { if err := runer(s, cctx, cmd); err != nil { diff --git a/src/app/server/cmd/options/options.go b/src/app/server/cmd/options/options.go index bf015779..f7e8ede4 100644 --- a/src/app/server/cmd/options/options.go +++ b/src/app/server/cmd/options/options.go @@ -1,12 +1,187 @@ package options +import ( + "crypto/md5" + "encoding/hex" + "fmt" + "os" + "strings" + "sync" + "time" + + "gitee.com/openeuler/PilotGo/sdk/logger" + "github.com/fsnotify/fsnotify" + "github.com/spf13/viper" + "k8s.io/klog/v2" +) + +/****************************************************************************** + * Copyright (c) KylinSoft Co., Ltd.2021-2022. All rights reserved. + * PilotGo is licensed under the Mulan PSL v2. + * You can use this software accodring to the terms and conditions of the Mulan PSL v2. + * You may obtain a copy of Mulan PSL v2 at: + * http://license.coscl.org.cn/MulanPSL2 + * THIS SOFTWARE IS PROVIDED ON AN 'AS IS' BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PSL v2 for more details. + * Author: yangzhao1 + * Date: 2022-04-06 13:27:45 + * LastEditTime: 2023-09-04 16:16:36 + * Description: provide agent log manager of pilotgo + ******************************************************************************/ + +const ( + defaultConfigurationName = "config_server" + defaultConfigurationPath = "/opt/PilotGo/server" +) + +type HttpServer struct { + Addr string `yaml:"addr" mapstructure:"addr"` + SessionCount int `yaml:"session_count" mapstructure:"session_count"` + SessionMaxAge int `yaml:"session_max_age" mapstructure:"session_max_age"` + Debug bool `yaml:"debug" mapstructure:"debug"` + UseHttps bool `yaml:"use_https" mapstructure:"use_https"` + CertFile string `yaml:"cert_file" mapstructure:"cert_file"` + KeyFile string `yaml:"key_file" mapstructure:"key_file"` +} + +type SocketServer struct { + Addr string `yaml:"addr" mapstructure:"addr"` +} + +type JWTConfig struct { + SecretKey string `yaml:"secret_key" mapstructure:"secret_key"` +} + +type MysqlDBInfo struct { + HostName string `yaml:"host_name" mapstructure:"host_name"` + UserName string `yaml:"user_name" mapstructure:"user_name"` + Password string `yaml:"password" mapstructure:"password"` + DataBase string `yaml:"data_base" mapstructure:"data_base"` + Port int `yaml:"port" mapstructure:"port"` +} + +type RedisDBInfo struct { + RedisConn string `yaml:"redis_conn" mapstructure:"redis_conn"` + UseTLS bool `yaml:"use_tls" mapstructure:"use_tls"` + RedisPwd string `yaml:"redis_pwd" mapstructure:"redis_pwd"` + DefaultDB int `yaml:"defaultDB" mapstructure:"defaultDB"` + DialTimeout time.Duration `yaml:"dialTimeout" mapstructure:"dialTimeout"` + EnableRedis bool `yaml:"enableRedis" mapstructure:"enableRedis"` +} + +type Storage struct { + Path string `yaml:"path" mapstructure:"path"` +} + +type ServerConfig struct { + HttpServer *HttpServer `yaml:"http_server" mapstructure:"http_server"` + SocketServer *SocketServer `yaml:"socket_server" mapstructure:"socket_server"` + JWT *JWTConfig `api:"jwt" yaml:"jwt" mapstructure:"jwt"` + Logopts *logger.LogOpts `yaml:"log" mapstructure:"log"` + MysqlDBinfo *MysqlDBInfo `yaml:"mysql" mapstructure:"mysql"` + RedisDBinfo *RedisDBInfo `yaml:"redis" mapstructure:"redis"` + Storage *Storage `yaml:"storage" mapstructure:"storage"` +} type ServerOptions struct { - Config string + Config string + ServerConfig *ServerConfig } func NewServerOptions() *ServerOptions { s := &ServerOptions{ - Config: "./config_server.yaml", + Config: "./config_server.yaml", + ServerConfig: New(), } return s } +func (s *ServerOptions) NewAPIServer(stopCh <-chan struct{}) { +} + +var ( + _config = defaultConfig() +) + +type pilotgoConfig struct { + cfg *ServerConfig + cfgChangeCh chan ServerConfig + watchOnce sync.Once + loadOnce sync.Once +} + +func ReadFileMd5(sfile string) (string, error) { + ssconfig, err := os.ReadFile(sfile) + if err != nil { + return "", err + } + return getMD5(ssconfig), nil +} +func getMD5(s []byte) string { + m := md5.New() + m.Write([]byte(s)) + return hex.EncodeToString(m.Sum(nil)) +} +func WatchConfigChange() <-chan ServerConfig { + return _config.watchConfig() +} +func (c *pilotgoConfig) watchConfig() <-chan ServerConfig { + c.watchOnce.Do(func() { + viper.WatchConfig() + viper.OnConfigChange(func(in fsnotify.Event) { + cfg := New() + if err := viper.Unmarshal(cfg); err != nil { + klog.Errorf("config reload error", err) + } else { + klog.InfoS("watchConfig pilotgo config !", "HttpServer", cfg.HttpServer) + klog.InfoS("watchConfig pilotgo config !", "SocketServer", cfg.SocketServer) + klog.InfoS("watchConfig pilotgo config !", "JWT", cfg.JWT) + klog.InfoS("watchConfig pilotgo config !", "Logopts", cfg.Logopts) + klog.InfoS("watchConfig pilotgo config !", "RedisDBinfo", cfg.RedisDBinfo) + klog.InfoS("watchConfig pilotgo config !", "MysqlDBinfo", cfg.MysqlDBinfo) + klog.InfoS("watchConfig pilotgo config !", "Storage", cfg.Storage) + if in.Op&fsnotify.Write != 0 && len(viper.AllKeys()) > 0 { + c.cfgChangeCh <- *cfg + } + } + }) + }) + return c.cfgChangeCh +} + +func New() *ServerConfig { + return &ServerConfig{} +} +func TryLoadFromDisk() (*ServerConfig, error) { + return _config.loadFromDisk() +} + +func (c *pilotgoConfig) loadFromDisk() (*ServerConfig, error) { + var err error + c.loadOnce.Do(func() { + if err = viper.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); !ok { + err = fmt.Errorf("error parsing configuration file %s", err) + } + } + err = viper.Unmarshal(c.cfg) + if err != nil { + klog.Errorf("viper error: %v /n", err) + } + }) + return c.cfg, err +} +func defaultConfig() *pilotgoConfig { + viper.SetConfigName(defaultConfigurationName) + viper.AddConfigPath(".") + viper.AddConfigPath(defaultConfigurationPath) + viper.SetEnvPrefix("pilotgo") + viper.AutomaticEnv() + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + + return &pilotgoConfig{ + cfg: New(), + cfgChangeCh: make(chan ServerConfig), + watchOnce: sync.Once{}, + loadOnce: sync.Once{}, + } +} diff --git a/src/app/server/network/httpserver.go b/src/app/server/network/httpserver.go index 38873cc1..9ff153d1 100644 --- a/src/app/server/network/httpserver.go +++ b/src/app/server/network/httpserver.go @@ -28,6 +28,7 @@ import ( "gitee.com/openeuler/PilotGo/app/server/resource" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gin-gonic/gin" + "k8s.io/klog/v2" ) func HttpServerInit(conf *sconfig.HttpServer, stopCh <-chan struct{}) error { @@ -48,8 +49,8 @@ func HttpServerInit(conf *sconfig.HttpServer, stopCh <-chan struct{}) error { } go func() { <-stopCh + klog.Warningln("httpserver prepare stop") _ = srv.Shutdown(shutdownCtx) - logger.Warn("httpserver close") }() // 启动http server服务 if conf.UseHttps { @@ -61,12 +62,19 @@ func HttpServerInit(conf *sconfig.HttpServer, stopCh <-chan struct{}) error { logger.Info("start http service on: https://%s", conf.Addr) if err := srv.ListenAndServeTLS(conf.CertFile, conf.KeyFile); err != nil { - logger.Error("start http server failed:%v", err) + if err != http.ErrServerClosed { + logger.Error("ListenAndServeTLS start http server failed:%v", err) + return + } } } else { logger.Info("start http service on: http://%s", conf.Addr) if err := srv.ListenAndServe(); err != nil { - logger.Error("start http server failed:%v", err) + if err != http.ErrServerClosed { + logger.Error("ListenAndServe start http server failed:%v", err) + + } + } } }() diff --git a/src/app/server/network/socketserver.go b/src/app/server/network/socketserver.go index 9e145795..78af7f2c 100644 --- a/src/app/server/network/socketserver.go +++ b/src/app/server/network/socketserver.go @@ -15,12 +15,13 @@ package network import ( - "fmt" "net" + "strings" "gitee.com/openeuler/PilotGo/app/server/agentmanager" sconfig "gitee.com/openeuler/PilotGo/app/server/config" "gitee.com/openeuler/PilotGo/sdk/logger" + "k8s.io/klog/v2" ) type SocketServer struct { @@ -35,40 +36,49 @@ func SocketServerInit(conf *sconfig.SocketServer, stopCh <-chan struct{}) error OnAccept: agentmanager.AddandRunAgent, OnStop: agentmanager.StopAgentManager, } - go func() { - <-stopCh - server.Stop() - logger.Warn("SocketServer close") - }() go func() { - if err := server.Run(conf.Addr); err != nil { - logger.Error("socket server init failed: %s", err.Error()) + if err := server.Run(conf.Addr, stopCh); err != nil { + logger.Error("socket server init run failed: %s", err.Error()) } }() return nil } -func (s *SocketServer) Run(addr string) error { +func (s *SocketServer) Run(addr string, stopCh <-chan struct{}) error { listener, err := net.Listen("tcp", addr) if err != nil { return err } + go func() { + <-stopCh + klog.Warningln("SocketServer prepare stop") + listener.Close() + }() logger.Debug("Waiting for agents") for { - conn, err := listener.Accept() - if err != nil { - fmt.Println("accept error:", err) - continue + select { + case <-stopCh: + klog.Warning("SocketServer prepare stop") + return nil + default: + conn, err := listener.Accept() + if err != nil { + if strings.Contains(err.Error(), "use of closed network connection") { + klog.Warningln("SocketServer success exit") + } else { + klog.Errorf("SocketServer run error:%v", err) + } + continue + } + if err := s.OnAccept(conn); err != nil { + klog.Errorf("failed to add and run agent: %v", err) + } } - if err := s.OnAccept(conn); err != nil { - logger.Error("failed to add and run agent: %s", err) - } } } func (s *SocketServer) Stop() { - } diff --git a/src/app/server/network/websocket/client.go b/src/app/server/network/websocket/client.go index 7b87110a..470bec7a 100644 --- a/src/app/server/network/websocket/client.go +++ b/src/app/server/network/websocket/client.go @@ -7,6 +7,7 @@ import ( "gitee.com/openeuler/PilotGo/app/server/agentmanager" "gitee.com/openeuler/PilotGo/sdk/logger" "github.com/gorilla/websocket" + "k8s.io/klog/v2" ) const ( @@ -91,11 +92,11 @@ func SendWarnMsgToWeb(stopCh <-chan struct{}) { for { select { case <-stopCh: - logger.Warn("SendWarnMsgToWeb close") + klog.Warningln("SendWarnMsgToWeb success exit") return - default: - data := <-agentmanager.WARN_MSG + case data := <-agentmanager.WARN_MSG: CliManager.Broadcast <- []byte(data.(string)) + } } diff --git a/src/app/server/service/auth/casbin.go b/src/app/server/service/auth/casbin.go index 18769a89..ec9d954f 100644 --- a/src/app/server/service/auth/casbin.go +++ b/src/app/server/service/auth/casbin.go @@ -63,38 +63,36 @@ func Casbin(conf *sconfig.MysqlDBInfo) { e = some(where (p.eft == allow)) ` - once.Do(func() { - // m := casbinmodel.Model{} - m, err := casbinmodel.NewModelFromString(text) - if err != nil { - logger.Error("casbin model create failed: %s", err) - return - } + // m := casbinmodel.Model{} + m, err := casbinmodel.NewModelFromString(text) + if err != nil { + logger.Error("casbin model create failed: %s", err) + return + } - url := fmt.Sprintf("%s:%s@tcp(%s:%d)/", - conf.UserName, - conf.Password, - conf.HostName, - conf.Port) - a, err := gormadapter.NewAdapter("mysql", url, conf.DataBase) - if err != nil { - logger.Error("casbin adapter create failed: %s", err) - return - } - enforcer, err := casbin.NewEnforcer(m, a) - if err != nil { - logger.Error("casbin enforcer create failed: %s", err) - return - } - if err := enforcer.LoadPolicy(); err != nil { - logger.Error("casbin load Policy failed: %s", err.Error()) - } + url := fmt.Sprintf("%s:%s@tcp(%s:%d)/", + conf.UserName, + conf.Password, + conf.HostName, + conf.Port) + a, err := gormadapter.NewAdapter("mysql", url, conf.DataBase) + if err != nil { + logger.Error("casbin adapter create failed: %s", err) + return + } + enforcer, err := casbin.NewEnforcer(m, a) + if err != nil { + logger.Error("casbin enforcer create failed: %s", err) + return + } + if err := enforcer.LoadPolicy(); err != nil { + logger.Error("casbin load Policy failed: %s", err.Error()) + } - G_Enfocer = enforcer + G_Enfocer = enforcer - // TODO: - initAdminPolicy() - }) + // TODO: + initAdminPolicy() } func addPolicy(role, resource, action, domain string) (bool, error) { diff --git a/src/app/server/service/eventbus/eventbus.go b/src/app/server/service/eventbus/eventbus.go index 115a676a..f36365a5 100644 --- a/src/app/server/service/eventbus/eventbus.go +++ b/src/app/server/service/eventbus/eventbus.go @@ -8,6 +8,7 @@ import ( "gitee.com/openeuler/PilotGo/sdk/common" "gitee.com/openeuler/PilotGo/sdk/logger" "gitee.com/openeuler/PilotGo/sdk/utils/httputils" + "k8s.io/klog/v2" ) type Listener struct { @@ -19,7 +20,6 @@ type EventBus struct { sync.Mutex listeners []*Listener stop chan struct{} - wait sync.WaitGroup event chan *common.EventMessage } @@ -90,26 +90,24 @@ func (e *EventBus) Run(stopCh <-chan struct{}) { go func() { <-stopCh e.Stop() - logger.Warn("EventBus stop") + klog.Warningln("EventBus prepare stop") }() - go func(e *EventBus) { for { select { case <-e.stop: - logger.Info("event bus exit") - e.wait.Done() + klog.Warningln("EventBus success exit ") + return case m := <-e.event: e.broadcast(m) } } }(e) + } func (e *EventBus) Stop() { - e.wait.Add(1) e.stop <- struct{}{} - e.wait.Wait() } func (e *EventBus) publish(m *common.EventMessage) { @@ -139,6 +137,7 @@ func Init(stopCh <-chan struct{}) { eventTypeMap = make(map[int][]Listener) globalEventBus = &EventBus{ event: make(chan *common.EventMessage, 20), + stop: make(chan struct{}), } globalEventBus.Run(stopCh) } diff --git a/src/app/server/service/plugin/heartbeat.go b/src/app/server/service/plugin/heartbeat.go index bc299ea6..cb50b0e8 100644 --- a/src/app/server/service/plugin/heartbeat.go +++ b/src/app/server/service/plugin/heartbeat.go @@ -6,19 +6,13 @@ import ( "gitee.com/openeuler/PilotGo/dbmanager/redismanager" "gitee.com/openeuler/PilotGo/sdk/logger" "gitee.com/openeuler/PilotGo/sdk/plugin/client" + "k8s.io/apimachinery/pkg/util/wait" ) func CheckPluginHeartbeats(stopCh <-chan struct{}) { - for { - select { - case <-stopCh: - logger.Warn("CheckPluginHeartbeats close") - return - default: - time.Sleep(client.HeartbeatInterval) - checkAndRebind() - } - } + go wait.Until(func() { + checkAndRebind() + }, client.HeartbeatInterval, stopCh) } func checkAndRebind() { diff --git a/src/app/server/service/plugin/plugin.go b/src/app/server/service/plugin/plugin.go index b4874bbb..5c1de0a1 100644 --- a/src/app/server/service/plugin/plugin.go +++ b/src/app/server/service/plugin/plugin.go @@ -38,7 +38,7 @@ func Init(stopCh <-chan struct{}) error { } // 检查插件状态,重新绑定plugin与pilotgo - go CheckPluginHeartbeats(stopCh) + CheckPluginHeartbeats(stopCh) return nil } diff --git a/src/config_server.yaml.templete b/src/config_server.yaml.templete index 0ffe8464..8a7d8f92 100644 --- a/src/config_server.yaml.templete +++ b/src/config_server.yaml.templete @@ -29,6 +29,6 @@ redis: redis_pwd: '' defaultDB: 0 dialTimeout: 5s #redis连接超时时间.默认5s - enableRedis: yes #是否启用redis + enableRedis: true #是否启用redis storage: #文件服务存储路径 path: "" \ No newline at end of file diff --git a/src/dbmanager/redismanager/redismanager.go b/src/dbmanager/redismanager/redismanager.go index 39559341..31234d49 100644 --- a/src/dbmanager/redismanager/redismanager.go +++ b/src/dbmanager/redismanager/redismanager.go @@ -15,7 +15,6 @@ package redismanager import ( - "context" "crypto/tls" "time" @@ -55,15 +54,15 @@ func RedisInit(redisConn, redisPwd string, defaultDB int, dialTimeout time.Durat go func() { <-stopCh global_redis.Close() - klog.Warning("global_redis release") + klog.Warning("global_redis success exit") }() - timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), dialTimeout) - defer cancelFunc() - _, err := global_redis.Ping(timeoutCtx).Result() - if err != nil { - return err - } + // timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), dialTimeout) + // defer cancelFunc() + // _, err := global_redis.Ping(timeoutCtx).Result() + // if err != nil { + // return err + // } DialTimeout = dialTimeout EnableRedis = enableRedis return nil diff --git a/src/go.mod b/src/go.mod index 0856d7ea..dc3a04c2 100644 --- a/src/go.mod +++ b/src/go.mod @@ -80,7 +80,7 @@ require ( github.com/spf13/afero v1.10.0 // indirect github.com/spf13/cast v1.5.1 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/spf13/viper v1.17.0 // indirect + github.com/spf13/viper v1.17.0 github.com/subosito/gotenv v1.6.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect @@ -100,6 +100,7 @@ require ( gorm.io/driver/postgres v1.5.2 // indirect gorm.io/driver/sqlserver v1.5.0 // indirect gorm.io/plugin/dbresolver v1.4.1 // indirect + k8s.io/apimachinery v0.22.1 modernc.org/libc v1.23.0 // indirect modernc.org/mathutil v1.5.0 // indirect modernc.org/memory v1.6.0 // indirect diff --git a/src/vendor/k8s.io/apimachinery/LICENSE b/src/vendor/k8s.io/apimachinery/LICENSE new file mode 100644 index 00000000..d6456956 --- /dev/null +++ b/src/vendor/k8s.io/apimachinery/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/src/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go b/src/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go new file mode 100644 index 00000000..1a544d3b --- /dev/null +++ b/src/vendor/k8s.io/apimachinery/pkg/util/clock/clock.go @@ -0,0 +1,445 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clock + +import ( + "sync" + "time" +) + +// PassiveClock allows for injecting fake or real clocks into code +// that needs to read the current time but does not support scheduling +// activity in the future. +type PassiveClock interface { + Now() time.Time + Since(time.Time) time.Duration +} + +// Clock allows for injecting fake or real clocks into code that +// needs to do arbitrary things based on time. +type Clock interface { + PassiveClock + After(time.Duration) <-chan time.Time + AfterFunc(time.Duration, func()) Timer + NewTimer(time.Duration) Timer + Sleep(time.Duration) + NewTicker(time.Duration) Ticker +} + +// RealClock really calls time.Now() +type RealClock struct{} + +// Now returns the current time. +func (RealClock) Now() time.Time { + return time.Now() +} + +// Since returns time since the specified timestamp. +func (RealClock) Since(ts time.Time) time.Duration { + return time.Since(ts) +} + +// After is the same as time.After(d). +func (RealClock) After(d time.Duration) <-chan time.Time { + return time.After(d) +} + +// AfterFunc is the same as time.AfterFunc(d, f). +func (RealClock) AfterFunc(d time.Duration, f func()) Timer { + return &realTimer{ + timer: time.AfterFunc(d, f), + } +} + +// NewTimer returns a new Timer. +func (RealClock) NewTimer(d time.Duration) Timer { + return &realTimer{ + timer: time.NewTimer(d), + } +} + +// NewTicker returns a new Ticker. +func (RealClock) NewTicker(d time.Duration) Ticker { + return &realTicker{ + ticker: time.NewTicker(d), + } +} + +// Sleep pauses the RealClock for duration d. +func (RealClock) Sleep(d time.Duration) { + time.Sleep(d) +} + +// FakePassiveClock implements PassiveClock, but returns an arbitrary time. +type FakePassiveClock struct { + lock sync.RWMutex + time time.Time +} + +// FakeClock implements Clock, but returns an arbitrary time. +type FakeClock struct { + FakePassiveClock + + // waiters are waiting for the fake time to pass their specified time + waiters []fakeClockWaiter +} + +type fakeClockWaiter struct { + targetTime time.Time + stepInterval time.Duration + skipIfBlocked bool + destChan chan time.Time + afterFunc func() +} + +// NewFakePassiveClock returns a new FakePassiveClock. +func NewFakePassiveClock(t time.Time) *FakePassiveClock { + return &FakePassiveClock{ + time: t, + } +} + +// NewFakeClock returns a new FakeClock +func NewFakeClock(t time.Time) *FakeClock { + return &FakeClock{ + FakePassiveClock: *NewFakePassiveClock(t), + } +} + +// Now returns f's time. +func (f *FakePassiveClock) Now() time.Time { + f.lock.RLock() + defer f.lock.RUnlock() + return f.time +} + +// Since returns time since the time in f. +func (f *FakePassiveClock) Since(ts time.Time) time.Duration { + f.lock.RLock() + defer f.lock.RUnlock() + return f.time.Sub(ts) +} + +// SetTime sets the time on the FakePassiveClock. +func (f *FakePassiveClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.time = t +} + +// After is the Fake version of time.After(d). +func (f *FakeClock) After(d time.Duration) <-chan time.Time { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }) + return ch +} + +// AfterFunc is the Fake version of time.AfterFunc(d, callback). +func (f *FakeClock) AfterFunc(d time.Duration, cb func()) Timer { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + afterFunc: cb, + }, + } + f.waiters = append(f.waiters, timer.waiter) + return timer +} + +// NewTimer is the Fake version of time.NewTimer(d). +func (f *FakeClock) NewTimer(d time.Duration) Timer { + f.lock.Lock() + defer f.lock.Unlock() + stopTime := f.time.Add(d) + ch := make(chan time.Time, 1) // Don't block! + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: stopTime, + destChan: ch, + }, + } + f.waiters = append(f.waiters, timer.waiter) + return timer +} + +// NewTicker returns a new Ticker. +func (f *FakeClock) NewTicker(d time.Duration) Ticker { + f.lock.Lock() + defer f.lock.Unlock() + tickTime := f.time.Add(d) + ch := make(chan time.Time, 1) // hold one tick + f.waiters = append(f.waiters, fakeClockWaiter{ + targetTime: tickTime, + stepInterval: d, + skipIfBlocked: true, + destChan: ch, + }) + + return &fakeTicker{ + c: ch, + } +} + +// Step moves clock by Duration, notifies anyone that's called After, Tick, or NewTimer +func (f *FakeClock) Step(d time.Duration) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(f.time.Add(d)) +} + +// SetTime sets the time on a FakeClock. +func (f *FakeClock) SetTime(t time.Time) { + f.lock.Lock() + defer f.lock.Unlock() + f.setTimeLocked(t) +} + +// Actually changes the time and checks any waiters. f must be write-locked. +func (f *FakeClock) setTimeLocked(t time.Time) { + f.time = t + newWaiters := make([]fakeClockWaiter, 0, len(f.waiters)) + for i := range f.waiters { + w := &f.waiters[i] + if !w.targetTime.After(t) { + + if w.skipIfBlocked { + select { + case w.destChan <- t: + default: + } + } else { + w.destChan <- t + } + + if w.afterFunc != nil { + w.afterFunc() + } + + if w.stepInterval > 0 { + for !w.targetTime.After(t) { + w.targetTime = w.targetTime.Add(w.stepInterval) + } + newWaiters = append(newWaiters, *w) + } + + } else { + newWaiters = append(newWaiters, f.waiters[i]) + } + } + f.waiters = newWaiters +} + +// HasWaiters returns true if After or AfterFunc has been called on f but not yet satisfied +// (so you can write race-free tests). +func (f *FakeClock) HasWaiters() bool { + f.lock.RLock() + defer f.lock.RUnlock() + return len(f.waiters) > 0 +} + +// Sleep pauses the FakeClock for duration d. +func (f *FakeClock) Sleep(d time.Duration) { + f.Step(d) +} + +// IntervalClock implements Clock, but each invocation of Now steps the clock forward the specified duration +type IntervalClock struct { + Time time.Time + Duration time.Duration +} + +// Now returns i's time. +func (i *IntervalClock) Now() time.Time { + i.Time = i.Time.Add(i.Duration) + return i.Time +} + +// Since returns time since the time in i. +func (i *IntervalClock) Since(ts time.Time) time.Duration { + return i.Time.Sub(ts) +} + +// After is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) After(d time.Duration) <-chan time.Time { + panic("IntervalClock doesn't implement After") +} + +// AfterFunc is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) AfterFunc(d time.Duration, cb func()) Timer { + panic("IntervalClock doesn't implement AfterFunc") +} + +// NewTimer is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) NewTimer(d time.Duration) Timer { + panic("IntervalClock doesn't implement NewTimer") +} + +// NewTicker is currently unimplemented, will panic. +// TODO: make interval clock use FakeClock so this can be implemented. +func (*IntervalClock) NewTicker(d time.Duration) Ticker { + panic("IntervalClock doesn't implement NewTicker") +} + +// Sleep is currently unimplemented; will panic. +func (*IntervalClock) Sleep(d time.Duration) { + panic("IntervalClock doesn't implement Sleep") +} + +// Timer allows for injecting fake or real timers into code that +// needs to do arbitrary things based on time. +type Timer interface { + C() <-chan time.Time + Stop() bool + Reset(d time.Duration) bool +} + +// realTimer is backed by an actual time.Timer. +type realTimer struct { + timer *time.Timer +} + +// C returns the underlying timer's channel. +func (r *realTimer) C() <-chan time.Time { + return r.timer.C +} + +// Stop calls Stop() on the underlying timer. +func (r *realTimer) Stop() bool { + return r.timer.Stop() +} + +// Reset calls Reset() on the underlying timer. +func (r *realTimer) Reset(d time.Duration) bool { + return r.timer.Reset(d) +} + +// fakeTimer implements Timer based on a FakeClock. +type fakeTimer struct { + fakeClock *FakeClock + waiter fakeClockWaiter +} + +// C returns the channel that notifies when this timer has fired. +func (f *fakeTimer) C() <-chan time.Time { + return f.waiter.destChan +} + +// Stop conditionally stops the timer. If the timer has neither fired +// nor been stopped then this call stops the timer and returns true, +// otherwise this call returns false. This is like time.Timer::Stop. +func (f *fakeTimer) Stop() bool { + f.fakeClock.lock.Lock() + defer f.fakeClock.lock.Unlock() + // The timer has already fired or been stopped, unless it is found + // among the clock's waiters. + stopped := false + oldWaiters := f.fakeClock.waiters + newWaiters := make([]fakeClockWaiter, 0, len(oldWaiters)) + seekChan := f.waiter.destChan + for i := range oldWaiters { + // Identify the timer's fakeClockWaiter by the identity of the + // destination channel, nothing else is necessarily unique and + // constant since the timer's creation. + if oldWaiters[i].destChan == seekChan { + stopped = true + } else { + newWaiters = append(newWaiters, oldWaiters[i]) + } + } + + f.fakeClock.waiters = newWaiters + + return stopped +} + +// Reset conditionally updates the firing time of the timer. If the +// timer has neither fired nor been stopped then this call resets the +// timer to the fake clock's "now" + d and returns true, otherwise +// it creates a new waiter, adds it to the clock, and returns true. +// +// It is not possible to return false, because a fake timer can be reset +// from any state (waiting to fire, already fired, and stopped). +// +// See the GoDoc for time.Timer::Reset for more context on why +// the return value of Reset() is not useful. +func (f *fakeTimer) Reset(d time.Duration) bool { + f.fakeClock.lock.Lock() + defer f.fakeClock.lock.Unlock() + waiters := f.fakeClock.waiters + seekChan := f.waiter.destChan + for i := range waiters { + if waiters[i].destChan == seekChan { + waiters[i].targetTime = f.fakeClock.time.Add(d) + return true + } + } + // No existing waiter, timer has already fired or been reset. + // We should still enable Reset() to succeed by creating a + // new waiter and adding it to the clock's waiters. + newWaiter := fakeClockWaiter{ + targetTime: f.fakeClock.time.Add(d), + destChan: seekChan, + } + f.fakeClock.waiters = append(f.fakeClock.waiters, newWaiter) + return true +} + +// Ticker defines the Ticker interface +type Ticker interface { + C() <-chan time.Time + Stop() +} + +type realTicker struct { + ticker *time.Ticker +} + +func (t *realTicker) C() <-chan time.Time { + return t.ticker.C +} + +func (t *realTicker) Stop() { + t.ticker.Stop() +} + +type fakeTicker struct { + c <-chan time.Time +} + +func (t *fakeTicker) C() <-chan time.Time { + return t.c +} + +func (t *fakeTicker) Stop() { +} diff --git a/src/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go b/src/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go new file mode 100644 index 00000000..035c5281 --- /dev/null +++ b/src/vendor/k8s.io/apimachinery/pkg/util/runtime/runtime.go @@ -0,0 +1,173 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package runtime + +import ( + "fmt" + "net/http" + "runtime" + "sync" + "time" + + "k8s.io/klog/v2" +) + +var ( + // ReallyCrash controls the behavior of HandleCrash and now defaults + // true. It's still exposed so components can optionally set to false + // to restore prior behavior. + ReallyCrash = true +) + +// PanicHandlers is a list of functions which will be invoked when a panic happens. +var PanicHandlers = []func(interface{}){logPanic} + +// HandleCrash simply catches a crash and logs an error. Meant to be called via +// defer. Additional context-specific handlers can be provided, and will be +// called in case of panic. HandleCrash actually crashes, after calling the +// handlers and logging the panic message. +// +// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully. +func HandleCrash(additionalHandlers ...func(interface{})) { + if r := recover(); r != nil { + for _, fn := range PanicHandlers { + fn(r) + } + for _, fn := range additionalHandlers { + fn(r) + } + if ReallyCrash { + // Actually proceed to panic. + panic(r) + } + } +} + +// logPanic logs the caller tree when a panic occurs (except in the special case of http.ErrAbortHandler). +func logPanic(r interface{}) { + if r == http.ErrAbortHandler { + // honor the http.ErrAbortHandler sentinel panic value: + // ErrAbortHandler is a sentinel panic value to abort a handler. + // While any panic from ServeHTTP aborts the response to the client, + // panicking with ErrAbortHandler also suppresses logging of a stack trace to the server's error log. + return + } + + // Same as stdlib http server code. Manually allocate stack trace buffer size + // to prevent excessively large logs + const size = 64 << 10 + stacktrace := make([]byte, size) + stacktrace = stacktrace[:runtime.Stack(stacktrace, false)] + if _, ok := r.(string); ok { + klog.Errorf("Observed a panic: %s\n%s", r, stacktrace) + } else { + klog.Errorf("Observed a panic: %#v (%v)\n%s", r, r, stacktrace) + } +} + +// ErrorHandlers is a list of functions which will be invoked when a nonreturnable +// error occurs. +// TODO(lavalamp): for testability, this and the below HandleError function +// should be packaged up into a testable and reusable object. +var ErrorHandlers = []func(error){ + logError, + (&rudimentaryErrorBackoff{ + lastErrorTime: time.Now(), + // 1ms was the number folks were able to stomach as a global rate limit. + // If you need to log errors more than 1000 times a second you + // should probably consider fixing your code instead. :) + minPeriod: time.Millisecond, + }).OnError, +} + +// HandlerError is a method to invoke when a non-user facing piece of code cannot +// return an error and needs to indicate it has been ignored. Invoking this method +// is preferable to logging the error - the default behavior is to log but the +// errors may be sent to a remote server for analysis. +func HandleError(err error) { + // this is sometimes called with a nil error. We probably shouldn't fail and should do nothing instead + if err == nil { + return + } + + for _, fn := range ErrorHandlers { + fn(err) + } +} + +// logError prints an error with the call stack of the location it was reported +func logError(err error) { + klog.ErrorDepth(2, err) +} + +type rudimentaryErrorBackoff struct { + minPeriod time.Duration // immutable + // TODO(lavalamp): use the clock for testability. Need to move that + // package for that to be accessible here. + lastErrorTimeLock sync.Mutex + lastErrorTime time.Time +} + +// OnError will block if it is called more often than the embedded period time. +// This will prevent overly tight hot error loops. +func (r *rudimentaryErrorBackoff) OnError(error) { + r.lastErrorTimeLock.Lock() + defer r.lastErrorTimeLock.Unlock() + d := time.Since(r.lastErrorTime) + if d < r.minPeriod { + // If the time moves backwards for any reason, do nothing + time.Sleep(r.minPeriod - d) + } + r.lastErrorTime = time.Now() +} + +// GetCaller returns the caller of the function that calls it. +func GetCaller() string { + var pc [1]uintptr + runtime.Callers(3, pc[:]) + f := runtime.FuncForPC(pc[0]) + if f == nil { + return fmt.Sprintf("Unable to find caller") + } + return f.Name() +} + +// RecoverFromPanic replaces the specified error with an error containing the +// original error, and the call tree when a panic occurs. This enables error +// handlers to handle errors and panics the same way. +func RecoverFromPanic(err *error) { + if r := recover(); r != nil { + // Same as stdlib http server code. Manually allocate stack trace buffer size + // to prevent excessively large logs + const size = 64 << 10 + stacktrace := make([]byte, size) + stacktrace = stacktrace[:runtime.Stack(stacktrace, false)] + + *err = fmt.Errorf( + "recovered from panic %q. (err=%v) Call stack:\n%s", + r, + *err, + stacktrace) + } +} + +// Must panics on non-nil errors. Useful to handling programmer level errors. +func Must(err error) { + if err != nil { + panic(err) + } +} diff --git a/src/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go b/src/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go new file mode 100644 index 00000000..3f0c968e --- /dev/null +++ b/src/vendor/k8s.io/apimachinery/pkg/util/wait/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package wait provides tools for polling or listening for changes +// to a condition. +package wait // import "k8s.io/apimachinery/pkg/util/wait" diff --git a/src/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go b/src/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go new file mode 100644 index 00000000..afb24876 --- /dev/null +++ b/src/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go @@ -0,0 +1,752 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wait + +import ( + "context" + "errors" + "math" + "math/rand" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/runtime" +) + +// For any test of the style: +// ... +// <- time.After(timeout): +// t.Errorf("Timed out") +// The value for timeout should effectively be "forever." Obviously we don't want our tests to truly lock up forever, but 30s +// is long enough that it is effectively forever for the things that can slow down a run on a heavily contended machine +// (GC, seeks, etc), but not so long as to make a developer ctrl-c a test run if they do happen to break that test. +var ForeverTestTimeout = time.Second * 30 + +// NeverStop may be passed to Until to make it never stop. +var NeverStop <-chan struct{} = make(chan struct{}) + +// Group allows to start a group of goroutines and wait for their completion. +type Group struct { + wg sync.WaitGroup +} + +func (g *Group) Wait() { + g.wg.Wait() +} + +// StartWithChannel starts f in a new goroutine in the group. +// stopCh is passed to f as an argument. f should stop when stopCh is available. +func (g *Group) StartWithChannel(stopCh <-chan struct{}, f func(stopCh <-chan struct{})) { + g.Start(func() { + f(stopCh) + }) +} + +// StartWithContext starts f in a new goroutine in the group. +// ctx is passed to f as an argument. f should stop when ctx.Done() is available. +func (g *Group) StartWithContext(ctx context.Context, f func(context.Context)) { + g.Start(func() { + f(ctx) + }) +} + +// Start starts f in a new goroutine in the group. +func (g *Group) Start(f func()) { + g.wg.Add(1) + go func() { + defer g.wg.Done() + f() + }() +} + +// Forever calls f every period for ever. +// +// Forever is syntactic sugar on top of Until. +func Forever(f func(), period time.Duration) { + Until(f, period, NeverStop) +} + +// Until loops until stop channel is closed, running f every period. +// +// Until is syntactic sugar on top of JitterUntil with zero jitter factor and +// with sliding = true (which means the timer for period starts after the f +// completes). +func Until(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, true, stopCh) +} + +// UntilWithContext loops until context is done, running f every period. +// +// UntilWithContext is syntactic sugar on top of JitterUntilWithContext +// with zero jitter factor and with sliding = true (which means the timer +// for period starts after the f completes). +func UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) { + JitterUntilWithContext(ctx, f, period, 0.0, true) +} + +// NonSlidingUntil loops until stop channel is closed, running f every +// period. +// +// NonSlidingUntil is syntactic sugar on top of JitterUntil with zero jitter +// factor, with sliding = false (meaning the timer for period starts at the same +// time as the function starts). +func NonSlidingUntil(f func(), period time.Duration, stopCh <-chan struct{}) { + JitterUntil(f, period, 0.0, false, stopCh) +} + +// NonSlidingUntilWithContext loops until context is done, running f every +// period. +// +// NonSlidingUntilWithContext is syntactic sugar on top of JitterUntilWithContext +// with zero jitter factor, with sliding = false (meaning the timer for period +// starts at the same time as the function starts). +func NonSlidingUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) { + JitterUntilWithContext(ctx, f, period, 0.0, false) +} + +// JitterUntil loops until stop channel is closed, running f every period. +// +// If jitterFactor is positive, the period is jittered before every run of f. +// If jitterFactor is not positive, the period is unchanged and not jittered. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +// +// Close stopCh to stop. f may not be invoked if stop channel is already +// closed. Pass NeverStop to if you don't want it stop. +func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { + BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh) +} + +// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { + var t clock.Timer + for { + select { + case <-stopCh: + return + default: + } + + if !sliding { + t = backoff.Backoff() + } + + func() { + defer runtime.HandleCrash() + f() + }() + + if sliding { + t = backoff.Backoff() + } + + // NOTE: b/c there is no priority selection in golang + // it is possible for this to race, meaning we could + // trigger t.C and stopCh, and t.C select falls through. + // In order to mitigate we re-check stopCh at the beginning + // of every loop to prevent extra executions of f(). + select { + case <-stopCh: + return + case <-t.C(): + } + } +} + +// JitterUntilWithContext loops until context is done, running f every period. +// +// If jitterFactor is positive, the period is jittered before every run of f. +// If jitterFactor is not positive, the period is unchanged and not jittered. +// +// If sliding is true, the period is computed after f runs. If it is false then +// period includes the runtime for f. +// +// Cancel context to stop. f may not be invoked if context is already expired. +func JitterUntilWithContext(ctx context.Context, f func(context.Context), period time.Duration, jitterFactor float64, sliding bool) { + JitterUntil(func() { f(ctx) }, period, jitterFactor, sliding, ctx.Done()) +} + +// Jitter returns a time.Duration between duration and duration + maxFactor * +// duration. +// +// This allows clients to avoid converging on periodic behavior. If maxFactor +// is 0.0, a suggested default value will be chosen. +func Jitter(duration time.Duration, maxFactor float64) time.Duration { + if maxFactor <= 0.0 { + maxFactor = 1.0 + } + wait := duration + time.Duration(rand.Float64()*maxFactor*float64(duration)) + return wait +} + +// ErrWaitTimeout is returned when the condition exited without success. +var ErrWaitTimeout = errors.New("timed out waiting for the condition") + +// ConditionFunc returns true if the condition is satisfied, or an error +// if the loop should be aborted. +type ConditionFunc func() (done bool, err error) + +// ConditionWithContextFunc returns true if the condition is satisfied, or an error +// if the loop should be aborted. +// +// The caller passes along a context that can be used by the condition function. +type ConditionWithContextFunc func(context.Context) (done bool, err error) + +// WithContext converts a ConditionFunc into a ConditionWithContextFunc +func (cf ConditionFunc) WithContext() ConditionWithContextFunc { + return func(context.Context) (done bool, err error) { + return cf() + } +} + +// runConditionWithCrashProtection runs a ConditionFunc with crash protection +func runConditionWithCrashProtection(condition ConditionFunc) (bool, error) { + return runConditionWithCrashProtectionWithContext(context.TODO(), condition.WithContext()) +} + +// runConditionWithCrashProtectionWithContext runs a +// ConditionWithContextFunc with crash protection. +func runConditionWithCrashProtectionWithContext(ctx context.Context, condition ConditionWithContextFunc) (bool, error) { + defer runtime.HandleCrash() + return condition(ctx) +} + +// Backoff holds parameters applied to a Backoff function. +type Backoff struct { + // The initial duration. + Duration time.Duration + // Duration is multiplied by factor each iteration, if factor is not zero + // and the limits imposed by Steps and Cap have not been reached. + // Should not be negative. + // The jitter does not contribute to the updates to the duration parameter. + Factor float64 + // The sleep at each iteration is the duration plus an additional + // amount chosen uniformly at random from the interval between + // zero and `jitter*duration`. + Jitter float64 + // The remaining number of iterations in which the duration + // parameter may change (but progress can be stopped earlier by + // hitting the cap). If not positive, the duration is not + // changed. Used for exponential backoff in combination with + // Factor and Cap. + Steps int + // A limit on revised values of the duration parameter. If a + // multiplication by the factor parameter would make the duration + // exceed the cap then the duration is set to the cap and the + // steps parameter is set to zero. + Cap time.Duration +} + +// Step (1) returns an amount of time to sleep determined by the +// original Duration and Jitter and (2) mutates the provided Backoff +// to update its Steps and Duration. +func (b *Backoff) Step() time.Duration { + if b.Steps < 1 { + if b.Jitter > 0 { + return Jitter(b.Duration, b.Jitter) + } + return b.Duration + } + b.Steps-- + + duration := b.Duration + + // calculate the next step + if b.Factor != 0 { + b.Duration = time.Duration(float64(b.Duration) * b.Factor) + if b.Cap > 0 && b.Duration > b.Cap { + b.Duration = b.Cap + b.Steps = 0 + } + } + + if b.Jitter > 0 { + duration = Jitter(duration, b.Jitter) + } + return duration +} + +// contextForChannel derives a child context from a parent channel. +// +// The derived context's Done channel is closed when the returned cancel function +// is called or when the parent channel is closed, whichever happens first. +// +// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked. +func contextForChannel(parentCh <-chan struct{}) (context.Context, context.CancelFunc) { + ctx, cancel := context.WithCancel(context.Background()) + + go func() { + select { + case <-parentCh: + cancel() + case <-ctx.Done(): + } + }() + return ctx, cancel +} + +// BackoffManager manages backoff with a particular scheme based on its underlying implementation. It provides +// an interface to return a timer for backoff, and caller shall backoff until Timer.C() drains. If the second Backoff() +// is called before the timer from the first Backoff() call finishes, the first timer will NOT be drained and result in +// undetermined behavior. +// The BackoffManager is supposed to be called in a single-threaded environment. +type BackoffManager interface { + Backoff() clock.Timer +} + +type exponentialBackoffManagerImpl struct { + backoff *Backoff + backoffTimer clock.Timer + lastBackoffStart time.Time + initialBackoff time.Duration + backoffResetDuration time.Duration + clock clock.Clock +} + +// NewExponentialBackoffManager returns a manager for managing exponential backoff. Each backoff is jittered and +// backoff will not exceed the given max. If the backoff is not called within resetDuration, the backoff is reset. +// This backoff manager is used to reduce load during upstream unhealthiness. +func NewExponentialBackoffManager(initBackoff, maxBackoff, resetDuration time.Duration, backoffFactor, jitter float64, c clock.Clock) BackoffManager { + return &exponentialBackoffManagerImpl{ + backoff: &Backoff{ + Duration: initBackoff, + Factor: backoffFactor, + Jitter: jitter, + + // the current impl of wait.Backoff returns Backoff.Duration once steps are used up, which is not + // what we ideally need here, we set it to max int and assume we will never use up the steps + Steps: math.MaxInt32, + Cap: maxBackoff, + }, + backoffTimer: nil, + initialBackoff: initBackoff, + lastBackoffStart: c.Now(), + backoffResetDuration: resetDuration, + clock: c, + } +} + +func (b *exponentialBackoffManagerImpl) getNextBackoff() time.Duration { + if b.clock.Now().Sub(b.lastBackoffStart) > b.backoffResetDuration { + b.backoff.Steps = math.MaxInt32 + b.backoff.Duration = b.initialBackoff + } + b.lastBackoffStart = b.clock.Now() + return b.backoff.Step() +} + +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for exponential backoff. +// The returned timer must be drained before calling Backoff() the second time +func (b *exponentialBackoffManagerImpl) Backoff() clock.Timer { + if b.backoffTimer == nil { + b.backoffTimer = b.clock.NewTimer(b.getNextBackoff()) + } else { + b.backoffTimer.Reset(b.getNextBackoff()) + } + return b.backoffTimer +} + +type jitteredBackoffManagerImpl struct { + clock clock.Clock + duration time.Duration + jitter float64 + backoffTimer clock.Timer +} + +// NewJitteredBackoffManager returns a BackoffManager that backoffs with given duration plus given jitter. If the jitter +// is negative, backoff will not be jittered. +func NewJitteredBackoffManager(duration time.Duration, jitter float64, c clock.Clock) BackoffManager { + return &jitteredBackoffManagerImpl{ + clock: c, + duration: duration, + jitter: jitter, + backoffTimer: nil, + } +} + +func (j *jitteredBackoffManagerImpl) getNextBackoff() time.Duration { + jitteredPeriod := j.duration + if j.jitter > 0.0 { + jitteredPeriod = Jitter(j.duration, j.jitter) + } + return jitteredPeriod +} + +// Backoff implements BackoffManager.Backoff, it returns a timer so caller can block on the timer for jittered backoff. +// The returned timer must be drained before calling Backoff() the second time +func (j *jitteredBackoffManagerImpl) Backoff() clock.Timer { + backoff := j.getNextBackoff() + if j.backoffTimer == nil { + j.backoffTimer = j.clock.NewTimer(backoff) + } else { + j.backoffTimer.Reset(backoff) + } + return j.backoffTimer +} + +// ExponentialBackoff repeats a condition check with exponential backoff. +// +// It repeatedly checks the condition and then sleeps, using `backoff.Step()` +// to determine the length of the sleep and adjust Duration and Steps. +// Stops and returns as soon as: +// 1. the condition check returns true or an error, +// 2. `backoff.Steps` checks of the condition have been done, or +// 3. a sleep truncated by the cap on duration has been completed. +// In case (1) the returned error is what the condition function returned. +// In all other cases, ErrWaitTimeout is returned. +func ExponentialBackoff(backoff Backoff, condition ConditionFunc) error { + for backoff.Steps > 0 { + if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { + return err + } + if backoff.Steps == 1 { + break + } + time.Sleep(backoff.Step()) + } + return ErrWaitTimeout +} + +// Poll tries a condition func until it returns true, an error, or the timeout +// is reached. +// +// Poll always waits the interval before the run of 'condition'. +// 'condition' will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to Poll something forever, see PollInfinite. +func Poll(interval, timeout time.Duration, condition ConditionFunc) error { + return PollWithContext(context.Background(), interval, timeout, condition.WithContext()) +} + +// PollWithContext tries a condition func until it returns true, an error, +// or when the context expires or the timeout is reached, whichever +// happens first. +// +// PollWithContext always waits the interval before the run of 'condition'. +// 'condition' will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to Poll something forever, see PollInfinite. +func PollWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error { + return poll(ctx, false, poller(interval, timeout), condition) +} + +// PollUntil tries a condition func until it returns true, an error or stopCh is +// closed. +// +// PollUntil always waits interval before the first run of 'condition'. +// 'condition' will always be invoked at least once. +func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { + ctx, cancel := contextForChannel(stopCh) + defer cancel() + return PollUntilWithContext(ctx, interval, condition.WithContext()) +} + +// PollUntilWithContext tries a condition func until it returns true, +// an error or the specified context is cancelled or expired. +// +// PollUntilWithContext always waits interval before the first run of 'condition'. +// 'condition' will always be invoked at least once. +func PollUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error { + return poll(ctx, false, poller(interval, 0), condition) +} + +// PollInfinite tries a condition func until it returns true or an error +// +// PollInfinite always waits the interval before the run of 'condition'. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +func PollInfinite(interval time.Duration, condition ConditionFunc) error { + return PollInfiniteWithContext(context.Background(), interval, condition.WithContext()) +} + +// PollInfiniteWithContext tries a condition func until it returns true or an error +// +// PollInfiniteWithContext always waits the interval before the run of 'condition'. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +func PollInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error { + return poll(ctx, false, poller(interval, 0), condition) +} + +// PollImmediate tries a condition func until it returns true, an error, or the timeout +// is reached. +// +// PollImmediate always checks 'condition' before waiting for the interval. 'condition' +// will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to immediately Poll something forever, see PollImmediateInfinite. +func PollImmediate(interval, timeout time.Duration, condition ConditionFunc) error { + return PollImmediateWithContext(context.Background(), interval, timeout, condition.WithContext()) +} + +// PollImmediateWithContext tries a condition func until it returns true, an error, +// or the timeout is reached or the specified context expires, whichever happens first. +// +// PollImmediateWithContext always checks 'condition' before waiting for the interval. +// 'condition' will always be invoked at least once. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +// +// If you want to immediately Poll something forever, see PollImmediateInfinite. +func PollImmediateWithContext(ctx context.Context, interval, timeout time.Duration, condition ConditionWithContextFunc) error { + return poll(ctx, true, poller(interval, timeout), condition) +} + +// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed. +// +// PollImmediateUntil runs the 'condition' before waiting for the interval. +// 'condition' will always be invoked at least once. +func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { + ctx, cancel := contextForChannel(stopCh) + defer cancel() + return PollImmediateUntilWithContext(ctx, interval, condition.WithContext()) +} + +// PollImmediateUntilWithContext tries a condition func until it returns true, +// an error or the specified context is cancelled or expired. +// +// PollImmediateUntilWithContext runs the 'condition' before waiting for the interval. +// 'condition' will always be invoked at least once. +func PollImmediateUntilWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error { + return poll(ctx, true, poller(interval, 0), condition) +} + +// PollImmediateInfinite tries a condition func until it returns true or an error +// +// PollImmediateInfinite runs the 'condition' before waiting for the interval. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) error { + return PollImmediateInfiniteWithContext(context.Background(), interval, condition.WithContext()) +} + +// PollImmediateInfiniteWithContext tries a condition func until it returns true +// or an error or the specified context gets cancelled or expired. +// +// PollImmediateInfiniteWithContext runs the 'condition' before waiting for the interval. +// +// Some intervals may be missed if the condition takes too long or the time +// window is too short. +func PollImmediateInfiniteWithContext(ctx context.Context, interval time.Duration, condition ConditionWithContextFunc) error { + return poll(ctx, true, poller(interval, 0), condition) +} + +// Internally used, each of the the public 'Poll*' function defined in this +// package should invoke this internal function with appropriate parameters. +// ctx: the context specified by the caller, for infinite polling pass +// a context that never gets cancelled or expired. +// immediate: if true, the 'condition' will be invoked before waiting for the interval, +// in this case 'condition' will always be invoked at least once. +// wait: user specified WaitFunc function that controls at what interval the condition +// function should be invoked periodically and whether it is bound by a timeout. +// condition: user specified ConditionWithContextFunc function. +func poll(ctx context.Context, immediate bool, wait WaitWithContextFunc, condition ConditionWithContextFunc) error { + if immediate { + done, err := runConditionWithCrashProtectionWithContext(ctx, condition) + if err != nil { + return err + } + if done { + return nil + } + } + + select { + case <-ctx.Done(): + // returning ctx.Err() will break backward compatibility + return ErrWaitTimeout + default: + return WaitForWithContext(ctx, wait, condition) + } +} + +// WaitFunc creates a channel that receives an item every time a test +// should be executed and is closed when the last test should be invoked. +type WaitFunc func(done <-chan struct{}) <-chan struct{} + +// WithContext converts the WaitFunc to an equivalent WaitWithContextFunc +func (w WaitFunc) WithContext() WaitWithContextFunc { + return func(ctx context.Context) <-chan struct{} { + return w(ctx.Done()) + } +} + +// WaitWithContextFunc creates a channel that receives an item every time a test +// should be executed and is closed when the last test should be invoked. +// +// When the specified context gets cancelled or expires the function +// stops sending item and returns immediately. +type WaitWithContextFunc func(ctx context.Context) <-chan struct{} + +// WaitFor continually checks 'fn' as driven by 'wait'. +// +// WaitFor gets a channel from 'wait()'', and then invokes 'fn' once for every value +// placed on the channel and once more when the channel is closed. If the channel is closed +// and 'fn' returns false without error, WaitFor returns ErrWaitTimeout. +// +// If 'fn' returns an error the loop ends and that error is returned. If +// 'fn' returns true the loop ends and nil is returned. +// +// ErrWaitTimeout will be returned if the 'done' channel is closed without fn ever +// returning true. +// +// When the done channel is closed, because the golang `select` statement is +// "uniform pseudo-random", the `fn` might still run one or multiple time, +// though eventually `WaitFor` will return. +func WaitFor(wait WaitFunc, fn ConditionFunc, done <-chan struct{}) error { + ctx, cancel := contextForChannel(done) + defer cancel() + return WaitForWithContext(ctx, wait.WithContext(), fn.WithContext()) +} + +// WaitForWithContext continually checks 'fn' as driven by 'wait'. +// +// WaitForWithContext gets a channel from 'wait()'', and then invokes 'fn' +// once for every value placed on the channel and once more when the +// channel is closed. If the channel is closed and 'fn' +// returns false without error, WaitForWithContext returns ErrWaitTimeout. +// +// If 'fn' returns an error the loop ends and that error is returned. If +// 'fn' returns true the loop ends and nil is returned. +// +// context.Canceled will be returned if the ctx.Done() channel is closed +// without fn ever returning true. +// +// When the ctx.Done() channel is closed, because the golang `select` statement is +// "uniform pseudo-random", the `fn` might still run one or multiple times, +// though eventually `WaitForWithContext` will return. +func WaitForWithContext(ctx context.Context, wait WaitWithContextFunc, fn ConditionWithContextFunc) error { + waitCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := wait(waitCtx) + for { + select { + case _, open := <-c: + ok, err := runConditionWithCrashProtectionWithContext(ctx, fn) + if err != nil { + return err + } + if ok { + return nil + } + if !open { + return ErrWaitTimeout + } + case <-ctx.Done(): + // returning ctx.Err() will break backward compatibility + return ErrWaitTimeout + } + } +} + +// poller returns a WaitFunc that will send to the channel every interval until +// timeout has elapsed and then closes the channel. +// +// Over very short intervals you may receive no ticks before the channel is +// closed. A timeout of 0 is interpreted as an infinity, and in such a case +// it would be the caller's responsibility to close the done channel. +// Failure to do so would result in a leaked goroutine. +// +// Output ticks are not buffered. If the channel is not ready to receive an +// item, the tick is skipped. +func poller(interval, timeout time.Duration) WaitWithContextFunc { + return WaitWithContextFunc(func(ctx context.Context) <-chan struct{} { + ch := make(chan struct{}) + + go func() { + defer close(ch) + + tick := time.NewTicker(interval) + defer tick.Stop() + + var after <-chan time.Time + if timeout != 0 { + // time.After is more convenient, but it + // potentially leaves timers around much longer + // than necessary if we exit early. + timer := time.NewTimer(timeout) + after = timer.C + defer timer.Stop() + } + + for { + select { + case <-tick.C: + // If the consumer isn't ready for this signal drop it and + // check the other channels. + select { + case ch <- struct{}{}: + default: + } + case <-after: + return + case <-ctx.Done(): + return + } + } + }() + + return ch + }) +} + +// ExponentialBackoffWithContext works with a request context and a Backoff. It ensures that the retry wait never +// exceeds the deadline specified by the request context. +func ExponentialBackoffWithContext(ctx context.Context, backoff Backoff, condition ConditionFunc) error { + for backoff.Steps > 0 { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if ok, err := runConditionWithCrashProtection(condition); err != nil || ok { + return err + } + + if backoff.Steps == 1 { + break + } + + waitBeforeRetry := backoff.Step() + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(waitBeforeRetry): + } + } + + return ErrWaitTimeout +} diff --git a/src/vendor/modules.txt b/src/vendor/modules.txt index 0eefef78..be669467 100644 --- a/src/vendor/modules.txt +++ b/src/vendor/modules.txt @@ -463,6 +463,11 @@ gorm.io/gorm/utils # gorm.io/plugin/dbresolver v1.4.1 ## explicit; go 1.14 gorm.io/plugin/dbresolver +# k8s.io/apimachinery v0.22.1 +## explicit; go 1.16 +k8s.io/apimachinery/pkg/util/clock +k8s.io/apimachinery/pkg/util/runtime +k8s.io/apimachinery/pkg/util/wait # k8s.io/klog/v2 v2.9.0 ## explicit; go 1.13 k8s.io/klog/v2 -- Gitee