diff --git a/housekeeper/operator/housekeeper-operator/controllers/update_controller.go b/housekeeper/operator/housekeeper-operator/controllers/update_controller.go index d202332d5acd4b50926539c4d6dc391784cb23a4..0cc7af811065c0d258daaa5816ee23096a9c8034 100644 --- a/housekeeper/operator/housekeeper-operator/controllers/update_controller.go +++ b/housekeeper/operator/housekeeper-operator/controllers/update_controller.go @@ -18,6 +18,7 @@ package controllers import ( "context" + "time" "github.com/sirupsen/logrus" housekeeperiov1alpha1 "housekeeper.io/operator/api/v1alpha1" @@ -29,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" @@ -60,59 +62,86 @@ func (r *UpdateReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return common.NoRequeue, nil } ctx = context.Background() - err := setLabels(ctx, r, req) + //返回worker节点的数量 + upInstance, nodesNum, err := getUpgradeInstance(ctx, r, req.NamespacedName) if err != nil { + return common.RequeueNow, err + } + limit := min(upInstance.Spec.MaxUnavailable, nodesNum) + if requeueAfter, err := setLabels(ctx, r, req, limit, upInstance); err != nil { logrus.Errorf("unable set nodes label: %v", err) return common.RequeueNow, err + } else if requeueAfter { + return common.RequeueAfter, nil } - return common.RequeueAfter, nil + return common.RequeueNow, nil } -func setLabels(ctx context.Context, r common.ReadWriterClient, req ctrl.Request) error { +func getUpgradeInstance(ctx context.Context, r common.ReadWriterClient, name types.NamespacedName) ( + upInstance housekeeperiov1alpha1.Update, nodeNum int, err error) { + if err = r.Get(ctx, name, &upInstance); err != nil { + logrus.Errorf("unable to fetch upgrade instance: %v", err) + return + } + requirement, err := labels.NewRequirement(constants.LabelMaster, selection.DoesNotExist, nil) + if err != nil { + logrus.Errorf("unable to create requirement %s: %v"+constants.LabelMaster, err) + return + } + nodesItems, err := getNodes(ctx, r, 0, *requirement) + if err != nil { + logrus.Errorf("failed to get nodes list: %v", err) + return + } + nodeNum = len(nodesItems) + return +} + +func setLabels(ctx context.Context, r common.ReadWriterClient, req ctrl.Request, limit int, + upInstance housekeeperiov1alpha1.Update) (bool, error) { reqUpgrade, err := labels.NewRequirement(constants.LabelUpgrading, selection.DoesNotExist, nil) if err != nil { logrus.Errorf("unable to create upgrade label requirement: %v", err) - return err + return false, err } reqMaster, err := labels.NewRequirement(constants.LabelMaster, selection.Exists, nil) if err != nil { logrus.Errorf("unable to create master label requirement: %v", err) - return err + return false, err } reqNoMaster, err := labels.NewRequirement(constants.LabelMaster, selection.DoesNotExist, nil) if err != nil { logrus.Errorf("unable to create non-master label requirement: %v", err) - return err + return false, err } - masterNodes, err := getNodes(ctx, r, *reqUpgrade, *reqMaster) + masterNodes, err := getNodes(ctx, r, 1, *reqUpgrade, *reqMaster) if err != nil { logrus.Errorf("unable to get master node list: %v", err) - return err + return false, err } - noMasterNodes, err := getNodes(ctx, r, *reqUpgrade, *reqNoMaster) + //limit: 限制worker节点每次升级的数量 + noMasterNodes, err := getNodes(ctx, r, limit, *reqUpgrade, *reqNoMaster) if err != nil { logrus.Errorf("unable to get non-master node list: %v", err) - return err + return false, err } - upgradeCompleted, err := assignUpdated(ctx, r, masterNodes, req.NamespacedName) + needRequeue, err := assignUpdated(ctx, r, masterNodes, upInstance) if err != nil { - logrus.Errorf("unabel to add the label of the master nodes: %v", err) - return err - } - //Make sure the master upgrade is complete before start upgrading non-master nodes - if upgradeCompleted { - _, err := assignUpdated(ctx, r, noMasterNodes, req.NamespacedName) - if err != nil { - logrus.Errorf("unabel to add the label of non-master nodes: %v", err) - return err - } + logrus.Errorf("unabel to add upgrade label of the master nodes: %v", err) + return false, err + } else if needRequeue { + return true, nil } - return nil + if needRequeue, err = assignUpdated(ctx, r, noMasterNodes, upInstance); err != nil { + logrus.Errorf("unabel to add upgrade label of non-master nodes: %v", err) + return false, err + } + return needRequeue, nil } -func getNodes(ctx context.Context, r common.ReadWriterClient, reqs ...labels.Requirement) ([]corev1.Node, error) { +func getNodes(ctx context.Context, r common.ReadWriterClient, limit int, reqs ...labels.Requirement) ([]corev1.Node, error) { var nodeList corev1.NodeList - opts := client.ListOptions{LabelSelector: labels.NewSelector().Add(reqs...)} + opts := client.ListOptions{LabelSelector: labels.NewSelector().Add(reqs...), Limit: int64(limit)} if err := r.List(ctx, &nodeList, &opts); err != nil { logrus.Errorf("unable to list nodes with requirements: %v", err) return nil, err @@ -121,14 +150,9 @@ func getNodes(ctx context.Context, r common.ReadWriterClient, reqs ...labels.Req } // Add the label to nodes -func assignUpdated(ctx context.Context, r common.ReadWriterClient, nodeList []corev1.Node, name types.NamespacedName) (bool, error) { - var upInstance housekeeperiov1alpha1.Update - if err := r.Get(ctx, name, &upInstance); err != nil { - logrus.Errorf("unable to get update Instance %v", err) - return false, err - } +func assignUpdated(ctx context.Context, r common.ReadWriterClient, nodeList []corev1.Node, + upInstance housekeeperiov1alpha1.Update) (bool, error) { var ( - upgradeNum = -1 kubeVersionSpec = upInstance.Spec.KubeVersion osVersionSpec = upInstance.Spec.OSVersion ) @@ -136,33 +160,76 @@ func assignUpdated(ctx context.Context, r common.ReadWriterClient, nodeList []co logrus.Warning("os version is required") return false, nil } + if len(nodeList) == 0 { + return false, nil + } for _, node := range nodeList { - var ( - kubeProxyVersion = node.Status.NodeInfo.KubeProxyVersion - kubeletVersion = node.Status.NodeInfo.KubeletVersion - osVersion = node.Status.NodeInfo.OSImage - ) - if len(kubeVersionSpec) > 0 { - //If kube-proxy, kubelet are the same as the version to be upgraded k8s, then k8s is successfully upgraded - if kubeVersionSpec == kubeProxyVersion && kubeVersionSpec == kubeletVersion { - logrus.Infof("successfully upgraded the node: %s", node.Name) - upgradeNum++ - continue + if conditionMet(node, kubeVersionSpec, osVersionSpec) { + node.Labels[constants.LabelUpgrading] = "" + if err := r.Update(ctx, &node); err != nil { + logrus.Errorf("unable to add %s label:%v", node.Name, err) + return false, err } - } else { - if osVersionSpec == osVersion { - continue + if err := waitForUpgradeComplete(node, kubeVersionSpec, osVersionSpec); err != nil { + logrus.Errorf("failed to wait for node upgrade to complete: %v", err) + return false, err } + } else { + return false, nil } - node.Labels[constants.LabelUpgrading] = "" - if err := r.Update(ctx, &node); err != nil { - logrus.Errorf("unable to add %s label:%v", node.Name, err) + } + return true, nil +} + +func conditionMet(node corev1.Node, kubeVersionSpec string, osVersionSpec string) bool { + var ( + kubeProxyVersion = node.Status.NodeInfo.KubeProxyVersion + kubeletVersion = node.Status.NodeInfo.KubeletVersion + osVersion = node.Status.NodeInfo.OSImage + ) + if len(kubeVersionSpec) > 0 { + if kubeVersionSpec == kubeProxyVersion && kubeVersionSpec == kubeletVersion { + return false + } + } else { + if osVersionSpec == osVersion { + return false } } - if len(kubeVersionSpec) == 0 { - return true, nil + return true +} + +func waitForUpgradeComplete(node corev1.Node, kubeVersionSpec string, osVersionSpec string) error { + ctx, cancel := context.WithTimeout(context.Background(), constants.Timeout) + defer cancel() + done := make(chan struct{}) + + go func() { + wait.Until(func() { + if !conditionMet(node, kubeVersionSpec, osVersionSpec) { + close(done) + } + }, 10*time.Second, ctx.Done()) + }() + + select { + case <-done: + logrus.Infof("successful upgrade node: %s", node.Name) + case <-ctx.Done(): + // 上下文超时,跳出循环 + if ctx.Err() == context.DeadlineExceeded { + logrus.Errorf("failed to upgrade node: %s: %v", node.Name, ctx.Err()) + return ctx.Err() + } + } + return nil +} + +func min(a, b int) int { + if a < b { + return a } - return upgradeNum == len(nodeList), nil + return b } // SetupWithManager sets up the controller with the Manager. diff --git a/housekeeper/pkg/constants/constants.go b/housekeeper/pkg/constants/constants.go index 4528bda1120fc244432023e9724c80b205c17606..2a2a73ec7f7fa08b4e40a2e7205e76ebdd886be2 100644 --- a/housekeeper/pkg/constants/constants.go +++ b/housekeeper/pkg/constants/constants.go @@ -15,6 +15,8 @@ limitations under the License. */ package constants +import "time" + const ( // LabelUpgrading is the key of the upgrading label for nodes LabelUpgrading = "upgrade.housekeeper.io/upgrading" @@ -27,3 +29,5 @@ const ( SockDir = "/run/housekeeper-daemon" SockName = "housekeeper-daemon.sock" ) + +const Timeout = 3 * time.Minute