From 93cddfc7c0f1955b8db7e98f17bd56b144dfa3b9 Mon Sep 17 00:00:00 2001 From: "Wu,Qiang-Roy" Date: Fri, 15 Aug 2025 14:24:56 +0800 Subject: [PATCH] watch cm: job-reschedule-reason to obtain the reschedule times --- .../pkg/model/model.go | 32 +++ .../pkg/model/slownode/slownode.go | 6 +- .../slownode/cluster/job_processor.go | 64 +++++- .../slownode/cluster/job_processor_test.go | 113 ++++++++++ .../slownode/cluster/job_summary_test.go | 208 +++++++++++++++++- .../slownode/dataparse/data_parse.go | 31 +-- .../slownode/dataparse/data_parse_test.go | 2 +- .../slownode/slownodejob/job_context.go | 24 +- .../pkg/utils/grpc/client_test.go | 77 +++++++ .../pkg/utils/k8s/cm_watcher.go | 185 ++++++++++++++++ .../pkg/utils/k8s/cm_watcher_test.go | 111 ++++++++++ .../pkg/utils/k8s/kubeclient.go | 11 - .../pkg/utils/k8s/kubeclient_test.go | 9 - .../pkg/utils/network.go | 14 +- .../pkg/utils/network_test.go | 142 ++++++++++++ .../pkg/utils/storage.go | 4 + .../pkg/utils/storage_test.go | 2 +- .../pkg/utils/utils.go | 2 +- .../pkg/utils/utils_test.go | 169 ++------------ component/clusterd/build/Dockerfile | 2 +- 20 files changed, 992 insertions(+), 216 deletions(-) create mode 100644 component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher.go create mode 100644 component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher_test.go create mode 100644 component/ascend-faultdiag-online/pkg/utils/network_test.go diff --git a/component/ascend-faultdiag-online/pkg/model/model.go b/component/ascend-faultdiag-online/pkg/model/model.go index 5ac8eb9af..c72a6ffba 100644 --- a/component/ascend-faultdiag-online/pkg/model/model.go +++ b/component/ascend-faultdiag-online/pkg/model/model.go @@ -48,3 +48,35 @@ type JobSummary struct { // Operator add/delete Operator string } + +// RescheduleReason the reason why training job reschedule +type RescheduleReason struct { + // RescheduleReason the reason why training job reschedule + RescheduleReason string `jons:"RescheduleReason"` + // PodName the pod name of training job in + PodName string `json:"PodName"` + // NodeName the node name of training job in + NodeName string `json:"NodeName"` + // NodeRankIndex the rank index of the node + NodeRankIndex string `json:"NodeRankIndex"` +} + +// RescheduleRecords the record struct of reschedule +type RescheduleRecords struct { + // LogFileFormatTime log file format time + LogFileFormatTime string `json:"LogFileFormatTime"` + // RescheduleTimeStamp reschedule timestamp + RescheduleTimeStamp int64 `json:"RescheduleTimeStamp"` + // ReasonOfTask reason why training job reschedule + ReasonOfTask []RescheduleReason `json:"ReasonOfTask"` +} + +// RescheduleData the reschedule data struct for training job +type RescheduleData struct { + // JobId including namespace/jobName-jobId + JobId string `json:"jobID"` // sample:default/default-test-mindspore-f4121ec4-590e-4cdc-a422-ac256b898659 + // TotalRescheduleTimes the total reschedule count + TotalRescheduleTimes int `json:"TotalRescheduleTimes"` + // RescheduleRecords the records of reschedule + RescheduleRecords []RescheduleRecords `json:"RescheduleRecords"` +} diff --git a/component/ascend-faultdiag-online/pkg/model/slownode/slownode.go b/component/ascend-faultdiag-online/pkg/model/slownode/slownode.go index 8ab5fbb7a..e0e364a4a 100644 --- a/component/ascend-faultdiag-online/pkg/model/slownode/slownode.go +++ b/component/ascend-faultdiag-online/pkg/model/slownode/slownode.go @@ -182,11 +182,11 @@ type ApiRes struct { // Server is a struct for cm data in job-summary type Server struct { // Sn is the serial number of the server - Sn string `json:"sn"` + Sn string // Ip is the ip address of the server - Ip string `json:"ip"` + Ip string // RankIds is the rank ids of the server, e.g. ["0", "1", "2"] - RankIds []string `json:"rankIds"` + RankIds []string } // JobSummary is a struct for cm data in job-summary diff --git a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor.go b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor.go index e41f35e65..806197e93 100644 --- a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor.go +++ b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor.go @@ -21,6 +21,7 @@ import ( "fmt" "os" "path/filepath" + "strings" "time" corev1 "k8s.io/api/core/v1" @@ -30,6 +31,7 @@ import ( "ascend-common/common-utils/hwlog" "ascend-faultdiag-online/pkg/core/context" "ascend-faultdiag-online/pkg/core/model/enum" + "ascend-faultdiag-online/pkg/model" "ascend-faultdiag-online/pkg/model/slownode" "ascend-faultdiag-online/pkg/service/servicefunc/slownode/algo" "ascend-faultdiag-online/pkg/service/servicefunc/slownode/constants" @@ -45,9 +47,14 @@ const ( slowNodeOn = 1 // slowNodeOff stop slow node feature slowNodeOff = 0 + + rescheduleNamespace = "mindx-dl" + rescheduleCmName = "job-reschedule-reason" + rescheduleRecordsKey = "recent-reschedule-records" ) var jobSummaryWatcher = utils.NewStorage[string]() +var rescheduleWatcher = utils.NewStorage[string]() type jobProcessor struct { ctx *slownodejob.JobContext @@ -56,8 +63,7 @@ type jobProcessor struct { func (j *jobProcessor) logPrefix() string { if j.ctx != nil { - return fmt.Sprintf("[FD-OL SLOWNODE]job(name=%s, namespace=%s, jobId=%s)", - j.ctx.Job.JobName, j.ctx.Job.Namespace, j.ctx.Job.JobId) + return j.ctx.LogPrefix() } return fmt.Sprintf("[FD-OL SLOWNODE]job(name=%s, namespace=%s, jobId=%s)", j.job.JobName, j.job.Namespace, j.job.JobId) @@ -125,6 +131,7 @@ func (j *jobProcessor) delete() { return } grpcClient.UnsubscribeJobSummary(registerId) + jobSummaryWatcher.Delete(j.job.KeyGenerator()) slownodejob.GetJobCtxMap().Delete(j.job.KeyGenerator()) } @@ -153,6 +160,7 @@ func (j *jobProcessor) start() { hwlog.RunLog.Errorf("%s created or updated cm feaild: %v", j.logPrefix(), err) return } + j.startRescheduleWatcher() j.ctx.StartAllProfiling() j.waitNodeReport() } @@ -172,6 +180,8 @@ func (j *jobProcessor) stop() { } algo.NewController(j.ctx).Stop() j.ctx.Stop() + j.stopRescheduleWatcher() + rescheduleWatcher.Delete(j.job.KeyGenerator()) jobOnceMap.Delete(j.ctx.Job.JobId) j.removeData() } @@ -267,6 +277,56 @@ func (j *jobProcessor) waitNodeReport() { }() } +func (j *jobProcessor) startRescheduleWatcher() { + var cmWatcher = k8s.GetCmWatcher() + registerId := cmWatcher.Subscribe(rescheduleNamespace, rescheduleCmName, j.rescheduleProcessor) + rescheduleWatcher.Store(j.job.KeyGenerator(), registerId) +} + +func (j *jobProcessor) rescheduleProcessor(oldCm, newCm *corev1.ConfigMap, op watch.EventType) { + if op != watch.Added && op != watch.Modified { + return + } + hwlog.RunLog.Infof("%v got reschedule data: %v", j.logPrefix(), newCm) + // the format of rescheduleRecordsKey, refer to: + // https://www.hiascend.com/document/detail/zh/mindcluster/71RC1/clustersched/dlug/dl_resume_060.html + // convert configMap to map[string]any + var data map[string]model.RescheduleData + records, exists := newCm.Data[rescheduleRecordsKey] + if !exists || records == "" { + hwlog.RunLog.Warnf("%v %v not in cm data: %+v or records is empty", + j.logPrefix(), rescheduleRecordsKey, newCm.Data) + return + } + if err := json.Unmarshal([]byte(records), &data); err != nil { + hwlog.RunLog.Errorf("%v convert reschedule-records: %s to map[string]any failed: %v", + j.logPrefix(), records, err) + return + } + for _, rescheduleData := range data { + if strings.HasSuffix(rescheduleData.JobId, j.ctx.Job.JobId) { + if rescheduleData.TotalRescheduleTimes != j.ctx.GetRescheduleCount() { + hwlog.RunLog.Infof("%v detected the TotalRescheduleTimes: %d changed(local count: %d), "+ + "stop and start slow node detection", + j.logPrefix(), rescheduleData.TotalRescheduleTimes, j.ctx.GetRescheduleCount()) + j.ctx.SetRescheduleCount(rescheduleData.TotalRescheduleTimes) + j.stop() + j.start() + } + return + } + } +} + +func (j *jobProcessor) stopRescheduleWatcher() { + registerId, exists := rescheduleWatcher.Load(j.job.KeyGenerator()) + if !exists { + return + } + var cmWatcher = k8s.GetCmWatcher() + cmWatcher.Unsubscribe(rescheduleNamespace, rescheduleCmName, registerId) +} + // JobProcessor store the slow node feat config into the confMap in cluster func JobProcessor(oldData, newData *slownode.Job, operator watch.EventType) { hwlog.RunLog.Infof("[FD-OL SLOWNODE]got job cm data, operator: %s, newData: %+v, oldData: %+v", diff --git a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor_test.go b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor_test.go index 78e247d25..b651f55b3 100644 --- a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor_test.go +++ b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_processor_test.go @@ -25,11 +25,124 @@ import ( "github.com/smartystreets/goconvey/convey" "k8s.io/apimachinery/pkg/watch" + "ascend-faultdiag-online/pkg/core/config" + "ascend-faultdiag-online/pkg/core/context" "ascend-faultdiag-online/pkg/model/slownode" "ascend-faultdiag-online/pkg/service/servicefunc/slownode/slownodejob" "ascend-faultdiag-online/pkg/utils/constants" ) +func TestWaitNodeReport(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + timeout := 1 + + // Mock context + context.FdCtx = &context.FaultDiagContext{} + context.FdCtx.Config = &config.FaultDiagConfig{ + Cluster: config.Cluster{ + NodeReportTimeout: timeout, + }, + } + convey.Convey("Test waitNodeReport", t, func() { + ctx := ctxGenerator() + ctx.StopChan = make(chan struct{}) + ctx.NodeReportSignal = make(chan struct{}) + j := &jobProcessor{ + ctx: ctx, + job: &slownode.Job{}, + } + convey.Convey("When node reports before timeout", func() { + j.waitNodeReport() + ctx.NodeReportSignal <- struct{}{} + }) + convey.Convey("When timeout occurs", func() { + mockStop := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), + "stop", + func(*jobProcessor) { + fmt.Println("stop called") + }, + ) + defer mockStop.Reset() + ctx.StopChan = make(chan struct{}) + ctx.NodeReportSignal = make(chan struct{}) + j.waitNodeReport() + time.Sleep(time.Second) + }) + convey.Convey("When job is stopped", func() { + ctx.StopChan = make(chan struct{}) + ctx.NodeReportSignal = make(chan struct{}) + j.waitNodeReport() + close(j.ctx.StopChan) + time.Sleep(time.Millisecond) + }) + }) +} + +func TestJobProcessor(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + job := &slownode.Job{} + convey.Convey("Test JobProcessor", t, func() { + testJobProcessorWithCase1(job) + testJobProcessorWithCase2(job) + }) +} + +func testJobProcessorWithCase1(job *slownode.Job) { + convey.Convey("When operator is Added", func() { + job.JobName = testJobName + mockAdd := gomonkey.ApplyPrivateMethod(&jobProcessor{}, "add", func(*jobProcessor) { + fmt.Println("mock start") + }) + defer mockAdd.Reset() + output := captureOutput(func() { + JobProcessor(nil, job, watch.Added) + }) + convey.So(output, convey.ShouldContainSubstring, "mock start") + }) + + convey.Convey("When operator is Modified", func() { + mockUpdate := gomonkey.ApplyPrivateMethod(&jobProcessor{}, "update", func(*jobProcessor) { + fmt.Println("mock update") + }) + defer mockUpdate.Reset() + output := captureOutput(func() { + JobProcessor(nil, job, watch.Modified) + }) + convey.So(output, convey.ShouldContainSubstring, "mock update") + }) + + convey.Convey("When operator is Deleted", func() { + mockDelete := gomonkey.ApplyPrivateMethod(&jobProcessor{}, "delete", func(*jobProcessor) { + fmt.Println("mock delete") + }) + defer mockDelete.Reset() + output := captureOutput(func() { + JobProcessor(nil, job, watch.Deleted) + }) + convey.So(output, convey.ShouldContainSubstring, "mock delete") + }) +} + +func testJobProcessorWithCase2(job *slownode.Job) { + convey.Convey("When job name is empty", func() { + job.JobName = "" + output := captureOutput(func() { + JobProcessor(nil, job, watch.Added) + }) + convey.So(output, convey.ShouldEqual, "") + }) + convey.Convey("When operator is unknown", func() { + job.JobName = testJobName + output := captureOutput(func() { + JobProcessor(nil, job, watch.Bookmark) + }) + convey.So(output, convey.ShouldEqual, "") + }) +} + func TestJobRestartProcessor(t *testing.T) { var ip = "127.0.0.1" patches := gomonkey.ApplyPrivateMethod(reflect.TypeOf(&jobProcessor{}), "stop", func(*jobProcessor) { diff --git a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_summary_test.go b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_summary_test.go index e31b0f360..151b6035b 100644 --- a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_summary_test.go +++ b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/cluster/job_summary_test.go @@ -17,20 +17,25 @@ package cluster import ( "encoding/json" - "testing" - "reflect" "fmt" + "reflect" + "testing" + "github.com/agiledragon/gomonkey/v2" "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/assert" "ascend-common/common-utils/hwlog" + "ascend-faultdiag-online/pkg/core/model/enum" "ascend-faultdiag-online/pkg/model" "ascend-faultdiag-online/pkg/model/slownode" + "ascend-faultdiag-online/pkg/service/servicefunc/slownode/common" + "ascend-faultdiag-online/pkg/service/servicefunc/slownode/slownodejob" ) func init() { config := hwlog.LogConfig{ - OnlyToStdout: true, + OnlyToStdout: true, } err := hwlog.InitRunLogger(&config, nil) if err != nil { @@ -38,6 +43,109 @@ func init() { } } +var ( + testJobName = "testJobName" + testNamespace = "testNamespace" + testJobId = "testJobId" +) + +func ctxGenerator() *slownodejob.JobContext { + ctx := &slownodejob.JobContext{ + Job: &slownode.Job{}, + } + ctx.Job.JobName = testJobName + ctx.Job.Namespace = testNamespace + return ctx +} + +func jobSummaryGenerator(t *testing.T) *model.JobSummary { + var jobSummary = &model.JobSummary{ + JobName: testJobName, + Namespace: testNamespace, + JobId: testJobId, + } + var data = `{ + "server_list": [ + { + "pod_id": "123", + "server_id": "127.0.0.1", + "server_sn": "321123", + "device": [ + { + "rank_id": "1" + }, + { + "rank_id": "2" + } + ] + } + ]}` + err := json.Unmarshal([]byte(data), &jobSummary.HcclJson) + assert.Nil(t, err) + return jobSummary +} + +func TestJobSummaryProcessor(t *testing.T) { + slownodejob.GetJobCtxMap().Clear() + var jobSummary = jobSummaryGenerator(t) + var ctx = ctxGenerator() + defer slownodejob.GetJobCtxMap().Clear() + convey.Convey("test jobSummaryProcessor", t, func() { + testJobSummaryProcessorCase1(ctx, jobSummary) + testJobSummaryProcessorCase2(ctx, jobSummary) + }) +} + +func testJobSummaryProcessorCase1(ctx *slownodejob.JobContext, jobSummary *model.JobSummary) { + convey.Convey("test no ctx found", func() { + output := captureOutput(func() { + jobSummaryProcessor(jobSummary) + }) + convey.So(output, convey.ShouldEqual, "") + }) + convey.Convey("test found ctx with different jobId", func() { + slownodejob.GetJobCtxMap().Insert(ctx.Job.KeyGenerator(), ctx) + // job summary status is empty, do the default case + jobSummaryProcessor(jobSummary) + convey.So(ctx.Job.JobId, convey.ShouldEqual, testJobId) + convey.So(ctx.TrainingJobStatus, convey.ShouldEqual, "") + }) +} + +func testJobSummaryProcessorCase2(ctx *slownodejob.JobContext, jobSummary *model.JobSummary) { + convey.Convey("test found ctx with job status is add", func() { + patch := gomonkey.ApplyFunc(jobStatusProcessor, func(*slownodejob.JobContext, *model.JobSummary) { + fmt.Println("mock the jobStatusProcessor") + }) + defer patch.Reset() + slownodejob.GetJobCtxMap().Clear() + slownodejob.GetJobCtxMap().Insert(ctx.Job.KeyGenerator(), ctx) + jobSummary.Operator = add + output := captureOutput(func() { + jobSummaryProcessor(jobSummary) + }) + + convey.So(ctx.Job.JobId, convey.ShouldEqual, testJobId) + convey.So(ctx.TrainingJobStatus, convey.ShouldEqual, "") + convey.So(output, convey.ShouldContainSubstring, "mock the jobStatusProcessor") + }) + convey.Convey("test found ctx with job status is delete", func() { + patch := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "delete", func(*jobProcessor) { fmt.Println("mock delete") }, + ) + defer patch.Reset() + slownodejob.GetJobCtxMap().Clear() + slownodejob.GetJobCtxMap().Insert(ctx.Job.KeyGenerator(), ctx) + jobSummary.Operator = del + output := captureOutput(func() { + jobSummaryProcessor(jobSummary) + }) + convey.So(ctx.Job.JobId, convey.ShouldEqual, testJobId) + convey.So(ctx.TrainingJobStatus, convey.ShouldEqual, "") + convey.So(output, convey.ShouldContainSubstring, "mock delete") + }) +} + func TestServersGenerator(t *testing.T) { convey.Convey("test serversGenerator", t, func() { var hcclJson = model.HcclJson{} @@ -71,3 +179,97 @@ func TestServersGenerator(t *testing.T) { convey.So(reflect.DeepEqual(servers, expect), convey.ShouldBeTrue) }) } + +func TestJobStatusProcessor(t *testing.T) { + slownodejob.GetJobCtxMap().Clear() + // prepare the jobSummary + var jobSummary = jobSummaryGenerator(t) + var ctx = ctxGenerator() + defer slownodejob.GetJobCtxMap().Clear() + convey.Convey("Test jobStatusProcessor", t, func() { + testJobStatusProcessorWithOtherJobStatus(ctx, jobSummary) + testJobStatusProcessorWithJobStatusIsRunning(ctx, jobSummary) + }) +} + +func testJobStatusProcessorWithJobStatusIsRunning(ctx *slownodejob.JobContext, jobSummary *model.JobSummary) { + convey.Convey("test job status is runing", func() { + jobSummary.JobStatus = enum.IsRunning + convey.Convey("test ctx is not running", func() { + // start job + patch := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "start", func(*jobProcessor) { fmt.Println("mock start") }, + ) + defer patch.Reset() + output := captureOutput(func() { + jobStatusProcessor(ctx, jobSummary) + }) + convey.So(output, convey.ShouldContainSubstring, "mock start") + }) + convey.Convey("test ctx is running and not reschedule", func() { + // servers are equal -> empty output + output := captureOutput(func() { + jobStatusProcessor(ctx, jobSummary) + }) + convey.So(output, convey.ShouldBeEmpty) + }) + convey.Convey("test ctx is running and reschedule", func() { + setUnexportedFiled(ctx, "isRunning", true) + // servers are not equal -> stop and start + patch := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "stop", func(*jobProcessor) { fmt.Println("mock stop") }, + ) + patch.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "start", func(*jobProcessor) { fmt.Println("mock start") }, + ) + patch.ApplyFunc(common.AreServersEqual, func(a, b []slownode.Server) bool { return false }) + defer patch.Reset() + output := captureOutput(func() { + jobStatusProcessor(ctx, jobSummary) + }) + convey.So(output, convey.ShouldContainSubstring, "mock stop") + convey.So(output, convey.ShouldContainSubstring, "mock start") + }) + }) +} + +func testJobStatusProcessorWithOtherJobStatus(ctx *slownodejob.JobContext, jobSummary *model.JobSummary) { + convey.Convey("test update server and jobStatus is empty", func() { + jobStatusProcessor(ctx, jobSummary) + servers := serversGenerator(jobSummary.HcclJson) + convey.So(reflect.DeepEqual(ctx.Job.Servers, servers), convey.ShouldBeTrue) + }) + convey.Convey("test job status is pending", func() { + patch := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "stop", func(*jobProcessor) { fmt.Println("mock stop") }, + ) + defer patch.Reset() + jobSummary.JobStatus = enum.IsPending + output := captureOutput(func() { + jobStatusProcessor(ctx, jobSummary) + }) + convey.So(output, convey.ShouldContainSubstring, "mock stop") + }) + convey.Convey("test job status is failed", func() { + patch := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "stop", func(*jobProcessor) { fmt.Println("mock stop") }, + ) + defer patch.Reset() + jobSummary.JobStatus = enum.IsFailed + output := captureOutput(func() { + jobStatusProcessor(ctx, jobSummary) + }) + convey.So(output, convey.ShouldContainSubstring, "mock stop") + }) + convey.Convey("test job status is complete", func() { + patch := gomonkey.ApplyPrivateMethod( + reflect.TypeOf(&jobProcessor{}), "delete", func(*jobProcessor) { fmt.Println("mock delete") }, + ) + defer patch.Reset() + jobSummary.JobStatus = enum.IsCompleted + output := captureOutput(func() { + jobStatusProcessor(ctx, jobSummary) + }) + convey.So(output, convey.ShouldContainSubstring, "mock delete") + }) +} diff --git a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse.go b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse.go index 4bf547388..054e76208 100644 --- a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse.go +++ b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse.go @@ -63,8 +63,7 @@ func (d *Controller) request(command enum.Command) error { if err != nil { return err } - hwlog.RunLog.Infof("[FD-OL SLOWNODE]job(name=%s, namespace=%s) %s data parse, confJson: %s", - d.ctx.Job.JobName, d.ctx.Job.Namespace, command, string(confJson)) + hwlog.RunLog.Infof("%v %s data parse, confJson: %s", d.ctx.LogPrefix(), command, string(confJson)) apiPath := fmt.Sprintf("feature/slownode/%s/%s", d.ctx.Deployment, command) resp, err := context.FdCtx.Request(apiPath, string(confJson)) if err != nil { @@ -77,24 +76,21 @@ func (d *Controller) request(command enum.Command) error { if res.Status != enum.Success { return errors.New(res.Msg) } - hwlog.RunLog.Infof("[FD-OL SLOWNODE]job(name=%s, namespace=%s) %s data parse success, resp: %s", - d.ctx.Job.JobName, d.ctx.Job.Namespace, command, resp) + hwlog.RunLog.Infof("%v %s data parse success, resp: %s", d.ctx.LogPrefix(), command, resp) return nil } // Start start the data parse func (d *Controller) Start() { if err := d.request(enum.Start); err != nil { - hwlog.RunLog.Errorf("[FD-OL SLOWNODE]job(name=%s, jobId=%s) started data parse failed: %v", - d.ctx.Job.JobName, d.ctx.Job.JobId, err) + hwlog.RunLog.Errorf("%v started data parse failed: %v", d.ctx.LogPrefix(), err) } } // Stop stop the data parse func (d *Controller) Stop() { if err := d.request(enum.Stop); err != nil { - hwlog.RunLog.Errorf("[FD-OL SLOWNODE]job(name=%s, jobId=%s) stopped data parse failed: %v", - d.ctx.Job.JobName, d.ctx.Job.JobId, err) + hwlog.RunLog.Errorf("%v stopped data parse failed: %v", d.ctx.LogPrefix(), err) } } @@ -103,31 +99,28 @@ func (d *Controller) Stop() { // only occurs in cluster func (d *Controller) MergeParallelGroupInfoWatcher() { go func() { - logPrefix := fmt.Sprintf("[FD-OL SLOWNODE]job(name=%s, jobId=%s)", d.ctx.Job.JobName, d.ctx.Job.JobId) - hwlog.RunLog.Infof("%s started watching merge parallel group info signal", logPrefix) + hwlog.RunLog.Infof("%s started watching merge parallel group info signal, timeout: %d", + d.ctx.LogPrefix(), context.FdCtx.Config.AllNodesReportTimeout) select { case <-d.ctx.MergeParallelGroupInfoSignal: - d.handleMergeSignal(logPrefix, "received signal") + d.handleMergeSignal("received signal") case <-time.After(time.Duration(context.FdCtx.Config.AllNodesReportTimeout) * time.Second): - d.handleMergeSignal( - logPrefix, - fmt.Sprintf("timeout after %d seconds", context.FdCtx.Config.AllNodesReportTimeout), - ) + d.handleMergeSignal(fmt.Sprintf("timeout after %d seconds", context.FdCtx.Config.AllNodesReportTimeout)) case _, ok := <-d.ctx.StopChan: if !ok { - hwlog.RunLog.Infof("%s stopped, exiting merge signal watcher", logPrefix) + hwlog.RunLog.Infof("%s stopped, exiting merge signal watcher", d.ctx.LogPrefix()) return } } }() } -func (d *Controller) handleMergeSignal(logPrefix, triggerReason string) { +func (d *Controller) handleMergeSignal(triggerReason string) { hwlog.RunLog.Infof("%s %s, merging parallel group info (reported nodes: %v)", - logPrefix, triggerReason, d.ctx.GetReportedNodeIps()) + d.ctx.LogPrefix(), triggerReason, d.ctx.GetReportedNodeIps()) d.ctx.AddStep() // Advance cluster step (e.g., from 1 to 2) d.ctx.StopHeavyProfiling() d.Start() - hwlog.RunLog.Infof("%s merge succeeded, exiting watcher", logPrefix) + hwlog.RunLog.Infof("%s merge succeeded, exiting watcher", d.ctx.LogPrefix()) } diff --git a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse_test.go b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse_test.go index 168fd5451..706bd9dbc 100644 --- a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse_test.go +++ b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/dataparse/data_parse_test.go @@ -199,6 +199,6 @@ func testHandleMergeSignal(ctx *slownodejob.JobContext) { convey.Convey("test handleMergeSignal", func() { patch := gomonkey.ApplyMethod(reflect.TypeOf(ctx), "StopHeavyProfiling", func(*slownodejob.JobContext) {}) defer patch.Reset() - NewController(ctx).handleMergeSignal("", "") + NewController(ctx).handleMergeSignal("") }) } diff --git a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/slownodejob/job_context.go b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/slownodejob/job_context.go index 803b4d61d..37c721321 100644 --- a/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/slownodejob/job_context.go +++ b/component/ascend-faultdiag-online/pkg/service/servicefunc/slownode/slownodejob/job_context.go @@ -43,6 +43,8 @@ type cluster struct { IsDegradation bool // NodeReportSignal node report signal NodeReportSignal chan struct{} + // rescheduleCount the reschedule count of training job + rescheduleCount int } // AddAlgoRecord add the slow node algo result in JobContext @@ -91,6 +93,20 @@ func (c *cluster) TriggerMerge() { } } +// GetRescheduleCount get the reschedule count of the training job +func (c *cluster) GetRescheduleCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.rescheduleCount +} + +// SetRescheduleCount set the reschedule count of the training job +func (c *cluster) SetRescheduleCount(count int) { + c.mu.Lock() + c.rescheduleCount = count + defer c.mu.Unlock() +} + type node struct { // RealRankId realRankIds parsed in data parse RealRankIds []string @@ -117,10 +133,7 @@ type JobContext struct { } // NewSlowNode returns a new SlowNode object -func NewJobContext( - job *slownode.Job, - deployment enum.DeployMode, -) *JobContext { +func NewJobContext(job *slownode.Job, deployment enum.DeployMode) *JobContext { if job == nil { hwlog.RunLog.Error("[FD-OL SLOWNODE]create slow node JobContext failed: job is nil") return nil @@ -195,7 +208,8 @@ func (ctx *JobContext) IsStartedHeavyProfiling() bool { } func (ctx *JobContext) LogPrefix() string { - return fmt.Sprintf("[FD-OL SLOWNODE]job(name=%s, jobId=%s)", ctx.Job.JobName, ctx.Job.JobId) + return fmt.Sprintf("[FD-OL SLOWNODE]job(name=%s, namespace=%v, jobId=%s)", + ctx.Job.JobName, ctx.Job.Namespace, ctx.Job.JobId) } // StartAllProfiling start all the profiling diff --git a/component/ascend-faultdiag-online/pkg/utils/grpc/client_test.go b/component/ascend-faultdiag-online/pkg/utils/grpc/client_test.go index 92ddaaa3e..3d0b68779 100644 --- a/component/ascend-faultdiag-online/pkg/utils/grpc/client_test.go +++ b/component/ascend-faultdiag-online/pkg/utils/grpc/client_test.go @@ -36,6 +36,7 @@ import ( "ascend-faultdiag-online/pkg/utils" "ascend-faultdiag-online/pkg/utils/grpc/job" "ascend-faultdiag-online/pkg/utils/grpc/profiling" + "ascend-faultdiag-online/pkg/utils/grpc/pubfault" ) var ( @@ -74,6 +75,48 @@ func TestMain(m *testing.M) { fmt.Printf("exit_code = %v\n", code) } +func TestConnect(t *testing.T) { + var validIp = "127.0.0.1" + var invalidIp = "0000" + convey.Convey("test connect", t, func() { + // c.conn is not nil + c := &Client{} + c.conn = &grpc.ClientConn{} + err := c.connect(validIp) + convey.So(err, convey.ShouldBeNil) + c.conn = nil + // invalid ip + err = c.connect(invalidIp) + convey.So(err.Error(), convey.ShouldContainSubstring, "invalid host") + // dial faild + connectFailed = true + err = c.connect(validIp) + convey.So(err.Error(), convey.ShouldContainSubstring, "failed to connect to grpc server") + // success + connectFailed = false + err = c.connect(validIp) + convey.So(err, convey.ShouldBeNil) + }) +} + +func TestClose(t *testing.T) { + c := &Client{} + patch := gomonkey.ApplyMethod(reflect.TypeOf(&grpc.ClientConn{}), "Close", func(*grpc.ClientConn) error { + c.conn = nil + return nil + }) + defer patch.Reset() + convey.Convey("test Close", t, func() { + convey.So(c.conn, convey.ShouldBeNil) + // conn is nil + c.Close() + convey.So(c.conn, convey.ShouldBeNil) + c.conn = &grpc.ClientConn{} + c.Close() + convey.So(c.conn, convey.ShouldBeNil) + }) +} + func TestGrpc(t *testing.T) { // test connect connectFailed = true @@ -125,6 +168,40 @@ func TestProfiling(t *testing.T) { assert.Nil(t, err) } +func TestProfilingSwitch(t *testing.T) { + var c = &Client{conn: &grpc.ClientConn{}} + c.tc = profiling.NewTrainingDataTraceClient(c.conn) + patch := gomonkey.ApplyMethodFunc( + reflect.TypeOf(c.tc), + "ModifyTrainingDataTraceSwitch", + func(context.Context, *profiling.DataTypeReq, ...grpc.CallOption) (*profiling.DataTypeRes, error) { + return &profiling.DataTypeRes{}, nil + }, + ) + defer patch.Reset() + convey.Convey("test profilingSwitch", t, func() { + _, err := c.profilingSwitch(&profiling.DataTypeReq{}) + convey.So(err, convey.ShouldBeNil) + }) +} + +func TestReportFault(t *testing.T) { + var c = &Client{conn: &grpc.ClientConn{}} + c.pf = pubfault.NewPubFaultClient(c.conn) + patch := gomonkey.ApplyMethodFunc( + reflect.TypeOf(c.pf), + "SendPublicFault", + func(context.Context, *pubfault.PublicFaultRequest, ...grpc.CallOption) (*pubfault.RespStatus, error) { + return &pubfault.RespStatus{}, nil + }, + ) + defer patch.Reset() + convey.Convey("test ReportFault", t, func() { + err := c.ReportFault([]*pubfault.Fault{}) + convey.So(err, convey.ShouldBeNil) + }) +} + func TestRegisterJobSummary(t *testing.T) { c := &Client{conn: &grpc.ClientConn{}} c.jc = job.NewJobClient(c.conn) diff --git a/component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher.go b/component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher.go new file mode 100644 index 000000000..0bcbe04ad --- /dev/null +++ b/component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher.go @@ -0,0 +1,185 @@ +/* Copyright(C) 2024. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package k8s is a tool to watch the config map +package k8s + +import ( + "context" + "fmt" + "sync" + + "github.com/google/uuid" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" + + "ascend-common/common-utils/hwlog" + "ascend-faultdiag-online/pkg/utils" +) + +var ( + // CmWatcher is a instance of cmWatcher + CmWatcher *cmWatcher + cmWatcherCnce sync.Once + storage = utils.NewStorage[*cmData]() +) + +type callbackFunc func(oldObj, newObj *corev1.ConfigMap, op watch.EventType) + +type cmData struct { + oldCm *corev1.ConfigMap + newCm *corev1.ConfigMap + op watch.EventType +} + +type callback struct { + registerId string + f callbackFunc +} + +type cmWatcher struct { + mu sync.Mutex + watchers map[string]context.CancelFunc + callbackMap map[string][]callback +} + +// GetCmWatcher return a singleton of cmWatcher +func GetCmWatcher() *cmWatcher { + cmWatcherCnce.Do(func() { + CmWatcher = &cmWatcher{ + watchers: make(map[string]context.CancelFunc), + callbackMap: make(map[string][]callback), + } + }) + return CmWatcher +} + +func (c *cmWatcher) keyGenerator(namespace, cmName string) string { + return fmt.Sprintf("%v/%v", namespace, cmName) +} + +func (c *cmWatcher) runInformer(ctx context.Context, namespace, cmName string) { + client, err := GetClient() + if err != nil { + hwlog.RunLog.Errorf("[FD-OL]get k8s client failed: %v", err) + return + } + factory := informers.NewFilteredSharedInformerFactory( + client.ClientSet, + 0, + namespace, + func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", cmName).String() + }, + ) + + informer := factory.Core().V1().ConfigMaps().Informer() + informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj any) { + go c.cmProcessor(nil, obj, watch.Added) + }, + UpdateFunc: func(oldObj, newObj any) { + go c.cmProcessor(oldObj, newObj, watch.Modified) + }, + DeleteFunc: func(obj any) { + go c.cmProcessor(nil, obj, watch.Deleted) + }, + }) + + stopChan := make(chan struct{}) + key := c.keyGenerator(namespace, cmName) + go func() { + <-ctx.Done() + close(stopChan) + hwlog.RunLog.Infof("[FD-OL]config map for %v stopped", key) + }() + hwlog.RunLog.Infof("[FD-OL]start to watch config map: %v", key) + informer.Run(stopChan) +} + +func (c *cmWatcher) cmProcessor(oldData, newData any, op watch.EventType) { + // convert data to configMap object + var oldCm, newCm *corev1.ConfigMap + var ok bool + if oldData != nil { + oldCm, ok = oldData.(*corev1.ConfigMap) + if !ok { + hwlog.RunLog.Errorf("[FD-OL]could not convert data: %v to config map object", oldData) + return + } + } + newCm, ok = newData.(*corev1.ConfigMap) + if !ok { + hwlog.RunLog.Errorf("[FD-OL]could not convert data: %v to config map object", newData) + return + } + key := c.keyGenerator(newCm.Namespace, newCm.Name) + storage.Store(key, &cmData{oldCm: oldCm, newCm: newCm, op: op}) + c.mu.Lock() + for _, cb := range c.callbackMap[key] { + go cb.f(oldCm, newCm, op) + } + c.mu.Unlock() +} + +// Subscribe subscribe the config map by namespace and cm name, call f if data available +func (c *cmWatcher) Subscribe(namespace, cmName string, f callbackFunc) string { + key := c.keyGenerator(namespace, cmName) + data, ok := storage.Load(key) + if ok { + go f(data.oldCm, data.newCm, data.op) + } + c.mu.Lock() + defer c.mu.Unlock() + registerId := uuid.New().String() + if _, exists := c.watchers[key]; exists { + callbacks := c.callbackMap[key] + callbacks = append(callbacks, callback{registerId: registerId, f: f}) + c.callbackMap[key] = callbacks + return registerId + } + c.callbackMap[key] = []callback{{registerId: registerId, f: f}} + ctx, cancle := context.WithCancel(context.Background()) + c.watchers[key] = cancle + go c.runInformer(ctx, namespace, cmName) + return registerId +} + +// Unsubscribe unsubscribe the config map +func (c *cmWatcher) Unsubscribe(namespace, cmName, registerId string) { + c.mu.Lock() + defer c.mu.Unlock() + var key = c.keyGenerator(namespace, cmName) + callbacks := c.callbackMap[key] + for i := 0; i < len(callbacks); i++ { + if callbacks[i].registerId == registerId { + callbacks[i] = callbacks[len(callbacks)-1] + callbacks = callbacks[:len(callbacks)-1] + c.callbackMap[key] = callbacks + break + } + } + if len(callbacks) == 0 { + if cancel, exists := c.watchers[key]; exists { + cancel() + } + delete(c.watchers, key) + delete(c.callbackMap, key) + storage.Delete(key) + } +} diff --git a/component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher_test.go b/component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher_test.go new file mode 100644 index 000000000..6569bddf9 --- /dev/null +++ b/component/ascend-faultdiag-online/pkg/utils/k8s/cm_watcher_test.go @@ -0,0 +1,111 @@ +/* Copyright(C) 2024. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package k8s is a DT collection for func in cm_watcher.go +package k8s + +import ( + "context" + "testing" + "time" + + "github.com/agiledragon/gomonkey/v2" + "github.com/smartystreets/goconvey/convey" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +const ( + testNamespace1 = "testNamespace1" + testCmName1 = "testCmName1" + testNamespace2 = "testNamespace2" + testCmName2 = "testCmName2" + + subCount = 10 +) + +func TestSubAndUnSub(t *testing.T) { + GetCmWatcher() + var count = 0 + patch := gomonkey.ApplyPrivateMethod( + CmWatcher, + "runInformer", + func(*cmWatcher, context.Context, string, string) { + count++ + }, + ) + defer patch.Reset() + var f = func(oldObj, newObj *corev1.ConfigMap, op watch.EventType) {} + var registerIds []string + convey.Convey("test sub and unsub", t, func() { + for i := 0; i < subCount; i++ { + registerIds = append(registerIds, CmWatcher.Subscribe(testNamespace1, testNamespace1, f)) + } + for i := 0; i < subCount; i++ { + registerIds = append(registerIds, CmWatcher.Subscribe(testNamespace2, testNamespace2, f)) + } + var length = 2 + convey.So(len(CmWatcher.callbackMap), convey.ShouldEqual, length) + convey.So(len(CmWatcher.watchers), convey.ShouldEqual, length) + convey.So(len(registerIds), convey.ShouldEqual, length*subCount) + time.Sleep(time.Millisecond) + convey.So(count, convey.ShouldEqual, length) + for _, registerId := range registerIds { + CmWatcher.Unsubscribe(testNamespace1, testNamespace1, registerId) + CmWatcher.Unsubscribe(testNamespace2, testNamespace2, registerId) + } + convey.So(len(CmWatcher.callbackMap), convey.ShouldEqual, 0) + convey.So(len(CmWatcher.watchers), convey.ShouldEqual, 0) + }) +} + +func TestCmProcessor(t *testing.T) { + GetCmWatcher() + patch := gomonkey.ApplyPrivateMethod( + CmWatcher, + "runInformer", + func(*cmWatcher, context.Context, string, string) {}, + ) + defer patch.Reset() + convey.Convey("test cm processor", t, func() { + // convert to configMap failed + wrongOldData := "xxx" + wrongNewData := "xxx" + CmWatcher.cmProcessor(wrongOldData, wrongNewData, watch.Added) + CmWatcher.cmProcessor(nil, wrongNewData, watch.Added) + newData := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: testCmName1, + Namespace: testNamespace1, + }, + } + CmWatcher.cmProcessor(nil, newData, watch.Added) + time.Sleep(time.Millisecond) + + // sub + f := func(oldObj, newObj *corev1.ConfigMap, op watch.EventType) { + assert.Equal(t, newObj.Name, testCmName1) + assert.Equal(t, newObj.Namespace, testNamespace1) + } + + registerId := CmWatcher.Subscribe(testNamespace1, testCmName1, f) + time.Sleep(time.Millisecond) + // unsub no data in storage + CmWatcher.Unsubscribe(testNamespace1, testCmName1, registerId) + _, ok := storage.Load(CmWatcher.keyGenerator(testNamespace1, testCmName1)) + convey.So(ok, convey.ShouldBeFalse) + }) +} diff --git a/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient.go b/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient.go index 81d9edd3a..db06ab7f2 100644 --- a/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient.go +++ b/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient.go @@ -88,17 +88,6 @@ func (c *Client) CreateConfigMap(cm *corev1.ConfigMap) (*corev1.ConfigMap, error return newCM, nil } -// GetConfigMap get config map by name and name space -func (c *Client) GetConfigMap(cmName, cmNamespace string) (*corev1.ConfigMap, error) { - newCM, err := c.ClientSet.CoreV1().ConfigMaps(cmNamespace).Get(context.TODO(), cmName, metav1.GetOptions{ - ResourceVersion: "0", - }) - if err != nil { - return nil, err - } - return newCM, nil -} - // UpdateConfigMap update config map func (c *Client) UpdateConfigMap(cm *corev1.ConfigMap) (*corev1.ConfigMap, error) { if cm == nil { diff --git a/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient_test.go b/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient_test.go index f0c504c8e..94006e666 100644 --- a/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient_test.go +++ b/component/ascend-faultdiag-online/pkg/utils/k8s/kubeclient_test.go @@ -123,7 +123,6 @@ func TestClientK8s(t *testing.T) { ClientSet: fake.NewSimpleClientset(), } convey.Convey("test ClientK8s method 'CreateConfigMap'", t, testCreateConfigMap) - convey.Convey("test ClientK8s method 'GetConfigMap'", t, testGetConfigMap) convey.Convey("test ClientK8s method 'UpdateConfigMap'", t, testUpdateConfigMap) convey.Convey("test ClientK8s method 'CreateOrUpdateConfigMap'", t, testCreateOrUpdateCM) } @@ -136,14 +135,6 @@ func testCreateConfigMap() { convey.So(err, convey.ShouldBeNil) } -func testGetConfigMap() { - if testK8sClient == nil { - panic("testK8sClient is nil") - } - _, err := testK8sClient.GetConfigMap("", "") - convey.So(err, convey.ShouldBeNil) -} - func testUpdateConfigMap() { if testK8sClient == nil { panic("testK8sClient is nil") diff --git a/component/ascend-faultdiag-online/pkg/utils/network.go b/component/ascend-faultdiag-online/pkg/utils/network.go index 4e78c2588..8d4b5ba71 100644 --- a/component/ascend-faultdiag-online/pkg/utils/network.go +++ b/component/ascend-faultdiag-online/pkg/utils/network.go @@ -28,31 +28,23 @@ import ( // GetNodeIp get the ip address of node pod func GetNodeIp() (string, error) { - // 获取环境变量 XDL_IP xdlIp := os.Getenv(constants.XdlIpField) - // 如果环境变量存在,直接返回 if xdlIp != "" { return xdlIp, nil } - // 如果没有环境变量,输出警告并调用 GetLocalIP 获取本地 IP - hwlog.RunLog.Warnf("%v environment variable not set", constants.XdlIpField) - // 获取本地所有网络接口的地址 + // no env, output the warn log and get local ip + hwlog.RunLog.Warnf("[FD-OL]%v environment variable not set", constants.XdlIpField) addrs, err := net.InterfaceAddrs() if err != nil { return "", err } - - // 遍历所有地址,查找 IPv4 地址 for _, addr := range addrs { - // 检查是否为 IP 地址,并且是否为 IPv4 地址 + // check the ip address is valid or not if ipNet, ok := addr.(*net.IPNet); ok && !ipNet.IP.IsLoopback() && ipNet.IP.To4() != nil { - // 返回第一个找到的非 loopback 的 IPv4 地址 return ipNet.IP.String(), nil } } - - // 如果没有找到有效的 IPv4 地址,返回错误 return "", fmt.Errorf("no valid IP address found") } diff --git a/component/ascend-faultdiag-online/pkg/utils/network_test.go b/component/ascend-faultdiag-online/pkg/utils/network_test.go new file mode 100644 index 000000000..a6d7b52b3 --- /dev/null +++ b/component/ascend-faultdiag-online/pkg/utils/network_test.go @@ -0,0 +1,142 @@ +/* +Copyright(C)2025. Huawei Technologies Co.,Ltd. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package utils is a DT collection for func in network.go +package utils + +import ( + "fmt" + "net" + "os" + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/smartystreets/goconvey/convey" + + "ascend-faultdiag-online/pkg/utils/constants" +) + +const ( + loopbackIp = "127.0.0.1" + mockEnvIp = "192.168.1.100" + mockNetIp = "192.168.1.101" + mockMask8 = 8 + mockMask24 = 24 + mockMask32 = 32 +) + +func TestGetNodeIp(t *testing.T) { + convey.Convey("test GetNodeIp", t, func() { + testGetNodeIpWithEnv() + testGetNodeIpWithoutEnv() + testGetNodeIpWithError() + testGetNodeIpWithInvalidIp() + }) +} + +func testGetNodeIpWithEnv() { + convey.Convey("test GetNodeIp with env", func() { + patch := gomonkey.ApplyFunc(os.Getenv, func(key string) string { + if key == constants.XdlIpField { + return mockEnvIp + } + return "" + }) + defer patch.Reset() + ip, err := GetNodeIp() + convey.So(err, convey.ShouldBeNil) + convey.So(ip, convey.ShouldEqual, mockEnvIp) + // net.InterfaceAddrs() is valid and got the same ip + patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { + return []net.Addr{ + &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, + &net.IPNet{IP: net.ParseIP(mockNetIp), Mask: net.CIDRMask(mockMask24, mockMask32)}, + }, nil + }) + ip, err = GetNodeIp() + convey.So(err, convey.ShouldBeNil) + convey.So(ip, convey.ShouldEqual, mockEnvIp) + }) +} + +func testGetNodeIpWithoutEnv() { + convey.Convey("test testGetNodeIpWithout env", func() { + patch := gomonkey.ApplyFunc(os.Getenv, func(key string) string { + return "" + }) + patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { + return []net.Addr{ + &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, + &net.IPNet{IP: net.ParseIP(mockNetIp), Mask: net.CIDRMask(mockMask24, mockMask32)}, + }, nil + }) + defer patch.Reset() + ip, err := GetNodeIp() + convey.So(err, convey.ShouldBeNil) + convey.So(ip, convey.ShouldEqual, mockNetIp) + }) +} + +func testGetNodeIpWithError() { + convey.Convey("test testGetNodeIp with error", func() { + patch := gomonkey.ApplyFunc(os.Getenv, func(key string) string { + return "" + }) + patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { + return []net.Addr{ + &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, + }, fmt.Errorf("no valid IP") + }) + defer patch.Reset() + ip, err := GetNodeIp() + convey.So(err.Error(), convey.ShouldEqual, "no valid IP") + convey.So(ip, convey.ShouldBeEmpty) + }) +} + +func testGetNodeIpWithInvalidIp() { + convey.Convey("test testGetNodeIp with invalid ip", func() { + patch := gomonkey.ApplyFunc(os.Getenv, func(key string) string { + return "" + }) + patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { + return []net.Addr{ + &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, + }, nil + }) + defer patch.Reset() + ip, err := GetNodeIp() + convey.So(err.Error(), convey.ShouldEqual, "no valid IP address found") + convey.So(ip, convey.ShouldBeEmpty) + }) +} + +func TestGetClusterIp(t *testing.T) { + convey.Convey("test GetClusterIp", t, func() { + // env exists + var ip = "127.0.0.1" + patch := gomonkey.ApplyFunc(os.Getenv, func(key string) string { + if key == constants.PodIP { + return ip + } + return "" + }) + defer convey.So(GetClusterIp(), convey.ShouldEqual, ip) + patch.Reset() + // env not exist + convey.So(GetClusterIp(), convey.ShouldEqual, "") + }) +} diff --git a/component/ascend-faultdiag-online/pkg/utils/storage.go b/component/ascend-faultdiag-online/pkg/utils/storage.go index 1be725aac..deb30d7ac 100644 --- a/component/ascend-faultdiag-online/pkg/utils/storage.go +++ b/component/ascend-faultdiag-online/pkg/utils/storage.go @@ -45,6 +45,10 @@ func (s *Storage[T]) Load(key string) (T, bool) { return res, ok } +func (s *Storage[T]) Delete(key string) { + s.data.Delete(key) +} + // NewStorage got a new storage instance func NewStorage[T any]() *Storage[T] { return &Storage[T]{} diff --git a/component/ascend-faultdiag-online/pkg/utils/storage_test.go b/component/ascend-faultdiag-online/pkg/utils/storage_test.go index e0abd4521..d8ec47c50 100644 --- a/component/ascend-faultdiag-online/pkg/utils/storage_test.go +++ b/component/ascend-faultdiag-online/pkg/utils/storage_test.go @@ -62,7 +62,7 @@ func TestStorage(t *testing.T) { convey.So(ok, convey.ShouldBeFalse) convey.So(res, convey.ShouldBeNil) // clear - storage.Clear() + storage.Delete(key) // load the existed key res, ok = storage.Load(key) convey.So(ok, convey.ShouldBeFalse) diff --git a/component/ascend-faultdiag-online/pkg/utils/utils.go b/component/ascend-faultdiag-online/pkg/utils/utils.go index 54986cae1..13d404da1 100644 --- a/component/ascend-faultdiag-online/pkg/utils/utils.go +++ b/component/ascend-faultdiag-online/pkg/utils/utils.go @@ -134,7 +134,7 @@ func Retry[T any](f func() (T, error), cg *RetryConfig) (T, error) { return res, errors.New("retry failed: func is nil") } if cg.RetryCount > maxRetryCount || cg.SleepTime.Milliseconds() < minSleepTime.Milliseconds() { - return res, fmt.Errorf("config check failed: execced the max retry count: %d or less than min sleep time: %v", + return res, fmt.Errorf("config check failed: excced the max retry count: %d or less than min sleep time: %v", maxRetryCount, minSleepTime) } var err error diff --git a/component/ascend-faultdiag-online/pkg/utils/utils_test.go b/component/ascend-faultdiag-online/pkg/utils/utils_test.go index 96d95c3a9..9d9a40cd2 100644 --- a/component/ascend-faultdiag-online/pkg/utils/utils_test.go +++ b/component/ascend-faultdiag-online/pkg/utils/utils_test.go @@ -20,13 +20,9 @@ package utils import ( "context" "errors" - "fmt" - "net" - "os" "testing" "time" - "github.com/agiledragon/gomonkey/v2" "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" @@ -153,151 +149,6 @@ func init() { hwlog.InitRunLogger(&hwLogConfig, context.Background()) } -const ( - loopbackIp = "127.0.0.1" - mockEnvIp = "192.168.1.100" - mockNetIp = "192.168.1.101" - mockMask8 = 8 - mockMask24 = 24 - mockMask32 = 32 -) - -// TestGetNodeIpExistAndIpValid测试GetNodeIp函数环境变量存在且net.InterfaceAddrds返回值有效 -func TestGetNodeIpExistAndIpValid(t *testing.T) { - // 打桩os.Getenv返回值 - patch := gomonkey.NewPatches() - defer patch.Reset() - - // 测试环境变量存在的情况 - patch.ApplyFunc(os.Getenv, func(key string) string { - if key == constants.XdlIpField { - return mockEnvIp - } - return "" - }) - - ip, err := GetNodeIp() - assert.NoError(t, err) - assert.Equal(t, mockEnvIp, ip) - - // 打桩net.InterfaceAddrs返回值有效 - patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { - return []net.Addr{ - &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, - &net.IPNet{IP: net.ParseIP(mockNetIp), Mask: net.CIDRMask(mockMask24, mockMask32)}, - }, nil - }) - - ip, err = GetNodeIp() - assert.NoError(t, err) - assert.Equal(t, mockEnvIp, ip) -} - -// TestGetNodeIpNotExistAndIpValid测试GetNodeIp函数环境变量不存在且net.InterfaceAddrds返回值有效 -func TestGetNodeIpNotExistAndIpValid(t *testing.T) { - // 打桩os.Getenv返回值 - patch := gomonkey.NewPatches() - defer patch.Reset() - - // 测试环境变量不存在的情况 - patch.ApplyFunc(os.Getenv, func(key string) string { - return "" - }) - - // 打桩net.InterfaceAddrs返回值有效 - patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { - return []net.Addr{ - &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, - &net.IPNet{IP: net.ParseIP(mockNetIp), Mask: net.CIDRMask(mockMask24, mockMask32)}, - }, nil - }) - - ip, err := GetNodeIp() - assert.Equal(t, err, nil) - assert.Equal(t, mockNetIp, ip) -} - -// TestGetNodeIpExistAndInterfaceErr测试GetNodeIp函数环境变量不存在且net.InterfaceAddrds返回值无效 -func TestGetNodeIpExistAndInterfaceErr(t *testing.T) { - // 打桩os.Getenv返回值 - patch := gomonkey.NewPatches() - defer patch.Reset() - - // 测试环境变量存在的情况 - patch.ApplyFunc(os.Getenv, func(key string) string { - return "" - }) - - // 打桩net.InterfaceAddrs返回值无效 - patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { - return []net.Addr{ - &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, - }, fmt.Errorf("no valid IP") - }) - - ip, err := GetNodeIp() - assert.Error(t, err) - assert.Equal(t, "", ip) - assert.Contains(t, err.Error(), "no valid IP") -} - -// TestGetNodeIpExistAndIpInvalid测试GetNodeIp函数环境变量不存在且net.InterfaceAddrds返回值无效 -func TestGetNodeIpExistAndIpInvalid(t *testing.T) { - // 打桩os.Getenv返回值 - patch := gomonkey.NewPatches() - defer patch.Reset() - - // 测试环境变量存在的情况 - patch.ApplyFunc(os.Getenv, func(key string) string { - return "" - }) - - // 打桩net.InterfaceAddrs返回值无效 - patch.ApplyFunc(net.InterfaceAddrs, func() ([]net.Addr, error) { - return []net.Addr{ - &net.IPNet{IP: net.ParseIP(loopbackIp), Mask: net.CIDRMask(mockMask8, mockMask32)}, - }, nil - }) - - ip, err := GetNodeIp() - assert.Error(t, err) - assert.Equal(t, "", ip) - assert.Contains(t, err.Error(), "no valid IP address found") -} - -// TestGetClusterIpWithEnvExist测试GetCluserIP函数环境变量存在 -func TestGetClusterIpWithEnvExist(t *testing.T) { - // 打桩os.Getenv返回值 - patch := gomonkey.NewPatches() - defer patch.Reset() - - // 测试环境变量存在的情况 - patch.ApplyFunc(os.Getenv, func(key string) string { - if key == constants.PodIP { - return mockEnvIp - } - return "" - }) - - ip := GetClusterIp() - assert.Equal(t, mockEnvIp, ip) -} - -// TestGetClusterIpWithEnvExist测试GetCluserIP函数环境变量不存在 -func TestGetClusterIpWithEnvNotExist(t *testing.T) { - // 打桩os.Getenv返回值 - patch := gomonkey.NewPatches() - defer patch.Reset() - - // 测试环境变量存在的情况 - patch.ApplyFunc(os.Getenv, func(key string) string { - return "" - }) - - ip := GetClusterIp() - assert.Equal(t, "", ip) -} - func TestWriteAndReadUniqueId(t *testing.T) { convey.Convey("test write and isRestarted", t, func() { // no file, no restarted @@ -323,5 +174,25 @@ func TestRetry(t *testing.T) { res, err := Retry(f, &RetryConfig{RetryCount: count, SleepTime: sleepTime}) convey.So(err, convey.ShouldNotBeNil) convey.So(res, convey.ShouldEqual, -1) + // test invalid parameters + sleepTime = time.Millisecond + _, err = Retry(f, &RetryConfig{RetryCount: count, SleepTime: sleepTime}) + convey.So(err.Error(), convey.ShouldContainSubstring, "less than min sleep time") + sleepTime = time.Second + count = maxRetryCount + 1 + _, err = Retry(f, &RetryConfig{RetryCount: count, SleepTime: sleepTime}) + convey.So(err.Error(), convey.ShouldContainSubstring, "excced the max retry count") + // nil func + f = nil + _, err = Retry(f, &RetryConfig{RetryCount: count, SleepTime: sleepTime}) + convey.So(err.Error(), convey.ShouldContainSubstring, "retry failed: func is nil") + + // default cg + f = func() (int, error) { + return 0, nil + } + res, err = Retry(f, nil) + convey.So(res, convey.ShouldEqual, 0) + convey.So(err, convey.ShouldBeNil) }) } diff --git a/component/clusterd/build/Dockerfile b/component/clusterd/build/Dockerfile index 7dd77ee32..f4541aeb5 100644 --- a/component/clusterd/build/Dockerfile +++ b/component/clusterd/build/Dockerfile @@ -20,4 +20,4 @@ RUN chown -R hwMindX:hwMindX /home/hwMindX &&\ echo 'source /etc/profile' >> /home/hwMindX/.bashrc # hwMindX is used as the default user of the container -USER hwMindX \ No newline at end of file +USER hwMindX -- Gitee