From 6f21190f29605cbdb61b93aeba73c301f66665cd Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Mon, 18 Aug 2025 22:45:08 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-for-volcano/common/util/type.go | 2 + component/ascend-for-volcano/huawei_npu.go | 8 + .../ascend910/ascend910a3/superpod/frame.go | 95 +++++++++++ .../ascend910a3/superpod/frame_test.go | 156 +++++++++++++++++ .../ascend910/ascend910a3/superpod/type.go | 1 + component/ascend-for-volcano/plugin/const.go | 10 ++ .../ascend-for-volcano/plugin/factory.go | 57 +++++++ .../ascend-for-volcano/plugin/factory_test.go | 159 ++++++++++++++++++ 8 files changed, 488 insertions(+) diff --git a/component/ascend-for-volcano/common/util/type.go b/component/ascend-for-volcano/common/util/type.go index cbef09029..85da9c940 100644 --- a/component/ascend-for-volcano/common/util/type.go +++ b/component/ascend-for-volcano/common/util/type.go @@ -236,6 +236,8 @@ const ( NodeNotMeetTopologyWarning = "the npus on this node don't satisfy the schedulable topology" // ArgumentError argument nil error. ArgumentError = "invalid argument" + // RankIdNotExist rank id does not exist + RankIdNotExist = "rank id does not exist" // JobKindKey for define the Job kind:ascend-310P, ascend-910 JobKindKey = "ring-controller.atlas" // JobKind910Value in ring-controller.atlas. diff --git a/component/ascend-for-volcano/huawei_npu.go b/component/ascend-for-volcano/huawei_npu.go index e34b687f6..7c62c5b99 100644 --- a/component/ascend-for-volcano/huawei_npu.go +++ b/component/ascend-for-volcano/huawei_npu.go @@ -91,6 +91,8 @@ func (tp *huaweiNPUPlugin) OnSessionOpen(ssn *framework.Session) { addJobReadyFn(ssn, tp) addJobEnqueueableFn(ssn, tp) + + addTaskOrderFn(ssn, tp) // Register event handlers to update task info in PodLister & nodeMap // for support Concurrency addEventHandler(ssn, tp) @@ -237,6 +239,12 @@ func addJobEnqueueableFn(ssn *framework.Session, tp *huaweiNPUPlugin) { }) } +func addTaskOrderFn(ssn *framework.Session, tp *huaweiNPUPlugin) { + ssn.AddTaskOrderFn(tp.Name(), func(l interface{}, r interface{}) int { + return tp.Scheduler.TaskOrderFn(l, r) + }) +} + func getNpuNum(ssn *framework.Session, tp *huaweiNPUPlugin, npuName string) int { var tNpuNum int for _, node := range ssn.Nodes { diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go index 708f627a7..a3980d418 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go @@ -177,6 +177,10 @@ func (tp *module910SuperPod) ScoreBestNPUNodes(task *api.TaskInfo, nodes []*api. defer func() { if *job.JobReadyTag { + if podGroupEnable, exist := job.Label[plugin.PodGroupScheduleKey]; exist && podGroupEnable == plugin.PodGroupScheduleValue { + tp.scoreNodeBatchForReadyJob(task, &job, sMap) + return + } tp.scoreNodeForReadyJob(task, job, sMap) } }() @@ -211,6 +215,97 @@ func (tp *module910SuperPod) ScoreBestNPUNodes(task *api.TaskInfo, nodes []*api. return nil } +func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job *plugin.SchedulerJob, + sMap map[string]float64) { + if task == nil || job == nil || len(sMap) == 0 { + klog.V(util.LogErrorLev).Infof("scoreNodeBatchForReadyJob %s", errors.New(util.ArgumentError)) + return + } + rankIdMap := tp.obtainBatchScoreRank(task, job) + if len(rankIdMap) == 0 { + klog.V(util.LogErrorLev).Infof("%s scoreNodeBatchForReadyJob %s: rankIdMap empty", + tp.GetPluginName(), task.Name) + job.JobReadyTag = new(bool) + return + } + for rankId := range rankIdMap { + superPodRank := rankId / tp.spBlock + localRank := rankId % tp.spBlock + klog.V(util.LogDebugLev).Infof("superPodRank: %d, localRank: %d", superPodRank, localRank) + superPodRankIndex := strconv.Itoa(superPodRank) + if localRank >= len(job.SuperPods[superPodRankIndex]) { + klog.V(util.LogErrorLev).Infof("superPodRank: %d, localRank: %d out of rank", superPodRank, localRank) + job.JobReadyTag = new(bool) + break + } + spn := job.SuperPods[superPodRankIndex][localRank] + if _, ok := sMap[spn.Name]; !ok { + klog.V(util.LogErrorLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s> not in sMap, select fail", + tp.GetPluginName(), task.Name, spn.Name) + job.JobReadyTag = new(bool) + break + } + klog.V(util.LogDebugLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s/%s> is exist in "+ + "SuperPodID: %d, select success", tp.GetPluginName(), task.Name, spn.Name, superPodRankIndex, + spn.SuperPodID) + sMap[spn.Name] = float64(scoreForNode - rankId) + } +} +func (tp *module910SuperPod) obtainBatchScoreRank(task *api.TaskInfo, job *plugin.SchedulerJob) map[int]struct{} { + if task == nil || job == nil { + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s", errors.New(util.ArgumentError)) + return nil + } + spec, ok := task.Pod.Annotations[taskSpec] + if !ok { + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s: (%s/%s) obtain taskSpec fail, skip", + tp.GetPluginName(), task.Namespace, task.Name) + return nil + } + rankIdMap := obtainOriginalRankIdMap(spec, job) + if len(rankIdMap) == 0 { + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s: (%s/%s) rankIdMap empty, skip", + tp.GetPluginName(), task.Namespace, task.Name) + return nil + } + return rankIdMap +} + +func obtainOriginalRankIdMap(spec string, job *plugin.SchedulerJob) map[int]struct{} { + if job == nil { + klog.V(util.LogErrorLev).Infof("obtainOriginalRankIdMap %s", errors.New(util.ArgumentError)) + return nil + } + klog.V(util.LogInfoLev).Infof("obtainOriginalRankIdMap job (%s/%s), len(job.Tasks) %d", + job.NameSpace, job.Name, len(job.Tasks)) + m := make(map[int]struct{}, len(job.Tasks)) + for _, task := range job.Tasks { + if !task.IsNPUTask() || task.Annotation[taskSpec] != spec { + continue + } + if task.PodStatus != v1.PodPending { + continue + } + rankIndex, ok := task.Annotation[plugin.PodRankIndexKey] + if !ok { + klog.V(util.LogWarningLev).Infof("obtainOriginalRankIdMap (%s/%s): rankIndex is not exist", + task.NameSpace, task.Name) + m[task.Index] = struct{}{} + continue + } + rank, err := strconv.Atoi(rankIndex) + if err != nil { + klog.V(util.LogErrorLev).Infof("obtainOriginalRankIdMap (%s/%s): rankIndex is not int", + task.NameSpace, task.Name) + continue + } + m[rank] = struct{}{} + } + klog.V(util.LogInfoLev).Infof("obtainOriginalRankIdMap job (%s/%s), len(rankMap) %d", + job.NameSpace, job.Name, len(m)) + return m +} + func (tp *module910SuperPod) scoreNodeForReadyJob(task *api.TaskInfo, job plugin.SchedulerJob, sMap map[string]float64) { var rank int diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go index bfdc9dac4..bda43d7ca 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/agiledragon/gomonkey/v2" + "k8s.io/api/core/v1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/util" @@ -844,3 +845,158 @@ func TestIsDelayingJob(t *testing.T) { }) } } + +func new910SuperPod(name string) *module910SuperPod { + m := &module910SuperPod{} + m.SetPluginName(name) + m.SetAnnoName(util.NPU910CardName) + m.SetAnnoPreVal(util.NPU910CardNamePre) + m.SetMaxNodeNPUNum(ascend910a3.NodeNPUNumber) + m.SetMaxCardNPUNum(ascend910a3.DieNPUNumber) + m.SetIsNetworkFaultAttention(true) + m.NetUnhealthyKey = ascend910a3.NetworkUnhealthyNPU + m.nodeVPodId = map[string]string{} + return m +} + +// TestScoreNodeBatchForReadyJob test of scoreNodeBatchForReadyJob +func TestScoreNodeBatchForReadyJob(t *testing.T) { + plg := new910SuperPod(SchedulerName) + plg.Name = "job1" + plg.SchedulerJobAttr = util.SchedulerJobAttr{ + ComJob: util.ComJob{}, + NPUJob: &util.NPUJob{}, + } + plg.ScheduleEnv = plugin.ScheduleEnv{} + type args struct { + task *api.TaskInfo + job *plugin.SchedulerJob + sMap map[string]float64 + } + tests := []struct { + name string + args args + }{ + { + name: "01-scoreNodeBatchForReadyJob invalid argument", + args: args{}, + }, + { + name: "02-scoreNodeBatchForReadyJob rankIdMap empty", + args: args{ + task: test.FakeNormalTestTask("pod1", "node1", "acjob"), + job: &plugin.SchedulerJob{}, + sMap: map[string]float64{"node1": 0}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plg.scoreNodeBatchForReadyJob(tt.args.task, tt.args.job, tt.args.sMap) + }) + } +} + +const ( + batchScoreNpuTask4 = 4 + batchScoreRank0 = 0 +) + +func createObtainBatchScoreRankTaskInfo(jobId, rankId, spec string) *api.TaskInfo { + task := test.FakeNormalTestTask("pod1", "node1", "acjob") + task.Job = api.JobID(jobId) + task.Pod.Annotations[plugin.PodRankIndexKey] = rankId + task.Pod.Annotations[taskSpec] = spec + return task +} + +func createBatchScoreNPUTasks(n int) map[api.TaskID]util.NPUTask { + tasks := make(map[api.TaskID]util.NPUTask, n) + for i := 0; i < n; i++ { + spec := workerSpec + if batchScoreRank0 == i { + spec = schedulerSpec + } + tasks[api.TaskID(strconv.Itoa(i))] = util.NPUTask{ + Name: "task" + strconv.Itoa(i), + ReqNPUName: util.NPU910CardName, + Annotation: map[string]string{ + plugin.PodRankIndexKey: strconv.Itoa(i), + taskSpec: spec, + }, + PodStatus: v1.PodPending, + } + } + return tasks +} + +func fakeSchedulerJobEmptyTask(jobName, namespace string) *plugin.SchedulerJob { + job := &plugin.SchedulerJob{ + SchedulerJobAttr: util.SchedulerJobAttr{ + ComJob: util.ComJob{ + Name: api.JobID(jobName), + NameSpace: namespace, + Selector: map[string]string{}, + Label: map[string]string{}, + }, + NPUJob: &util.NPUJob{ + ReqNPUName: util.NPU910CardName, + ReqNPUNum: 0, + Tasks: make(map[api.TaskID]util.NPUTask), + }, + }, + } + return job +} + +// TestObtainBatchScoreRank test of obtainBatchScoreRank +func TestObtainBatchScoreRank(t *testing.T) { + jobIdStr := "job1" + plg := new910SuperPod(SchedulerName) + plg.Name = api.JobID(jobIdStr) + plg.SchedulerJobAttr = util.SchedulerJobAttr{ + ComJob: util.ComJob{}, + NPUJob: &util.NPUJob{}, + } + plg.ScheduleEnv = plugin.ScheduleEnv{} + schedulerJob := fakeSchedulerJobEmptyTask(jobIdStr, "") + schedulerJob.Tasks = createBatchScoreNPUTasks(batchScoreNpuTask4) + type args struct { + task *api.TaskInfo + job *plugin.SchedulerJob + } + tests := []struct { + name string + args args + want map[int]struct{} + }{ + { + name: "01-obtainBatchScoreRank invalid argument", + args: args{}, + want: nil, + }, + { + name: "02-obtainBatchScoreRank spec " + schedulerSpec, + args: args{ + task: createObtainBatchScoreRankTaskInfo(jobIdStr, "1", schedulerSpec), + job: schedulerJob, + }, + want: map[int]struct{}{0: {}}, + }, + { + name: "02-obtainBatchScoreRank spec " + workerSpec, + args: args{ + task: createObtainBatchScoreRankTaskInfo(jobIdStr, "1", workerSpec), + job: schedulerJob, + }, + want: map[int]struct{}{1: {}, 2: {}, 3: {}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := plg.obtainBatchScoreRank(tt.args.task, tt.args.job); !reflect.DeepEqual(got, tt.want) { + t.Errorf("obtainBatchScoreRank() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go index 40ec76335..5fe99c85e 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go @@ -41,6 +41,7 @@ const ( scoreForNode = 100000000 taskSpec = "volcano.sh/task-spec" schedulerSpec = "scheduler" + workerSpec = "worker" delayingTime = 10 ) diff --git a/component/ascend-for-volcano/plugin/const.go b/component/ascend-for-volcano/plugin/const.go index a8b10b13f..442360bd8 100644 --- a/component/ascend-for-volcano/plugin/const.go +++ b/component/ascend-for-volcano/plugin/const.go @@ -55,6 +55,16 @@ const ( PodRankIndexKey = "hccl/rankIndex" // ReplicaSetType replicaset type ReplicaSetType = "ReplicaSet" + // PodGroupScheduleKey podgroup schedule the enable key + PodGroupScheduleKey = "podgroup-sched-enable" + // PodGroupScheduleValue podgroup schedule the enable value + PodGroupScheduleValue = "true" +) + +const ( + TaskOrderHighPriority = -1 + TaskOrderLowPriority = 1 + TaskOrderSamePriority = 0 ) const ( diff --git a/component/ascend-for-volcano/plugin/factory.go b/component/ascend-for-volcano/plugin/factory.go index 5d82694ff..b10578700 100644 --- a/component/ascend-for-volcano/plugin/factory.go +++ b/component/ascend-for-volcano/plugin/factory.go @@ -446,6 +446,63 @@ func isContain(target string, strArray []string) bool { return false } +// TaskOrderFn Sort the selected tasks. +func (sHandle *ScheduleHandler) TaskOrderFn(l interface{}, r interface{}) int { + lv, ok := l.(*api.TaskInfo) + if !ok { + klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") + return TaskOrderSamePriority + } + job, ok := sHandle.Jobs[lv.Job] + if !ok { + klog.V(util.LogDebugLev).Infof("TaskOrderFn (%s/%s): job is not exist", lv.Namespace, lv.Name) + return TaskOrderSamePriority + } + podGroupEnable, exist := job.Label[PodGroupScheduleKey] + if !exist || podGroupEnable != PodGroupScheduleValue { + return TaskOrderSamePriority + } + rv, ok := r.(*api.TaskInfo) + if !ok { + klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") + return TaskOrderSamePriority + } + rRankId, err := sHandle.obtainTaskRankId(rv, &job) + if err != nil { + return TaskOrderSamePriority + } + lRankId, err := sHandle.obtainTaskRankId(lv, &job) + if err != nil { + return TaskOrderSamePriority + } + if lRankId < rRankId { + return TaskOrderHighPriority + } + return TaskOrderLowPriority +} + +func (sHandle *ScheduleHandler) obtainTaskRankId(task *api.TaskInfo, job *SchedulerJob) (int, error) { + var rankId int + if task == nil || job == nil { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId failed: %s.", util.ArgumentError) + return rankId, errors.New(util.ArgumentError) + } + if rankIndex, ok := task.Pod.Annotations[PodRankIndexKey]; ok { + rankId, err := strconv.Atoi(rankIndex) + if err != nil { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex is not int", + task.Namespace, task.Name) + } + return rankId, err + } + nTask, ok := job.Tasks[task.UID] + if !ok { + return rankId, errors.New(util.RankIdNotExist) + } + rankId = nTask.Index + return rankId, nil +} + // BatchNodeOrderFn Score the selected nodes. func (sHandle *ScheduleHandler) BatchNodeOrderFn(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { diff --git a/component/ascend-for-volcano/plugin/factory_test.go b/component/ascend-for-volcano/plugin/factory_test.go index 18513c1f1..a141c5366 100644 --- a/component/ascend-for-volcano/plugin/factory_test.go +++ b/component/ascend-for-volcano/plugin/factory_test.go @@ -21,6 +21,7 @@ package plugin import ( "reflect" + "strconv" "strings" "sync" "testing" @@ -682,3 +683,161 @@ func TestGetGraceDeleteTime(t *testing.T) { }) } } + +type taskOrderFnArgs struct { + l interface{} + r interface{} +} +type taskOrderFnTest struct { + name string + fields fields + args taskOrderFnArgs + want int +} + +func createTaskOrderFnTask(jobId string, taskNum int) []*api.TaskInfo { + tTask := test.FakeNormalTestTasks(taskNum) + for index, task := range tTask { + task.Job = api.JobID(jobId) + task.Pod.Annotations[PodRankIndexKey] = strconv.Itoa(index) + } + return tTask +} + +func buildTaskOrderFnTest() []taskOrderFnTest { + jobIdStr := "job1" + tTask := createTaskOrderFnTask(jobIdStr, util.NPUIndex2) + var tests []taskOrderFnTest + tests = append(tests, createTaskOrderFnCasesOfTaskNil()...) + tests = append(tests, createTaskOrderFnCasesOfJobNotExist(tTask[util.NPUIndex0], tTask[util.NPUIndex1])...) + tests = append(tests, createTaskOrderFnCasesOfPodGroupLabelNotExist(jobIdStr, + tTask[util.NPUIndex0], tTask[util.NPUIndex1])...) + tests = append(tests, createTaskOrderFnCasesOfCompareTask(jobIdStr, + tTask[util.NPUIndex0], tTask[util.NPUIndex1])...) + return tests +} + +func createTaskOrderFnCasesOfTaskNil() []taskOrderFnTest { + return []taskOrderFnTest{ + { + name: "01-TaskOrderFn nil task", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: nil, r: nil}, + want: 0, + }, + } +} + +func createTaskOrderFnCasesOfJobNotExist(task1, task2 *api.TaskInfo) []taskOrderFnTest { + return []taskOrderFnTest{ + { + name: "03-TaskOrderFn job not exist", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task1, r: task2}, + want: 0, + }, + } +} + +func createTaskOrderFnCasesOfPodGroupLabelNotExist(jobIdStr string, task1, task2 *api.TaskInfo) []taskOrderFnTest { + jobId := api.JobID(jobIdStr) + schedulerJob := fakeSchedulerJobEmptyTask(jobIdStr, "") + clusterCache := NewClusterCache() + clusterCache.Jobs = map[api.JobID]SchedulerJob{jobId: schedulerJob} + + return []taskOrderFnTest{ + { + name: "04-TaskOrderFn job podgroup label not exist", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task1, r: task2}, + want: 0, + }, + } +} + +func createTaskOrderFnCasesOfCompareTask(jobIdStr string, task1, task2 *api.TaskInfo) []taskOrderFnTest { + jobId := api.JobID(jobIdStr) + schedulerJob := fakeSchedulerJobEmptyTask(jobIdStr, "") + schedulerJob.Label[PodGroupScheduleKey] = PodGroupScheduleValue + clusterCache := NewClusterCache() + clusterCache.Jobs = map[api.JobID]SchedulerJob{jobId: schedulerJob} + return []taskOrderFnTest{ + { + name: "05-TaskOrderFn job l < r", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task1, r: task2}, + want: -1, + }, + { + name: "06-TaskOrderFn job l > r", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task2, r: task1}, + want: 1, + }, + } +} + +// TestTaskOrderFn test of TaskOrderFn +func TestTaskOrderFn(t *testing.T) { + tests := buildTaskOrderFnTest() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sHandle := &ScheduleHandler{ + NPUPlugins: tt.fields.NPUPlugins, + ScheduleEnv: tt.fields.ScheduleEnv, + } + if got := sHandle.TaskOrderFn(tt.args.l, tt.args.r); got != tt.want { + t.Errorf("TaskOrderFn() = %v, want %v", got, tt.want) + } + }) + } +} + +func fakeSchedulerJobEmptyTask(jobName, namespace string) SchedulerJob { + job := SchedulerJob{ + SchedulerJobAttr: util.SchedulerJobAttr{ + ComJob: util.ComJob{ + Name: api.JobID(jobName), + NameSpace: namespace, + Selector: map[string]string{}, + Label: map[string]string{}, + }, + NPUJob: &util.NPUJob{ + ReqNPUName: util.NPU910CardName, + ReqNPUNum: 0, + Tasks: make(map[api.TaskID]util.NPUTask), + }, + }, + } + return job +} -- Gitee From 4bb2f0f66178ec52eb0b1eb7dc71680852ea02ac Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Wed, 20 Aug 2025 20:50:50 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-for-volcano/common/util/type.go | 4 +- .../ascend910/ascend910a3/superpod/frame.go | 18 ++---- component/ascend-for-volcano/plugin/const.go | 6 +- .../ascend-for-volcano/plugin/factory.go | 62 +++++++++---------- 4 files changed, 42 insertions(+), 48 deletions(-) diff --git a/component/ascend-for-volcano/common/util/type.go b/component/ascend-for-volcano/common/util/type.go index 85da9c940..685d04d3a 100644 --- a/component/ascend-for-volcano/common/util/type.go +++ b/component/ascend-for-volcano/common/util/type.go @@ -236,8 +236,8 @@ const ( NodeNotMeetTopologyWarning = "the npus on this node don't satisfy the schedulable topology" // ArgumentError argument nil error. ArgumentError = "invalid argument" - // RankIdNotExist rank id does not exist - RankIdNotExist = "rank id does not exist" + // RankIdNotExistError rank id does not exist + RankIdNotExistError = "rank id does not exist" // JobKindKey for define the Job kind:ascend-310P, ascend-910 JobKindKey = "ring-controller.atlas" // JobKind910Value in ring-controller.atlas. diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go index a3980d418..af630d17b 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go @@ -217,7 +217,7 @@ func (tp *module910SuperPod) ScoreBestNPUNodes(task *api.TaskInfo, nodes []*api. func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job *plugin.SchedulerJob, sMap map[string]float64) { - if task == nil || job == nil || len(sMap) == 0 { + if task == nil || job == nil || len(sMap) == 0 || tp.spBlock == 0 { klog.V(util.LogErrorLev).Infof("scoreNodeBatchForReadyJob %s", errors.New(util.ArgumentError)) return } @@ -225,7 +225,7 @@ func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job * if len(rankIdMap) == 0 { klog.V(util.LogErrorLev).Infof("%s scoreNodeBatchForReadyJob %s: rankIdMap empty", tp.GetPluginName(), task.Name) - job.JobReadyTag = new(bool) + *job.JobReadyTag = false return } for rankId := range rankIdMap { @@ -235,14 +235,14 @@ func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job * superPodRankIndex := strconv.Itoa(superPodRank) if localRank >= len(job.SuperPods[superPodRankIndex]) { klog.V(util.LogErrorLev).Infof("superPodRank: %d, localRank: %d out of rank", superPodRank, localRank) - job.JobReadyTag = new(bool) + *job.JobReadyTag = false break } spn := job.SuperPods[superPodRankIndex][localRank] if _, ok := sMap[spn.Name]; !ok { klog.V(util.LogErrorLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s> not in sMap, select fail", tp.GetPluginName(), task.Name, spn.Name) - job.JobReadyTag = new(bool) + *job.JobReadyTag = false break } klog.V(util.LogDebugLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s/%s> is exist in "+ @@ -251,6 +251,7 @@ func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job * sMap[spn.Name] = float64(scoreForNode - rankId) } } + func (tp *module910SuperPod) obtainBatchScoreRank(task *api.TaskInfo, job *plugin.SchedulerJob) map[int]struct{} { if task == nil || job == nil { klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s", errors.New(util.ArgumentError)) @@ -262,13 +263,7 @@ func (tp *module910SuperPod) obtainBatchScoreRank(task *api.TaskInfo, job *plugi tp.GetPluginName(), task.Namespace, task.Name) return nil } - rankIdMap := obtainOriginalRankIdMap(spec, job) - if len(rankIdMap) == 0 { - klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s: (%s/%s) rankIdMap empty, skip", - tp.GetPluginName(), task.Namespace, task.Name) - return nil - } - return rankIdMap + return obtainOriginalRankIdMap(spec, job) } func obtainOriginalRankIdMap(spec string, job *plugin.SchedulerJob) map[int]struct{} { @@ -290,7 +285,6 @@ func obtainOriginalRankIdMap(spec string, job *plugin.SchedulerJob) map[int]stru if !ok { klog.V(util.LogWarningLev).Infof("obtainOriginalRankIdMap (%s/%s): rankIndex is not exist", task.NameSpace, task.Name) - m[task.Index] = struct{}{} continue } rank, err := strconv.Atoi(rankIndex) diff --git a/component/ascend-for-volcano/plugin/const.go b/component/ascend-for-volcano/plugin/const.go index 442360bd8..a297d5308 100644 --- a/component/ascend-for-volcano/plugin/const.go +++ b/component/ascend-for-volcano/plugin/const.go @@ -62,9 +62,9 @@ const ( ) const ( - TaskOrderHighPriority = -1 - TaskOrderLowPriority = 1 - TaskOrderSamePriority = 0 + taskOrderHighPriority = -1 + taskOrderLowPriority = 1 + taskOrderSamePriority = 0 ) const ( diff --git a/component/ascend-for-volcano/plugin/factory.go b/component/ascend-for-volcano/plugin/factory.go index b10578700..587c2f20b 100644 --- a/component/ascend-for-volcano/plugin/factory.go +++ b/component/ascend-for-volcano/plugin/factory.go @@ -447,59 +447,59 @@ func isContain(target string, strArray []string) bool { } // TaskOrderFn Sort the selected tasks. -func (sHandle *ScheduleHandler) TaskOrderFn(l interface{}, r interface{}) int { - lv, ok := l.(*api.TaskInfo) +func (sHandle *ScheduleHandler) TaskOrderFn(InterfaceA interface{}, InterfaceB interface{}) int { + taskInfoA, ok := InterfaceA.(*api.TaskInfo) if !ok { klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") - return TaskOrderSamePriority + return taskOrderSamePriority } - job, ok := sHandle.Jobs[lv.Job] + taskInfoB, ok := InterfaceB.(*api.TaskInfo) if !ok { - klog.V(util.LogDebugLev).Infof("TaskOrderFn (%s/%s): job is not exist", lv.Namespace, lv.Name) - return TaskOrderSamePriority + klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") + return taskOrderSamePriority + } + + job, ok := sHandle.Jobs[taskInfoA.Job] + if !ok { + klog.V(util.LogDebugLev).Infof("TaskOrderFn (%s/%s): job is not exist", taskInfoA.Namespace, taskInfoA.Name) + return taskOrderSamePriority } podGroupEnable, exist := job.Label[PodGroupScheduleKey] if !exist || podGroupEnable != PodGroupScheduleValue { - return TaskOrderSamePriority - } - rv, ok := r.(*api.TaskInfo) - if !ok { - klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") - return TaskOrderSamePriority + return taskOrderSamePriority } - rRankId, err := sHandle.obtainTaskRankId(rv, &job) + rRankId, err := sHandle.obtainTaskRankId(taskInfoB, &job) if err != nil { - return TaskOrderSamePriority + return taskOrderSamePriority } - lRankId, err := sHandle.obtainTaskRankId(lv, &job) + lRankId, err := sHandle.obtainTaskRankId(taskInfoA, &job) if err != nil { - return TaskOrderSamePriority + return taskOrderSamePriority } + if lRankId < rRankId { - return TaskOrderHighPriority + return taskOrderHighPriority } - return TaskOrderLowPriority + return taskOrderLowPriority } func (sHandle *ScheduleHandler) obtainTaskRankId(task *api.TaskInfo, job *SchedulerJob) (int, error) { - var rankId int if task == nil || job == nil { klog.V(util.LogDebugLev).Infof("obtainTaskRankId failed: %s.", util.ArgumentError) - return rankId, errors.New(util.ArgumentError) + return 0, errors.New(util.ArgumentError) } - if rankIndex, ok := task.Pod.Annotations[PodRankIndexKey]; ok { - rankId, err := strconv.Atoi(rankIndex) - if err != nil { - klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex is not int", - task.Namespace, task.Name) - } - return rankId, err + rankIndex, ok := task.Pod.Annotations[PodRankIndexKey] + if ok { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex not exist", + task.Namespace, task.Name) + return 0, errors.New(util.RankIdNotExistError) } - nTask, ok := job.Tasks[task.UID] - if !ok { - return rankId, errors.New(util.RankIdNotExist) + rankId, err := strconv.Atoi(rankIndex) + if err != nil { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex(%s) is not int", + task.Namespace, task.Name, rankIndex) + return 0, errors.New(util.ArgumentError) } - rankId = nTask.Index return rankId, nil } -- Gitee From edafadefb7043e6bfdc142af4b83383977529b7a Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Wed, 20 Aug 2025 21:31:57 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-for-volcano/plugin/factory.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/ascend-for-volcano/plugin/factory.go b/component/ascend-for-volcano/plugin/factory.go index 587c2f20b..75e40632f 100644 --- a/component/ascend-for-volcano/plugin/factory.go +++ b/component/ascend-for-volcano/plugin/factory.go @@ -489,7 +489,7 @@ func (sHandle *ScheduleHandler) obtainTaskRankId(task *api.TaskInfo, job *Schedu return 0, errors.New(util.ArgumentError) } rankIndex, ok := task.Pod.Annotations[PodRankIndexKey] - if ok { + if !ok { klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex not exist", task.Namespace, task.Name) return 0, errors.New(util.RankIdNotExistError) -- Gitee From 4ecbc0174bf9014fdfe3dd4b4605249c1c91d993 Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Thu, 21 Aug 2025 22:23:41 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/clusterd/pkg/common/constant/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/clusterd/pkg/common/constant/constants.go b/component/clusterd/pkg/common/constant/constants.go index 835ab116c..84fd8d6d5 100644 --- a/component/clusterd/pkg/common/constant/constants.go +++ b/component/clusterd/pkg/common/constant/constants.go @@ -49,7 +49,7 @@ const ( // AscendJobRefKind reference kind is AscendJob AscendJobRefKind = "AscendJob" // MaxSupportNodeNum max support node num - MaxSupportNodeNum = 5000 + MaxSupportNodeNum = 16000 // MaxSupportJobNum max support job num MaxSupportJobNum = 10000 // MaxCmQueueLen max cm queue len support -- Gitee From aaafafe637e4792e716d56a57043fcbf5c265410 Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Fri, 22 Aug 2025 11:07:54 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend910/ascend910a3/superpod/frame.go | 18 +--- .../ascend910a3/superpod/frame_test.go | 15 ++- .../ascend-for-volcano/plugin/factory.go | 8 +- .../ascend-for-volcano/plugin/factory_test.go | 98 ++++++++++++------- 4 files changed, 83 insertions(+), 56 deletions(-) diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go index af630d17b..467c081ea 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go @@ -231,7 +231,7 @@ func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job * for rankId := range rankIdMap { superPodRank := rankId / tp.spBlock localRank := rankId % tp.spBlock - klog.V(util.LogDebugLev).Infof("superPodRank: %d, localRank: %d", superPodRank, localRank) + klog.V(util.LogInfoLev).Infof("superPodRank: %d, localRank: %d", superPodRank, localRank) superPodRankIndex := strconv.Itoa(superPodRank) if localRank >= len(job.SuperPods[superPodRankIndex]) { klog.V(util.LogErrorLev).Infof("superPodRank: %d, localRank: %d out of rank", superPodRank, localRank) @@ -245,7 +245,7 @@ func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job * *job.JobReadyTag = false break } - klog.V(util.LogDebugLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s/%s> is exist in "+ + klog.V(util.LogInfoLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s/%s> is exist in "+ "SuperPodID: %d, select success", tp.GetPluginName(), task.Name, spn.Name, superPodRankIndex, spn.SuperPodID) sMap[spn.Name] = float64(scoreForNode - rankId) @@ -263,14 +263,6 @@ func (tp *module910SuperPod) obtainBatchScoreRank(task *api.TaskInfo, job *plugi tp.GetPluginName(), task.Namespace, task.Name) return nil } - return obtainOriginalRankIdMap(spec, job) -} - -func obtainOriginalRankIdMap(spec string, job *plugin.SchedulerJob) map[int]struct{} { - if job == nil { - klog.V(util.LogErrorLev).Infof("obtainOriginalRankIdMap %s", errors.New(util.ArgumentError)) - return nil - } klog.V(util.LogInfoLev).Infof("obtainOriginalRankIdMap job (%s/%s), len(job.Tasks) %d", job.NameSpace, job.Name, len(job.Tasks)) m := make(map[int]struct{}, len(job.Tasks)) @@ -283,19 +275,19 @@ func obtainOriginalRankIdMap(spec string, job *plugin.SchedulerJob) map[int]stru } rankIndex, ok := task.Annotation[plugin.PodRankIndexKey] if !ok { - klog.V(util.LogWarningLev).Infof("obtainOriginalRankIdMap (%s/%s): rankIndex is not exist", + klog.V(util.LogWarningLev).Infof("obtainBatchScoreRank (%s/%s): rankIndex is not exist", task.NameSpace, task.Name) continue } rank, err := strconv.Atoi(rankIndex) if err != nil { - klog.V(util.LogErrorLev).Infof("obtainOriginalRankIdMap (%s/%s): rankIndex is not int", + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank (%s/%s): rankIndex is not int", task.NameSpace, task.Name) continue } m[rank] = struct{}{} } - klog.V(util.LogInfoLev).Infof("obtainOriginalRankIdMap job (%s/%s), len(rankMap) %d", + klog.V(util.LogInfoLev).Infof("obtainBatchScoreRank job (%s/%s), len(rankMap) %d", job.NameSpace, job.Name, len(m)) return m } diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go index bda43d7ca..c57c1d528 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go @@ -965,6 +965,9 @@ func TestObtainBatchScoreRank(t *testing.T) { task *api.TaskInfo job *plugin.SchedulerJob } + + taskInfoWithoutSpecAnno := createObtainBatchScoreRankTaskInfo(jobIdStr, "1", schedulerSpec) + delete(taskInfoWithoutSpecAnno.Pod.Annotations, taskSpec) tests := []struct { name string args args @@ -976,7 +979,15 @@ func TestObtainBatchScoreRank(t *testing.T) { want: nil, }, { - name: "02-obtainBatchScoreRank spec " + schedulerSpec, + name: "02-obtainBatchScoreRank spec not exist", + args: args{ + task: taskInfoWithoutSpecAnno, + job: schedulerJob, + }, + want: nil, + }, + { + name: "03-obtainBatchScoreRank spec " + schedulerSpec, args: args{ task: createObtainBatchScoreRankTaskInfo(jobIdStr, "1", schedulerSpec), job: schedulerJob, @@ -984,7 +995,7 @@ func TestObtainBatchScoreRank(t *testing.T) { want: map[int]struct{}{0: {}}, }, { - name: "02-obtainBatchScoreRank spec " + workerSpec, + name: "04-obtainBatchScoreRank spec " + workerSpec, args: args{ task: createObtainBatchScoreRankTaskInfo(jobIdStr, "1", workerSpec), job: schedulerJob, diff --git a/component/ascend-for-volcano/plugin/factory.go b/component/ascend-for-volcano/plugin/factory.go index 75e40632f..f539d5673 100644 --- a/component/ascend-for-volcano/plugin/factory.go +++ b/component/ascend-for-volcano/plugin/factory.go @@ -468,11 +468,11 @@ func (sHandle *ScheduleHandler) TaskOrderFn(InterfaceA interface{}, InterfaceB i if !exist || podGroupEnable != PodGroupScheduleValue { return taskOrderSamePriority } - rRankId, err := sHandle.obtainTaskRankId(taskInfoB, &job) + rRankId, err := sHandle.obtainTaskRankId(taskInfoB) if err != nil { return taskOrderSamePriority } - lRankId, err := sHandle.obtainTaskRankId(taskInfoA, &job) + lRankId, err := sHandle.obtainTaskRankId(taskInfoA) if err != nil { return taskOrderSamePriority } @@ -483,8 +483,8 @@ func (sHandle *ScheduleHandler) TaskOrderFn(InterfaceA interface{}, InterfaceB i return taskOrderLowPriority } -func (sHandle *ScheduleHandler) obtainTaskRankId(task *api.TaskInfo, job *SchedulerJob) (int, error) { - if task == nil || job == nil { +func (sHandle *ScheduleHandler) obtainTaskRankId(task *api.TaskInfo) (int, error) { + if task == nil { klog.V(util.LogDebugLev).Infof("obtainTaskRankId failed: %s.", util.ArgumentError) return 0, errors.New(util.ArgumentError) } diff --git a/component/ascend-for-volcano/plugin/factory_test.go b/component/ascend-for-volcano/plugin/factory_test.go index a141c5366..804e50407 100644 --- a/component/ascend-for-volcano/plugin/factory_test.go +++ b/component/ascend-for-volcano/plugin/factory_test.go @@ -721,16 +721,14 @@ func createTaskOrderFnCasesOfTaskNil() []taskOrderFnTest { return []taskOrderFnTest{ { name: "01-TaskOrderFn nil task", - fields: fields{ - NPUPlugins: map[string]sets.Empty{}, - ScheduleEnv: ScheduleEnv{ - ClusterCache: NewClusterCache(), - FrameAttr: VolcanoFrame{}, - }, - }, args: taskOrderFnArgs{l: nil, r: nil}, want: 0, }, + { + name: "02-TaskOrderFn right value nil task", + args: taskOrderFnArgs{l: &api.TaskInfo{}, r: nil}, + want: 0, + }, } } @@ -738,13 +736,6 @@ func createTaskOrderFnCasesOfJobNotExist(task1, task2 *api.TaskInfo) []taskOrder return []taskOrderFnTest{ { name: "03-TaskOrderFn job not exist", - fields: fields{ - NPUPlugins: map[string]sets.Empty{}, - ScheduleEnv: ScheduleEnv{ - ClusterCache: NewClusterCache(), - FrameAttr: VolcanoFrame{}, - }, - }, args: taskOrderFnArgs{l: task1, r: task2}, want: 0, }, @@ -760,13 +751,6 @@ func createTaskOrderFnCasesOfPodGroupLabelNotExist(jobIdStr string, task1, task2 return []taskOrderFnTest{ { name: "04-TaskOrderFn job podgroup label not exist", - fields: fields{ - NPUPlugins: map[string]sets.Empty{}, - ScheduleEnv: ScheduleEnv{ - ClusterCache: clusterCache, - FrameAttr: VolcanoFrame{}, - }, - }, args: taskOrderFnArgs{l: task1, r: task2}, want: 0, }, @@ -782,25 +766,11 @@ func createTaskOrderFnCasesOfCompareTask(jobIdStr string, task1, task2 *api.Task return []taskOrderFnTest{ { name: "05-TaskOrderFn job l < r", - fields: fields{ - NPUPlugins: map[string]sets.Empty{}, - ScheduleEnv: ScheduleEnv{ - ClusterCache: clusterCache, - FrameAttr: VolcanoFrame{}, - }, - }, args: taskOrderFnArgs{l: task1, r: task2}, want: -1, }, { name: "06-TaskOrderFn job l > r", - fields: fields{ - NPUPlugins: map[string]sets.Empty{}, - ScheduleEnv: ScheduleEnv{ - ClusterCache: clusterCache, - FrameAttr: VolcanoFrame{}, - }, - }, args: taskOrderFnArgs{l: task2, r: task1}, want: 1, }, @@ -813,8 +783,8 @@ func TestTaskOrderFn(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sHandle := &ScheduleHandler{ - NPUPlugins: tt.fields.NPUPlugins, - ScheduleEnv: tt.fields.ScheduleEnv, + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ClusterCache: NewClusterCache(), FrameAttr: VolcanoFrame{}}, } if got := sHandle.TaskOrderFn(tt.args.l, tt.args.r); got != tt.want { t.Errorf("TaskOrderFn() = %v, want %v", got, tt.want) @@ -841,3 +811,57 @@ func fakeSchedulerJobEmptyTask(jobName, namespace string) SchedulerJob { } return job } + +type taskRankIDFnArgs struct { + task *api.TaskInfo +} +type taskRankIdFnTest struct { + name string + fields fields + args taskRankIDFnArgs + want int + wantErr bool +} + +func buildObtainTaskRankIdCases() []taskRankIdFnTest { + task1 := test.FakeNormalTestTask("pod1", "node1", "acjob") + delete(task1.Pod.Annotations, PodRankIndexKey) + task2 := test.FakeNormalTestTask("pod1", "node1", "acjob") + task2.Pod.Annotations[PodRankIndexKey] = "" + + return []taskRankIdFnTest{ + { + name: "01-obtainTaskRankId, task is nil", + args: taskRankIDFnArgs{task: nil}, + want: 0, + }, + { + name: "02-obtainTaskRankId, pod annotation not exist", + args: taskRankIDFnArgs{task: task1}, + want: 0, + }, + { + name: "03-obtainTaskRankId, pod annotation not int", + args: taskRankIDFnArgs{task: task2}, + want: 0, + }, + } +} + +// TestObtainTaskRankId test obtainTaskRankId +func TestObtainTaskRankId(t *testing.T) { + tests := buildObtainTaskRankIdCases() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sHandle := &ScheduleHandler{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ClusterCache: NewClusterCache(), FrameAttr: VolcanoFrame{}}, + } + res, err := sHandle.obtainTaskRankId(tt.args.task) + if res != tt.want || (err != nil) != tt.wantErr { + t.Errorf("TaskOrderFn() = %v, want %v", res, tt.want) + } + }) + } + +} -- Gitee From 787238c2c0679bdde43f198ed284be3705eb4d03 Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Fri, 22 Aug 2025 11:54:23 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-for-volcano/plugin/factory_test.go | 67 ++++++++++++++++--- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/component/ascend-for-volcano/plugin/factory_test.go b/component/ascend-for-volcano/plugin/factory_test.go index 804e50407..0cfbe29a9 100644 --- a/component/ascend-for-volcano/plugin/factory_test.go +++ b/component/ascend-for-volcano/plugin/factory_test.go @@ -721,11 +721,25 @@ func createTaskOrderFnCasesOfTaskNil() []taskOrderFnTest { return []taskOrderFnTest{ { name: "01-TaskOrderFn nil task", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, args: taskOrderFnArgs{l: nil, r: nil}, want: 0, }, { name: "02-TaskOrderFn right value nil task", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, args: taskOrderFnArgs{l: &api.TaskInfo{}, r: nil}, want: 0, }, @@ -736,6 +750,13 @@ func createTaskOrderFnCasesOfJobNotExist(task1, task2 *api.TaskInfo) []taskOrder return []taskOrderFnTest{ { name: "03-TaskOrderFn job not exist", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, args: taskOrderFnArgs{l: task1, r: task2}, want: 0, }, @@ -751,6 +772,13 @@ func createTaskOrderFnCasesOfPodGroupLabelNotExist(jobIdStr string, task1, task2 return []taskOrderFnTest{ { name: "04-TaskOrderFn job podgroup label not exist", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, args: taskOrderFnArgs{l: task1, r: task2}, want: 0, }, @@ -766,11 +794,25 @@ func createTaskOrderFnCasesOfCompareTask(jobIdStr string, task1, task2 *api.Task return []taskOrderFnTest{ { name: "05-TaskOrderFn job l < r", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, args: taskOrderFnArgs{l: task1, r: task2}, want: -1, }, { name: "06-TaskOrderFn job l > r", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, args: taskOrderFnArgs{l: task2, r: task1}, want: 1, }, @@ -783,8 +825,8 @@ func TestTaskOrderFn(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sHandle := &ScheduleHandler{ - NPUPlugins: map[string]sets.Empty{}, - ScheduleEnv: ScheduleEnv{ClusterCache: NewClusterCache(), FrameAttr: VolcanoFrame{}}, + NPUPlugins: tt.fields.NPUPlugins, + ScheduleEnv: tt.fields.ScheduleEnv, } if got := sHandle.TaskOrderFn(tt.args.l, tt.args.r); got != tt.want { t.Errorf("TaskOrderFn() = %v, want %v", got, tt.want) @@ -831,19 +873,22 @@ func buildObtainTaskRankIdCases() []taskRankIdFnTest { return []taskRankIdFnTest{ { - name: "01-obtainTaskRankId, task is nil", - args: taskRankIDFnArgs{task: nil}, - want: 0, + name: "01-obtainTaskRankId, task is nil", + args: taskRankIDFnArgs{task: nil}, + want: 0, + wantErr: true, }, { - name: "02-obtainTaskRankId, pod annotation not exist", - args: taskRankIDFnArgs{task: task1}, - want: 0, + name: "02-obtainTaskRankId, pod annotation not exist", + args: taskRankIDFnArgs{task: task1}, + want: 0, + wantErr: true, }, { - name: "03-obtainTaskRankId, pod annotation not int", - args: taskRankIDFnArgs{task: task2}, - want: 0, + name: "03-obtainTaskRankId, pod annotation not int", + args: taskRankIDFnArgs{task: task2}, + want: 0, + wantErr: true, }, } } -- Gitee From 9d3b81f166ee2605404b517ca06f88a33ad44dd1 Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Fri, 22 Aug 2025 14:09:36 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/npu/ascend910/ascend910a3/superpod/frame_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go index c57c1d528..1902da72c 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go @@ -965,7 +965,6 @@ func TestObtainBatchScoreRank(t *testing.T) { task *api.TaskInfo job *plugin.SchedulerJob } - taskInfoWithoutSpecAnno := createObtainBatchScoreRankTaskInfo(jobIdStr, "1", schedulerSpec) delete(taskInfoWithoutSpecAnno.Pod.Annotations, taskSpec) tests := []struct { -- Gitee From ba62f771c40eaceef39192ac4a2d52ac230e996b Mon Sep 17 00:00:00 2001 From: maybeclear <471843059@qq.com> Date: Fri, 22 Aug 2025 14:30:38 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E3=80=90volcano=E3=80=91=E3=80=90=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E8=AF=B4=E6=98=8E=E3=80=91=E6=94=AF=E6=8C=81openfuyao?= =?UTF-8?q?=E5=AE=9A=E5=88=B6k8s=E5=9C=BA=E6=99=AF=E8=B0=83=E7=94=A8?= =?UTF-8?q?=E6=89=B9=E9=87=8F=E8=B0=83=E5=BA=A6pod=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend910a3/superpod/frame_test.go | 63 ++++++++++--------- 1 file changed, 35 insertions(+), 28 deletions(-) diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go index 1902da72c..84190db34 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go @@ -949,37 +949,31 @@ func fakeSchedulerJobEmptyTask(jobName, namespace string) *plugin.SchedulerJob { return job } -// TestObtainBatchScoreRank test of obtainBatchScoreRank -func TestObtainBatchScoreRank(t *testing.T) { - jobIdStr := "job1" - plg := new910SuperPod(SchedulerName) - plg.Name = api.JobID(jobIdStr) - plg.SchedulerJobAttr = util.SchedulerJobAttr{ - ComJob: util.ComJob{}, - NPUJob: &util.NPUJob{}, - } - plg.ScheduleEnv = plugin.ScheduleEnv{} - schedulerJob := fakeSchedulerJobEmptyTask(jobIdStr, "") +type obtainBatchScoreRankArgs struct { + task *api.TaskInfo + job *plugin.SchedulerJob +} + +type obtainBatchScoreRankTest struct { + name string + args obtainBatchScoreRankArgs + want map[int]struct{} +} + +func getObtainBatchScoreRankTestCases(jobId string) []obtainBatchScoreRankTest { + schedulerJob := fakeSchedulerJobEmptyTask(jobId, "") schedulerJob.Tasks = createBatchScoreNPUTasks(batchScoreNpuTask4) - type args struct { - task *api.TaskInfo - job *plugin.SchedulerJob - } - taskInfoWithoutSpecAnno := createObtainBatchScoreRankTaskInfo(jobIdStr, "1", schedulerSpec) + taskInfoWithoutSpecAnno := createObtainBatchScoreRankTaskInfo(jobId, "1", schedulerSpec) delete(taskInfoWithoutSpecAnno.Pod.Annotations, taskSpec) - tests := []struct { - name string - args args - want map[int]struct{} - }{ + tests := []obtainBatchScoreRankTest{ { name: "01-obtainBatchScoreRank invalid argument", - args: args{}, + args: obtainBatchScoreRankArgs{}, want: nil, }, { name: "02-obtainBatchScoreRank spec not exist", - args: args{ + args: obtainBatchScoreRankArgs{ task: taskInfoWithoutSpecAnno, job: schedulerJob, }, @@ -987,22 +981,35 @@ func TestObtainBatchScoreRank(t *testing.T) { }, { name: "03-obtainBatchScoreRank spec " + schedulerSpec, - args: args{ - task: createObtainBatchScoreRankTaskInfo(jobIdStr, "1", schedulerSpec), + args: obtainBatchScoreRankArgs{ + task: createObtainBatchScoreRankTaskInfo(jobId, "1", schedulerSpec), job: schedulerJob, }, want: map[int]struct{}{0: {}}, }, { name: "04-obtainBatchScoreRank spec " + workerSpec, - args: args{ - task: createObtainBatchScoreRankTaskInfo(jobIdStr, "1", workerSpec), + args: obtainBatchScoreRankArgs{ + task: createObtainBatchScoreRankTaskInfo(jobId, "1", workerSpec), job: schedulerJob, }, want: map[int]struct{}{1: {}, 2: {}, 3: {}}, }, } - for _, tt := range tests { + return tests +} + +// TestObtainBatchScoreRank test of obtainBatchScoreRank +func TestObtainBatchScoreRank(t *testing.T) { + jobId := "job1" + plg := new910SuperPod(SchedulerName) + plg.Name = api.JobID(jobId) + plg.SchedulerJobAttr = util.SchedulerJobAttr{ + ComJob: util.ComJob{}, + NPUJob: &util.NPUJob{}, + } + plg.ScheduleEnv = plugin.ScheduleEnv{} + for _, tt := range getObtainBatchScoreRankTestCases(jobId) { t.Run(tt.name, func(t *testing.T) { if got := plg.obtainBatchScoreRank(tt.args.task, tt.args.job); !reflect.DeepEqual(got, tt.want) { t.Errorf("obtainBatchScoreRank() = %v, want %v", got, tt.want) -- Gitee