diff --git a/component/ascend-for-volcano/common/util/type.go b/component/ascend-for-volcano/common/util/type.go index cbef0902967945f4dea9b6383165261c7d0e9042..685d04d3a91765894db5be6f0dc3fb5cb11f692c 100644 --- a/component/ascend-for-volcano/common/util/type.go +++ b/component/ascend-for-volcano/common/util/type.go @@ -236,6 +236,8 @@ const ( NodeNotMeetTopologyWarning = "the npus on this node don't satisfy the schedulable topology" // ArgumentError argument nil error. ArgumentError = "invalid argument" + // RankIdNotExistError rank id does not exist + RankIdNotExistError = "rank id does not exist" // JobKindKey for define the Job kind:ascend-310P, ascend-910 JobKindKey = "ring-controller.atlas" // JobKind910Value in ring-controller.atlas. diff --git a/component/ascend-for-volcano/huawei_npu.go b/component/ascend-for-volcano/huawei_npu.go index e34b687f670942e2a9ef6aea1021e98ade232efc..7c62c5b99aa1eb986af152db67db265b10f663d1 100644 --- a/component/ascend-for-volcano/huawei_npu.go +++ b/component/ascend-for-volcano/huawei_npu.go @@ -91,6 +91,8 @@ func (tp *huaweiNPUPlugin) OnSessionOpen(ssn *framework.Session) { addJobReadyFn(ssn, tp) addJobEnqueueableFn(ssn, tp) + + addTaskOrderFn(ssn, tp) // Register event handlers to update task info in PodLister & nodeMap // for support Concurrency addEventHandler(ssn, tp) @@ -237,6 +239,12 @@ func addJobEnqueueableFn(ssn *framework.Session, tp *huaweiNPUPlugin) { }) } +func addTaskOrderFn(ssn *framework.Session, tp *huaweiNPUPlugin) { + ssn.AddTaskOrderFn(tp.Name(), func(l interface{}, r interface{}) int { + return tp.Scheduler.TaskOrderFn(l, r) + }) +} + func getNpuNum(ssn *framework.Session, tp *huaweiNPUPlugin, npuName string) int { var tNpuNum int for _, node := range ssn.Nodes { diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go index 708f627a7d9a73ce99941c4fb7a903180a43ef1c..467c081eada3ae1a71cbedcbe567b3a49a07afec 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame.go @@ -177,6 +177,10 @@ func (tp *module910SuperPod) ScoreBestNPUNodes(task *api.TaskInfo, nodes []*api. defer func() { if *job.JobReadyTag { + if podGroupEnable, exist := job.Label[plugin.PodGroupScheduleKey]; exist && podGroupEnable == plugin.PodGroupScheduleValue { + tp.scoreNodeBatchForReadyJob(task, &job, sMap) + return + } tp.scoreNodeForReadyJob(task, job, sMap) } }() @@ -211,6 +215,83 @@ func (tp *module910SuperPod) ScoreBestNPUNodes(task *api.TaskInfo, nodes []*api. return nil } +func (tp *module910SuperPod) scoreNodeBatchForReadyJob(task *api.TaskInfo, job *plugin.SchedulerJob, + sMap map[string]float64) { + if task == nil || job == nil || len(sMap) == 0 || tp.spBlock == 0 { + klog.V(util.LogErrorLev).Infof("scoreNodeBatchForReadyJob %s", errors.New(util.ArgumentError)) + return + } + rankIdMap := tp.obtainBatchScoreRank(task, job) + if len(rankIdMap) == 0 { + klog.V(util.LogErrorLev).Infof("%s scoreNodeBatchForReadyJob %s: rankIdMap empty", + tp.GetPluginName(), task.Name) + *job.JobReadyTag = false + return + } + for rankId := range rankIdMap { + superPodRank := rankId / tp.spBlock + localRank := rankId % tp.spBlock + klog.V(util.LogInfoLev).Infof("superPodRank: %d, localRank: %d", superPodRank, localRank) + superPodRankIndex := strconv.Itoa(superPodRank) + if localRank >= len(job.SuperPods[superPodRankIndex]) { + klog.V(util.LogErrorLev).Infof("superPodRank: %d, localRank: %d out of rank", superPodRank, localRank) + *job.JobReadyTag = false + break + } + spn := job.SuperPods[superPodRankIndex][localRank] + if _, ok := sMap[spn.Name]; !ok { + klog.V(util.LogErrorLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s> not in sMap, select fail", + tp.GetPluginName(), task.Name, spn.Name) + *job.JobReadyTag = false + break + } + klog.V(util.LogInfoLev).Infof("%s scoreNodeBatchForReadyJob %s: node<%s/%s> is exist in "+ + "SuperPodID: %d, select success", tp.GetPluginName(), task.Name, spn.Name, superPodRankIndex, + spn.SuperPodID) + sMap[spn.Name] = float64(scoreForNode - rankId) + } +} + +func (tp *module910SuperPod) obtainBatchScoreRank(task *api.TaskInfo, job *plugin.SchedulerJob) map[int]struct{} { + if task == nil || job == nil { + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s", errors.New(util.ArgumentError)) + return nil + } + spec, ok := task.Pod.Annotations[taskSpec] + if !ok { + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank %s: (%s/%s) obtain taskSpec fail, skip", + tp.GetPluginName(), task.Namespace, task.Name) + return nil + } + klog.V(util.LogInfoLev).Infof("obtainOriginalRankIdMap job (%s/%s), len(job.Tasks) %d", + job.NameSpace, job.Name, len(job.Tasks)) + m := make(map[int]struct{}, len(job.Tasks)) + for _, task := range job.Tasks { + if !task.IsNPUTask() || task.Annotation[taskSpec] != spec { + continue + } + if task.PodStatus != v1.PodPending { + continue + } + rankIndex, ok := task.Annotation[plugin.PodRankIndexKey] + if !ok { + klog.V(util.LogWarningLev).Infof("obtainBatchScoreRank (%s/%s): rankIndex is not exist", + task.NameSpace, task.Name) + continue + } + rank, err := strconv.Atoi(rankIndex) + if err != nil { + klog.V(util.LogErrorLev).Infof("obtainBatchScoreRank (%s/%s): rankIndex is not int", + task.NameSpace, task.Name) + continue + } + m[rank] = struct{}{} + } + klog.V(util.LogInfoLev).Infof("obtainBatchScoreRank job (%s/%s), len(rankMap) %d", + job.NameSpace, job.Name, len(m)) + return m +} + func (tp *module910SuperPod) scoreNodeForReadyJob(task *api.TaskInfo, job plugin.SchedulerJob, sMap map[string]float64) { var rank int diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go index bfdc9dac4d566eeda5b2ebedef44e81ab468b12d..84190db340d435ac27ed82b2f8918b72376a19cc 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/frame_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/agiledragon/gomonkey/v2" + "k8s.io/api/core/v1" "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/plugins/ascend-volcano-plugin/common/util" @@ -844,3 +845,175 @@ func TestIsDelayingJob(t *testing.T) { }) } } + +func new910SuperPod(name string) *module910SuperPod { + m := &module910SuperPod{} + m.SetPluginName(name) + m.SetAnnoName(util.NPU910CardName) + m.SetAnnoPreVal(util.NPU910CardNamePre) + m.SetMaxNodeNPUNum(ascend910a3.NodeNPUNumber) + m.SetMaxCardNPUNum(ascend910a3.DieNPUNumber) + m.SetIsNetworkFaultAttention(true) + m.NetUnhealthyKey = ascend910a3.NetworkUnhealthyNPU + m.nodeVPodId = map[string]string{} + return m +} + +// TestScoreNodeBatchForReadyJob test of scoreNodeBatchForReadyJob +func TestScoreNodeBatchForReadyJob(t *testing.T) { + plg := new910SuperPod(SchedulerName) + plg.Name = "job1" + plg.SchedulerJobAttr = util.SchedulerJobAttr{ + ComJob: util.ComJob{}, + NPUJob: &util.NPUJob{}, + } + plg.ScheduleEnv = plugin.ScheduleEnv{} + type args struct { + task *api.TaskInfo + job *plugin.SchedulerJob + sMap map[string]float64 + } + tests := []struct { + name string + args args + }{ + { + name: "01-scoreNodeBatchForReadyJob invalid argument", + args: args{}, + }, + { + name: "02-scoreNodeBatchForReadyJob rankIdMap empty", + args: args{ + task: test.FakeNormalTestTask("pod1", "node1", "acjob"), + job: &plugin.SchedulerJob{}, + sMap: map[string]float64{"node1": 0}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + plg.scoreNodeBatchForReadyJob(tt.args.task, tt.args.job, tt.args.sMap) + }) + } +} + +const ( + batchScoreNpuTask4 = 4 + batchScoreRank0 = 0 +) + +func createObtainBatchScoreRankTaskInfo(jobId, rankId, spec string) *api.TaskInfo { + task := test.FakeNormalTestTask("pod1", "node1", "acjob") + task.Job = api.JobID(jobId) + task.Pod.Annotations[plugin.PodRankIndexKey] = rankId + task.Pod.Annotations[taskSpec] = spec + return task +} + +func createBatchScoreNPUTasks(n int) map[api.TaskID]util.NPUTask { + tasks := make(map[api.TaskID]util.NPUTask, n) + for i := 0; i < n; i++ { + spec := workerSpec + if batchScoreRank0 == i { + spec = schedulerSpec + } + tasks[api.TaskID(strconv.Itoa(i))] = util.NPUTask{ + Name: "task" + strconv.Itoa(i), + ReqNPUName: util.NPU910CardName, + Annotation: map[string]string{ + plugin.PodRankIndexKey: strconv.Itoa(i), + taskSpec: spec, + }, + PodStatus: v1.PodPending, + } + } + return tasks +} + +func fakeSchedulerJobEmptyTask(jobName, namespace string) *plugin.SchedulerJob { + job := &plugin.SchedulerJob{ + SchedulerJobAttr: util.SchedulerJobAttr{ + ComJob: util.ComJob{ + Name: api.JobID(jobName), + NameSpace: namespace, + Selector: map[string]string{}, + Label: map[string]string{}, + }, + NPUJob: &util.NPUJob{ + ReqNPUName: util.NPU910CardName, + ReqNPUNum: 0, + Tasks: make(map[api.TaskID]util.NPUTask), + }, + }, + } + return job +} + +type obtainBatchScoreRankArgs struct { + task *api.TaskInfo + job *plugin.SchedulerJob +} + +type obtainBatchScoreRankTest struct { + name string + args obtainBatchScoreRankArgs + want map[int]struct{} +} + +func getObtainBatchScoreRankTestCases(jobId string) []obtainBatchScoreRankTest { + schedulerJob := fakeSchedulerJobEmptyTask(jobId, "") + schedulerJob.Tasks = createBatchScoreNPUTasks(batchScoreNpuTask4) + taskInfoWithoutSpecAnno := createObtainBatchScoreRankTaskInfo(jobId, "1", schedulerSpec) + delete(taskInfoWithoutSpecAnno.Pod.Annotations, taskSpec) + tests := []obtainBatchScoreRankTest{ + { + name: "01-obtainBatchScoreRank invalid argument", + args: obtainBatchScoreRankArgs{}, + want: nil, + }, + { + name: "02-obtainBatchScoreRank spec not exist", + args: obtainBatchScoreRankArgs{ + task: taskInfoWithoutSpecAnno, + job: schedulerJob, + }, + want: nil, + }, + { + name: "03-obtainBatchScoreRank spec " + schedulerSpec, + args: obtainBatchScoreRankArgs{ + task: createObtainBatchScoreRankTaskInfo(jobId, "1", schedulerSpec), + job: schedulerJob, + }, + want: map[int]struct{}{0: {}}, + }, + { + name: "04-obtainBatchScoreRank spec " + workerSpec, + args: obtainBatchScoreRankArgs{ + task: createObtainBatchScoreRankTaskInfo(jobId, "1", workerSpec), + job: schedulerJob, + }, + want: map[int]struct{}{1: {}, 2: {}, 3: {}}, + }, + } + return tests +} + +// TestObtainBatchScoreRank test of obtainBatchScoreRank +func TestObtainBatchScoreRank(t *testing.T) { + jobId := "job1" + plg := new910SuperPod(SchedulerName) + plg.Name = api.JobID(jobId) + plg.SchedulerJobAttr = util.SchedulerJobAttr{ + ComJob: util.ComJob{}, + NPUJob: &util.NPUJob{}, + } + plg.ScheduleEnv = plugin.ScheduleEnv{} + for _, tt := range getObtainBatchScoreRankTestCases(jobId) { + t.Run(tt.name, func(t *testing.T) { + if got := plg.obtainBatchScoreRank(tt.args.task, tt.args.job); !reflect.DeepEqual(got, tt.want) { + t.Errorf("obtainBatchScoreRank() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go index 40ec76335b3a2a6db721373fb2a088133294961b..5fe99c85e54c328127823e598217db296689a42f 100644 --- a/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go +++ b/component/ascend-for-volcano/internal/npu/ascend910/ascend910a3/superpod/type.go @@ -41,6 +41,7 @@ const ( scoreForNode = 100000000 taskSpec = "volcano.sh/task-spec" schedulerSpec = "scheduler" + workerSpec = "worker" delayingTime = 10 ) diff --git a/component/ascend-for-volcano/plugin/const.go b/component/ascend-for-volcano/plugin/const.go index a8b10b13f0cd8392df10c09fbefe495538472126..a297d530830242da99adf9de9ace2f482374d938 100644 --- a/component/ascend-for-volcano/plugin/const.go +++ b/component/ascend-for-volcano/plugin/const.go @@ -55,6 +55,16 @@ const ( PodRankIndexKey = "hccl/rankIndex" // ReplicaSetType replicaset type ReplicaSetType = "ReplicaSet" + // PodGroupScheduleKey podgroup schedule the enable key + PodGroupScheduleKey = "podgroup-sched-enable" + // PodGroupScheduleValue podgroup schedule the enable value + PodGroupScheduleValue = "true" +) + +const ( + taskOrderHighPriority = -1 + taskOrderLowPriority = 1 + taskOrderSamePriority = 0 ) const ( diff --git a/component/ascend-for-volcano/plugin/factory.go b/component/ascend-for-volcano/plugin/factory.go index 5d82694ff9a19fb706fb765a1a2fe51d41f17451..f539d5673ea2c4a480070b6be992db1740aa0080 100644 --- a/component/ascend-for-volcano/plugin/factory.go +++ b/component/ascend-for-volcano/plugin/factory.go @@ -446,6 +446,63 @@ func isContain(target string, strArray []string) bool { return false } +// TaskOrderFn Sort the selected tasks. +func (sHandle *ScheduleHandler) TaskOrderFn(InterfaceA interface{}, InterfaceB interface{}) int { + taskInfoA, ok := InterfaceA.(*api.TaskInfo) + if !ok { + klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") + return taskOrderSamePriority + } + taskInfoB, ok := InterfaceB.(*api.TaskInfo) + if !ok { + klog.V(util.LogDebugLev).Info("TaskOrderFn failed, object is not a TaskInfo") + return taskOrderSamePriority + } + + job, ok := sHandle.Jobs[taskInfoA.Job] + if !ok { + klog.V(util.LogDebugLev).Infof("TaskOrderFn (%s/%s): job is not exist", taskInfoA.Namespace, taskInfoA.Name) + return taskOrderSamePriority + } + podGroupEnable, exist := job.Label[PodGroupScheduleKey] + if !exist || podGroupEnable != PodGroupScheduleValue { + return taskOrderSamePriority + } + rRankId, err := sHandle.obtainTaskRankId(taskInfoB) + if err != nil { + return taskOrderSamePriority + } + lRankId, err := sHandle.obtainTaskRankId(taskInfoA) + if err != nil { + return taskOrderSamePriority + } + + if lRankId < rRankId { + return taskOrderHighPriority + } + return taskOrderLowPriority +} + +func (sHandle *ScheduleHandler) obtainTaskRankId(task *api.TaskInfo) (int, error) { + if task == nil { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId failed: %s.", util.ArgumentError) + return 0, errors.New(util.ArgumentError) + } + rankIndex, ok := task.Pod.Annotations[PodRankIndexKey] + if !ok { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex not exist", + task.Namespace, task.Name) + return 0, errors.New(util.RankIdNotExistError) + } + rankId, err := strconv.Atoi(rankIndex) + if err != nil { + klog.V(util.LogDebugLev).Infof("obtainTaskRankId task(%s/%s): rankIndex(%s) is not int", + task.Namespace, task.Name, rankIndex) + return 0, errors.New(util.ArgumentError) + } + return rankId, nil +} + // BatchNodeOrderFn Score the selected nodes. func (sHandle *ScheduleHandler) BatchNodeOrderFn(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) { diff --git a/component/ascend-for-volcano/plugin/factory_test.go b/component/ascend-for-volcano/plugin/factory_test.go index 18513c1f1fa1121f27784f87fc804beffac38a75..0cfbe29a92c8bed1fad1c3f01befaf6dfb627c77 100644 --- a/component/ascend-for-volcano/plugin/factory_test.go +++ b/component/ascend-for-volcano/plugin/factory_test.go @@ -21,6 +21,7 @@ package plugin import ( "reflect" + "strconv" "strings" "sync" "testing" @@ -682,3 +683,230 @@ func TestGetGraceDeleteTime(t *testing.T) { }) } } + +type taskOrderFnArgs struct { + l interface{} + r interface{} +} +type taskOrderFnTest struct { + name string + fields fields + args taskOrderFnArgs + want int +} + +func createTaskOrderFnTask(jobId string, taskNum int) []*api.TaskInfo { + tTask := test.FakeNormalTestTasks(taskNum) + for index, task := range tTask { + task.Job = api.JobID(jobId) + task.Pod.Annotations[PodRankIndexKey] = strconv.Itoa(index) + } + return tTask +} + +func buildTaskOrderFnTest() []taskOrderFnTest { + jobIdStr := "job1" + tTask := createTaskOrderFnTask(jobIdStr, util.NPUIndex2) + var tests []taskOrderFnTest + tests = append(tests, createTaskOrderFnCasesOfTaskNil()...) + tests = append(tests, createTaskOrderFnCasesOfJobNotExist(tTask[util.NPUIndex0], tTask[util.NPUIndex1])...) + tests = append(tests, createTaskOrderFnCasesOfPodGroupLabelNotExist(jobIdStr, + tTask[util.NPUIndex0], tTask[util.NPUIndex1])...) + tests = append(tests, createTaskOrderFnCasesOfCompareTask(jobIdStr, + tTask[util.NPUIndex0], tTask[util.NPUIndex1])...) + return tests +} + +func createTaskOrderFnCasesOfTaskNil() []taskOrderFnTest { + return []taskOrderFnTest{ + { + name: "01-TaskOrderFn nil task", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: nil, r: nil}, + want: 0, + }, + { + name: "02-TaskOrderFn right value nil task", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: &api.TaskInfo{}, r: nil}, + want: 0, + }, + } +} + +func createTaskOrderFnCasesOfJobNotExist(task1, task2 *api.TaskInfo) []taskOrderFnTest { + return []taskOrderFnTest{ + { + name: "03-TaskOrderFn job not exist", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: NewClusterCache(), + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task1, r: task2}, + want: 0, + }, + } +} + +func createTaskOrderFnCasesOfPodGroupLabelNotExist(jobIdStr string, task1, task2 *api.TaskInfo) []taskOrderFnTest { + jobId := api.JobID(jobIdStr) + schedulerJob := fakeSchedulerJobEmptyTask(jobIdStr, "") + clusterCache := NewClusterCache() + clusterCache.Jobs = map[api.JobID]SchedulerJob{jobId: schedulerJob} + + return []taskOrderFnTest{ + { + name: "04-TaskOrderFn job podgroup label not exist", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task1, r: task2}, + want: 0, + }, + } +} + +func createTaskOrderFnCasesOfCompareTask(jobIdStr string, task1, task2 *api.TaskInfo) []taskOrderFnTest { + jobId := api.JobID(jobIdStr) + schedulerJob := fakeSchedulerJobEmptyTask(jobIdStr, "") + schedulerJob.Label[PodGroupScheduleKey] = PodGroupScheduleValue + clusterCache := NewClusterCache() + clusterCache.Jobs = map[api.JobID]SchedulerJob{jobId: schedulerJob} + return []taskOrderFnTest{ + { + name: "05-TaskOrderFn job l < r", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task1, r: task2}, + want: -1, + }, + { + name: "06-TaskOrderFn job l > r", + fields: fields{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ + ClusterCache: clusterCache, + FrameAttr: VolcanoFrame{}, + }, + }, + args: taskOrderFnArgs{l: task2, r: task1}, + want: 1, + }, + } +} + +// TestTaskOrderFn test of TaskOrderFn +func TestTaskOrderFn(t *testing.T) { + tests := buildTaskOrderFnTest() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sHandle := &ScheduleHandler{ + NPUPlugins: tt.fields.NPUPlugins, + ScheduleEnv: tt.fields.ScheduleEnv, + } + if got := sHandle.TaskOrderFn(tt.args.l, tt.args.r); got != tt.want { + t.Errorf("TaskOrderFn() = %v, want %v", got, tt.want) + } + }) + } +} + +func fakeSchedulerJobEmptyTask(jobName, namespace string) SchedulerJob { + job := SchedulerJob{ + SchedulerJobAttr: util.SchedulerJobAttr{ + ComJob: util.ComJob{ + Name: api.JobID(jobName), + NameSpace: namespace, + Selector: map[string]string{}, + Label: map[string]string{}, + }, + NPUJob: &util.NPUJob{ + ReqNPUName: util.NPU910CardName, + ReqNPUNum: 0, + Tasks: make(map[api.TaskID]util.NPUTask), + }, + }, + } + return job +} + +type taskRankIDFnArgs struct { + task *api.TaskInfo +} +type taskRankIdFnTest struct { + name string + fields fields + args taskRankIDFnArgs + want int + wantErr bool +} + +func buildObtainTaskRankIdCases() []taskRankIdFnTest { + task1 := test.FakeNormalTestTask("pod1", "node1", "acjob") + delete(task1.Pod.Annotations, PodRankIndexKey) + task2 := test.FakeNormalTestTask("pod1", "node1", "acjob") + task2.Pod.Annotations[PodRankIndexKey] = "" + + return []taskRankIdFnTest{ + { + name: "01-obtainTaskRankId, task is nil", + args: taskRankIDFnArgs{task: nil}, + want: 0, + wantErr: true, + }, + { + name: "02-obtainTaskRankId, pod annotation not exist", + args: taskRankIDFnArgs{task: task1}, + want: 0, + wantErr: true, + }, + { + name: "03-obtainTaskRankId, pod annotation not int", + args: taskRankIDFnArgs{task: task2}, + want: 0, + wantErr: true, + }, + } +} + +// TestObtainTaskRankId test obtainTaskRankId +func TestObtainTaskRankId(t *testing.T) { + tests := buildObtainTaskRankIdCases() + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sHandle := &ScheduleHandler{ + NPUPlugins: map[string]sets.Empty{}, + ScheduleEnv: ScheduleEnv{ClusterCache: NewClusterCache(), FrameAttr: VolcanoFrame{}}, + } + res, err := sHandle.obtainTaskRankId(tt.args.task) + if res != tt.want || (err != nil) != tt.wantErr { + t.Errorf("TaskOrderFn() = %v, want %v", res, tt.want) + } + }) + } + +} diff --git a/component/clusterd/pkg/common/constant/constants.go b/component/clusterd/pkg/common/constant/constants.go index 835ab116c5a6d1c44d4075e71cce8cbc79ea059c..84fd8d6d527540173ea9cd1e05091bc4fc2c76bd 100644 --- a/component/clusterd/pkg/common/constant/constants.go +++ b/component/clusterd/pkg/common/constant/constants.go @@ -49,7 +49,7 @@ const ( // AscendJobRefKind reference kind is AscendJob AscendJobRefKind = "AscendJob" // MaxSupportNodeNum max support node num - MaxSupportNodeNum = 5000 + MaxSupportNodeNum = 16000 // MaxSupportJobNum max support job num MaxSupportJobNum = 10000 // MaxCmQueueLen max cm queue len support