From 9643380578bb60ac3d6ea31b6e58897d04e6073a Mon Sep 17 00:00:00 2001 From: lauk Date: Wed, 26 Jul 2023 17:28:14 +0800 Subject: [PATCH] housekeeper-operator code optimization --- cmd/nkd/upgrade.go | 3 +- housekeeper/Makefile | 4 +- .../controllers/update_controller.go | 198 +++++++++--------- housekeeper/pkg/constants/constants.go | 7 +- 4 files changed, 113 insertions(+), 99 deletions(-) diff --git a/cmd/nkd/upgrade.go b/cmd/nkd/upgrade.go index 5550b32..8493cac 100755 --- a/cmd/nkd/upgrade.go +++ b/cmd/nkd/upgrade.go @@ -37,9 +37,10 @@ func runUpgradeCmd(command *cobra.Command, args []string) error { evictPodForce = false maxUnavailable = 2 loopTimeout = 2 * time.Minute + kubeconfig = "" ) // Get the kubeconfig configuration - config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { config, err = rest.InClusterConfig() if err != nil { diff --git a/housekeeper/Makefile b/housekeeper/Makefile index 590e7f6..6972971 100644 --- a/housekeeper/Makefile +++ b/housekeeper/Makefile @@ -35,12 +35,14 @@ SHELL = /usr/bin/env bash -o pipefail ##@ Build .PHONY: all -all: housekeeper-operator-manager housekeeper-controller-manager +all: housekeeper-operator-manager housekeeper-controller-manager housekeeper-daemon # Build binary housekeeper-operator-manager: go build -o bin/housekeeper-operator-manager operator/housekeeper-operator/main.go housekeeper-controller-manager: go build -o bin/housekeeper-controller-manager operator/housekeeper-controller/main.go +housekeeper-daemon: + go build -o bin/housekeeper-daemon daemon/main.go # Build the docker image .PHONY: docker-build diff --git a/housekeeper/operator/housekeeper-operator/controllers/update_controller.go b/housekeeper/operator/housekeeper-operator/controllers/update_controller.go index 0cc7af8..266bc61 100644 --- a/housekeeper/operator/housekeeper-operator/controllers/update_controller.go +++ b/housekeeper/operator/housekeeper-operator/controllers/update_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "sync" "time" "github.com/sirupsen/logrus" @@ -29,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -57,91 +57,95 @@ type UpdateReconciler struct { // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.13.0/pkg/reconcile func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { _ = log.FromContext(ctx) - if r.Client == nil { return common.NoRequeue, nil } + var crMutex sync.Mutex + crMutex.Lock() + defer crMutex.Unlock() ctx = context.Background() - //返回worker节点的数量 - upInstance, nodesNum, err := getUpgradeInstance(ctx, r, req.NamespacedName) + return reconcile(ctx, r, req) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *UpdateReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&housekeeperiov1alpha1.Update{}). + Complete(r) +} + +func reconcile(ctx context.Context, r common.ReadWriterClient, req ctrl.Request) (ctrl.Result, error) { + var update housekeeperiov1alpha1.Update + if err := r.Get(ctx, req.NamespacedName, &update); err != nil { + logrus.Errorf("unable to fetch update instance: %v", err) + return common.NoRequeue, err + } + if len(update.Spec.OSVersion) == 0 { + logrus.Warning("os version is required") + return common.RequeueAfter, nil + } + masterNodesItems, err := getMasterNodesItems(ctx, r) if err != nil { return common.RequeueNow, err } - limit := min(upInstance.Spec.MaxUnavailable, nodesNum) - if requeueAfter, err := setLabels(ctx, r, req, limit, upInstance); err != nil { - logrus.Errorf("unable set nodes label: %v", err) + workerNodesItems, err := getWorkerNodesItems(ctx, r) + if err != nil { return common.RequeueNow, err - } else if requeueAfter { - return common.RequeueAfter, nil } - return common.RequeueNow, nil + if assignUpdated(ctx, r, masterNodesItems, 1, update); err != nil { + return common.RequeueNow, err + } + maxUnavailable := min(update.Spec.MaxUnavailable, len(workerNodesItems)) + if assignUpdated(ctx, r, masterNodesItems, maxUnavailable, update); err != nil { + return common.RequeueNow, err + } + + return common.NoRequeue, nil } -func getUpgradeInstance(ctx context.Context, r common.ReadWriterClient, name types.NamespacedName) ( - upInstance housekeeperiov1alpha1.Update, nodeNum int, err error) { - if err = r.Get(ctx, name, &upInstance); err != nil { - logrus.Errorf("unable to fetch upgrade instance: %v", err) +func getMasterNodesItems(ctx context.Context, r common.ReadWriterClient) ( + nodesItems []corev1.Node, err error) { + reqUpgrade, err := labels.NewRequirement(constants.LabelUpgrading, selection.DoesNotExist, nil) + if err != nil { + logrus.Errorf("unable to create requirement %s: %v", reqUpgrade, err) return } - requirement, err := labels.NewRequirement(constants.LabelMaster, selection.DoesNotExist, nil) + reqMaster, err := labels.NewRequirement(constants.LabelMaster, selection.Exists, nil) if err != nil { - logrus.Errorf("unable to create requirement %s: %v"+constants.LabelMaster, err) + logrus.Errorf("unable to create requirement %s: %v", constants.LabelMaster, err) return } - nodesItems, err := getNodes(ctx, r, 0, *requirement) + nodesItems, err = getNodes(ctx, r, *reqUpgrade, *reqMaster) if err != nil { - logrus.Errorf("failed to get nodes list: %v", err) + logrus.Errorf("failed to get master nodes list: %v", err) return } - nodeNum = len(nodesItems) return } -func setLabels(ctx context.Context, r common.ReadWriterClient, req ctrl.Request, limit int, - upInstance housekeeperiov1alpha1.Update) (bool, error) { +func getWorkerNodesItems(ctx context.Context, r common.ReadWriterClient) ( + nodesItems []corev1.Node, err error) { reqUpgrade, err := labels.NewRequirement(constants.LabelUpgrading, selection.DoesNotExist, nil) if err != nil { - logrus.Errorf("unable to create upgrade label requirement: %v", err) - return false, err - } - reqMaster, err := labels.NewRequirement(constants.LabelMaster, selection.Exists, nil) - if err != nil { - logrus.Errorf("unable to create master label requirement: %v", err) - return false, err - } - reqNoMaster, err := labels.NewRequirement(constants.LabelMaster, selection.DoesNotExist, nil) - if err != nil { - logrus.Errorf("unable to create non-master label requirement: %v", err) - return false, err - } - masterNodes, err := getNodes(ctx, r, 1, *reqUpgrade, *reqMaster) - if err != nil { - logrus.Errorf("unable to get master node list: %v", err) - return false, err + logrus.Errorf("unable to create requirement %s: %v", reqUpgrade, err) + return } - //limit: 限制worker节点每次升级的数量 - noMasterNodes, err := getNodes(ctx, r, limit, *reqUpgrade, *reqNoMaster) + reqWorker, err := labels.NewRequirement(constants.LabelMaster, selection.DoesNotExist, nil) if err != nil { - logrus.Errorf("unable to get non-master node list: %v", err) - return false, err + logrus.Errorf("unable to create requirement %s: %v"+constants.LabelMaster, err) + return } - needRequeue, err := assignUpdated(ctx, r, masterNodes, upInstance) + nodesItems, err = getNodes(ctx, r, *reqUpgrade, *reqWorker) if err != nil { - logrus.Errorf("unabel to add upgrade label of the master nodes: %v", err) - return false, err - } else if needRequeue { - return true, nil - } - if needRequeue, err = assignUpdated(ctx, r, noMasterNodes, upInstance); err != nil { - logrus.Errorf("unabel to add upgrade label of non-master nodes: %v", err) - return false, err + logrus.Errorf("failed to get worker nodes list: %v", err) + return } - return needRequeue, nil + return } -func getNodes(ctx context.Context, r common.ReadWriterClient, limit int, reqs ...labels.Requirement) ([]corev1.Node, error) { +func getNodes(ctx context.Context, r common.ReadWriterClient, reqs ...labels.Requirement) ([]corev1.Node, error) { var nodeList corev1.NodeList - opts := client.ListOptions{LabelSelector: labels.NewSelector().Add(reqs...), Limit: int64(limit)} + opts := client.ListOptions{LabelSelector: labels.NewSelector().Add(reqs...)} if err := r.List(ctx, &nodeList, &opts); err != nil { logrus.Errorf("unable to list nodes with requirements: %v", err) return nil, err @@ -151,56 +155,55 @@ func getNodes(ctx context.Context, r common.ReadWriterClient, limit int, reqs .. // Add the label to nodes func assignUpdated(ctx context.Context, r common.ReadWriterClient, nodeList []corev1.Node, - upInstance housekeeperiov1alpha1.Update) (bool, error) { + maxUnavailable int, upInstance housekeeperiov1alpha1.Update) error { var ( kubeVersionSpec = upInstance.Spec.KubeVersion osVersionSpec = upInstance.Spec.OSVersion + count = 0 + wg sync.WaitGroup ) - if len(osVersionSpec) == 0 { - logrus.Warning("os version is required") - return false, nil - } - if len(nodeList) == 0 { - return false, nil - } + + // 创建一个通道来接收任务结果 + resultChan := make(chan error) + for _, node := range nodeList { + if count >= maxUnavailable { + count = 0 + //为了控制升级任务的并发数,每处理 maxUnavailable 个节点后,休眠 2 分钟 + time.Sleep(constants.NodeSleepTime) + } if conditionMet(node, kubeVersionSpec, osVersionSpec) { node.Labels[constants.LabelUpgrading] = "" if err := r.Update(ctx, &node); err != nil { logrus.Errorf("unable to add %s label:%v", node.Name, err) - return false, err + return err } - if err := waitForUpgradeComplete(node, kubeVersionSpec, osVersionSpec); err != nil { - logrus.Errorf("failed to wait for node upgrade to complete: %v", err) - return false, err - } - } else { - return false, nil + count++ + wg.Add(1) // 增加 WaitGroup 的计数器 + go func(node corev1.Node) { + waitForUpgradeComplete(node, kubeVersionSpec, osVersionSpec, resultChan, &wg) + }(node) } } - return true, nil -} + //等待所有任务完成 + wg.Wait() -func conditionMet(node corev1.Node, kubeVersionSpec string, osVersionSpec string) bool { - var ( - kubeProxyVersion = node.Status.NodeInfo.KubeProxyVersion - kubeletVersion = node.Status.NodeInfo.KubeletVersion - osVersion = node.Status.NodeInfo.OSImage - ) - if len(kubeVersionSpec) > 0 { - if kubeVersionSpec == kubeProxyVersion && kubeVersionSpec == kubeletVersion { - return false - } - } else { - if osVersionSpec == osVersion { - return false + //关闭结果通道 + close(resultChan) + // 遍历结果通道,处理每个任务的结果 + for err := range resultChan { + if err != nil { + return err } } - return true + return nil } -func waitForUpgradeComplete(node corev1.Node, kubeVersionSpec string, osVersionSpec string) error { - ctx, cancel := context.WithTimeout(context.Background(), constants.Timeout) +func waitForUpgradeComplete(node corev1.Node, kubeVersionSpec string, osVersionSpec string, + resultChan chan<- error, wg *sync.WaitGroup) { + defer wg.Done() // goroutine 执行完成后减少 WaitGroup 的计数器 + + ctx, cancel := context.WithTimeout(context.Background(), constants.NodeTimeout) defer cancel() done := make(chan struct{}) @@ -215,14 +218,24 @@ func waitForUpgradeComplete(node corev1.Node, kubeVersionSpec string, osVersionS select { case <-done: logrus.Infof("successful upgrade node: %s", node.Name) + resultChan <- nil case <-ctx.Done(): // 上下文超时,跳出循环 if ctx.Err() == context.DeadlineExceeded { logrus.Errorf("failed to upgrade node: %s: %v", node.Name, ctx.Err()) - return ctx.Err() + resultChan <- ctx.Err() } } - return nil + //确保在任务完成后关闭done通道 + close(done) +} + +func conditionMet(node corev1.Node, kubeVersionSpec string, osVersionSpec string) bool { + nodeInfo := node.Status.NodeInfo + if kubeVersionSpec != "" { + return kubeVersionSpec != nodeInfo.KubeProxyVersion && kubeVersionSpec != nodeInfo.KubeletVersion + } + return osVersionSpec != nodeInfo.OSImage } func min(a, b int) int { @@ -231,10 +244,3 @@ func min(a, b int) int { } return b } - -// SetupWithManager sets up the controller with the Manager. -func (r *UpdateReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&housekeeperiov1alpha1.Update{}). - Complete(r) -} diff --git a/housekeeper/pkg/constants/constants.go b/housekeeper/pkg/constants/constants.go index 2a2a73e..05c167e 100644 --- a/housekeeper/pkg/constants/constants.go +++ b/housekeeper/pkg/constants/constants.go @@ -30,4 +30,9 @@ const ( SockName = "housekeeper-daemon.sock" ) -const Timeout = 3 * time.Minute +const ( + // node upgrade timeout + NodeTimeout = 5 * time.Minute + // time to sleep after processing maxUnavailable nodes + NodeSleepTime = 2 * time.Minute +) -- Gitee