From 39c548885756ae9d4d202ed11cb1d2dee5765be5 Mon Sep 17 00:00:00 2001 From: Atlas_kang Date: Fri, 13 Dec 2024 21:09:48 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E3=80=90=E5=8D=95=E5=8F=B7=20Defect?= =?UTF-8?q?=E3=80=91=20=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4=E6=98=8E=20Mod?= =?UTF-8?q?ification=E3=80=91=E5=AE=B9=E5=99=A8=E5=BF=AB=E7=85=A7Poc?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81pod=20ip=E4=BF=9D=E6=8C=81=EF=BC=8C?= =?UTF-8?q?=E6=B3=A8=E5=85=A5=E5=BF=AB=E7=85=A7=E6=81=A2=E5=A4=8D=E7=8E=AF?= =?UTF-8?q?=E5=A2=83=E5=8F=98=E9=87=8F=20=E3=80=90=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=BA=BA=20Modifier=E3=80=91=20kangfuan=20k00666092=20?= =?UTF-8?q?=E3=80=90=E8=AF=84=E5=AE=A1=E4=BA=BA=20Reviewer=E3=80=91=20liuy?= =?UTF-8?q?unchen=2000405355?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/controllers/v1/annotation.go | 42 +++++++++++++- .../controllers/v1/ascendjob_controller.go | 58 +++++++++++++++++++ .../pkg/controllers/v1/consts.go | 5 ++ 3 files changed, 104 insertions(+), 1 deletion(-) diff --git a/component/ascend-operator/pkg/controllers/v1/annotation.go b/component/ascend-operator/pkg/controllers/v1/annotation.go index ea93e41e9..0e8421fe9 100644 --- a/component/ascend-operator/pkg/controllers/v1/annotation.go +++ b/component/ascend-operator/pkg/controllers/v1/annotation.go @@ -20,7 +20,12 @@ import ( func (r *ASJobReconciler) setPodAnnotation(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, rtype, index string) error { - return r.setHcclRankIndex(job, podTemplate, rtype, index) + if err := r.setHcclRankIndex(job, podTemplate, rtype, index); err != nil { + return err + } + + r.setPodIP(job, podTemplate) + return nil } func (r *ASJobReconciler) setHcclRankIndex(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, rtype, @@ -51,3 +56,38 @@ func (r *ASJobReconciler) setHcclRankIndex(job *mindxdlv1.AscendJob, podTemplate hwlog.RunLog.Debugf("set rank index<%d> to pod<%s>", rank, podTemplate.Name) return nil } + +func (r *ASJobReconciler) setPodIP(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec) { + anno := job.GetAnnotations() + if !enableResilience(anno) { + return + } + podTemplate.Annotations[enableResilienceKey] = trueString + // if job enable resilience, set container env + if useSnapshot, ok := r.snapShotJobs[job.GetUID()]; ok && useSnapshot { + for index, c := range podTemplate.Spec.Containers { + // set env to ascend container + if c.Name == mindxdlv1.DefaultContainerName { + addEnvValue(podTemplate, snapshotRestoreKey, trueString, index) + hwlog.RunLog.Infof("job (%s) with pod (%v) use snapshot=================", podTemplate.UID) + break + } + } + } + + ipPools, ok := r.ipPools[job.GetUID()] + hwlog.RunLog.Infof("ipPools=================%v", ipPools) + if !ok || len(ipPools) == 0 { + return + } + + podTemplate.Annotations[fixedPodIPKey] = "[\"" + ipPools[0] + "\"]" + newPool := make([]string, 0) + for index, val := range ipPools { + if index != 0 { + newPool = append(newPool, val) + } + } + r.ipPools[job.GetUID()] = newPool + hwlog.RunLog.Infof("job (%s) create pod and set ip, the rest ips is =================%v", job.GetUID(), newPool) +} diff --git a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go index 46bb79fdd..009eda758 100644 --- a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go +++ b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go @@ -78,6 +78,8 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *ASJobReconci versions: make(map[types.UID]int32), backoffLimits: make(map[types.UID]int32), rtGenerators: make(map[types.UID]generator.RankTableGenerator), + ipPools: make(map[types.UID][]string), + snapShotJobs: make(map[types.UID]bool), } cfg := mgr.GetConfig() @@ -118,6 +120,8 @@ type ASJobReconciler struct { versions map[types.UID]int32 backoffLimits map[types.UID]int32 rtGenerators map[types.UID]generator.RankTableGenerator + ipPools map[types.UID][]string + snapShotJobs map[types.UID]bool } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -152,6 +156,8 @@ func (r *ASJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl hwlog.RunLog.Infof("reconcile cancelled,job<%s> has been deleted", req.NamespacedName) delete(r.versions, ascendjob.UID) delete(r.backoffLimits, ascendjob.UID) + delete(r.ipPools, ascendjob.UID) + delete(r.snapShotJobs, ascendjob.UID) return ctrl.Result{}, nil } @@ -344,6 +350,8 @@ func (r *ASJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { } r.versions[ascendJob.UID] = defaultPodVersion r.backoffLimits[ascendJob.UID] = unsetBackoffLimits + r.ipPools[ascendJob.UID] = make([]string, 0) + r.SetJobSnapshot(ascendJob) if ascendJob.Spec.RunPolicy.BackoffLimit != nil { r.backoffLimits[ascendJob.UID] = *ascendJob.Spec.RunPolicy.BackoffLimit } else if err = r.setFaultRetryTimesToBackoffLimits(ascendJob); err != nil { @@ -390,6 +398,8 @@ func (r *ASJobReconciler) onOwnerDeleteFunc() func(deleteEvent event.DeleteEvent hwlog.RunLog.Info(msg) delete(r.versions, ascendJob.UID) delete(r.backoffLimits, ascendJob.UID) + delete(r.ipPools, ascendJob.UID) + delete(r.snapShotJobs, ascendJob.UID) return true } } @@ -419,10 +429,58 @@ func (r *ASJobReconciler) onPodDeleteFunc() func(event.DeleteEvent) bool { if ok && int32(versionNumber) == currentVersion { r.versions[controllerRef.UID]++ } + r.SetJobIPPool(controllerRef.UID, getPodIP(e)) return true } } +func enableResilience(annotations map[string]string) bool { + if annotations == nil { + return false + } + hwlog.RunLog.Infof("annotations[enableResilienceKey]====================%v", annotations[enableResilienceKey]) + if val, ok := annotations[enableResilienceKey]; !ok || val != trueString { + return false + } + return true +} + +func getPodIP(e event.DeleteEvent) string { + if !enableResilience(e.Object.GetAnnotations()) { + return "" + } + pod, ok := e.Object.(*v1.Pod) + if !ok { + hwlog.RunLog.Infof("not a pod==================") + return "" + } + hwlog.RunLog.Infof("get delete pod IP: %s==================", pod.Status.PodIP) + return pod.Status.PodIP +} + +func (r *ASJobReconciler) SetJobSnapshot(job *mindxdlv1.AscendJob) { + if !enableResilience(job.GetAnnotations()) { + return + } + r.snapShotJobs[job.GetUID()] = true + hwlog.RunLog.Infof("acjob (%s) set snapshot to true==================", job.GetUID()) +} + +func (r *ASJobReconciler) SetJobIPPool(jobUID types.UID, newIP string) { + if newIP == "" { + return + } + ips := r.ipPools[jobUID] + for _, ip := range ips { + if newIP == ip { + return + } + } + ips = append(ips, newIP) + r.ipPools[jobUID] = ips + hwlog.RunLog.Infof("acjob (%s) add delete pod IP pool: %v==================", jobUID, ips) +} + // ControllerName get controller name func (r *ASJobReconciler) ControllerName() string { return controllerName diff --git a/component/ascend-operator/pkg/controllers/v1/consts.go b/component/ascend-operator/pkg/controllers/v1/consts.go index ac0ca61aa..be8fb07f6 100644 --- a/component/ascend-operator/pkg/controllers/v1/consts.go +++ b/component/ascend-operator/pkg/controllers/v1/consts.go @@ -49,6 +49,11 @@ const ( statusPodIPDownwardAPI = "status.podIP" + enableResilienceKey = "enableResilience" + trueString = "true" + fixedPodIPKey = "cni.projectcalico.org/ipAddrs" + snapshotRestoreKey = "SNAPSHOT_RESTORE" + cmRetryTime = 3 configmapPrefix = "rings-config-" acjobKind = "AscendJob" -- Gitee From 9ffc7c78389317a284b4c34471296ca573b7a43d Mon Sep 17 00:00:00 2001 From: Atlas_kang Date: Sat, 14 Dec 2024 21:12:31 +0800 Subject: [PATCH 2/3] =?UTF-8?q?=E3=80=90=E5=8D=95=E5=8F=B7=20Defect?= =?UTF-8?q?=E3=80=91=20=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4=E6=98=8E=20Mod?= =?UTF-8?q?ification=E3=80=91=E5=AE=B9=E5=99=A8=E5=BF=AB=E7=85=A7Poc?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81pod=20ip=E4=BF=9D=E6=8C=81=EF=BC=8C?= =?UTF-8?q?=E6=B3=A8=E5=85=A5=E5=BF=AB=E7=85=A7=E6=81=A2=E5=A4=8D=E7=8E=AF?= =?UTF-8?q?=E5=A2=83=E5=8F=98=E9=87=8F=20=E3=80=90=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=BA=BA=20Modifier=E3=80=91=20kangfuan=20k00666092=20?= =?UTF-8?q?=E3=80=90=E8=AF=84=E5=AE=A1=E4=BA=BA=20Reviewer=E3=80=91=20liuy?= =?UTF-8?q?unchen=2000405355?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/controllers/v1/annotation.go | 43 +++---- .../controllers/v1/ascendjob_controller.go | 109 ++++++++++++------ .../pkg/controllers/v1/consts.go | 7 +- .../ascend-operator/pkg/controllers/v1/env.go | 21 ++++ .../pkg/controllers/v1/labels.go | 5 + .../ascend-operator/pkg/controllers/v1/pod.go | 71 ++++++++++++ .../pkg/controllers/v1/utils.go | 14 +++ 7 files changed, 205 insertions(+), 65 deletions(-) diff --git a/component/ascend-operator/pkg/controllers/v1/annotation.go b/component/ascend-operator/pkg/controllers/v1/annotation.go index 0e8421fe9..369030eca 100644 --- a/component/ascend-operator/pkg/controllers/v1/annotation.go +++ b/component/ascend-operator/pkg/controllers/v1/annotation.go @@ -24,7 +24,8 @@ func (r *ASJobReconciler) setPodAnnotation(job *mindxdlv1.AscendJob, podTemplate return err } - r.setPodIP(job, podTemplate) + r.setPodIP(job, podTemplate, rtype) + return nil } @@ -57,37 +58,25 @@ func (r *ASJobReconciler) setHcclRankIndex(job *mindxdlv1.AscendJob, podTemplate return nil } -func (r *ASJobReconciler) setPodIP(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec) { - anno := job.GetAnnotations() - if !enableResilience(anno) { +func (r *ASJobReconciler) setPodIP(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, rtype string) { + if !enableResilience(job.GetLabels()) { return } - podTemplate.Annotations[enableResilienceKey] = trueString - // if job enable resilience, set container env - if useSnapshot, ok := r.snapShotJobs[job.GetUID()]; ok && useSnapshot { - for index, c := range podTemplate.Spec.Containers { - // set env to ascend container - if c.Name == mindxdlv1.DefaultContainerName { - addEnvValue(podTemplate, snapshotRestoreKey, trueString, index) - hwlog.RunLog.Infof("job (%s) with pod (%v) use snapshot=================", podTemplate.UID) - break - } - } - } - ipPools, ok := r.ipPools[job.GetUID()] - hwlog.RunLog.Infof("ipPools=================%v", ipPools) - if !ok || len(ipPools) == 0 { + podIPMap, ok := r.podIPPool[job.GetUID()] + hwlog.RunLog.Infof("podIPMap=================%v", podIPMap) + if !ok { + r.podIPPool[job.GetUID()] = make(map[string][]string) return } - - podTemplate.Annotations[fixedPodIPKey] = "[\"" + ipPools[0] + "\"]" - newPool := make([]string, 0) - for index, val := range ipPools { - if index != 0 { - newPool = append(newPool, val) - } + // get pod type ip pool + typeIPs, ok := podIPMap[rtype] + if !ok || len(typeIPs) == 0 { + return } - r.ipPools[job.GetUID()] = newPool + // get first item + ip, newPool := removeFirst(typeIPs) + podTemplate.Annotations[fixedPodIPKey] = "[\"" + ip + "\"]" + r.podIPPool[job.GetUID()][rtype] = newPool hwlog.RunLog.Infof("job (%s) create pod and set ip, the rest ips is =================%v", job.GetUID(), newPool) } diff --git a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go index 009eda758..f45152765 100644 --- a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go +++ b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go @@ -34,7 +34,6 @@ import ( "golang.org/x/time/rate" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -78,8 +77,9 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *ASJobReconci versions: make(map[types.UID]int32), backoffLimits: make(map[types.UID]int32), rtGenerators: make(map[types.UID]generator.RankTableGenerator), - ipPools: make(map[types.UID][]string), + podIPPool: make(map[types.UID]map[string][]string), snapShotJobs: make(map[types.UID]bool), + podUIDPool: make(map[types.UID]map[string][]string), } cfg := mgr.GetConfig() @@ -120,8 +120,9 @@ type ASJobReconciler struct { versions map[types.UID]int32 backoffLimits map[types.UID]int32 rtGenerators map[types.UID]generator.RankTableGenerator - ipPools map[types.UID][]string + podIPPool map[types.UID]map[string][]string snapShotJobs map[types.UID]bool + podUIDPool map[types.UID]map[string][]string // {jobid: {master: [pod_uid], worker:[pod_ip, pod_uid]}} } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -156,8 +157,9 @@ func (r *ASJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl hwlog.RunLog.Infof("reconcile cancelled,job<%s> has been deleted", req.NamespacedName) delete(r.versions, ascendjob.UID) delete(r.backoffLimits, ascendjob.UID) - delete(r.ipPools, ascendjob.UID) + delete(r.podIPPool, ascendjob.UID) delete(r.snapShotJobs, ascendjob.UID) + delete(r.podUIDPool, ascendjob.UID) return ctrl.Result{}, nil } @@ -350,7 +352,8 @@ func (r *ASJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { } r.versions[ascendJob.UID] = defaultPodVersion r.backoffLimits[ascendJob.UID] = unsetBackoffLimits - r.ipPools[ascendJob.UID] = make([]string, 0) + r.podIPPool[ascendJob.UID] = make(map[string][]string) + r.podUIDPool[ascendJob.UID] = make(map[string][]string) r.SetJobSnapshot(ascendJob) if ascendJob.Spec.RunPolicy.BackoffLimit != nil { r.backoffLimits[ascendJob.UID] = *ascendJob.Spec.RunPolicy.BackoffLimit @@ -398,8 +401,9 @@ func (r *ASJobReconciler) onOwnerDeleteFunc() func(deleteEvent event.DeleteEvent hwlog.RunLog.Info(msg) delete(r.versions, ascendJob.UID) delete(r.backoffLimits, ascendJob.UID) - delete(r.ipPools, ascendJob.UID) + delete(r.podIPPool, ascendJob.UID) delete(r.snapShotJobs, ascendJob.UID) + delete(r.podUIDPool, ascendJob.UID) return true } } @@ -429,56 +433,89 @@ func (r *ASJobReconciler) onPodDeleteFunc() func(event.DeleteEvent) bool { if ok && int32(versionNumber) == currentVersion { r.versions[controllerRef.UID]++ } - r.SetJobIPPool(controllerRef.UID, getPodIP(e)) + + pod, ok := isEnableResiliencePod(e) + if !ok || pod == nil { + return true + } + r.setPodIPPool(controllerRef.UID, e, pod) + r.setPodUIDPool(controllerRef.UID, e, pod) return true } } -func enableResilience(annotations map[string]string) bool { - if annotations == nil { - return false +func isEnableResiliencePod(e event.DeleteEvent) (*corev1.Pod, bool) { + pod, ok := e.Object.(*corev1.Pod) + if !ok { + hwlog.RunLog.Infof("not a pod==================") + return nil, false } - hwlog.RunLog.Infof("annotations[enableResilienceKey]====================%v", annotations[enableResilienceKey]) - if val, ok := annotations[enableResilienceKey]; !ok || val != trueString { - return false + if !enableResilience(pod.GetLabels()) { + return nil, false } - return true + + return pod, true } -func getPodIP(e event.DeleteEvent) string { - if !enableResilience(e.Object.GetAnnotations()) { - return "" +func enableResilience(labels map[string]string) bool { + if labels == nil { + return false } - pod, ok := e.Object.(*v1.Pod) - if !ok { - hwlog.RunLog.Infof("not a pod==================") - return "" + hwlog.RunLog.Infof("labels[enableResilienceKey]====================%v", labels[enableResilienceKey]) + if val, ok := labels[enableResilienceKey]; !ok || val != trueString { + return false } - hwlog.RunLog.Infof("get delete pod IP: %s==================", pod.Status.PodIP) - return pod.Status.PodIP + return true } func (r *ASJobReconciler) SetJobSnapshot(job *mindxdlv1.AscendJob) { - if !enableResilience(job.GetAnnotations()) { + if !enableResilience(job.GetLabels()) { return } r.snapShotJobs[job.GetUID()] = true hwlog.RunLog.Infof("acjob (%s) set snapshot to true==================", job.GetUID()) } -func (r *ASJobReconciler) SetJobIPPool(jobUID types.UID, newIP string) { - if newIP == "" { - return +// Collect the pod IP address for creating pods of this type of job. +func (r *ASJobReconciler) setPodIPPool(jobUID types.UID, e event.DeleteEvent, pod *corev1.Pod) { + podIPMap, ok := r.podIPPool[jobUID] + if !ok { + podIPMap = make(map[string][]string) + r.podIPPool[jobUID] = podIPMap } - ips := r.ipPools[jobUID] - for _, ip := range ips { - if newIP == ip { - return + hwlog.RunLog.Infof("get delete pod IP: %s==================", pod.Status.PodIP) + // Obtains the list of saved pod IP addresses based on the pod type. + repType := e.Object.GetLabels()[commonv1.ReplicaTypeLabel] + typeIPs := addDeletePodInfo(podIPMap, pod.Status.PodIP, repType) + podIPMap[repType] = typeIPs + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) add delete pod IP pool: %v==================", jobUID, repType, + r.podIPPool[jobUID][repType]) +} + +func addDeletePodInfo(infoMap map[string][]string, newItem, repType string) []string { + typeArray := infoMap[repType] + for _, item := range typeArray { + if newItem == item { + return typeArray } } - ips = append(ips, newIP) - r.ipPools[jobUID] = ips - hwlog.RunLog.Infof("acjob (%s) add delete pod IP pool: %v==================", jobUID, ips) + typeArray = append(typeArray, newItem) + return typeArray +} + +func (r *ASJobReconciler) setPodUIDPool(jobUID types.UID, e event.DeleteEvent, pod *corev1.Pod) { + podUIDMap, ok := r.podUIDPool[jobUID] + if !ok { + podUIDMap = make(map[string][]string) + r.podUIDPool[jobUID] = podUIDMap + } + hwlog.RunLog.Infof("get delete pod uid: %s==================", pod.GetObjectMeta().GetUID()) + // Obtains the list of saved pod uid based on the pod type. + repType := e.Object.GetLabels()[commonv1.ReplicaTypeLabel] + typeUIDs := addDeletePodInfo(podUIDMap, string(pod.GetObjectMeta().GetUID()), repType) + podUIDMap[repType] = typeUIDs + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) add delete pod uid pool: %v==================", jobUID, repType, + r.podUIDPool[jobUID][repType]) } // ControllerName get controller name @@ -534,12 +571,12 @@ func (r *ASJobReconciler) DeleteJob(job interface{}) error { } if err := r.Delete(context.Background(), ascendjob); err != nil { - r.recorder.Eventf(ascendjob, v1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err) + r.recorder.Eventf(ascendjob, corev1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err) hwlog.RunLog.Errorf("failed to delete job<%s-%s>, err: %s", ascendjob.Namespace, ascendjob.Name, err) return err } - r.recorder.Eventf(ascendjob, v1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", ascendjob.Name) + r.recorder.Eventf(ascendjob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", ascendjob.Name) hwlog.RunLog.Infof("job<%s-%s> has been deleted", ascendjob.Namespace, ascendjob.Name) return nil } diff --git a/component/ascend-operator/pkg/controllers/v1/consts.go b/component/ascend-operator/pkg/controllers/v1/consts.go index be8fb07f6..ca450340a 100644 --- a/component/ascend-operator/pkg/controllers/v1/consts.go +++ b/component/ascend-operator/pkg/controllers/v1/consts.go @@ -47,12 +47,14 @@ const ( gangSchedulingPodGroupAnnotation = "scheduling.k8s.io/group-name" npuCoreName = "huawei.com/npu-core" - statusPodIPDownwardAPI = "status.podIP" + statusPodIPDownwardAPI = "status.podIP" + metadataPodUIDownwardAPI = "metadata.uid" enableResilienceKey = "enableResilience" trueString = "true" fixedPodIPKey = "cni.projectcalico.org/ipAddrs" - snapshotRestoreKey = "SNAPSHOT_RESTORE" + podUIDKey = "POD_UID" + snapshotRestoreID = "SNAPSHOT_RESTORE_ID" cmRetryTime = 3 configmapPrefix = "rings-config-" @@ -140,4 +142,5 @@ const ( workQueueMaxDelay = 20 * time.Second workQueueQps = 10 workQueueBurst = 100 + noMatchedContainer = -1 ) diff --git a/component/ascend-operator/pkg/controllers/v1/env.go b/component/ascend-operator/pkg/controllers/v1/env.go index d9b4729bb..74433cb6a 100644 --- a/component/ascend-operator/pkg/controllers/v1/env.go +++ b/component/ascend-operator/pkg/controllers/v1/env.go @@ -14,6 +14,7 @@ import ( "strings" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" + corev1 "k8s.io/api/core/v1" "ascend-common/api" @@ -110,6 +111,17 @@ func (r *ASJobReconciler) setAscendVisibleDevicesEnv(container *corev1.Container } } +func addEnvFieldRef(pod *corev1.PodTemplateSpec, envKey, fieldRef string, index int) { + pod.Spec.Containers[index].Env = append(pod.Spec.Containers[index].Env, corev1.EnvVar{ + Name: envKey, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fieldRef, + }, + }, + }) +} + func (r *ASJobReconciler) setMindSporeEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpec) { msRoleMap := map[commonv1.ReplicaType]string{ mindxdlv1.MindSporeReplicaTypeScheduler: msSchedulerRole, @@ -136,10 +148,13 @@ func (r *ASJobReconciler) setMindSporeEnv(pi *podInfo, podTemplate *corev1.PodTe addEnvValue(podTemplate, api.MsLocalWorkerEnv, strconv.Itoa(pi.ctReq), i) addEnvValue(podTemplate, api.MsWorkerNumEnv, strconv.Itoa(pi.ctReq*pi.npuReplicas), i) } + addEnvValue(podTemplate, taskIDEnvKey, string(pi.job.UID), i) + addEnvValue(podTemplate, mindxServerIPEnv, pi.clusterdSvcIp, i) addEnvValue(podTemplate, msNodeRank, strconv.Itoa(pi.rank), i) addEnvValue(podTemplate, msSchedPort, pi.port, i) addEnvValue(podTemplate, msServerNum, "0", i) addEnvValue(podTemplate, msRole, msRoleMap[pi.rtype], i) + addEnvValue(podTemplate, hostNetwork, strconv.FormatBool(pi.spec.Template.Spec.HostNetwork), i) addEnvValue(podTemplate, npuPod, strconv.FormatBool(checkNpuPod(pi)), i) hwlog.RunLog.Debugf(logEnvPattern, podTemplate.Name, podTemplate.Spec.Containers[i].Env) @@ -162,6 +177,9 @@ func (r *ASJobReconciler) setPytorchEnv(pi *podInfo, podTemplate *corev1.PodTemp addEnvValue(podTemplate, ptMasterAddr, pi.ip, i) addEnvValue(podTemplate, ptMasterPort, pi.port, i) addEnvValue(podTemplate, ptRank, strconv.Itoa(pi.rank), i) + addEnvValue(podTemplate, taskIDEnvKey, string(pi.job.UID), i) + addEnvValue(podTemplate, mindxServerIPEnv, pi.clusterdSvcIp, i) + addEnvValue(podTemplate, hostNetwork, strconv.FormatBool(pi.spec.Template.Spec.HostNetwork), i) hwlog.RunLog.Debugf(logEnvPattern, podTemplate.Name, podTemplate.Spec.Containers[i].Env) } } @@ -191,7 +209,10 @@ func (r *ASJobReconciler) setTensorflowEnv(pi *podInfo, podTemplate *corev1.PodT } addEnvValue(podTemplate, tfChiefPort, pi.port, i) addEnvValue(podTemplate, tfRank, strconv.Itoa(pi.rank), i) + addEnvValue(podTemplate, taskIDEnvKey, string(pi.job.UID), i) + addEnvValue(podTemplate, mindxServerIPEnv, pi.clusterdSvcIp, i) addEnvValue(podTemplate, tfChiefDevice, "0", i) + addEnvValue(podTemplate, hostNetwork, strconv.FormatBool(pi.spec.Template.Spec.HostNetwork), i) podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, corev1.EnvVar{ Name: tfWorkerIP, ValueFrom: &corev1.EnvVarSource{ diff --git a/component/ascend-operator/pkg/controllers/v1/labels.go b/component/ascend-operator/pkg/controllers/v1/labels.go index 2a7eaf5c5..e605ea8b5 100644 --- a/component/ascend-operator/pkg/controllers/v1/labels.go +++ b/component/ascend-operator/pkg/controllers/v1/labels.go @@ -42,4 +42,9 @@ func (r *ASJobReconciler) setPodLabels(job *mindxdlv1.AscendJob, podTemplate *co for key, value := range labelsMap { podTemplate.Labels[key] = value } + + if enableResilience(job.GetLabels()) { + // check whether the pod IP address and pod UID need to be saved when the pod is deleted. + podTemplate.Labels[enableResilienceKey] = trueString + } } diff --git a/component/ascend-operator/pkg/controllers/v1/pod.go b/component/ascend-operator/pkg/controllers/v1/pod.go index 3c745d35b..56e67719a 100644 --- a/component/ascend-operator/pkg/controllers/v1/pod.go +++ b/component/ascend-operator/pkg/controllers/v1/pod.go @@ -464,6 +464,11 @@ func (r *ASJobReconciler) createPodSpec(pi *podInfo, } func (r *ASJobReconciler) setEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpec) error { + index := getAscendContainerIndex(podTemplate) + hwlog.RunLog.Infof("============================index:%v", index) + if index == noMatchedContainer { + return nil + } if mindxdlutils.IsMindIEEPJob(pi.job) { hwlog.RunLog.Debugf("Set mindIEEP AscendJob<%s-%s> env", pi.job.Namespace, pi.job.Name) r.setInferEnv(pi, podTemplate) @@ -473,6 +478,9 @@ func (r *ASJobReconciler) setEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpe pi.rtype == mindxdlv1.ReplicaTypeWorker { return nil } + + setAscendPodCommonEnv(podTemplate, index) + r.setSnapShotEnv(pi, podTemplate, index) hwlog.RunLog.Debugf("Set AscendJob<%s-%s> framework<%s> env start", pi.job.Namespace, pi.job.Name, pi.frame) r.setCommonEnv(pi, podTemplate) if pi.ctReq == 0 { @@ -492,6 +500,69 @@ func (r *ASJobReconciler) setEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpe return nil } +func setAscendPodCommonEnv(podTemplate *corev1.PodTemplateSpec, index int) { + if len(podTemplate.Spec.Containers[index].Env) == 0 { + podTemplate.Spec.Containers[index].Env = make([]corev1.EnvVar, 0) + } +} + +func getAscendContainerIndex(podTemplate *corev1.PodTemplateSpec) int { + for i, container := range podTemplate.Spec.Containers { + if container.Name == mindxdlv1.DefaultContainerName { + return i + } + } + return noMatchedContainer +} + +func (r *ASJobReconciler) setSnapShotEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpec, index int) { + // if job enable resilience, set container snapshot restore uid + if useSnapshot, ok := r.snapShotJobs[pi.job.GetUID()]; !ok || !useSnapshot { + return + } + hwlog.RunLog.Infof("============================addEnvFieldRef") + addEnvFieldRef(podTemplate, podUIDKey, metadataPodUIDownwardAPI, index) + // set pod snapshot restore id + r.setSnapShotRestoreID(pi, podTemplate, index) +} + +func (r *ASJobReconciler) setSnapShotRestoreID(pi *podInfo, podTemplate *corev1.PodTemplateSpec, index int) { + podUID := "" + repType := strings.ToLower(string(pi.rtype)) + jobUID := pi.job.GetUID() + + defer func() { + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) set pod snapshot restore id: %v==================", jobUID, + repType, podUID) + + if podUID != "" { + // set pod snapshot restore id by pod related type + addEnvValue(podTemplate, snapshotRestoreID, podUID, index) + } else { + // the pod first start use itself uid + addEnvFieldRef(podTemplate, snapshotRestoreID, metadataPodUIDownwardAPI, index) + } + }() + + jobUIDMap, ok := r.podUIDPool[jobUID] + if !ok { + r.podUIDPool[jobUID] = make(map[string][]string) + return + } + typeUIDs, ok := jobUIDMap[repType] + if !ok || len(typeUIDs) == 0 { + return + } + // when pod deleting, we will append the pod ip and pod uid to different array tail in the same times + // Therefore, when we use the pod ip and pod uid to set a new pod, + // the pod ip and pod uid obtained from different arrays are in one-to-one correspondence, + // because the pods information is in sequence during the append operation. + podUID, newUID := removeFirst(typeUIDs) + jobUIDMap[repType] = newUID + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) set pod snapshot uid new pool: %v==================", jobUID, + repType, r.podUIDPool[jobUID][repType]) +} + func (r *ASJobReconciler) setGangScheduleInfo(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rt string) { jobSchedulerName := job.Spec.SchedulerName diff --git a/component/ascend-operator/pkg/controllers/v1/utils.go b/component/ascend-operator/pkg/controllers/v1/utils.go index 50401b862..46b578940 100644 --- a/component/ascend-operator/pkg/controllers/v1/utils.go +++ b/component/ascend-operator/pkg/controllers/v1/utils.go @@ -291,3 +291,17 @@ func getJobRequiredNpu(job *mindxdlv1.AscendJob) int { } return requiredNpu } + +func removeFirst(list []string) (string, []string) { + newList := make([]string, 0) + first := "" + for index, val := range list { + if index == 0 { + first = val + continue + } + newList = append(newList, val) + } + + return first, newList +} -- Gitee From 9012fca72a8c518ba0f445f8995af18d5ee6f980 Mon Sep 17 00:00:00 2001 From: Atlas_kang Date: Mon, 16 Dec 2024 22:17:42 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E3=80=90=E5=8D=95=E5=8F=B7=20Defect?= =?UTF-8?q?=E3=80=91=20=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4=E6=98=8E=20Mod?= =?UTF-8?q?ification=E3=80=91=E5=AE=B9=E5=99=A8=E5=BF=AB=E7=85=A7Poc?= =?UTF-8?q?=EF=BC=8C=E6=B3=A8=E5=85=A5=E5=BF=AB=E7=85=A7=E6=81=A2=E5=A4=8D?= =?UTF-8?q?=E7=8E=AF=E5=A2=83=E5=8F=98=E9=87=8F=E6=A0=BC=E5=BC=8F=E4=B8=BA?= =?UTF-8?q?jobuid=5Fpodname=5Fcontainername=20=E3=80=90=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E4=BA=BA=20Modifier=E3=80=91=20kangfuan=20k00666092=20?= =?UTF-8?q?=E3=80=90=E8=AF=84=E5=AE=A1=E4=BA=BA=20Reviewer=E3=80=91=20liuy?= =?UTF-8?q?unchen=2000405355?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../apis/batch/v1/constants.go | 2 + .../controllers/v1/ascendjob_controller.go | 21 ---------- .../pkg/controllers/v1/consts.go | 4 +- .../ascend-operator/pkg/controllers/v1/pod.go | 42 +++---------------- 4 files changed, 9 insertions(+), 60 deletions(-) diff --git a/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go b/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go index 5e8097ca5..7e1fc582f 100644 --- a/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go +++ b/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go @@ -35,6 +35,8 @@ const ( Singular = "ascendjob" // DefaultContainerName the default container name for AscendJob. + // The modification of the name affects the path for storing the container snapshot. + // If the modification is required, do not add the random number such as uuid. DefaultContainerName = "ascend" // DefaultPortName is name of the port used to communicate between other process. DefaultPortName = "ascendjob-port" diff --git a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go index f45152765..597f76539 100644 --- a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go +++ b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go @@ -79,7 +79,6 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *ASJobReconci rtGenerators: make(map[types.UID]generator.RankTableGenerator), podIPPool: make(map[types.UID]map[string][]string), snapShotJobs: make(map[types.UID]bool), - podUIDPool: make(map[types.UID]map[string][]string), } cfg := mgr.GetConfig() @@ -122,7 +121,6 @@ type ASJobReconciler struct { rtGenerators map[types.UID]generator.RankTableGenerator podIPPool map[types.UID]map[string][]string snapShotJobs map[types.UID]bool - podUIDPool map[types.UID]map[string][]string // {jobid: {master: [pod_uid], worker:[pod_ip, pod_uid]}} } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -159,7 +157,6 @@ func (r *ASJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl delete(r.backoffLimits, ascendjob.UID) delete(r.podIPPool, ascendjob.UID) delete(r.snapShotJobs, ascendjob.UID) - delete(r.podUIDPool, ascendjob.UID) return ctrl.Result{}, nil } @@ -353,7 +350,6 @@ func (r *ASJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { r.versions[ascendJob.UID] = defaultPodVersion r.backoffLimits[ascendJob.UID] = unsetBackoffLimits r.podIPPool[ascendJob.UID] = make(map[string][]string) - r.podUIDPool[ascendJob.UID] = make(map[string][]string) r.SetJobSnapshot(ascendJob) if ascendJob.Spec.RunPolicy.BackoffLimit != nil { r.backoffLimits[ascendJob.UID] = *ascendJob.Spec.RunPolicy.BackoffLimit @@ -403,7 +399,6 @@ func (r *ASJobReconciler) onOwnerDeleteFunc() func(deleteEvent event.DeleteEvent delete(r.backoffLimits, ascendJob.UID) delete(r.podIPPool, ascendJob.UID) delete(r.snapShotJobs, ascendJob.UID) - delete(r.podUIDPool, ascendJob.UID) return true } } @@ -439,7 +434,6 @@ func (r *ASJobReconciler) onPodDeleteFunc() func(event.DeleteEvent) bool { return true } r.setPodIPPool(controllerRef.UID, e, pod) - r.setPodUIDPool(controllerRef.UID, e, pod) return true } } @@ -503,21 +497,6 @@ func addDeletePodInfo(infoMap map[string][]string, newItem, repType string) []st return typeArray } -func (r *ASJobReconciler) setPodUIDPool(jobUID types.UID, e event.DeleteEvent, pod *corev1.Pod) { - podUIDMap, ok := r.podUIDPool[jobUID] - if !ok { - podUIDMap = make(map[string][]string) - r.podUIDPool[jobUID] = podUIDMap - } - hwlog.RunLog.Infof("get delete pod uid: %s==================", pod.GetObjectMeta().GetUID()) - // Obtains the list of saved pod uid based on the pod type. - repType := e.Object.GetLabels()[commonv1.ReplicaTypeLabel] - typeUIDs := addDeletePodInfo(podUIDMap, string(pod.GetObjectMeta().GetUID()), repType) - podUIDMap[repType] = typeUIDs - hwlog.RunLog.Infof("acjob (%s) replicaType(%s) add delete pod uid pool: %v==================", jobUID, repType, - r.podUIDPool[jobUID][repType]) -} - // ControllerName get controller name func (r *ASJobReconciler) ControllerName() string { return controllerName diff --git a/component/ascend-operator/pkg/controllers/v1/consts.go b/component/ascend-operator/pkg/controllers/v1/consts.go index ca450340a..e39ccc793 100644 --- a/component/ascend-operator/pkg/controllers/v1/consts.go +++ b/component/ascend-operator/pkg/controllers/v1/consts.go @@ -47,13 +47,11 @@ const ( gangSchedulingPodGroupAnnotation = "scheduling.k8s.io/group-name" npuCoreName = "huawei.com/npu-core" - statusPodIPDownwardAPI = "status.podIP" - metadataPodUIDownwardAPI = "metadata.uid" + statusPodIPDownwardAPI = "status.podIP" enableResilienceKey = "enableResilience" trueString = "true" fixedPodIPKey = "cni.projectcalico.org/ipAddrs" - podUIDKey = "POD_UID" snapshotRestoreID = "SNAPSHOT_RESTORE_ID" cmRetryTime = 3 diff --git a/component/ascend-operator/pkg/controllers/v1/pod.go b/component/ascend-operator/pkg/controllers/v1/pod.go index 56e67719a..8c271b0d3 100644 --- a/component/ascend-operator/pkg/controllers/v1/pod.go +++ b/component/ascend-operator/pkg/controllers/v1/pod.go @@ -439,6 +439,8 @@ func (r *ASJobReconciler) createPodSpec(pi *podInfo, pi.rank = pi.index } // Set name for the template. + // The modification of the name affects the path for storing the container snapshot. + // If the modification is required, do not add the random number such as uuid. podTemplate.Name = common.GenGeneralName(job.Name, strings.ToLower(string(pi.rtype)), indexStr) err := r.setEnv(pi, podTemplate) @@ -520,47 +522,15 @@ func (r *ASJobReconciler) setSnapShotEnv(pi *podInfo, podTemplate *corev1.PodTem if useSnapshot, ok := r.snapShotJobs[pi.job.GetUID()]; !ok || !useSnapshot { return } - hwlog.RunLog.Infof("============================addEnvFieldRef") - addEnvFieldRef(podTemplate, podUIDKey, metadataPodUIDownwardAPI, index) // set pod snapshot restore id r.setSnapShotRestoreID(pi, podTemplate, index) } func (r *ASJobReconciler) setSnapShotRestoreID(pi *podInfo, podTemplate *corev1.PodTemplateSpec, index int) { - podUID := "" - repType := strings.ToLower(string(pi.rtype)) - jobUID := pi.job.GetUID() - - defer func() { - hwlog.RunLog.Infof("acjob (%s) replicaType(%s) set pod snapshot restore id: %v==================", jobUID, - repType, podUID) - - if podUID != "" { - // set pod snapshot restore id by pod related type - addEnvValue(podTemplate, snapshotRestoreID, podUID, index) - } else { - // the pod first start use itself uid - addEnvFieldRef(podTemplate, snapshotRestoreID, metadataPodUIDownwardAPI, index) - } - }() - - jobUIDMap, ok := r.podUIDPool[jobUID] - if !ok { - r.podUIDPool[jobUID] = make(map[string][]string) - return - } - typeUIDs, ok := jobUIDMap[repType] - if !ok || len(typeUIDs) == 0 { - return - } - // when pod deleting, we will append the pod ip and pod uid to different array tail in the same times - // Therefore, when we use the pod ip and pod uid to set a new pod, - // the pod ip and pod uid obtained from different arrays are in one-to-one correspondence, - // because the pods information is in sequence during the append operation. - podUID, newUID := removeFirst(typeUIDs) - jobUIDMap[repType] = newUID - hwlog.RunLog.Infof("acjob (%s) replicaType(%s) set pod snapshot uid new pool: %v==================", jobUID, - repType, r.podUIDPool[jobUID][repType]) + restoreID := string(pi.job.GetUID()) + "_" + podTemplate.Name + "_" + mindxdlv1.DefaultContainerName + addEnvValue(podTemplate, snapshotRestoreID, restoreID, index) + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) set pod snapshot restore id: (%v)==================", + pi.job.GetUID(), pi.rtype, restoreID) } func (r *ASJobReconciler) setGangScheduleInfo(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, -- Gitee