diff --git a/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go b/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go index 5e8097ca5286c5cea26a0823009badc7fb7c4410..7e1fc582fd451fea2f2a75ac589f9fada43c4d4e 100644 --- a/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go +++ b/component/ascend-common/api/ascend-operator/apis/batch/v1/constants.go @@ -35,6 +35,8 @@ const ( Singular = "ascendjob" // DefaultContainerName the default container name for AscendJob. + // The modification of the name affects the path for storing the container snapshot. + // If the modification is required, do not add the random number such as uuid. DefaultContainerName = "ascend" // DefaultPortName is name of the port used to communicate between other process. DefaultPortName = "ascendjob-port" diff --git a/component/ascend-operator/pkg/controllers/v1/annotation.go b/component/ascend-operator/pkg/controllers/v1/annotation.go index ea93e41e9b4bd93e71c5d9bd4d45d4e2a02f48de..369030eca41498ca2470f3ff05241b339e5ecbf3 100644 --- a/component/ascend-operator/pkg/controllers/v1/annotation.go +++ b/component/ascend-operator/pkg/controllers/v1/annotation.go @@ -20,7 +20,13 @@ import ( func (r *ASJobReconciler) setPodAnnotation(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, rtype, index string) error { - return r.setHcclRankIndex(job, podTemplate, rtype, index) + if err := r.setHcclRankIndex(job, podTemplate, rtype, index); err != nil { + return err + } + + r.setPodIP(job, podTemplate, rtype) + + return nil } func (r *ASJobReconciler) setHcclRankIndex(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, rtype, @@ -51,3 +57,26 @@ func (r *ASJobReconciler) setHcclRankIndex(job *mindxdlv1.AscendJob, podTemplate hwlog.RunLog.Debugf("set rank index<%d> to pod<%s>", rank, podTemplate.Name) return nil } + +func (r *ASJobReconciler) setPodIP(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, rtype string) { + if !enableResilience(job.GetLabels()) { + return + } + + podIPMap, ok := r.podIPPool[job.GetUID()] + hwlog.RunLog.Infof("podIPMap=================%v", podIPMap) + if !ok { + r.podIPPool[job.GetUID()] = make(map[string][]string) + return + } + // get pod type ip pool + typeIPs, ok := podIPMap[rtype] + if !ok || len(typeIPs) == 0 { + return + } + // get first item + ip, newPool := removeFirst(typeIPs) + podTemplate.Annotations[fixedPodIPKey] = "[\"" + ip + "\"]" + r.podIPPool[job.GetUID()][rtype] = newPool + hwlog.RunLog.Infof("job (%s) create pod and set ip, the rest ips is =================%v", job.GetUID(), newPool) +} diff --git a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go index 46bb79fdde12acd0908621fdff6a225836379263..597f7653913eeb85d8f88a7a327602295e4f57d1 100644 --- a/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go +++ b/component/ascend-operator/pkg/controllers/v1/ascendjob_controller.go @@ -34,7 +34,6 @@ import ( "golang.org/x/time/rate" appv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" k8serr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -78,6 +77,8 @@ func NewReconciler(mgr manager.Manager, enableGangScheduling bool) *ASJobReconci versions: make(map[types.UID]int32), backoffLimits: make(map[types.UID]int32), rtGenerators: make(map[types.UID]generator.RankTableGenerator), + podIPPool: make(map[types.UID]map[string][]string), + snapShotJobs: make(map[types.UID]bool), } cfg := mgr.GetConfig() @@ -118,6 +119,8 @@ type ASJobReconciler struct { versions map[types.UID]int32 backoffLimits map[types.UID]int32 rtGenerators map[types.UID]generator.RankTableGenerator + podIPPool map[types.UID]map[string][]string + snapShotJobs map[types.UID]bool } // Reconcile is part of the main kubernetes reconciliation loop which aims to @@ -152,6 +155,8 @@ func (r *ASJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl hwlog.RunLog.Infof("reconcile cancelled,job<%s> has been deleted", req.NamespacedName) delete(r.versions, ascendjob.UID) delete(r.backoffLimits, ascendjob.UID) + delete(r.podIPPool, ascendjob.UID) + delete(r.snapShotJobs, ascendjob.UID) return ctrl.Result{}, nil } @@ -344,6 +349,8 @@ func (r *ASJobReconciler) onOwnerCreateFunc() func(event.CreateEvent) bool { } r.versions[ascendJob.UID] = defaultPodVersion r.backoffLimits[ascendJob.UID] = unsetBackoffLimits + r.podIPPool[ascendJob.UID] = make(map[string][]string) + r.SetJobSnapshot(ascendJob) if ascendJob.Spec.RunPolicy.BackoffLimit != nil { r.backoffLimits[ascendJob.UID] = *ascendJob.Spec.RunPolicy.BackoffLimit } else if err = r.setFaultRetryTimesToBackoffLimits(ascendJob); err != nil { @@ -390,6 +397,8 @@ func (r *ASJobReconciler) onOwnerDeleteFunc() func(deleteEvent event.DeleteEvent hwlog.RunLog.Info(msg) delete(r.versions, ascendJob.UID) delete(r.backoffLimits, ascendJob.UID) + delete(r.podIPPool, ascendJob.UID) + delete(r.snapShotJobs, ascendJob.UID) return true } } @@ -419,10 +428,75 @@ func (r *ASJobReconciler) onPodDeleteFunc() func(event.DeleteEvent) bool { if ok && int32(versionNumber) == currentVersion { r.versions[controllerRef.UID]++ } + + pod, ok := isEnableResiliencePod(e) + if !ok || pod == nil { + return true + } + r.setPodIPPool(controllerRef.UID, e, pod) return true } } +func isEnableResiliencePod(e event.DeleteEvent) (*corev1.Pod, bool) { + pod, ok := e.Object.(*corev1.Pod) + if !ok { + hwlog.RunLog.Infof("not a pod==================") + return nil, false + } + if !enableResilience(pod.GetLabels()) { + return nil, false + } + + return pod, true +} + +func enableResilience(labels map[string]string) bool { + if labels == nil { + return false + } + hwlog.RunLog.Infof("labels[enableResilienceKey]====================%v", labels[enableResilienceKey]) + if val, ok := labels[enableResilienceKey]; !ok || val != trueString { + return false + } + return true +} + +func (r *ASJobReconciler) SetJobSnapshot(job *mindxdlv1.AscendJob) { + if !enableResilience(job.GetLabels()) { + return + } + r.snapShotJobs[job.GetUID()] = true + hwlog.RunLog.Infof("acjob (%s) set snapshot to true==================", job.GetUID()) +} + +// Collect the pod IP address for creating pods of this type of job. +func (r *ASJobReconciler) setPodIPPool(jobUID types.UID, e event.DeleteEvent, pod *corev1.Pod) { + podIPMap, ok := r.podIPPool[jobUID] + if !ok { + podIPMap = make(map[string][]string) + r.podIPPool[jobUID] = podIPMap + } + hwlog.RunLog.Infof("get delete pod IP: %s==================", pod.Status.PodIP) + // Obtains the list of saved pod IP addresses based on the pod type. + repType := e.Object.GetLabels()[commonv1.ReplicaTypeLabel] + typeIPs := addDeletePodInfo(podIPMap, pod.Status.PodIP, repType) + podIPMap[repType] = typeIPs + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) add delete pod IP pool: %v==================", jobUID, repType, + r.podIPPool[jobUID][repType]) +} + +func addDeletePodInfo(infoMap map[string][]string, newItem, repType string) []string { + typeArray := infoMap[repType] + for _, item := range typeArray { + if newItem == item { + return typeArray + } + } + typeArray = append(typeArray, newItem) + return typeArray +} + // ControllerName get controller name func (r *ASJobReconciler) ControllerName() string { return controllerName @@ -476,12 +550,12 @@ func (r *ASJobReconciler) DeleteJob(job interface{}) error { } if err := r.Delete(context.Background(), ascendjob); err != nil { - r.recorder.Eventf(ascendjob, v1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err) + r.recorder.Eventf(ascendjob, corev1.EventTypeWarning, FailedDeleteJobReason, "Error deleting: %v", err) hwlog.RunLog.Errorf("failed to delete job<%s-%s>, err: %s", ascendjob.Namespace, ascendjob.Name, err) return err } - r.recorder.Eventf(ascendjob, v1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", ascendjob.Name) + r.recorder.Eventf(ascendjob, corev1.EventTypeNormal, SuccessfulDeleteJobReason, "Deleted job: %v", ascendjob.Name) hwlog.RunLog.Infof("job<%s-%s> has been deleted", ascendjob.Namespace, ascendjob.Name) return nil } diff --git a/component/ascend-operator/pkg/controllers/v1/consts.go b/component/ascend-operator/pkg/controllers/v1/consts.go index ac0ca61aaa3974853088ae3666a70e80e79c9a46..e39ccc793da27c39e948e7a9c01c75c9fe349046 100644 --- a/component/ascend-operator/pkg/controllers/v1/consts.go +++ b/component/ascend-operator/pkg/controllers/v1/consts.go @@ -49,6 +49,11 @@ const ( statusPodIPDownwardAPI = "status.podIP" + enableResilienceKey = "enableResilience" + trueString = "true" + fixedPodIPKey = "cni.projectcalico.org/ipAddrs" + snapshotRestoreID = "SNAPSHOT_RESTORE_ID" + cmRetryTime = 3 configmapPrefix = "rings-config-" acjobKind = "AscendJob" @@ -135,4 +140,5 @@ const ( workQueueMaxDelay = 20 * time.Second workQueueQps = 10 workQueueBurst = 100 + noMatchedContainer = -1 ) diff --git a/component/ascend-operator/pkg/controllers/v1/env.go b/component/ascend-operator/pkg/controllers/v1/env.go index d9b4729bbd5adf0042dabcf358879df7813ef95f..74433cb6a67379c693444ddadc741391ff32b0c0 100644 --- a/component/ascend-operator/pkg/controllers/v1/env.go +++ b/component/ascend-operator/pkg/controllers/v1/env.go @@ -14,6 +14,7 @@ import ( "strings" commonv1 "github.com/kubeflow/common/pkg/apis/common/v1" + corev1 "k8s.io/api/core/v1" "ascend-common/api" @@ -110,6 +111,17 @@ func (r *ASJobReconciler) setAscendVisibleDevicesEnv(container *corev1.Container } } +func addEnvFieldRef(pod *corev1.PodTemplateSpec, envKey, fieldRef string, index int) { + pod.Spec.Containers[index].Env = append(pod.Spec.Containers[index].Env, corev1.EnvVar{ + Name: envKey, + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{ + FieldPath: fieldRef, + }, + }, + }) +} + func (r *ASJobReconciler) setMindSporeEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpec) { msRoleMap := map[commonv1.ReplicaType]string{ mindxdlv1.MindSporeReplicaTypeScheduler: msSchedulerRole, @@ -136,10 +148,13 @@ func (r *ASJobReconciler) setMindSporeEnv(pi *podInfo, podTemplate *corev1.PodTe addEnvValue(podTemplate, api.MsLocalWorkerEnv, strconv.Itoa(pi.ctReq), i) addEnvValue(podTemplate, api.MsWorkerNumEnv, strconv.Itoa(pi.ctReq*pi.npuReplicas), i) } + addEnvValue(podTemplate, taskIDEnvKey, string(pi.job.UID), i) + addEnvValue(podTemplate, mindxServerIPEnv, pi.clusterdSvcIp, i) addEnvValue(podTemplate, msNodeRank, strconv.Itoa(pi.rank), i) addEnvValue(podTemplate, msSchedPort, pi.port, i) addEnvValue(podTemplate, msServerNum, "0", i) addEnvValue(podTemplate, msRole, msRoleMap[pi.rtype], i) + addEnvValue(podTemplate, hostNetwork, strconv.FormatBool(pi.spec.Template.Spec.HostNetwork), i) addEnvValue(podTemplate, npuPod, strconv.FormatBool(checkNpuPod(pi)), i) hwlog.RunLog.Debugf(logEnvPattern, podTemplate.Name, podTemplate.Spec.Containers[i].Env) @@ -162,6 +177,9 @@ func (r *ASJobReconciler) setPytorchEnv(pi *podInfo, podTemplate *corev1.PodTemp addEnvValue(podTemplate, ptMasterAddr, pi.ip, i) addEnvValue(podTemplate, ptMasterPort, pi.port, i) addEnvValue(podTemplate, ptRank, strconv.Itoa(pi.rank), i) + addEnvValue(podTemplate, taskIDEnvKey, string(pi.job.UID), i) + addEnvValue(podTemplate, mindxServerIPEnv, pi.clusterdSvcIp, i) + addEnvValue(podTemplate, hostNetwork, strconv.FormatBool(pi.spec.Template.Spec.HostNetwork), i) hwlog.RunLog.Debugf(logEnvPattern, podTemplate.Name, podTemplate.Spec.Containers[i].Env) } } @@ -191,7 +209,10 @@ func (r *ASJobReconciler) setTensorflowEnv(pi *podInfo, podTemplate *corev1.PodT } addEnvValue(podTemplate, tfChiefPort, pi.port, i) addEnvValue(podTemplate, tfRank, strconv.Itoa(pi.rank), i) + addEnvValue(podTemplate, taskIDEnvKey, string(pi.job.UID), i) + addEnvValue(podTemplate, mindxServerIPEnv, pi.clusterdSvcIp, i) addEnvValue(podTemplate, tfChiefDevice, "0", i) + addEnvValue(podTemplate, hostNetwork, strconv.FormatBool(pi.spec.Template.Spec.HostNetwork), i) podTemplate.Spec.Containers[i].Env = append(podTemplate.Spec.Containers[i].Env, corev1.EnvVar{ Name: tfWorkerIP, ValueFrom: &corev1.EnvVarSource{ diff --git a/component/ascend-operator/pkg/controllers/v1/labels.go b/component/ascend-operator/pkg/controllers/v1/labels.go index 2a7eaf5c5566b19ccd8feacf805aecd62ac07549..e605ea8b563ba08a05c72d519e85fa923b1a54ff 100644 --- a/component/ascend-operator/pkg/controllers/v1/labels.go +++ b/component/ascend-operator/pkg/controllers/v1/labels.go @@ -42,4 +42,9 @@ func (r *ASJobReconciler) setPodLabels(job *mindxdlv1.AscendJob, podTemplate *co for key, value := range labelsMap { podTemplate.Labels[key] = value } + + if enableResilience(job.GetLabels()) { + // check whether the pod IP address and pod UID need to be saved when the pod is deleted. + podTemplate.Labels[enableResilienceKey] = trueString + } } diff --git a/component/ascend-operator/pkg/controllers/v1/pod.go b/component/ascend-operator/pkg/controllers/v1/pod.go index 3c745d35b29bc5f2ff008be85c22e250a143cc9e..8c271b0d37ea42eddc06bc0f5669e162faab6460 100644 --- a/component/ascend-operator/pkg/controllers/v1/pod.go +++ b/component/ascend-operator/pkg/controllers/v1/pod.go @@ -439,6 +439,8 @@ func (r *ASJobReconciler) createPodSpec(pi *podInfo, pi.rank = pi.index } // Set name for the template. + // The modification of the name affects the path for storing the container snapshot. + // If the modification is required, do not add the random number such as uuid. podTemplate.Name = common.GenGeneralName(job.Name, strings.ToLower(string(pi.rtype)), indexStr) err := r.setEnv(pi, podTemplate) @@ -464,6 +466,11 @@ func (r *ASJobReconciler) createPodSpec(pi *podInfo, } func (r *ASJobReconciler) setEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpec) error { + index := getAscendContainerIndex(podTemplate) + hwlog.RunLog.Infof("============================index:%v", index) + if index == noMatchedContainer { + return nil + } if mindxdlutils.IsMindIEEPJob(pi.job) { hwlog.RunLog.Debugf("Set mindIEEP AscendJob<%s-%s> env", pi.job.Namespace, pi.job.Name) r.setInferEnv(pi, podTemplate) @@ -473,6 +480,9 @@ func (r *ASJobReconciler) setEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpe pi.rtype == mindxdlv1.ReplicaTypeWorker { return nil } + + setAscendPodCommonEnv(podTemplate, index) + r.setSnapShotEnv(pi, podTemplate, index) hwlog.RunLog.Debugf("Set AscendJob<%s-%s> framework<%s> env start", pi.job.Namespace, pi.job.Name, pi.frame) r.setCommonEnv(pi, podTemplate) if pi.ctReq == 0 { @@ -492,6 +502,37 @@ func (r *ASJobReconciler) setEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpe return nil } +func setAscendPodCommonEnv(podTemplate *corev1.PodTemplateSpec, index int) { + if len(podTemplate.Spec.Containers[index].Env) == 0 { + podTemplate.Spec.Containers[index].Env = make([]corev1.EnvVar, 0) + } +} + +func getAscendContainerIndex(podTemplate *corev1.PodTemplateSpec) int { + for i, container := range podTemplate.Spec.Containers { + if container.Name == mindxdlv1.DefaultContainerName { + return i + } + } + return noMatchedContainer +} + +func (r *ASJobReconciler) setSnapShotEnv(pi *podInfo, podTemplate *corev1.PodTemplateSpec, index int) { + // if job enable resilience, set container snapshot restore uid + if useSnapshot, ok := r.snapShotJobs[pi.job.GetUID()]; !ok || !useSnapshot { + return + } + // set pod snapshot restore id + r.setSnapShotRestoreID(pi, podTemplate, index) +} + +func (r *ASJobReconciler) setSnapShotRestoreID(pi *podInfo, podTemplate *corev1.PodTemplateSpec, index int) { + restoreID := string(pi.job.GetUID()) + "_" + podTemplate.Name + "_" + mindxdlv1.DefaultContainerName + addEnvValue(podTemplate, snapshotRestoreID, restoreID, index) + hwlog.RunLog.Infof("acjob (%s) replicaType(%s) set pod snapshot restore id: (%v)==================", + pi.job.GetUID(), pi.rtype, restoreID) +} + func (r *ASJobReconciler) setGangScheduleInfo(job *mindxdlv1.AscendJob, podTemplate *corev1.PodTemplateSpec, replicas map[commonv1.ReplicaType]*commonv1.ReplicaSpec, rt string) { jobSchedulerName := job.Spec.SchedulerName diff --git a/component/ascend-operator/pkg/controllers/v1/utils.go b/component/ascend-operator/pkg/controllers/v1/utils.go index 50401b8624f61f1a205b289c04dbdc3b095ac64b..46b5789402622cea77ea5fc7c32b3bd9a3ddd481 100644 --- a/component/ascend-operator/pkg/controllers/v1/utils.go +++ b/component/ascend-operator/pkg/controllers/v1/utils.go @@ -291,3 +291,17 @@ func getJobRequiredNpu(job *mindxdlv1.AscendJob) int { } return requiredNpu } + +func removeFirst(list []string) (string, []string) { + newList := make([]string, 0) + first := "" + for index, val := range list { + if index == 0 { + first = val + continue + } + newList = append(newList, val) + } + + return first, newList +}