diff --git a/component/clusterd/go.mod b/component/clusterd/go.mod index 76e7dc4b11fd392baa6fc2719de70ec74526c861..6045de4d3f28cb1e0ca495129a530d195c5b3a34 100644 --- a/component/clusterd/go.mod +++ b/component/clusterd/go.mod @@ -1,6 +1,6 @@ module clusterd -go 1.20 +go 1.21 require ( ascend-common v0.0.0 diff --git a/component/clusterd/pkg/application/fault/grpc_service.go b/component/clusterd/pkg/application/fault/grpc_service.go index 3ab8695e7f6e9a575f41cfb7f966a2159156958b..0fe83f6f1cbc86e110c27c83e1b63ef1bed5ed90 100644 --- a/component/clusterd/pkg/application/fault/grpc_service.go +++ b/component/clusterd/pkg/application/fault/grpc_service.go @@ -18,10 +18,12 @@ package fault import ( "context" "fmt" + "reflect" "sync" "ascend-common/common-utils/hwlog" "clusterd/pkg/application/config" + "clusterd/pkg/application/faultmanager" "clusterd/pkg/common/constant" "clusterd/pkg/domain/common" "clusterd/pkg/domain/job" @@ -34,6 +36,7 @@ type FaultServer struct { faultPublisher map[string]*config.ConfigPublisher[*fault.FaultMsgSignal] lock sync.RWMutex fault.UnimplementedFaultServer + faultCh chan map[string]constant.JobFaultInfo } // NewFaultServer create a fault server @@ -42,6 +45,10 @@ func NewFaultServer(ctx context.Context) *FaultServer { serviceCtx: ctx, faultPublisher: make(map[string]*config.ConfigPublisher[*fault.FaultMsgSignal]), lock: sync.RWMutex{}, + faultCh: make(chan map[string]constant.JobFaultInfo, 5), + } + if err := faultmanager.RegisterForJobFaultRank(server.faultCh, reflect.TypeOf(server).Name()); err != nil { + hwlog.RunLog.Error("RegisterForJobFaultRank fail") } go server.checkFaultFromFaultCenter() return server diff --git a/component/clusterd/pkg/application/fault/publish_fault_plugin.go b/component/clusterd/pkg/application/fault/publish_fault_plugin.go index 34a70195c1a981efb9a9b6f59d086c7b86c5d08e..f5120a6baafefc87e4e575ffdd2c50cb3b7f1eb5 100644 --- a/component/clusterd/pkg/application/fault/publish_fault_plugin.go +++ b/component/clusterd/pkg/application/fault/publish_fault_plugin.go @@ -6,36 +6,26 @@ package fault import ( "sort" "sync" - "time" "github.com/golang/protobuf/proto" "ascend-common/common-utils/hwlog" "clusterd/pkg/application/config" - "clusterd/pkg/application/faultmanager" "clusterd/pkg/common/constant" "clusterd/pkg/domain/job" "clusterd/pkg/interface/grpc/fault" ) -const ( - publishFaultInterval = 1 -) - func (s *FaultServer) checkFaultFromFaultCenter() { - ticker := time.NewTicker(time.Duration(publishFaultInterval) * time.Second) - defer ticker.Stop() for { select { case <-s.serviceCtx.Done(): return - case <-ticker.C: - hwlog.RunLog.Debug("ticker publish fault from global center") - if faultmanager.GlobalFaultProcessCenter == nil { - hwlog.RunLog.Warnf("global center is nil, try it after %d second", publishFaultInterval) + case allJobFaultInfo, ok := <-s.faultCh: + if !ok { + hwlog.RunLog.Info("faultCh has been closed.") return } - allJobFaultInfo := faultmanager.QueryJobsFaultInfo(constant.NotHandleFault) hwlog.RunLog.Debugf("global fault info: %v", allJobFaultInfo) s.checkPublishFault(allJobFaultInfo) } diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center.go b/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center.go index bfabccc84e8c755981945f94dbe6acfdd1f432e7..ff450ceb1c96f3b673f6e20a746df923c61c324c 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center.go @@ -6,19 +6,17 @@ package cmprocess import ( "fmt" "sync" - "time" "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" + "clusterd/pkg/domain/faultdomain" "clusterd/pkg/domain/faultdomain/cmmanager" ) type baseFaultCenter[T constant.ConfigMapInterface] struct { processorList []constant.FaultProcessor - lastProcessTime int64 subscribeChannelList []chan int mutex sync.Mutex - processPeriod int64 cmManager *cmmanager.FaultCenterCmManager[T] centerType int } @@ -26,37 +24,33 @@ type baseFaultCenter[T constant.ConfigMapInterface] struct { func newBaseFaultCenter[T constant.ConfigMapInterface](cmManager *cmmanager.FaultCenterCmManager[T], centerType int) baseFaultCenter[T] { return baseFaultCenter[T]{ processorList: make([]constant.FaultProcessor, 0), - lastProcessTime: 0, subscribeChannelList: make([]chan int, 0), mutex: sync.Mutex{}, - processPeriod: constant.FaultCenterProcessPeriod, cmManager: cmManager, centerType: centerType, } } -func (baseCenter *baseFaultCenter[T]) isProcessLimited(currentTime int64) bool { - return baseCenter.lastProcessTime+baseCenter.processPeriod > currentTime -} - func (baseCenter *baseFaultCenter[T]) Process() { - currentTime := time.Now().UnixMilli() - if baseCenter.isProcessLimited(currentTime) { - return - } - baseCenter.lastProcessTime = currentTime updateOriginalCm := baseCenter.updateOriginalCm() - baseCenter.setProcessingCm(baseCenter.getOriginalCm()) + origCm := baseCenter.getOriginalCm() + var processingCm map[string]T + if baseCenter.centerType == constant.DeviceProcessType { + processingCm = make(map[string]T) + for cmName, deviceInfo := range origCm { + processingCm[faultdomain.CmNameToNodeName(cmName)] = deviceInfo + } + } else { + processingCm = origCm + } for _, processor := range baseCenter.processorList { - processingCm := baseCenter.getProcessingCm() info := constant.OneConfigmapContent[T]{ AllConfigmap: processingCm, UpdateConfigmap: updateOriginalCm, } processingCm = processor.Process(info).(constant.OneConfigmapContent[T]).AllConfigmap - baseCenter.setProcessingCm(processingCm) } - if baseCenter.setProcessedCm(baseCenter.getProcessingCm()) { + if baseCenter.setProcessedCm(processingCm) { baseCenter.notifySubscriber() } } @@ -96,14 +90,6 @@ func (baseCenter *baseFaultCenter[T]) getOriginalCm() map[string]T { return baseCenter.cmManager.GetOriginalCm().Data } -func (baseCenter *baseFaultCenter[T]) setProcessingCm(cm map[string]T) { - baseCenter.cmManager.SetProcessingCm(cmmanager.ConfigMap[T]{Data: cm}) -} - -func (baseCenter *baseFaultCenter[T]) getProcessingCm() map[string]T { - return baseCenter.cmManager.GetProcessingCm().Data -} - func (baseCenter *baseFaultCenter[T]) setProcessedCm(cm map[string]T) bool { return baseCenter.cmManager.SetProcessedCm(cmmanager.ConfigMap[T]{Data: cm}) } diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center_test.go b/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center_test.go index 5477643f05a8574db5ee6bbe4051944e7ab91d79..16883407bb41a7905eb329a2ffb74e75b4077c18 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center_test.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/base_fault_center_test.go @@ -6,6 +6,7 @@ package cmprocess import ( "testing" + "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" "clusterd/pkg/domain/faultdomain/cmmanager" ) @@ -16,6 +17,11 @@ func (f *fakeProcessor) Process(info any) any { return info } +func TestMain(m *testing.M) { + hwlog.InitRunLogger(&hwlog.LogConfig{OnlyToStdout: true}, nil) + m.Run() +} + func TestBaseFaultCenterProcess(t *testing.T) { t.Run("TestBaseFaultCenterProcess", func(t *testing.T) { manager := cmmanager.DeviceCenterCmManager diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/device_fault_center.go b/component/clusterd/pkg/application/faultmanager/cmprocess/device_fault_center.go index bcbd0dc9124b8115231f19bda02965786f36b90f..e1ba0259c28fe1a14aef60f6d983a0c900efd117 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/device_fault_center.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/device_fault_center.go @@ -16,7 +16,7 @@ var DeviceCenter *deviceFaultProcessCenter // deviceFaultProcessCenter type deviceFaultProcessCenter struct { - baseFaultCenter[*constant.DeviceInfo] + baseFaultCenter[*constant.AdvanceDeviceFaultCm] } func init() { diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor.go b/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor.go index 9e8f8156e4a6c205ab4b2ad31b57b74cda3ee521..3a2ce9776f79bc994d1cf2322f995e79fe57fa84 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor.go @@ -4,13 +4,10 @@ package publicfault import ( - "encoding/json" "strconv" - "strings" "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" - "clusterd/pkg/common/util" "clusterd/pkg/domain/faultdomain" "clusterd/pkg/domain/publicfault" ) @@ -20,13 +17,13 @@ var PubFaultProcessor *pubFaultProcessor type pubFaultProcessor struct { pubFaultInfo map[string]*constant.PubFaultCache - devCMInfo *constant.DeviceInfo + devCMInfo *constant.AdvanceDeviceFaultCm } func init() { PubFaultProcessor = &pubFaultProcessor{ pubFaultInfo: make(map[string]*constant.PubFaultCache), - devCMInfo: &constant.DeviceInfo{}, + devCMInfo: &constant.AdvanceDeviceFaultCm{}, } } @@ -35,7 +32,7 @@ func (p *pubFaultProcessor) Process(info any) any { if publicfault.PubFaultCache == nil || len(publicfault.PubFaultCache.GetPubFault()) == 0 { return info } - processContent, ok := info.(constant.OneConfigmapContent[*constant.DeviceInfo]) + processContent, ok := info.(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) if !ok { hwlog.RunLog.Error("input is not DeviceInfo type", info) return info @@ -47,45 +44,26 @@ func (p *pubFaultProcessor) Process(info any) any { hwlog.RunLog.Errorf("public fault processor process failed, error: %v", err) return processContent } - for devCMName, devCMInfo := range deviceInfos { - nodeName := strings.TrimPrefix(devCMName, constant.DeviceInfoPrefix) + for nodeName, devCMInfo := range deviceInfos { pubFaults, ok := copyFaultCache[nodeName] if !ok { continue } p.pubFaultInfo = pubFaults p.devCMInfo = devCMInfo - p.faultJoin(nodeName) + p.faultJoin() } processContent.AllConfigmap = deviceInfos return processContent } -func (p *pubFaultProcessor) faultJoin(nodeName string) []constant.DeviceFault { - faultKey, faultList := faultdomain.GetFaultListInfo(p.devCMInfo) - var dpCMFaults []constant.DeviceFault - if err := json.Unmarshal([]byte(faultList), &dpCMFaults); err != nil { - hwlog.RunLog.Errorf("unmarshal fault list for node <%s> failed, error: %v", nodeName, err) - return nil - } - devType := faultdomain.GetDeviceType(p.devCMInfo) - - var newFaultList []constant.DeviceFault - if err := util.DeepCopy(&newFaultList, &dpCMFaults); err != nil { - hwlog.RunLog.Errorf("deep copy device cm faults failed, err: %v", err) - return nil - } - - dpNPUFaultLevelMap := make(map[string]string) - for _, dpCMFault := range newFaultList { - dpNPUFaultLevelMap[dpCMFault.NPUName] = dpCMFault.FaultLevel - } - +func (p *pubFaultProcessor) faultJoin() { + modified := false for _, pubFaultCache := range p.pubFaultInfo { // add public fault to fault list - pubFaultCache.FaultDevNames = convertNPUIdsToName(pubFaultCache.FaultDevIds, devType) + pubFaultCache.FaultDevNames = convertNPUIdsToName(pubFaultCache.FaultDevIds, p.devCMInfo.DeviceType) for _, faultDevName := range pubFaultCache.FaultDevNames { - newFaultList = append(newFaultList, constant.DeviceFault{ + fault := constant.DeviceFault{ FaultType: constant.PublicFaultType, NPUName: faultDevName, LargeModelFaultLevel: pubFaultCache.FaultLevel, @@ -97,32 +75,14 @@ func (p *pubFaultProcessor) faultJoin(nodeName string) []constant.DeviceFault { FaultTime: pubFaultCache.FaultTime, FaultLevel: pubFaultCache.FaultLevel, }}, - }) - } - - for _, pubFaultDev := range pubFaultCache.FaultDevNames { - // public fault id does not exist in dp cm - faultLevel, ok := dpNPUFaultLevelMap[pubFaultDev] - if !ok { - p.updateAvailAndUnhealthy(pubFaultCache.FaultLevel, pubFaultDev) - continue } - // public fault id existed in dp cm - seriousLevel := faultdomain.GetMostSeriousFaultLevel([]string{pubFaultCache.FaultLevel, faultLevel}) - p.updateAvailAndUnhealthy(seriousLevel, pubFaultDev) + p.devCMInfo.AddFault(fault) + modified = true } } - p.updateFaultList(newFaultList, faultKey) - return newFaultList -} - -func (p *pubFaultProcessor) updateFaultList(newFaultList []constant.DeviceFault, faultKey string) { - faultListData, err := json.Marshal(newFaultList) - if err != nil { - hwlog.RunLog.Errorf("marshal device fault list failed, error: %v", err) - return + if modified { + faultdomain.SortDataForAdvanceDeviceInfo(p.devCMInfo) } - p.devCMInfo.DeviceList[faultKey] = string(faultListData) } func convertNPUIdsToName(phyIds []int32, devType string) []string { @@ -133,10 +93,3 @@ func convertNPUIdsToName(phyIds []int32, devType string) []string { } return npuNames } - -func (p *pubFaultProcessor) updateAvailAndUnhealthy(faultLevel string, NPUName string) { - if faultLevel == constant.SeparateNPU { - faultdomain.DelDevFromAvailList(p.devCMInfo, []string{NPUName}) - faultdomain.AddDevFromUnhealthyList(p.devCMInfo, []string{NPUName}) - } -} diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor_test.go b/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor_test.go index 318e02597df2d8a613baedf9bcc2e23dcf0e04c9..672edaca5912429823729b39b56c422b8900cf85 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor_test.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/publicfault/pub_fault_processor_test.go @@ -4,11 +4,14 @@ package publicfault import ( + "sort" "testing" "github.com/smartystreets/goconvey/convey" + "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" + "clusterd/pkg/common/util" "clusterd/pkg/domain/faultdomain" "clusterd/pkg/domain/publicfault" ) @@ -49,8 +52,8 @@ func TestProcessor(t *testing.T) { func testNilCache() { resetFaultCache() - ori := constant.OneConfigmapContent[*constant.DeviceInfo]{ - AllConfigmap: oriDevInfo1, + ori := constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]{ + AllConfigmap: faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](oriDevInfo1), UpdateConfigmap: nil, } res := PubFaultProcessor.Process(ori) @@ -67,25 +70,27 @@ func testInputInvalid() { func testNodeNameInvalid() { resetFaultCache() publicfault.PubFaultCache.AddPubFaultToCache(&testCacheData, testNodeName3, faultKey1) - content := constant.OneConfigmapContent[*constant.DeviceInfo]{ - AllConfigmap: oriDevInfo1, + content := constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]{ + AllConfigmap: faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](oriDevInfo1), UpdateConfigmap: nil, } - exp := PubFaultProcessor.Process(content).(constant.OneConfigmapContent[*constant.DeviceInfo]) + exp := PubFaultProcessor.Process(content).(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) convey.So(content, convey.ShouldResemble, exp) } func testDiff() { resetFaultCache() publicfault.PubFaultCache.AddPubFaultToCache(&testCacheData, testNodeName1, faultKey1) - content := constant.OneConfigmapContent[*constant.DeviceInfo]{ - AllConfigmap: oriDevInfo1, + content := constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]{ + AllConfigmap: faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](oriDevInfo1), UpdateConfigmap: nil, } - resContent := PubFaultProcessor.Process(content).(constant.OneConfigmapContent[*constant.DeviceInfo]) - ori := faultdomain.GetAdvanceDeviceCmForNodeMap(resContent.AllConfigmap) - exp := faultdomain.GetAdvanceDeviceCmForNodeMap(expDeviceInfo1) - convey.So(ori, convey.ShouldResemble, exp) + resContent := PubFaultProcessor.Process(content).(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) + sortDeviceFaultList(resContent.AllConfigmap) + want := faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expDeviceInfo1) + sortDeviceFaultList(want) + result := resContent.AllConfigmap + convey.So(result, convey.ShouldResemble, want) } func testCommon() { @@ -93,14 +98,19 @@ func testCommon() { const card5 = 5 testCacheData.FaultDevIds = []int32{0, card5} publicfault.PubFaultCache.AddPubFaultToCache(&testCacheData, testNodeName2, faultKey2) - content := constant.OneConfigmapContent[*constant.DeviceInfo]{ - AllConfigmap: oriDevInfo2, + content := constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]{ + AllConfigmap: faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](oriDevInfo2), UpdateConfigmap: nil, } - resContent := PubFaultProcessor.Process(content).(constant.OneConfigmapContent[*constant.DeviceInfo]) - res := faultdomain.GetAdvanceDeviceCmForNodeMap(resContent.AllConfigmap) - exp := faultdomain.GetAdvanceDeviceCmForNodeMap(expDeviceInfo2) - convey.So(res, convey.ShouldResemble, exp) + resContent := PubFaultProcessor.Process(content).(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) + hwlog.RunLog.Infof(util.ObjToString(resContent.AllConfigmap)) + hwlog.RunLog.Infof( + util.ObjToString(faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expDeviceInfo2))) + sortDeviceFaultList(resContent.AllConfigmap) + result := resContent.AllConfigmap + want := faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expDeviceInfo2) + sortDeviceFaultList(want) + convey.So(result, convey.ShouldResemble, want) } func resetFaultCache() { @@ -108,3 +118,17 @@ func resetFaultCache() { delete(publicfault.PubFaultCache.GetPubFault(), nodeName) } } + +func sortDeviceFaultList(advanceFaultCm map[string]*constant.AdvanceDeviceFaultCm) { + for _, advanceDeviceCm := range advanceFaultCm { + for _, fault := range advanceDeviceCm.FaultDeviceList { + sort.Slice(fault, func(i, j int) bool { + return util.MakeDataHash(fault[i]) < util.MakeDataHash(fault[j]) + }) + } + sort.Strings(advanceDeviceCm.CardUnHealthy) + sort.Strings(advanceDeviceCm.NetworkUnhealthy) + sort.Strings(advanceDeviceCm.Recovering) + sort.Strings(advanceDeviceCm.AvailableDeviceList) + } +} diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor.go b/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor.go index be65346d0d60a5c494a921b1c4ba48311cb7456c..3d8555d44f254f972eb480489196bf172167e914 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor.go @@ -31,7 +31,7 @@ type uceFaultProcessor struct { // node->DeviceName->uceDeviceInfo uceDeviceOfNode map[string]constant.UceNodeInfo jobServerInfoMap constant.JobServerInfoMap - nodeDeviceCmMap map[string]constant.AdvanceDeviceFaultCm + nodeDeviceCmMap map[string]*constant.AdvanceDeviceFaultCm } func init() { @@ -50,7 +50,7 @@ func (processor *uceFaultProcessor) initUceDeviceFromNodeAndReportInfo(jobId str } for _, deviceOfJob := range devicesOfJobOnNode.DeviceList { - deviceName := processor.nodeDeviceCmMap[nodeName].ServerType + "-" + deviceOfJob.DeviceID + deviceName := processor.nodeDeviceCmMap[nodeName].DeviceType + "-" + deviceOfJob.DeviceID uceReportInfo := collector.ReportInfoCollector.GetInfo(jobId, nodeName, deviceName) jobUceDevice := constant.UceDeviceInfo{ DeviceName: deviceName, @@ -72,32 +72,28 @@ func (processor *uceFaultProcessor) initUceDeviceFromNodeAndReportInfo(jobId str // Process uce fault func (processor *uceFaultProcessor) Process(info any) any { - processContent, ok := info.(constant.OneConfigmapContent[*constant.DeviceInfo]) + processContent, ok := info.(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) if !ok { hwlog.RunLog.Errorf("%v cannot convert to DeviceInfo", info) return info } - deviceInfos := processContent.AllConfigmap processor.jobServerInfoMap = job.GetJobServerInfoMap() - processor.nodeDeviceCmMap = faultdomain.GetAdvanceDeviceCmForNodeMap(deviceInfos) - hwlog.RunLog.Debugf("current deviceInfos %s", util.ObjToString(deviceInfos)) - hwlog.RunLog.Debugf("current nodeDeviceCmMap %s", util.ObjToString(processor.nodeDeviceCmMap)) + processor.nodeDeviceCmMap = processContent.AllConfigmap + hwlog.RunLog.Debugf("current nodeDeviceCmMap %v", processor.nodeDeviceCmMap) processor.uceDeviceOfNode = processor.getUceDeviceOfNodes() - hwlog.RunLog.Debugf("current uceDeviceOfNode %s", util.ObjToString(processor.uceDeviceOfNode)) + hwlog.RunLog.Debugf("current uceDeviceOfNode %v", processor.uceDeviceOfNode) processor.uceDevicesOfUceJob = processor.getUceDevicesForUceTolerateJobs() - hwlog.RunLog.Debugf("current uceDevicesOfUceJob %s", util.ObjToString(processor.uceDevicesOfUceJob)) + hwlog.RunLog.Debugf("current uceDevicesOfUceJob %v", processor.uceDevicesOfUceJob) currentTime := time.Now().UnixMilli() hwlog.RunLog.Debugf("currentTime %d", currentTime) processor.processUceFaultInfo(currentTime) - faultdomain.AdvanceDeviceCmForNodeMapToString(processor.nodeDeviceCmMap, deviceInfos) - hwlog.RunLog.Debugf("result deviceInfos %s", util.ObjToString(deviceInfos)) - processContent.AllConfigmap = deviceInfos + hwlog.RunLog.Debugf("result deviceInfos %v", processContent.AllConfigmap) return processContent } @@ -109,7 +105,8 @@ func (processor *uceFaultProcessor) processUceFaultInfo(currentTime int64) { } func (processor *uceFaultProcessor) processEachNodeUceFaultInfo( - nodeName string, deviceInfo constant.AdvanceDeviceFaultCm, currentTime int64) constant.AdvanceDeviceFaultCm { + nodeName string, deviceInfo *constant.AdvanceDeviceFaultCm, currentTime int64) *constant.AdvanceDeviceFaultCm { + modified := false for _, uceJob := range processor.uceDevicesOfUceJob { for deviceName, uceDevice := range uceJob.UceNode[nodeName].DeviceInfo { log := fmt.Sprintf("filter uce device: %s on node %s, "+ @@ -120,24 +117,27 @@ func (processor *uceFaultProcessor) processEachNodeUceFaultInfo( util.ReadableMsTime(uceDevice.RecoverTime)) if processor.canFilterUceDeviceFaultInfo(uceDevice, currentTime) { hwlog.RunLog.Warn("uceFaultProcessor " + log) - deviceInfo.FaultDeviceList = processor.filterUceDeviceFaultInfo(deviceName, deviceInfo.FaultDeviceList) + processor.filterUceDeviceFaultInfo(deviceName, deviceInfo) + modified = true } else { hwlog.RunLog.Warn("uceFaultProcessor cannot " + log) } } } + if modified { + faultdomain.SortDataForAdvanceDeviceInfo(deviceInfo) + } return deviceInfo } func (processor *uceFaultProcessor) filterUceDeviceFaultInfo( - deviceName string, deviceFaultMap map[string][]constant.DeviceFault) map[string][]constant.DeviceFault { - for _, fault := range deviceFaultMap[deviceName] { + deviceName string, advanceDevInfo *constant.AdvanceDeviceFaultCm) { + for _, fault := range advanceDevInfo.FaultDeviceList[deviceName] { // filter device's uce fault if faultdomain.IsUceFault(fault.FaultCode) { - deviceFaultMap = faultdomain.DeleteFaultFromFaultMap(deviceFaultMap, fault) + advanceDevInfo.DelFault(fault) } } - return deviceFaultMap } func (processor *uceFaultProcessor) canFilterUceDeviceFaultInfo(uceDevice constant.UceDeviceInfo, currentTime int64) bool { @@ -222,7 +222,8 @@ func (processor *uceFaultProcessor) getUceDevicesForUceTolerateJobs() map[string return uceJobs } -func (processor *uceFaultProcessor) getUceFaultDevices(nodeName string, deviceInfo constant.AdvanceDeviceFaultCm) constant.UceNodeInfo { +func (processor *uceFaultProcessor) getUceFaultDevices( + nodeName string, deviceInfo *constant.AdvanceDeviceFaultCm) constant.UceNodeInfo { nodeInfo := constant.UceNodeInfo{ NodeName: nodeName, DeviceInfo: make(map[string]constant.UceDeviceInfo), diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor_test.go b/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor_test.go index 3a29a4140a0a95359a33602c71677a7e9a32842a..1a69daa82ac4df345efb6b3248feeb837cb85ab8 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor_test.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/uce/uce_fault_processor_test.go @@ -289,7 +289,7 @@ func TestUceFaultProcessorGetUceDeviceOfNodes(t *testing.T) { t.Errorf("init data failed. %v", testFileErr) } - UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) + UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos) deviceOfNodes := UceProcessor.getUceDeviceOfNodes() if !reflect.DeepEqual(deviceOfNodes, uceNodesInfos) { t.Errorf("getUceDeviceOfNodes() = %v, want %v", @@ -306,7 +306,7 @@ func TestUceFaultProcessorGetUceDevicesForUceTolerateJobs(t *testing.T) { } UceProcessor.jobServerInfoMap = jobServerInfoMap - UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) + UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos) UceProcessor.uceDeviceOfNode = UceProcessor.getUceDeviceOfNodes() UceProcessor.uceDevicesOfUceJob = UceProcessor.getUceDevicesForUceTolerateJobs() if !reflect.DeepEqual(UceProcessor.uceDevicesOfUceJob, expectUceJobsInfo) { @@ -324,16 +324,16 @@ func TestUceFaultProcessorProcessUceFaultInfo(t *testing.T) { } UceProcessor.jobServerInfoMap = jobServerInfoMap - UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) + UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos) UceProcessor.uceDeviceOfNode = UceProcessor.getUceDeviceOfNodes() UceProcessor.uceDevicesOfUceJob = UceProcessor.getUceDevicesForUceTolerateJobs() currentTime := 109 * time.Second.Milliseconds() UceProcessor.processUceFaultInfo(currentTime) - faultdomain.AdvanceDeviceCmForNodeMapToString(UceProcessor.nodeDeviceCmMap, cmDeviceInfos) - result := faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) - want := faultdomain.GetAdvanceDeviceCmForNodeMap(expectProcessedDeviceInfos) + result := UceProcessor.nodeDeviceCmMap + want := faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expectProcessedDeviceInfos) if !reflect.DeepEqual(result, want) { - t.Errorf("processUceFaultInfo() = %v, want %v", + t.Errorf("orgcm:\n%v\n\nresult:\n%v\n\nwant:\n%v", + util.ObjToString(faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos)), util.ObjToString(result), util.ObjToString(want)) } }) @@ -349,16 +349,15 @@ func TestUceFaultProcessorScenario1(t *testing.T) { collector.ReportInfoCollector = reportInfos UceProcessor.jobServerInfoMap = jobServerInfoMap - UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) + UceProcessor.nodeDeviceCmMap = faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos) UceProcessor.uceDeviceOfNode = UceProcessor.getUceDeviceOfNodes() UceProcessor.uceDevicesOfUceJob = UceProcessor.getUceDevicesForUceTolerateJobs() currentTime := 100 * time.Second.Milliseconds() UceProcessor.processUceFaultInfo(currentTime) - faultdomain.AdvanceDeviceCmForNodeMapToString(UceProcessor.nodeDeviceCmMap, cmDeviceInfos) - result := faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) - want := faultdomain.GetAdvanceDeviceCmForNodeMap(expectProcessedDeviceInfos) + result := UceProcessor.nodeDeviceCmMap + want := faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expectProcessedDeviceInfos) if !reflect.DeepEqual(result, want) { - t.Errorf("processUceFaultInfo() = %v, want %v", + t.Errorf("processUceFaultInfo() = %v, \n\nwant %v", util.ObjToString(result), util.ObjToString(want)) } }) @@ -366,14 +365,14 @@ func TestUceFaultProcessorScenario1(t *testing.T) { func TestUceFaultProcessorScenario2(t *testing.T) { t.Run("TestUceFaultProcessorScenario2", func(t *testing.T) { - cmDeviceInfos, expectProcessedDeviceInfos, jobServerInfoMap, reportInfos, testFileErr := + cmDeviceInfos, expProcessedDeviceInfos, jobServerInfoMap, reportInfos, testFileErr := readObjectFromUceScenarioTestYaml() if testFileErr != nil { t.Errorf("init data failed. %v", testFileErr) } - content := constant.OneConfigmapContent[*constant.DeviceInfo]{ - AllConfigmap: cmDeviceInfos, - UpdateConfigmap: []constant.InformerCmItem[*constant.DeviceInfo]{ + content := constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]{ + AllConfigmap: faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos), + UpdateConfigmap: []constant.InformerCmItem[*constant.AdvanceDeviceFaultCm]{ { IsAdd: false, Data: nil}, @@ -396,9 +395,9 @@ func TestUceFaultProcessorScenario2(t *testing.T) { mockUnixMilli.Reset() }() - resultContent := UceProcessor.Process(content).(constant.OneConfigmapContent[*constant.DeviceInfo]) - result := faultdomain.GetAdvanceDeviceCmForNodeMap(resultContent.AllConfigmap) - want := faultdomain.GetAdvanceDeviceCmForNodeMap(expectProcessedDeviceInfos) + resultContent := UceProcessor.Process(content).(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) + result := resultContent.AllConfigmap + want := faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expProcessedDeviceInfos) if !reflect.DeepEqual(result, want) { t.Errorf("processUceFaultInfo() = %v, want %v", util.ObjToString(result), util.ObjToString(want)) diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor.go b/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor.go index a4179076ee4120633f78266435cb63b3d5a4a13a..16e4a64ef27e8e3e4e99c4d2a3ca54be4458897e 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor.go @@ -27,12 +27,13 @@ type uceAccompanyFaultProcessor struct { uceAccompanyFaultQue map[string]map[string][]constant.DeviceFault // uceFaultTime uceFaultTime map[string]map[string]int64 - deviceCmForNodeMap map[string]constant.AdvanceDeviceFaultCm + deviceCmForNodeMap map[string]*constant.AdvanceDeviceFaultCm } func init() { UceAccompanyProcessor = &uceAccompanyFaultProcessor{ DiagnosisAccompanyTimeout: constant.DiagnosisAccompanyTimeout, + deviceCmForNodeMap: make(map[string]*constant.AdvanceDeviceFaultCm), uceAccompanyFaultQue: make(map[string]map[string][]constant.DeviceFault), uceFaultTime: make(map[string]map[string]int64), } @@ -45,7 +46,7 @@ func (processor *uceAccompanyFaultProcessor) uceAccompanyFaultInQue() { } func (processor *uceAccompanyFaultProcessor) uceAccompanyFaultInQueForNode( - nodeName string, deviceInfo constant.AdvanceDeviceFaultCm) { + nodeName string, deviceInfo *constant.AdvanceDeviceFaultCm) { if _, ok := processor.uceAccompanyFaultQue[nodeName]; !ok { processor.uceAccompanyFaultQue[nodeName] = make(map[string][]constant.DeviceFault) } @@ -101,21 +102,22 @@ func (processor *uceAccompanyFaultProcessor) inQue(nodeName, deviceName string, func (processor *uceAccompanyFaultProcessor) filterFaultInfos(currentTime int64) { for nodeName, nodeFaults := range processor.uceAccompanyFaultQue { - faultMap := processor.deviceCmForNodeMap[nodeName] + deviceFaultCm, found := processor.deviceCmForNodeMap[nodeName] + if !found { + continue + } for deviceName, deviceFaultQue := range nodeFaults { - newQue, newFaultMap := - processor.filterFaultDevice(faultMap.FaultDeviceList, currentTime, nodeName, deviceName, deviceFaultQue) + newQue := processor.filterFaultDevice(deviceFaultCm, currentTime, nodeName, deviceName, deviceFaultQue) nodeFaults[deviceName] = newQue - faultMap.FaultDeviceList = newFaultMap } - processor.deviceCmForNodeMap[nodeName] = faultMap } } func (processor *uceAccompanyFaultProcessor) filterFaultDevice( - faultMap map[string][]constant.DeviceFault, currentTime int64, nodeName, deviceName string, - deviceFaultQue []constant.DeviceFault) ([]constant.DeviceFault, map[string][]constant.DeviceFault) { + deviceFaultCm *constant.AdvanceDeviceFaultCm, currentTime int64, nodeName, deviceName string, + deviceFaultQue []constant.DeviceFault) []constant.DeviceFault { newDeviceFaultQue := make([]constant.DeviceFault, 0) + modified := false for _, fault := range deviceFaultQue { uceFaultTime := processor.getDeviceUceFaultTime(nodeName, deviceName) errorMsg := fmt.Sprintf("filterFaultDevice cannot find uce fault time for device %s of node %s", @@ -123,26 +125,32 @@ func (processor *uceAccompanyFaultProcessor) filterFaultDevice( accompanyFaultTime := faultdomain.GetFaultTime(fault, errorMsg) // if is accompanied fault, filter if processor.isAccompaniedFaultByUce(uceFaultTime, accompanyFaultTime) { - hwlog.RunLog.Warnf("filter uce accompany fault %s, fault time: %s", - util.ObjToString(fault), util.ReadableMsTime(accompanyFaultTime)) - faultMap = faultdomain.DeleteFaultFromFaultMap(faultMap, fault) + hwlog.RunLog.Warnf("filter uce accompany fault %v, fault time: %s", + fault, util.ReadableMsTime(accompanyFaultTime)) + deviceFaultCm.DelFault(fault) + modified = true continue } // if current is not exceed diagnosis time, // then cannot decide fault is accompany or not, filter, and in que to decide in next turn. if !processor.isCurrentExceedDiagnosisTimeout(currentTime, accompanyFaultTime) { - hwlog.RunLog.Warnf("filter uce accompany like fault %s, fault time: %s", - util.ObjToString(fault), util.ReadableMsTime(accompanyFaultTime)) - faultMap = faultdomain.DeleteFaultFromFaultMap(faultMap, fault) + hwlog.RunLog.Warnf("filter uce accompany like fault %v, fault time: %s", + fault, util.ReadableMsTime(accompanyFaultTime)) + deviceFaultCm.DelFault(fault) + modified = true newDeviceFaultQue = append(newDeviceFaultQue, fault) continue } // cannot filter, add the aic/aiv fault into faultMap - faultMap = faultdomain.AddFaultIntoFaultMap(faultMap, fault) - hwlog.RunLog.Warnf("cannot filter uce accompany like fault %s, uce fault time: %s", - util.ObjToString(fault), util.ReadableMsTime(uceFaultTime)) + deviceFaultCm.AddFault(fault) + modified = true + hwlog.RunLog.Warnf("cannot filter uce accompany like fault %v, uce fault time: %s", + fault, util.ReadableMsTime(uceFaultTime)) } - return newDeviceFaultQue, faultMap + if modified { + faultdomain.SortDataForAdvanceDeviceInfo(deviceFaultCm) + } + return newDeviceFaultQue } func (processor *uceAccompanyFaultProcessor) getDeviceUceFaultTime(nodeName, deviceName string) int64 { @@ -164,24 +172,18 @@ func (processor *uceAccompanyFaultProcessor) isCurrentExceedDiagnosisTimeout( // Process uce accompany fault func (processor *uceAccompanyFaultProcessor) Process(info any) any { - processContent, ok := info.(constant.OneConfigmapContent[*constant.DeviceInfo]) + processContent, ok := info.(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) if !ok { hwlog.RunLog.Errorf("%v cannot convert to DeviceInfo", info) return info } - deviceInfos := processContent.AllConfigmap - processor.deviceCmForNodeMap = faultdomain.GetAdvanceDeviceCmForNodeMap(deviceInfos) - hwlog.RunLog.Debugf("current deviceInfos: %s", util.ObjToString(deviceInfos)) - hwlog.RunLog.Debugf("current deviceCmForNodeMap: %s", util.ObjToString(processor.deviceCmForNodeMap)) + processor.deviceCmForNodeMap = processContent.AllConfigmap + hwlog.RunLog.Debugf("current deviceInfos: %v", processContent.AllConfigmap) processor.uceAccompanyFaultInQue() - hwlog.RunLog.Debugf("current uceAccompanyFaultQue: %s", util.ObjToString(processor.uceAccompanyFaultQue)) + hwlog.RunLog.Debugf("current uceAccompanyFaultQue: %v", processor.uceAccompanyFaultQue) currentTime := time.Now().UnixMilli() processor.filterFaultInfos(currentTime) - faultdomain.AdvanceDeviceCmForNodeMapToString(processor.deviceCmForNodeMap, deviceInfos) - - hwlog.RunLog.Debugf("uceAccompanyFaultProcessor result: %s", util.ObjToString(deviceInfos)) - processContent.AllConfigmap = deviceInfos return processContent } diff --git a/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor_test.go b/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor_test.go index 951406d5ead9cf7e868d6bc3bb5e4d62c7abb3ed..63180c9c82c24af2dbbf3ac6067cd15bfddf4d47 100644 --- a/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor_test.go +++ b/component/clusterd/pkg/application/faultmanager/cmprocess/uceaccompany/uce_accompany_fault_processor_test.go @@ -36,12 +36,12 @@ func TestUceAccompanyFaultProcessorProcess(t *testing.T) { if testFileErr != nil { t.Errorf("init data failed. %v", testFileErr) } - UceAccompanyProcessor.deviceCmForNodeMap = faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos) + UceAccompanyProcessor.deviceCmForNodeMap = + faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos) UceAccompanyProcessor.uceAccompanyFaultInQue() UceAccompanyProcessor.filterFaultInfos(CurrentTime) - faultdomain.AdvanceDeviceCmForNodeMapToString(UceAccompanyProcessor.deviceCmForNodeMap, cmDeviceInfos) - if !reflect.DeepEqual(faultdomain.GetAdvanceDeviceCmForNodeMap(cmDeviceInfos), - faultdomain.GetAdvanceDeviceCmForNodeMap(expectProcessedDeviceInfos)) { + if !reflect.DeepEqual(UceAccompanyProcessor.deviceCmForNodeMap, + faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expectProcessedDeviceInfos)) { t.Errorf("result = %v, want %v", util.ObjToString(cmDeviceInfos), util.ObjToString(expectProcessedDeviceInfos)) } @@ -60,9 +60,9 @@ func TestUceAccompanyFaultProcessorProcessE2E(t *testing.T) { if testFileErr != nil { t.Errorf("init data failed. %v", testFileErr) } - content := constant.OneConfigmapContent[*constant.DeviceInfo]{ - AllConfigmap: cmDeviceInfos, - UpdateConfigmap: []constant.InformerCmItem[*constant.DeviceInfo]{{}}, + content := constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]{ + AllConfigmap: faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](cmDeviceInfos), + UpdateConfigmap: []constant.InformerCmItem[*constant.AdvanceDeviceFaultCm]{{}}, } mockTime := time.Time{} mockUnixMilli := gomonkey.ApplyPrivateMethod(mockTime, "UnixMilli", func() int64 { @@ -75,12 +75,13 @@ func TestUceAccompanyFaultProcessorProcessE2E(t *testing.T) { mockNow.Reset() mockUnixMilli.Reset() }() - resultContent := UceAccompanyProcessor.Process(content).(constant.OneConfigmapContent[*constant.DeviceInfo]) + res := UceAccompanyProcessor.Process(content).(constant.OneConfigmapContent[*constant.AdvanceDeviceFaultCm]) - if !reflect.DeepEqual(faultdomain.GetAdvanceDeviceCmForNodeMap(resultContent.AllConfigmap), - faultdomain.GetAdvanceDeviceCmForNodeMap(expectProcessedDeviceInfos)) { - t.Errorf("result = %v, want %v", - util.ObjToString(cmDeviceInfos), util.ObjToString(expectProcessedDeviceInfos)) + exp := faultdomain.GetAdvanceFaultCm[*constant.AdvanceDeviceFaultCm](expectProcessedDeviceInfos) + if !reflect.DeepEqual(res.AllConfigmap, exp) { + t.Errorf("result:\n%v\nwant:\n%v", + util.ObjToString(res.AllConfigmap), + util.ObjToString(exp)) } if len(UceAccompanyProcessor.uceAccompanyFaultQue["node1"]["Ascend910-1"]) != 1 && @@ -111,7 +112,7 @@ func TestUceAccompanyFaultProcessorProcessForAddFault(t *testing.T) { }, }, } - UceAccompanyProcessor.deviceCmForNodeMap = make(map[string]constant.AdvanceDeviceFaultCm) + UceAccompanyProcessor.deviceCmForNodeMap = map[string]*constant.AdvanceDeviceFaultCm{"node1": {}} UceAccompanyProcessor.filterFaultInfos(CurrentTime) if len(UceAccompanyProcessor.deviceCmForNodeMap[nodeName].FaultDeviceList[deviceName]) != 1 { t.Error("TestUceAccompanyFaultProcessorProcessForAddFault fail") diff --git a/component/clusterd/pkg/application/faultmanager/fault_process_center.go b/component/clusterd/pkg/application/faultmanager/fault_process_center.go index 80e8d48ee99a9764c772f77a34ce9919ae0016c7..3f1485436c1bb0ddfe2fba1d21aa2034bff850b1 100644 --- a/component/clusterd/pkg/application/faultmanager/fault_process_center.go +++ b/component/clusterd/pkg/application/faultmanager/fault_process_center.go @@ -45,12 +45,12 @@ func (center *faultProcessCenter) notifyFaultCenterProcess(whichToProcess int) { // Work faultProcessCenter work goroutine func (center *faultProcessCenter) Work(ctx context.Context) { go func() { - hwlog.RunLog.Info("faultProcessCenter start work") + hwlog.RunLog.Info("faultProcessCenter start work!") centerTicker := time.NewTicker(time.Second) for { select { case <-ctx.Done(): - hwlog.RunLog.Info("faultProcessCenter stop work") + hwlog.RunLog.Info("faultProcessCenter stop work!") return case whichToProcess := <-center.notifyProcessChan: switch whichToProcess { @@ -104,7 +104,7 @@ func QueryJobsFaultInfo(faultLevel string) map[string]constant.JobFaultInfo { } // QueryDeviceInfoToReport query device info to report -func QueryDeviceInfoToReport() map[string]*constant.DeviceInfo { +func QueryDeviceInfoToReport() map[string]*constant.AdvanceDeviceFaultCm { infos := cmprocess.DeviceCenter.GetProcessedCm() for _, info := range infos { info.UpdateTime = time.Now().Unix() @@ -148,3 +148,8 @@ func PubFaultCollector(oldPubFaultInfo, newPubFaultInfo *api.PubFaultInfo, opera } publicfault.PubFaultCollector(newPubFaultInfo) } + +// RegisterForJobFaultRank register for job fault info +func RegisterForJobFaultRank(ch chan map[string]constant.JobFaultInfo, src string) error { + return jobprocess.FaultJobCenter.Register(ch, src) +} diff --git a/component/clusterd/pkg/application/faultmanager/jobprocess/fault_job_center.go b/component/clusterd/pkg/application/faultmanager/jobprocess/fault_job_center.go index 1bd6d380a145cdafad2894b34023c867c32e424a..f18e4a91f330e7cff67c4fe0693f9e25decf3ed1 100644 --- a/component/clusterd/pkg/application/faultmanager/jobprocess/fault_job_center.go +++ b/component/clusterd/pkg/application/faultmanager/jobprocess/fault_job_center.go @@ -4,8 +4,10 @@ package jobprocess import ( - "time" + "fmt" + "sync" + "ascend-common/common-utils/hwlog" "clusterd/pkg/application/faultmanager/cmprocess" "clusterd/pkg/application/faultmanager/jobprocess/faultrank" "clusterd/pkg/application/faultmanager/jobprocess/relationfault" @@ -16,26 +18,28 @@ import ( var FaultJobCenter *faultJobProcessCenter type faultJobProcessCenter struct { - lastProcessTime int64 - processorList []constant.FaultProcessor + processorList []constant.FaultProcessor + subscribeChannelList []*subscriber + mutex sync.Mutex +} + +type subscriber struct { + ch chan map[string]constant.JobFaultInfo + src string } func init() { FaultJobCenter = &faultJobProcessCenter{ - lastProcessTime: 0, processorList: []constant.FaultProcessor{ relationfault.RelationProcessor, faultrank.JobFaultRankProcessor, }, + mutex: sync.Mutex{}, + subscribeChannelList: make([]*subscriber, 0), } } func (fJobCenter *faultJobProcessCenter) Process() { - currentTime := time.Now().UnixMilli() - if fJobCenter.isProcessLimited(currentTime) { - return - } - fJobCenter.lastProcessTime = currentTime content := constant.AllConfigmapContent{ DeviceCm: cmprocess.DeviceCenter.GetProcessedCm(), SwitchCm: cmprocess.SwitchCenter.GetProcessedCm(), @@ -44,8 +48,37 @@ func (fJobCenter *faultJobProcessCenter) Process() { for _, processor := range fJobCenter.processorList { processor.Process(content) } + fJobCenter.notifySubscriber() +} + +// Register notify chan +func (fJobCenter *faultJobProcessCenter) Register(ch chan map[string]constant.JobFaultInfo, src string) error { + if ch == nil { + return fmt.Errorf("invalid chanel for send") + } + fJobCenter.mutex.Lock() + defer fJobCenter.mutex.Unlock() + length := len(fJobCenter.subscribeChannelList) + if length > constant.MaxFaultCenterSubscriber { + return fmt.Errorf("the number of registrants is %d, cannot add any more", length) + } + fJobCenter.subscribeChannelList = append(fJobCenter.subscribeChannelList, &subscriber{ + ch: ch, + src: src, + }) + return nil } -func (fJobCenter *faultJobProcessCenter) isProcessLimited(currentTime int64) bool { - return fJobCenter.lastProcessTime+constant.FaultCenterProcessPeriod > currentTime +func (fJobCenter *faultJobProcessCenter) notifySubscriber() { + faultRankInfos := faultrank.JobFaultRankProcessor.GetJobFaultRankInfosFilterLevel(constant.NotHandleFault) + for _, sub := range fJobCenter.subscribeChannelList { + if sub.ch == nil { + continue + } + select { + case sub.ch <- faultRankInfos: + default: + hwlog.RunLog.Warnf("notify %s fault rank failed.", sub.src) + } + } } diff --git a/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor.go b/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor.go index 6dc7ff21a9dff2c554e480014edac2824ff6ac0a..b344c13c39e855b2dbb1cab70bdf865123e164ac 100644 --- a/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor.go +++ b/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor.go @@ -75,16 +75,15 @@ func (processor *jobRankFaultInfoProcessor) setJobFaultRankInfos(faultInfos map[ } func (processor *jobRankFaultInfoProcessor) findFaultRankForJob( - nodeDeviceInfoMap map[string]constant.AdvanceDeviceFaultCm, + advanceDeviceInfo *constant.AdvanceDeviceFaultCm, nodeName string, serverList map[string]constant.ServerHccl, jobId string) []constant.FaultRank { - advanceDeviceInfo := nodeDeviceInfoMap[nodeName] devicesOfJobOnNode, ok := serverList[nodeName] - faultRankList := make([]constant.FaultRank, 0) - if !ok || len(devicesOfJobOnNode.DeviceList) == 0 { - return faultRankList + if advanceDeviceInfo == nil || !ok || len(devicesOfJobOnNode.DeviceList) == 0 { + return make([]constant.FaultRank, 0) } + faultRankList := make([]constant.FaultRank, 0) for _, deviceInfo := range devicesOfJobOnNode.DeviceList { - deviceName := advanceDeviceInfo.ServerType + "-" + deviceInfo.DeviceID + deviceName := advanceDeviceInfo.DeviceType + "-" + deviceInfo.DeviceID faultList := advanceDeviceInfo.FaultDeviceList[deviceName] podRank, podUid := pod.GetPodRankAndPodUid(jobId, deviceInfo.RankID) uceInManagementPlane := false @@ -158,13 +157,7 @@ func (processor *jobRankFaultInfoProcessor) Process(info any) any { hwlog.RunLog.Error("convert info to AllConfigmapContent failed") return info } - deviceInfos := allConfigmap.DeviceCm - deviceCmForNodeMap := faultdomain.GetAdvanceDeviceCmForNodeMap(deviceInfos) - hwlog.RunLog.Debugf("deviceInfos: %#v", deviceInfos) - nodeInfos := allConfigmap.NodeCm - hwlog.RunLog.Debugf("nodeInfos: %#v", nodeInfos) - switchInfos := allConfigmap.SwitchCm - hwlog.RunLog.Debugf("switchInfos: %#v", switchInfos) + hwlog.RunLog.Debugf("allConfigmap info: %#v", util.ObjToString(allConfigmap)) jobFaultInfos := make(map[string]constant.JobFaultInfo) jobServerInfoMap := job.GetJobServerInfoMap() @@ -176,7 +169,7 @@ func (processor *jobRankFaultInfoProcessor) Process(info any) any { } hwlog.RunLog.Debugf("serverList: %d", len(serverList)) faultList, nodeStatusList, faultDeviceList := processor.findNodeDeviceAndSwitchFault(serverList, - nodeInfos, switchInfos, deviceCmForNodeMap, jobId) + allConfigmap.NodeCm, allConfigmap.SwitchCm, allConfigmap.DeviceCm, jobId) jobFaultInfo.FaultList = faultList if len(jobFaultInfo.FaultList) > 0 { hwlog.RunLog.Debugf("jobFaultInfo: %#v", jobFaultInfo) @@ -194,7 +187,7 @@ func (processor *jobRankFaultInfoProcessor) Process(info any) any { func (processor *jobRankFaultInfoProcessor) findNodeDeviceAndSwitchFault( serverList map[string]constant.ServerHccl, nodeInfos map[string]*constant.NodeInfo, - switchInfos map[string]*constant.SwitchInfo, deviceCmForNodeMap map[string]constant.AdvanceDeviceFaultCm, + switchInfos map[string]*constant.SwitchInfo, deviceCmForNodeMap map[string]*constant.AdvanceDeviceFaultCm, jobId string) ([]constant.FaultRank, []string, []constant.FaultDevice) { faultList := make([]constant.FaultRank, 0) faultDeviceList := make([]constant.FaultDevice, 0) @@ -225,7 +218,8 @@ func (processor *jobRankFaultInfoProcessor) findNodeDeviceAndSwitchFault( faultDeviceList = append(faultDeviceList, convertToFaultDevice(&server, "", constant.SeparateNPU, constant.EmptyDeviceId, constant.FaultTypeNode)) } - faultRankList := processor.findFaultRankForJob(deviceCmForNodeMap, nodeName, serverList, jobId) + advanceDeviceInfo := deviceCmForNodeMap[nodeName] + faultRankList := processor.findFaultRankForJob(advanceDeviceInfo, nodeName, serverList, jobId) faultList = append(faultList, faultRankList...) faultDeviceList = append(faultDeviceList, getFautDeviceInfoByFaultRank(&server, faultRankList)...) } diff --git a/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor_test.go b/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor_test.go index 222c6379757436f7ed1ea260cdc8c8ea3b3ec86d..ddbefdd694b591ba200852e6eb5ebcd23273e4c9 100644 --- a/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor_test.go +++ b/component/clusterd/pkg/application/faultmanager/jobprocess/faultrank/job_fault_rank_processor_test.go @@ -69,7 +69,11 @@ func TestFaultProcessorImplProcess(t *testing.T) { mockKube.Reset() mockJob.Reset() }() - JobFaultRankProcessor.Process(constant.AllConfigmapContent{}) + JobFaultRankProcessor.Process(constant.AllConfigmapContent{ + DeviceCm: make(map[string]*constant.AdvanceDeviceFaultCm), + SwitchCm: make(map[string]*constant.SwitchInfo), + NodeCm: make(map[string]*constant.NodeInfo), + }) faultRankInfos := JobFaultRankProcessor.GetJobFaultRankInfos() if len(faultRankInfos[jobId].FaultList) != len(jobServerMap.InfoMap[jobId][nodeName].DeviceList) { t.Error("TestFaultProcessorImplProcess fail") @@ -172,9 +176,9 @@ func TestFindFaultRankForJob(t *testing.T) { func testNoDevicesOnNode(processor *jobRankFaultInfoProcessor) { convey.Convey("When no devices on node", func() { - nodeDeviceInfoMap := map[string]constant.AdvanceDeviceFaultCm{ + nodeDeviceInfoMap := map[string]*constant.AdvanceDeviceFaultCm{ "node1": { - ServerType: "server-type", + DeviceType: "server-type", FaultDeviceList: map[string][]constant.DeviceFault{}, }, } @@ -184,16 +188,17 @@ func testNoDevicesOnNode(processor *jobRankFaultInfoProcessor) { }, } - faultRanks := processor.findFaultRankForJob(nodeDeviceInfoMap, "node1", serverList, "job1") + faultRanks := processor.findFaultRankForJob( + nodeDeviceInfoMap["node1"], "node1", serverList, "job1") convey.So(faultRanks, convey.ShouldBeEmpty) }) } func testUceInManagementPlane(processor *jobRankFaultInfoProcessor) { convey.Convey("When UCE fault in management plane", func() { - nodeDeviceInfoMap := map[string]constant.AdvanceDeviceFaultCm{ + nodeDeviceInfoMap := map[string]*constant.AdvanceDeviceFaultCm{ "node1": { - ServerType: "server-type", + DeviceType: "server-type", FaultDeviceList: map[string][]constant.DeviceFault{ "server-type-device1": { {FaultCode: constant.UceFaultCode, FaultLevel: constant.RestartBusiness}, @@ -215,7 +220,8 @@ func testUceInManagementPlane(processor *jobRankFaultInfoProcessor) { }) defer patches.Reset() - faultRanks := processor.findFaultRankForJob(nodeDeviceInfoMap, "node1", serverList, "job1") + faultRanks := processor.findFaultRankForJob( + nodeDeviceInfoMap["node1"], "node1", serverList, "job1") convey.So(faultRanks, convey.ShouldHaveLength, 1) convey.So(faultRanks[0].FaultCode, convey.ShouldEqual, constant.UceFaultCode) convey.So(faultRanks[0].DoStepRetry, convey.ShouldBeTrue) @@ -224,9 +230,9 @@ func testUceInManagementPlane(processor *jobRankFaultInfoProcessor) { func testUceInBusinessPlane(processor *jobRankFaultInfoProcessor) { convey.Convey("When UCE fault in business plane", func() { - nodeDeviceInfoMap := map[string]constant.AdvanceDeviceFaultCm{ + nodeDeviceInfoMap := map[string]*constant.AdvanceDeviceFaultCm{ "node1": { - ServerType: "server-type", + DeviceType: "server-type", FaultDeviceList: map[string][]constant.DeviceFault{}, }, } @@ -249,7 +255,8 @@ func testUceInBusinessPlane(processor *jobRankFaultInfoProcessor) { return true }) - faultRanks := processor.findFaultRankForJob(nodeDeviceInfoMap, "node1", serverList, "job1") + faultRanks := processor.findFaultRankForJob( + nodeDeviceInfoMap["node1"], "node1", serverList, "job1") convey.So(faultRanks, convey.ShouldHaveLength, 1) convey.So(faultRanks[0].FaultCode, convey.ShouldEqual, constant.UceFaultCode) convey.So(faultRanks[0].DoStepRetry, convey.ShouldBeTrue) diff --git a/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_process_test.go b/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_process_test.go index be30f7e2d2f29e835a69bf0b73d3df6bd7d6eee3..0bc258d11a71b841718ecb71e1cde4d8482fc85c 100644 --- a/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_process_test.go +++ b/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_process_test.go @@ -16,7 +16,6 @@ import ( "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" "clusterd/pkg/common/util" - "clusterd/pkg/domain/faultdomain" "clusterd/pkg/domain/job" "clusterd/pkg/interface/kube" ) @@ -329,7 +328,7 @@ func testInvalidInfoType(processor *relationFaultProcessor) { func testValidInfoType(processor *relationFaultProcessor) { convey.Convey("When info type is valid", func() { content := constant.AllConfigmapContent{ - DeviceCm: map[string]*constant.DeviceInfo{}, + DeviceCm: map[string]*constant.AdvanceDeviceFaultCm{}, SwitchCm: map[string]*constant.SwitchInfo{}, NodeCm: map[string]*constant.NodeInfo{}, } @@ -480,8 +479,8 @@ func testSuccess(fJob *FaultJob) { func testInitByDeviceFault(fJob *FaultJob) { convey.Convey("When initializing by device fault", func() { cardName := "server-type-device1" - nodeFaultInfo := constant.AdvanceDeviceFaultCm{ - ServerType: "server-type", + nodeFaultInfo := &constant.AdvanceDeviceFaultCm{ + DeviceType: "server-type", FaultDeviceList: map[string][]constant.DeviceFault{ cardName: { { @@ -724,7 +723,6 @@ func testValidConfig() { func TestInitFaultJobs(t *testing.T) { convey.Convey("Test InitFaultJobs", t, func() { processor := &relationFaultProcessor{ - deviceInfoCm: map[string]*constant.DeviceInfo{}, switchInfoCm: map[string]*constant.SwitchInfo{}, faultJobs: make(map[string]*FaultJob), } @@ -736,11 +734,10 @@ func TestInitFaultJobs(t *testing.T) { func testEmptyServerList(processor *relationFaultProcessor) { convey.Convey("When server list is empty", func() { - patches := gomonkey.ApplyFunc(faultdomain.GetAdvanceDeviceCmForNodeMap, - func(deviceInfoCm map[string]*constant.DeviceInfo) map[string]constant.AdvanceDeviceFaultCm { - return map[string]constant.AdvanceDeviceFaultCm{"node1": {SuperPodID: 1}} - }) + patches := gomonkey.NewPatches() defer patches.Reset() + deviceInfo := map[string]*constant.AdvanceDeviceFaultCm{"node1": {SuperPodID: 1}} + processor.deviceInfoCm = deviceInfo patches.ApplyFunc(job.GetJobServerInfoMap, func() constant.JobServerInfoMap { return constant.JobServerInfoMap{ @@ -758,11 +755,10 @@ func testEmptyServerList(processor *relationFaultProcessor) { func testInitFaultJob(processor *relationFaultProcessor) { convey.Convey("When initializing fault job", func() { - patches := gomonkey.ApplyFunc(faultdomain.GetAdvanceDeviceCmForNodeMap, - func(deviceInfoCm map[string]*constant.DeviceInfo) map[string]constant.AdvanceDeviceFaultCm { - return map[string]constant.AdvanceDeviceFaultCm{"node1": {SuperPodID: 1}} - }) + patches := gomonkey.NewPatches() defer patches.Reset() + deviceInfo := map[string]*constant.AdvanceDeviceFaultCm{"node1": {SuperPodID: 1}} + processor.deviceInfoCm = deviceInfo patches.ApplyFunc(job.GetJobServerInfoMap, func() constant.JobServerInfoMap { return constant.JobServerInfoMap{ diff --git a/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_processor.go b/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_processor.go index 71206d40169a7eb57acf18d107ae77396cdc8fe2..2195e835e6896bf36354d8085defa9fc6a9d059c 100644 --- a/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_processor.go +++ b/component/clusterd/pkg/application/faultmanager/jobprocess/relationfault/relation_fault_processor.go @@ -45,7 +45,7 @@ func loadConfig() { type relationFaultProcessor struct { faultJobs map[string]*FaultJob - deviceInfoCm map[string]*constant.DeviceInfo + deviceInfoCm map[string]*constant.AdvanceDeviceFaultCm switchInfoCm map[string]*constant.SwitchInfo nodeInfoCm map[string]*constant.NodeInfo } @@ -71,7 +71,6 @@ func (processor *relationFaultProcessor) Process(info any) any { } func (processor *relationFaultProcessor) InitFaultJobs() { - deviceCmForNodeMap := faultdomain.GetAdvanceDeviceCmForNodeMap(processor.deviceInfoCm) faultJobs := make(map[string]*FaultJob) jobServerInfoMap := job.GetJobServerInfoMap() for jobId, serverLists := range jobServerInfoMap.InfoMap { @@ -85,11 +84,17 @@ func (processor *relationFaultProcessor) InitFaultJobs() { } tmpFaultJob.initFaultJobAttr() for nodeName, serverList := range serverLists { - tmpFaultJob.IsA3Job = deviceCmForNodeMap[nodeName].SuperPodID >= 0 tmpFaultJob.PodNames[serverList.ServerName] = serverList.PodID tmpFaultJob.NameSpace = serverList.PodNameSpace - tmpFaultJob.initBySwitchFault(processor.switchInfoCm[constant.SwitchInfoPrefix+nodeName], serverList) - tmpFaultJob.initByDeviceFault(deviceCmForNodeMap[nodeName], serverList) + switchInfo, ok := processor.switchInfoCm[constant.SwitchInfoPrefix+nodeName] + if ok { + tmpFaultJob.initBySwitchFault(switchInfo, serverList) + } + deviceInfo, ok := processor.deviceInfoCm[nodeName] + if ok { + tmpFaultJob.IsA3Job = deviceInfo.SuperPodID >= 0 + tmpFaultJob.initByDeviceFault(deviceInfo, serverList) + } } faultJobs[jobId] = tmpFaultJob hwlog.RunLog.Debugf("init fault job %v", util.ObjToString(faultJobs)) @@ -320,12 +325,12 @@ func (fJob *FaultJob) addFaultStrategyForTimeOutCode(fault *constant.FaultInfo) } } -func (fJob *FaultJob) initByDeviceFault(nodeFaultInfo constant.AdvanceDeviceFaultCm, serverList constant.ServerHccl) { +func (fJob *FaultJob) initByDeviceFault(nodeFaultInfo *constant.AdvanceDeviceFaultCm, serverList constant.ServerHccl) { if fJob.SeparateNodes.Has(serverList.ServerName) { return } for _, deviceInfo := range serverList.DeviceList { - deviceName := nodeFaultInfo.ServerType + "-" + deviceInfo.DeviceID + deviceName := nodeFaultInfo.DeviceType + "-" + deviceInfo.DeviceID fault, ok := nodeFaultInfo.FaultDeviceList[deviceName] if !ok { continue diff --git a/component/clusterd/pkg/application/recover/fault_recover_service.go b/component/clusterd/pkg/application/recover/fault_recover_service.go index b055da3fcd61c09c1517af8847b727d7b0205f8a..9f5d0af680c82279655c2c658413886683992324 100644 --- a/component/clusterd/pkg/application/recover/fault_recover_service.go +++ b/component/clusterd/pkg/application/recover/fault_recover_service.go @@ -6,6 +6,7 @@ package recover import ( "context" "fmt" + "reflect" "sync" "time" @@ -27,6 +28,7 @@ type FaultRecoverService struct { eventCtl map[string]*EventController initJob map[string]common.JobBaseInfo lock sync.RWMutex + faultCh chan map[string]constant.JobFaultInfo pb.UnimplementedRecoverServer } @@ -37,6 +39,10 @@ func NewFaultRecoverService(keepAlive int, ctx context.Context) *FaultRecoverSer s.serviceCtx = ctx s.eventCtl = make(map[string]*EventController) s.initJob = make(map[string]common.JobBaseInfo) + s.faultCh = make(chan map[string]constant.JobFaultInfo, 5) + if err := faultmanager.RegisterForJobFaultRank(s.faultCh, reflect.TypeOf(s).Name()); err != nil { + hwlog.RunLog.Errorf("RegisterForJobFaultRank fail") + } go s.checkFaultFromFaultCenter() return s } @@ -101,12 +107,7 @@ func (s *FaultRecoverService) dealWithJobFaultInfo(jobFaultInfoList []constant.J wg.Wait() } -func (s *FaultRecoverService) checkFault() { - if faultmanager.GlobalFaultProcessCenter == nil { - hwlog.RunLog.Warnf("global center is nil, try it after %d second", globalFaultBeaconSecond) - return - } - allJobFaultInfo := faultmanager.QueryJobsFaultInfo(constant.NotHandleFault) +func (s *FaultRecoverService) checkFault(allJobFaultInfo map[string]constant.JobFaultInfo) { var registeredJobInfo []constant.JobFaultInfo for jobId, jobFaultInfo := range allJobFaultInfo { if !s.registered(jobId) { @@ -121,15 +122,12 @@ func (s *FaultRecoverService) checkFault() { } func (s *FaultRecoverService) checkFaultFromFaultCenter() { - ticker := time.NewTicker(time.Duration(globalFaultBeaconSecond) * time.Second) - defer ticker.Stop() for { select { case <-s.serviceCtx.Done(): return - case <-ticker.C: - hwlog.RunLog.Debug("ticker check npu fault from global center") - s.checkFault() + case allJobFaultInfo := <-s.faultCh: + s.checkFault(allJobFaultInfo) } } } diff --git a/component/clusterd/pkg/application/recover/fault_recover_service_test.go b/component/clusterd/pkg/application/recover/fault_recover_service_test.go index 8e69e2a0cb0becc60dc80eca6bb46f4b20a44b1e..824e4f413ae41a292bf6be38ff416f00503ca71f 100644 --- a/component/clusterd/pkg/application/recover/fault_recover_service_test.go +++ b/component/clusterd/pkg/application/recover/fault_recover_service_test.go @@ -495,25 +495,18 @@ func TestDealWithJobFaultInfo(t *testing.T) { func TestCheckFault(t *testing.T) { convey.Convey("Testing checkFault", t, func() { service := fakeService() - patches := gomonkey.ApplyFunc(faultmanager.QueryJobsFaultInfo, - func(faultLevel string) map[string]constant.JobFaultInfo { - return map[string]constant.JobFaultInfo{ - fakeJobID1: {JobId: fakeJobID1, FaultList: []constant.FaultRank{{}}}, - fakeJobID2: {JobId: fakeJobID2, FaultList: []constant.FaultRank{{}}}, - } - }).ApplyFunc(service.registered, func(jobId string) bool { - if jobId == "job1" { - return true - } - return false - }).ApplyFunc(service.dealWithJobFaultInfo, func(jobFaultInfoList []constant.JobFaultInfo) { + patches := gomonkey.ApplyFunc(service.dealWithJobFaultInfo, func(jobFaultInfoList []constant.JobFaultInfo) { convey.So(jobFaultInfoList, convey.ShouldHaveLength, 1) }) defer patches.Reset() - service.checkFault() + info := map[string]constant.JobFaultInfo{ + fakeJobID1: {JobId: fakeJobID1, FaultList: []constant.FaultRank{{}}}, + fakeJobID2: {JobId: fakeJobID2, FaultList: []constant.FaultRank{{}}}, + } + service.checkFault(info) faultmanager.GlobalFaultProcessCenter = nil - service.checkFault() + service.checkFault(info) }) } diff --git a/component/clusterd/pkg/application/resource/report.go b/component/clusterd/pkg/application/resource/report.go index 9c2d1317be40b9190619939e89fc624f6e4870ed..eb3051659b197b3151ee8acfc09e8e70dcb4ad33 100644 --- a/component/clusterd/pkg/application/resource/report.go +++ b/component/clusterd/pkg/application/resource/report.go @@ -18,6 +18,7 @@ import ( "clusterd/pkg/application/faultmanager" "clusterd/pkg/common/constant" "clusterd/pkg/domain/device" + "clusterd/pkg/domain/faultdomain" "clusterd/pkg/domain/node" "clusterd/pkg/domain/switchinfo" "clusterd/pkg/interface/kube" @@ -65,7 +66,8 @@ func Report(ctx context.Context) { }) switch whichToReport { case constant.DeviceProcessType: - deviceArr := device.GetSafeData(faultmanager.QueryDeviceInfoToReport()) + deviceArr := device.GetSafeData(faultdomain.AdvanceFaultMapToOriginalFaultMap[*constant.DeviceInfo]( + faultmanager.QueryDeviceInfoToReport())) updateDeviceInfoCm(deviceArr) case constant.NodeProcessType: nodeArr := node.GetSafeData(faultmanager.QueryNodeInfoToReport()) @@ -74,7 +76,8 @@ func Report(ctx context.Context) { switchArr := switchinfo.GetSafeData(faultmanager.QuerySwitchInfoToReport()) updateSwitchInfoCm(switchArr) case constant.AllProcessType: - deviceArr := device.GetSafeData(faultmanager.QueryDeviceInfoToReport()) + deviceArr := device.GetSafeData(faultdomain.AdvanceFaultMapToOriginalFaultMap[*constant.DeviceInfo]( + faultmanager.QueryDeviceInfoToReport())) nodeArr := node.GetSafeData(faultmanager.QueryNodeInfoToReport()) switchArr := switchinfo.GetSafeData(faultmanager.QuerySwitchInfoToReport()) updateAllCm(deviceArr, nodeArr, switchArr) diff --git a/component/clusterd/pkg/common/constant/const.go b/component/clusterd/pkg/common/constant/const.go index e67f0e2f16bfa2da227834da8dd3008fd691fb8f..8d25e33a41421b4b8f7667a4cc4e3c8fae28a514 100644 --- a/component/clusterd/pkg/common/constant/const.go +++ b/component/clusterd/pkg/common/constant/const.go @@ -179,6 +179,18 @@ const ( SubHealthFault = "SubHealthFault" ) +// About cm keys +const ( + // CmRecoveringSuffix Recovering Suffix + CmRecoveringSuffix = "-Recovering" + // CmCardUnhealthySuffix CardUnhealthy Suffix + CmCardUnhealthySuffix = "-Unhealthy" + // CmCardNetworkUnhealthySuffix NetworkUnhealthy Suffix + CmCardNetworkUnhealthySuffix = "-NetworkUnhealthy" + // CmFaultListSuffix FaultList Suffix + CmFaultListSuffix = "-Fault" +) + // support device type const ( Ascend910 = "Ascend910" diff --git a/component/clusterd/pkg/common/constant/constants.go b/component/clusterd/pkg/common/constant/constants.go index 2500d3eb9ff4bfa285e280920a12f261e1f86a56..7d6845de090a79f5f2c56ebe4fcceaea85515f8b 100644 --- a/component/clusterd/pkg/common/constant/constants.go +++ b/component/clusterd/pkg/common/constant/constants.go @@ -80,7 +80,6 @@ const ( JobReportRecoverTimeout = 10 * 1000 JobReportInfoExpiredTimeout = 10 * 1000 JobReportCompleteTimeout = 30 * 1000 - FaultCenterProcessPeriod = 3 * 1000 MaxFaultCenterSubscriber = 10 UnknownFaultTime = -1 ) diff --git a/component/clusterd/pkg/common/constant/methods.go b/component/clusterd/pkg/common/constant/methods.go new file mode 100644 index 0000000000000000000000000000000000000000..76cbe80168f26b8fcdd712c37545d63185101dbd --- /dev/null +++ b/component/clusterd/pkg/common/constant/methods.go @@ -0,0 +1,311 @@ +// Copyright (c) Huawei Technologies Co., Ltd. 2025-2025. All rights reserved. + +// Package constant a series of para +package constant + +import ( + "maps" + + "k8s.io/utils/strings/slices" + + "ascend-common/api" + "ascend-common/common-utils/hwlog" + "clusterd/pkg/common/util" +) + +var normalFaultLevel = []string{NotHandleFault, SubHealthFault, NormalNPU, NormalNetwork} + +func (cm *AdvanceDeviceFaultCm) addFaultIntoFaultList(addFault DeviceFault) bool { + if cm.FaultDeviceList == nil { + cm.FaultDeviceList = make(map[string][]DeviceFault) + } + if _, ok := cm.FaultDeviceList[addFault.NPUName]; !ok { + cm.FaultDeviceList[addFault.NPUName] = make([]DeviceFault, 0) + } + deviceFaults := cm.FaultDeviceList[addFault.NPUName] + found := false + for _, fault := range deviceFaults { + if equalDeviceFault(&addFault, &fault) { + found = true + break + } + } + if !found { + deviceFaults = append(deviceFaults, addFault) + } + cm.FaultDeviceList[addFault.NPUName] = deviceFaults + return !found +} + +// AddFault add fault in the AdvanceDeviceFaultCm +// If the fault is more than normalFaultLevel, then should add into CardUnHealthy/NetworkUnhealthy +// And remove from AvailableDeviceList +func (cm *AdvanceDeviceFaultCm) AddFault(fault DeviceFault) { + if !cm.addFaultIntoFaultList(fault) { + return + } + if !slices.Contains(normalFaultLevel, fault.FaultLevel) { + if fault.FaultType == CardUnhealthy || fault.FaultType == PublicFaultType { + cm.AvailableDeviceList = util.DeleteStringSliceItem(cm.AvailableDeviceList, fault.NPUName) + if !slices.Contains(cm.CardUnHealthy, fault.NPUName) { + cm.CardUnHealthy = append(cm.CardUnHealthy, fault.NPUName) + } + } else if fault.FaultType == CardNetworkUnhealthy { + cm.AvailableDeviceList = util.DeleteStringSliceItem(cm.AvailableDeviceList, fault.NPUName) + if !slices.Contains(cm.NetworkUnhealthy, fault.NPUName) { + cm.NetworkUnhealthy = append(cm.NetworkUnhealthy, fault.NPUName) + } + } else { + hwlog.RunLog.Errorf("unrecognizable fault type %s", fault.FaultType) + } + } +} + +func (cm *AdvanceDeviceFaultCm) delFaultFromFaultList(delFault DeviceFault) bool { + if cm.FaultDeviceList == nil { + return false + } + if _, ok := cm.FaultDeviceList[delFault.NPUName]; !ok { + return false + } + deviceFaults := cm.FaultDeviceList[delFault.NPUName] + + newDeviceFaults := make([]DeviceFault, 0) + found := false + for _, fault := range deviceFaults { + if equalDeviceFault(&delFault, &fault) { + found = true + continue + } + newDeviceFaults = append(newDeviceFaults, fault) + } + if len(newDeviceFaults) == 0 { + delete(cm.FaultDeviceList, delFault.NPUName) + } else { + cm.FaultDeviceList[delFault.NPUName] = newDeviceFaults + } + return found +} + +// DelFault delete fault in the AdvanceDeviceFaultCm +// Delete fault cannot add npu into AvailableDeviceList, because some job run on the npu +func (cm *AdvanceDeviceFaultCm) DelFault(fault DeviceFault) { + if !cm.delFaultFromFaultList(fault) { + return + } + deviceFaults := cm.FaultDeviceList[fault.NPUName] + delFromCardUnhealthy := true + delFromCardNetworkUnhealthy := true + for _, devFault := range deviceFaults { + if !slices.Contains(normalFaultLevel, devFault.FaultLevel) { + if devFault.FaultType == CardUnhealthy || devFault.FaultType == PublicFaultType { + delFromCardUnhealthy = false + } else if devFault.FaultType == CardNetworkUnhealthy { + delFromCardNetworkUnhealthy = false + } else { + hwlog.RunLog.Errorf("unrecognizable fault type %s", devFault.FaultType) + } + } + } + if delFromCardUnhealthy { + cm.CardUnHealthy = util.DeleteStringSliceItem(cm.CardUnHealthy, fault.NPUName) + } + if delFromCardNetworkUnhealthy { + cm.NetworkUnhealthy = util.DeleteStringSliceItem(cm.NetworkUnhealthy, fault.NPUName) + } +} + +// IsSame compare two AdvanceDeviceFaultCm, do not care UpdateTime +func (cm *AdvanceDeviceFaultCm) IsSame(another ConfigMapInterface) bool { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + return false + } + thatCm, ok := another.(*AdvanceDeviceFaultCm) + if !ok { + return false + } + eq := func(faultListOne []DeviceFault, faultListOther []DeviceFault) bool { + if len(faultListOne) != len(faultListOther) { + return false + } + for i, fault := range faultListOne { + if !equalDeviceFault(&fault, &faultListOther[i]) { + return false + } + } + return true + } + return cm.DeviceType == thatCm.DeviceType && + cm.CmName == thatCm.CmName && + cm.SuperPodID == thatCm.SuperPodID && + cm.ServerIndex == thatCm.ServerIndex && + slices.Equal(cm.AvailableDeviceList, thatCm.AvailableDeviceList) && + slices.Equal(cm.Recovering, thatCm.Recovering) && + slices.Equal(cm.CardUnHealthy, thatCm.CardUnHealthy) && + slices.Equal(cm.NetworkUnhealthy, thatCm.NetworkUnhealthy) && + maps.EqualFunc(cm.FaultDeviceList, thatCm.FaultDeviceList, eq) +} + +// GetCmName return cm name +func (cm *AdvanceDeviceFaultCm) GetCmName() string { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + } + return cm.CmName +} + +// GetRecoveringKey return cm RecoveringKey +func (cm *AdvanceDeviceFaultCm) GetRecoveringKey() string { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + } + return api.ResourceNamePrefix + cm.DeviceType + CmRecoveringSuffix +} + +// GetCardUnHealthyKey return cm CardUnHealthyKey +func (cm *AdvanceDeviceFaultCm) GetCardUnHealthyKey() string { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + } + return api.ResourceNamePrefix + cm.DeviceType + CmCardUnhealthySuffix +} + +// GetNetworkUnhealthyKey return cm NetworkUnhealthyKey +func (cm *AdvanceDeviceFaultCm) GetNetworkUnhealthyKey() string { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + } + return api.ResourceNamePrefix + cm.DeviceType + CmCardNetworkUnhealthySuffix +} + +// GetFaultDeviceListKey return cm FaultDeviceListKey +func (cm *AdvanceDeviceFaultCm) GetFaultDeviceListKey() string { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + } + return api.ResourceNamePrefix + cm.DeviceType + CmFaultListSuffix +} + +// GetAvailableDeviceListKey return cm AvailableDeviceListKey +func (cm *AdvanceDeviceFaultCm) GetAvailableDeviceListKey() string { + if cm == nil { + hwlog.RunLog.Error("cm is nil") + } + return api.ResourceNamePrefix + cm.DeviceType +} + +// GetCmName get configmap name of device info +func (cm *DeviceInfo) GetCmName() string { + return cm.CmName +} + +// GetCmName get configmap name of switch info +func (cm *SwitchInfo) GetCmName() string { + return cm.CmName +} + +// GetCmName get configmap name of node info +func (cm *NodeInfo) GetCmName() string { + return cm.CmName +} + +// IsSame compare with another cm +func (cm *DeviceInfo) IsSame(another ConfigMapInterface) bool { + anotherDeviceInfo, ok := another.(*DeviceInfo) + if !ok { + hwlog.RunLog.Warnf("compare with cm which is not DeviceInfo") + return false + } + return !DeviceInfoBusinessDataIsNotEqual(cm, anotherDeviceInfo) +} + +// IsSame compare with another cm +func (cm *SwitchInfo) IsSame(another ConfigMapInterface) bool { + anotherSwitchInfo, ok := another.(*SwitchInfo) + if !ok { + hwlog.RunLog.Warnf("compare with cm which is not SwitchInfo") + return false + } + return !SwitchInfoBusinessDataIsNotEqual(cm, anotherSwitchInfo) +} + +// IsSame compare with another cm +func (cm *NodeInfo) IsSame(another ConfigMapInterface) bool { + anotherNodeInfo, ok := another.(*NodeInfo) + if !ok { + hwlog.RunLog.Warnf("compare with cm which is not NodeInfo") + return false + } + return !NodeInfoBusinessDataIsNotEqual(cm, anotherNodeInfo) +} + +// DeviceInfoBusinessDataIsNotEqual determine the business data is not equal +func DeviceInfoBusinessDataIsNotEqual(oldDevInfo *DeviceInfo, devInfo *DeviceInfo) bool { + if oldDevInfo == nil && devInfo == nil { + hwlog.RunLog.Debug("both oldDevInfo and devInfo are nil") + return false + } + if oldDevInfo == nil || devInfo == nil { + hwlog.RunLog.Debug("one of oldDevInfo and devInfo is not empty, and the other is empty") + return true + } + if len(oldDevInfo.DeviceList) != len(devInfo.DeviceList) { + hwlog.RunLog.Debug("the length of the deviceList of oldDevInfo is not equal to that of the deviceList of devInfo") + return true + } + for nKey, nValue := range oldDevInfo.DeviceList { + oValue, exists := devInfo.DeviceList[nKey] + if !exists || nValue != oValue { + hwlog.RunLog.Debug("neither oldDevInfo nor devInfo is empty, but oldDevInfo is not equal to devInfo") + return true + } + } + hwlog.RunLog.Debug("oldDevInfo is equal to devInfo") + return false +} + +// SwitchInfoBusinessDataIsNotEqual judge is the faultcode and fault level is the same as known, if is not same returns true +func SwitchInfoBusinessDataIsNotEqual(oldSwitch, newSwitch *SwitchInfo) bool { + if oldSwitch == nil && newSwitch == nil { + return false + } + if (oldSwitch != nil && newSwitch == nil) || (oldSwitch == nil && newSwitch != nil) { + return true + } + if newSwitch.FaultLevel != oldSwitch.FaultLevel || newSwitch.NodeStatus != oldSwitch.NodeStatus || + len(newSwitch.FaultCode) != len(oldSwitch.FaultCode) { + return true + } + hwlog.RunLog.Debug("oldSwitch is equal to newSwitch") + return false +} + +// NodeInfoBusinessDataIsNotEqual determine the business data is not equal +func NodeInfoBusinessDataIsNotEqual(oldNodeInfo *NodeInfo, newNodeInfo *NodeInfo) bool { + if oldNodeInfo == nil && newNodeInfo == nil { + hwlog.RunLog.Debug("both oldNodeInfo and newNodeInfo are nil") + return false + } + if oldNodeInfo == nil || newNodeInfo == nil { + hwlog.RunLog.Debug("one of oldNodeInfo and newNodeInfo is not empty, and the other is empty") + return true + } + if oldNodeInfo.NodeStatus != newNodeInfo.NodeStatus || + len(oldNodeInfo.FaultDevList) != len(newNodeInfo.FaultDevList) { + hwlog.RunLog.Debug("neither oldNodeInfo nor newNodeInfo is empty, but oldNodeInfo is not equal to newNodeInfo") + return true + } + hwlog.RunLog.Debug("oldNodeInfo is equal to newNodeInfo") + return false +} + +func equalDeviceFault(one, other *DeviceFault) bool { + return one.FaultType == other.FaultType && + one.NPUName == other.NPUName && + one.LargeModelFaultLevel == other.LargeModelFaultLevel && + one.FaultLevel == other.FaultLevel && + one.FaultHandling == other.FaultHandling && + one.FaultCode == other.FaultCode && + maps.Equal(one.FaultTimeAndLevelMap, other.FaultTimeAndLevelMap) +} diff --git a/component/clusterd/pkg/common/constant/type.go b/component/clusterd/pkg/common/constant/type.go index 58deb303cfdefc13f4bc67c2d52318529f9bc16a..18b7bd2cc76e2706cb5c7585696c02b118f633f7 100644 --- a/component/clusterd/pkg/common/constant/type.go +++ b/component/clusterd/pkg/common/constant/type.go @@ -3,10 +3,6 @@ // Package constant a series of para package constant -import ( - "ascend-common/common-utils/hwlog" -) - // FaultTimeAndLevel of each fault code // some fault may not have accurate fault time and level, // for example: duration fault use current time as `FaultTime` @@ -216,14 +212,16 @@ type FaultProcessor interface { // AdvanceDeviceFaultCm more structure device info type AdvanceDeviceFaultCm struct { - ServerType string - CmName string - SuperPodID int32 - ServerIndex int32 - FaultDeviceList map[string][]DeviceFault - CardUnHealthy []string - NetworkUnhealthy []string - UpdateTime int64 + DeviceType string + CmName string + SuperPodID int32 + ServerIndex int32 + FaultDeviceList map[string][]DeviceFault + AvailableDeviceList []string + Recovering []string + CardUnHealthy []string + NetworkUnhealthy []string + UpdateTime int64 } // InformerCmItem informer configmap item of queue or buffer @@ -240,7 +238,7 @@ type OneConfigmapContent[T ConfigMapInterface] struct { // AllConfigmapContent contains all kind of configmap content type AllConfigmapContent struct { - DeviceCm map[string]*DeviceInfo + DeviceCm map[string]*AdvanceDeviceFaultCm SwitchCm map[string]*SwitchInfo NodeCm map[string]*NodeInfo } @@ -251,111 +249,6 @@ type ConfigMapInterface interface { IsSame(another ConfigMapInterface) bool } -// GetCmName get configmap name of device info -func (cm *DeviceInfo) GetCmName() string { - return cm.CmName -} - -// GetCmName get configmap name of switch info -func (cm *SwitchInfo) GetCmName() string { - return cm.CmName -} - -// GetCmName get configmap name of node info -func (cm *NodeInfo) GetCmName() string { - return cm.CmName -} - -// IsSame compare with another cm -func (cm *DeviceInfo) IsSame(another ConfigMapInterface) bool { - anotherDeviceInfo, ok := another.(*DeviceInfo) - if !ok { - hwlog.RunLog.Warnf("compare with cm which is not DeviceInfo") - return false - } - return !DeviceInfoBusinessDataIsNotEqual(cm, anotherDeviceInfo) -} - -// IsSame compare with another cm -func (cm *SwitchInfo) IsSame(another ConfigMapInterface) bool { - anotherSwitchInfo, ok := another.(*SwitchInfo) - if !ok { - hwlog.RunLog.Warnf("compare with cm which is not SwitchInfo") - return false - } - return !SwitchInfoBusinessDataIsNotEqual(cm, anotherSwitchInfo) -} - -// IsSame compare with another cm -func (cm *NodeInfo) IsSame(another ConfigMapInterface) bool { - anotherNodeInfo, ok := another.(*NodeInfo) - if !ok { - hwlog.RunLog.Warnf("compare with cm which is not NodeInfo") - return false - } - return !NodeInfoBusinessDataIsNotEqual(cm, anotherNodeInfo) -} - -// DeviceInfoBusinessDataIsNotEqual determine the business data is not equal -func DeviceInfoBusinessDataIsNotEqual(oldDevInfo *DeviceInfo, devInfo *DeviceInfo) bool { - if oldDevInfo == nil && devInfo == nil { - hwlog.RunLog.Debug("both oldDevInfo and devInfo are nil") - return false - } - if oldDevInfo == nil || devInfo == nil { - hwlog.RunLog.Debug("one of oldDevInfo and devInfo is not empty, and the other is empty") - return true - } - if len(oldDevInfo.DeviceList) != len(devInfo.DeviceList) { - hwlog.RunLog.Debug("the length of the deviceList of oldDevInfo is not equal to that of the deviceList of devInfo") - return true - } - for nKey, nValue := range oldDevInfo.DeviceList { - oValue, exists := devInfo.DeviceList[nKey] - if !exists || nValue != oValue { - hwlog.RunLog.Debug("neither oldDevInfo nor devInfo is empty, but oldDevInfo is not equal to devInfo") - return true - } - } - hwlog.RunLog.Debug("oldDevInfo is equal to devInfo") - return false -} - -// SwitchInfoBusinessDataIsNotEqual judge is the faultcode and fault level is the same as known, if is not same returns true -func SwitchInfoBusinessDataIsNotEqual(oldSwitch, newSwitch *SwitchInfo) bool { - if oldSwitch == nil && newSwitch == nil { - return false - } - if (oldSwitch != nil && newSwitch == nil) || (oldSwitch == nil && newSwitch != nil) { - return true - } - if newSwitch.FaultLevel != oldSwitch.FaultLevel || newSwitch.NodeStatus != oldSwitch.NodeStatus || - len(newSwitch.FaultCode) != len(oldSwitch.FaultCode) { - return true - } - hwlog.RunLog.Debug("oldSwitch is equal to newSwitch") - return false -} - -// NodeInfoBusinessDataIsNotEqual determine the business data is not equal -func NodeInfoBusinessDataIsNotEqual(oldNodeInfo *NodeInfo, newNodeInfo *NodeInfo) bool { - if oldNodeInfo == nil && newNodeInfo == nil { - hwlog.RunLog.Debug("both oldNodeInfo and newNodeInfo are nil") - return false - } - if oldNodeInfo == nil || newNodeInfo == nil { - hwlog.RunLog.Debug("one of oldNodeInfo and newNodeInfo is not empty, and the other is empty") - return true - } - if oldNodeInfo.NodeStatus != newNodeInfo.NodeStatus || - len(oldNodeInfo.FaultDevList) != len(newNodeInfo.FaultDevList) { - hwlog.RunLog.Debug("neither oldNodeInfo nor newNodeInfo is empty, but oldNodeInfo is not equal to newNodeInfo") - return true - } - hwlog.RunLog.Debug("oldNodeInfo is equal to newNodeInfo") - return false -} - // FaultRank defines the structure for storing fault rank information. // It includes the rank ID and fault code. type FaultRank struct { diff --git a/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager.go b/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager.go index cc1f02e3c2957f2dc5e60b964f88969c4ec6aebb..795a71293349ddcdc84167922f4cdad6ffb8a1ad 100644 --- a/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager.go +++ b/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager.go @@ -16,39 +16,35 @@ type ConfigMap[T constant.ConfigMapInterface] struct { Data map[string]T } -var DeviceCenterCmManager *FaultCenterCmManager[*constant.DeviceInfo] +var DeviceCenterCmManager *FaultCenterCmManager[*constant.AdvanceDeviceFaultCm] var SwitchCenterCmManager *FaultCenterCmManager[*constant.SwitchInfo] var NodeCenterCmManager *FaultCenterCmManager[*constant.NodeInfo] type FaultCenterCmManager[T constant.ConfigMapInterface] struct { - mutex sync.RWMutex - cmBuffer *collector.ConfigmapCollectBuffer[T] - originalCm ConfigMap[T] - processingCm ConfigMap[T] - processedCm ConfigMap[T] + mutex sync.RWMutex + cmBuffer *collector.ConfigmapCollectBuffer[T] + originalCm ConfigMap[T] + processedCm ConfigMap[T] } func init() { - DeviceCenterCmManager = &FaultCenterCmManager[*constant.DeviceInfo]{ - mutex: sync.RWMutex{}, - originalCm: ConfigMap[*constant.DeviceInfo]{Data: make(map[string]*constant.DeviceInfo)}, - processingCm: ConfigMap[*constant.DeviceInfo]{Data: make(map[string]*constant.DeviceInfo)}, - processedCm: ConfigMap[*constant.DeviceInfo]{Data: make(map[string]*constant.DeviceInfo)}, - cmBuffer: collector.DeviceCmCollectBuffer, + DeviceCenterCmManager = &FaultCenterCmManager[*constant.AdvanceDeviceFaultCm]{ + mutex: sync.RWMutex{}, + originalCm: ConfigMap[*constant.AdvanceDeviceFaultCm]{Data: make(map[string]*constant.AdvanceDeviceFaultCm)}, + processedCm: ConfigMap[*constant.AdvanceDeviceFaultCm]{Data: make(map[string]*constant.AdvanceDeviceFaultCm)}, + cmBuffer: collector.DeviceCmCollectBuffer, } SwitchCenterCmManager = &FaultCenterCmManager[*constant.SwitchInfo]{ - mutex: sync.RWMutex{}, - originalCm: ConfigMap[*constant.SwitchInfo]{Data: make(map[string]*constant.SwitchInfo)}, - processingCm: ConfigMap[*constant.SwitchInfo]{Data: make(map[string]*constant.SwitchInfo)}, - processedCm: ConfigMap[*constant.SwitchInfo]{Data: make(map[string]*constant.SwitchInfo)}, - cmBuffer: collector.SwitchCmCollectBuffer, + mutex: sync.RWMutex{}, + originalCm: ConfigMap[*constant.SwitchInfo]{Data: make(map[string]*constant.SwitchInfo)}, + processedCm: ConfigMap[*constant.SwitchInfo]{Data: make(map[string]*constant.SwitchInfo)}, + cmBuffer: collector.SwitchCmCollectBuffer, } NodeCenterCmManager = &FaultCenterCmManager[*constant.NodeInfo]{ - mutex: sync.RWMutex{}, - originalCm: ConfigMap[*constant.NodeInfo]{Data: make(map[string]*constant.NodeInfo)}, - processingCm: ConfigMap[*constant.NodeInfo]{Data: make(map[string]*constant.NodeInfo)}, - processedCm: ConfigMap[*constant.NodeInfo]{Data: make(map[string]*constant.NodeInfo)}, - cmBuffer: collector.NodeCmCollectBuffer, + mutex: sync.RWMutex{}, + originalCm: ConfigMap[*constant.NodeInfo]{Data: make(map[string]*constant.NodeInfo)}, + processedCm: ConfigMap[*constant.NodeInfo]{Data: make(map[string]*constant.NodeInfo)}, + cmBuffer: collector.NodeCmCollectBuffer, } } @@ -58,18 +54,6 @@ func (manager *FaultCenterCmManager[T]) GetOriginalCm() ConfigMap[T] { return manager.originalCm.deepCopy() } -func (manager *FaultCenterCmManager[T]) SetProcessingCm(cm ConfigMap[T]) { - manager.mutex.Lock() - defer manager.mutex.Unlock() - manager.processingCm = cm.deepCopy() -} - -func (manager *FaultCenterCmManager[T]) GetProcessingCm() ConfigMap[T] { - manager.mutex.RLock() - defer manager.mutex.RUnlock() - return manager.processingCm.deepCopy() -} - func (manager *FaultCenterCmManager[T]) SetProcessedCm(cm ConfigMap[T]) bool { manager.mutex.Lock() defer manager.mutex.Unlock() diff --git a/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager_test.go b/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager_test.go index a6cd5cbe4c465b2fb3fcbc03c2715cd7de8f3b51..76854b0833a0ae628bc24866675fd5dd32bf579e 100644 --- a/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager_test.go +++ b/component/clusterd/pkg/domain/faultdomain/cmmanager/configmap_manager_test.go @@ -8,8 +8,13 @@ import ( "sync" "testing" + "github.com/agiledragon/gomonkey/v2" + "github.com/smartystreets/goconvey/convey" + "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" + "clusterd/pkg/common/util" + "clusterd/pkg/domain/faultdomain/collector" ) func TestMain(m *testing.M) { @@ -35,24 +40,16 @@ func TestFaultCenterCmManagerSetAndGetConfigmap(t *testing.T) { t.Errorf("TestFaultCenterCmManagerSetAndGetDeviceInfoCm failed") } faultManager := FaultCenterCmManager[*constant.DeviceInfo]{ - mutex: sync.RWMutex{}, - originalCm: ConfigMap[*constant.DeviceInfo]{}, - processingCm: ConfigMap[*constant.DeviceInfo]{}, - processedCm: ConfigMap[*constant.DeviceInfo]{}, + mutex: sync.RWMutex{}, + originalCm: ConfigMap[*constant.DeviceInfo]{}, + processedCm: ConfigMap[*constant.DeviceInfo]{}, } faultManager.updateOriginalCm(cm1, true) faultManager.updateOriginalCm(cm2, true) if !reflect.DeepEqual(deviceCM, faultManager.GetOriginalCm()) { t.Errorf("TestFaultCenterCmManagerSetAndGetDeviceInfoCm failed") } - faultManager.SetProcessingCm(faultManager.GetOriginalCm()) - if !reflect.DeepEqual(deviceCM, faultManager.GetProcessingCm()) { - t.Errorf("TestFaultCenterCmManagerSetAndGetDeviceInfoCm failed") - } - faultManager.SetProcessedCm(faultManager.GetProcessingCm()) - if !reflect.DeepEqual(deviceCM, faultManager.GetProcessedCm()) { - t.Errorf("TestFaultCenterCmManagerSetAndGetDeviceInfoCm failed") - } + faultManager.updateOriginalCm(cm1, false) faultManager.updateOriginalCm(cm2, false) if len(faultManager.GetOriginalCm().Data) != 0 { @@ -60,3 +57,69 @@ func TestFaultCenterCmManagerSetAndGetConfigmap(t *testing.T) { } }) } + +func TestConfigMapEqual(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + + convey.Convey("Test ConfigMap.equal()", t, func() { + cm1 := ConfigMap[*constant.AdvanceDeviceFaultCm]{ + Data: map[string]*constant.AdvanceDeviceFaultCm{ + "node": { + FaultDeviceList: nil, + AvailableDeviceList: []string{"2", "1"}, + Recovering: []string{"4", "3"}, + CardUnHealthy: []string{"6", "5"}, + NetworkUnhealthy: []string{"6", "5"}, + UpdateTime: 10, + }, + }, + } + cm2 := new(ConfigMap[*constant.AdvanceDeviceFaultCm]) + util.DeepCopy(cm2, cm1) + + convey.Convey("should return true for equal config maps", func() { + convey.So(cm1.equal(*cm2), convey.ShouldBeTrue) + }) + }) +} + +func TestUpdateBatchOriginalCm(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + + convey.Convey("Test UpdateBatchOriginalCm()", t, func() { + manager := FaultCenterCmManager[*constant.AdvanceDeviceFaultCm]{ + mutex: sync.RWMutex{}, + cmBuffer: &collector.ConfigmapCollectBuffer[*constant.AdvanceDeviceFaultCm]{}, + originalCm: ConfigMap[*constant.AdvanceDeviceFaultCm]{}, + processedCm: ConfigMap[*constant.AdvanceDeviceFaultCm]{}, + } + manager.UpdateBatchOriginalCm() + convey.So(manager.originalCm.Data, convey.ShouldBeEmpty) + }) +} + +func TestSetAndGetProcessedCm(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + + convey.Convey("Test SetProcessedCm() and GetProcessedCm()", t, func() { + manager := FaultCenterCmManager[*constant.AdvanceDeviceFaultCm]{ + mutex: sync.RWMutex{}, + cmBuffer: &collector.ConfigmapCollectBuffer[*constant.AdvanceDeviceFaultCm]{}, + originalCm: ConfigMap[*constant.AdvanceDeviceFaultCm]{}, + processedCm: ConfigMap[*constant.AdvanceDeviceFaultCm]{}, + } + cm := ConfigMap[*constant.AdvanceDeviceFaultCm]{ + Data: map[string]*constant.AdvanceDeviceFaultCm{ + "node": {}, + }, + } + manager.SetProcessedCm(cm) + convey.So(manager.processedCm, convey.ShouldResemble, cm) + + processedCm := manager.GetProcessedCm() + convey.So(processedCm, convey.ShouldResemble, cm) + }) +} diff --git a/component/clusterd/pkg/domain/faultdomain/collector/cm_collector.go b/component/clusterd/pkg/domain/faultdomain/collector/cm_collector.go index 7ffe01740edb78b153f1d10d52436068ca0b1fb2..8ebe5f4dba39f702a026e990b5a5b0cf470c7393 100644 --- a/component/clusterd/pkg/domain/faultdomain/collector/cm_collector.go +++ b/component/clusterd/pkg/domain/faultdomain/collector/cm_collector.go @@ -8,9 +8,10 @@ import ( "ascend-common/common-utils/hwlog" "clusterd/pkg/common/constant" + "clusterd/pkg/domain/faultdomain" ) -var DeviceCmCollectBuffer *ConfigmapCollectBuffer[*constant.DeviceInfo] +var DeviceCmCollectBuffer *ConfigmapCollectBuffer[*constant.AdvanceDeviceFaultCm] var NodeCmCollectBuffer *ConfigmapCollectBuffer[*constant.NodeInfo] var SwitchCmCollectBuffer *ConfigmapCollectBuffer[*constant.SwitchInfo] @@ -21,10 +22,10 @@ type ConfigmapCollectBuffer[T constant.ConfigMapInterface] struct { } func init() { - DeviceCmCollectBuffer = &ConfigmapCollectBuffer[*constant.DeviceInfo]{ + DeviceCmCollectBuffer = &ConfigmapCollectBuffer[*constant.AdvanceDeviceFaultCm]{ mutex: sync.Mutex{}, - buffer: make(map[string]*[]constant.InformerCmItem[*constant.DeviceInfo]), - lastItem: make(map[string]constant.InformerCmItem[*constant.DeviceInfo]), + buffer: make(map[string]*[]constant.InformerCmItem[*constant.AdvanceDeviceFaultCm]), + lastItem: make(map[string]constant.InformerCmItem[*constant.AdvanceDeviceFaultCm]), } NodeCmCollectBuffer = &ConfigmapCollectBuffer[*constant.NodeInfo]{ mutex: sync.Mutex{}, @@ -86,7 +87,10 @@ func informerItemEqual[T constant.ConfigMapInterface](lastItem, newItem constant func informInfoUpdate(newInfo any, whichToInformer int, isAdd bool) { switch whichToInformer { case constant.DeviceProcessType: - DeviceCmCollectBuffer.Push(newInfo.(*constant.DeviceInfo), isAdd) + advanceFaultForNode := + faultdomain.GetAdvanceFaultForNode(newInfo.(*constant.DeviceInfo)).(*constant.AdvanceDeviceFaultCm) + faultdomain.SortDataForAdvanceDeviceInfo(advanceFaultForNode) + DeviceCmCollectBuffer.Push(advanceFaultForNode, isAdd) case constant.NodeProcessType: NodeCmCollectBuffer.Push(newInfo.(*constant.NodeInfo), isAdd) case constant.SwitchProcessType: diff --git a/component/clusterd/pkg/domain/faultdomain/collector/cm_collector_test.go b/component/clusterd/pkg/domain/faultdomain/collector/cm_collector_test.go index 17bf8d7d264ae03adfbdfd62c8c757b1967b2d2f..2824d1e9794112ec67751fbb16a81bc47a8ba81d 100644 --- a/component/clusterd/pkg/domain/faultdomain/collector/cm_collector_test.go +++ b/component/clusterd/pkg/domain/faultdomain/collector/cm_collector_test.go @@ -17,10 +17,10 @@ const ( ) func resetDeviceCmCollector() { - DeviceCmCollectBuffer = &ConfigmapCollectBuffer[*constant.DeviceInfo]{ + DeviceCmCollectBuffer = &ConfigmapCollectBuffer[*constant.AdvanceDeviceFaultCm]{ mutex: sync.Mutex{}, - buffer: make(map[string]*[]constant.InformerCmItem[*constant.DeviceInfo]), - lastItem: make(map[string]constant.InformerCmItem[*constant.DeviceInfo]), + buffer: make(map[string]*[]constant.InformerCmItem[*constant.AdvanceDeviceFaultCm]), + lastItem: make(map[string]constant.InformerCmItem[*constant.AdvanceDeviceFaultCm]), } } func TestCmInfoCollector(t *testing.T) { diff --git a/component/clusterd/pkg/domain/faultdomain/fault_utils.go b/component/clusterd/pkg/domain/faultdomain/fault_utils.go index 41992b4d73cfd2b61bebc9f757bd0f494d8a62c8..c2df068a2772739fc7790a44e578657e3075ceca 100644 --- a/component/clusterd/pkg/domain/faultdomain/fault_utils.go +++ b/component/clusterd/pkg/domain/faultdomain/fault_utils.go @@ -53,69 +53,107 @@ func CmNameToNodeName(cmName string) string { return strings.TrimPrefix(cmName, constant.DeviceInfoPrefix) } -func nodeNameToCmName(nodeName string) string { - return constant.DeviceInfoPrefix + nodeName +// GetAdvanceFaultCm return more usable fault cm, ONLY FOR TESTCASE +func GetAdvanceFaultCm[U, T constant.ConfigMapInterface]( + cmInfos map[string]T) map[string]U { + result := make(map[string]U) + for _, info := range cmInfos { + result[CmNameToNodeName(info.GetCmName())] = GetAdvanceFaultForNode(info).(U) + } + return result } -// GetAdvanceDeviceCmForNodeMap get advance device cm for node map -func GetAdvanceDeviceCmForNodeMap( - deviceInfoCms map[string]*constant.DeviceInfo) map[string]constant.AdvanceDeviceFaultCm { - advanceDeviceCmForNodeMap := make(map[string]constant.AdvanceDeviceFaultCm) - for _, deviceInfo := range deviceInfoCms { - advanceDeviceCmForNodeMap[CmNameToNodeName(deviceInfo.CmName)] = GetAdvanceDeviceCm(deviceInfo) +// GetAdvanceFaultForNode return more usable fault cm for one node +func GetAdvanceFaultForNode[T constant.ConfigMapInterface](cmForNode T) constant.ConfigMapInterface { + switch cm := any(cmForNode).(type) { + case *constant.DeviceInfo: + return GetAdvanceDeviceCm(cm) + case *constant.NodeInfo: + return cm + case *constant.SwitchInfo: + return cm + case *constant.AdvanceDeviceFaultCm: + return cm + default: + hwlog.RunLog.Errorf("cmForNode type is not support.") + return nil } - return advanceDeviceCmForNodeMap } -// GetAdvanceDeviceCm deviceName->faults -func GetAdvanceDeviceCm(devInfo *constant.DeviceInfo) constant.AdvanceDeviceFaultCm { - advanceDeviceCm := constant.AdvanceDeviceFaultCm{ +// GetAdvanceDeviceCm return more usable device cm +func GetAdvanceDeviceCm(devInfo *constant.DeviceInfo) *constant.AdvanceDeviceFaultCm { + advanceDeviceCm := &constant.AdvanceDeviceFaultCm{ CmName: devInfo.CmName, SuperPodID: devInfo.SuperPodID, ServerIndex: devInfo.ServerIndex, UpdateTime: devInfo.UpdateTime, - ServerType: GetDeviceType(devInfo), + DeviceType: GetDeviceType(devInfo), } - if faultList, ok := devInfo.DeviceList[GetFaultListKey(devInfo)]; ok { - var devicesFault []constant.DeviceFault - err := json.Unmarshal([]byte(faultList), &devicesFault) - if err != nil { - hwlog.RunLog.Errorf("get fault list for node %v failed. "+ - "Json unmarshall exception: %v", devInfo.CmName, err) - return advanceDeviceCm - } - deviceFaultMap := make(map[string][]constant.DeviceFault) - for _, deviceFault := range devicesFault { - if _, ok := deviceFaultMap[deviceFault.NPUName]; !ok { - deviceFaultMap[deviceFault.NPUName] = make([]constant.DeviceFault, 0) - } - hwlog.RunLog.Debugf("device fault: %s of cm %s, time: %s", - util.ObjToString(deviceFault), devInfo.CmName, util.ReadableMsTime(devInfo.UpdateTime)) - // device plugin may merge multiple fault codes in one string - deviceFaults := splitDeviceFault(deviceFault, CmNameToNodeName(devInfo.CmName)) - deviceFaultMap[deviceFault.NPUName] = append(deviceFaultMap[deviceFault.NPUName], deviceFaults...) - } - advanceDeviceCm.FaultDeviceList = deviceFaultMap - } else { + advanceDeviceCm.FaultDeviceList = getFaultListInfo(devInfo) + advanceDeviceCm.NetworkUnhealthy = getNetworkUnhealthyCardList(devInfo) + advanceDeviceCm.CardUnHealthy = getCardUnHealthy(devInfo) + advanceDeviceCm.AvailableDeviceList = getAvailableDevices(devInfo) + advanceDeviceCm.Recovering = getRecoveringDevList(devInfo) + return advanceDeviceCm +} + +func getFaultListInfo(devInfo *constant.DeviceInfo) map[string][]constant.DeviceFault { + _, faultList := getFaultListString(devInfo) + if len(faultList) == 0 { hwlog.RunLog.Infof("get fault list for node %v failed. fault list does not exist", devInfo.CmName) + return make(map[string][]constant.DeviceFault) } - if networkUnhealthyCardList, ok := devInfo.DeviceList[GetNetworkUnhealthyKey(devInfo)]; ok { - cardList := strings.Split(networkUnhealthyCardList, ",") - advanceDeviceCm.NetworkUnhealthy = cardList - } else { - hwlog.RunLog.Infof("get NetworkUnhealthy list for node %v failed. fault list does not exist", - devInfo.CmName) + var devicesFault []constant.DeviceFault + err := json.Unmarshal([]byte(faultList), &devicesFault) + if err != nil { + hwlog.RunLog.Errorf("get fault list for node %v failed. "+ + "Json unmarshall exception: %v", devInfo.CmName, err) + return make(map[string][]constant.DeviceFault) } - if cardUnhealthyCardList, ok := devInfo.DeviceList[GetCardUnhealthyKey(devInfo)]; ok { - var cardList []string - if len(cardUnhealthyCardList) == 0 { - cardList = make([]string, 0) - } else { - cardList = strings.Split(cardUnhealthyCardList, ",") + deviceFaultMap := make(map[string][]constant.DeviceFault) + for _, deviceFault := range devicesFault { + if _, ok := deviceFaultMap[deviceFault.NPUName]; !ok { + deviceFaultMap[deviceFault.NPUName] = make([]constant.DeviceFault, 0) } - advanceDeviceCm.CardUnHealthy = cardList + hwlog.RunLog.Debugf("device fault: %s of cm %s, time: %s", + util.ObjToString(deviceFault), devInfo.CmName, util.ReadableMsTime(devInfo.UpdateTime)) + // device plugin may merge multiple fault codes in one string + deviceFaults := splitDeviceFault(deviceFault, CmNameToNodeName(devInfo.CmName)) + deviceFaultMap[deviceFault.NPUName] = append(deviceFaultMap[deviceFault.NPUName], deviceFaults...) } - return advanceDeviceCm + return deviceFaultMap +} + +func getCardUnHealthy(devInfo *constant.DeviceInfo) []string { + _, info := getCardUnhealthyString(devInfo) + if len(info) == 0 { + return make([]string, 0) + } + return strings.Split(info, ",") +} + +func getNetworkUnhealthyCardList(devInfo *constant.DeviceInfo) []string { + _, info := getNetworkUnhealthyString(devInfo) + if len(info) == 0 { + return make([]string, 0) + } + return strings.Split(info, ",") +} + +func getAvailableDevices(devInfo *constant.DeviceInfo) []string { + _, info := getAvailDevListString(devInfo) + if len(info) == 0 { + return make([]string, 0) + } + return strings.Split(info, ",") +} + +func getRecoveringDevList(devInfo *constant.DeviceInfo) []string { + _, info := getRecoveringString(devInfo) + if len(info) == 0 { + return make([]string, 0) + } + return strings.Split(info, ",") } // GetDeviceType get device type from device info @@ -180,7 +218,9 @@ func splitDeviceFault(faultInfo constant.DeviceFault, nodeName string) []constan func mergeDeviceFault(notGroupDeviceFaults []constant.DeviceFault) ([]constant.DeviceFault, error) { faultsGroupByType := faultsGroupByType(notGroupDeviceFaults) result := make([]constant.DeviceFault, 0) - for _, faultsGroup := range faultsGroupByType { + faultTypes := getSortedKeys(faultsGroupByType) + for _, faultType := range faultTypes { + faultsGroup := faultsGroupByType[faultType] deviceName := faultsGroup[0].NPUName fautLevels := make([]string, 0) newTimeAndLevelMap := make(map[string]constant.FaultTimeAndLevel, len(faultsGroup)) @@ -211,83 +251,76 @@ func mergeDeviceFault(notGroupDeviceFaults []constant.DeviceFault) ([]constant.D return result, nil } -// DeleteFaultFromFaultMap delete fault from faultMap -func DeleteFaultFromFaultMap(faultMap map[string][]constant.DeviceFault, - delFault constant.DeviceFault) map[string][]constant.DeviceFault { - if faultMap == nil { - return make(map[string][]constant.DeviceFault) +func AdvanceFaultMapToOriginalFaultMap[U, T constant.ConfigMapInterface](advanceFaultCm map[string]T) map[string]U { + orgFaultCm := make(map[string]U) + for _, advanceCmForNode := range advanceFaultCm { + orgFaultCm[advanceCmForNode.GetCmName()] = AdvanceCmToOriginalCm(advanceCmForNode).(U) } - deviceFaults, ok := faultMap[delFault.NPUName] - if !ok { - return faultMap - } - newDeviceFaults := make([]constant.DeviceFault, 0) - for _, fault := range deviceFaults { - if reflect.DeepEqual(delFault, fault) { - continue - } - newDeviceFaults = append(newDeviceFaults, fault) - } - faultMap[delFault.NPUName] = newDeviceFaults - return faultMap + return orgFaultCm } -// AddFaultIntoFaultMap add fault into faultMap -func AddFaultIntoFaultMap(faultMap map[string][]constant.DeviceFault, - addFault constant.DeviceFault) map[string][]constant.DeviceFault { - if faultMap == nil { - faultMap = make(map[string][]constant.DeviceFault) - } - deviceFaults, ok := faultMap[addFault.NPUName] - if !ok { - deviceFaults = make([]constant.DeviceFault, 0) +func AdvanceCmToOriginalCm[T constant.ConfigMapInterface](advanceCmForNode T) constant.ConfigMapInterface { + switch cm := any(advanceCmForNode).(type) { + case *constant.AdvanceDeviceFaultCm: + return AdvanceDevCmToOrigCm(cm) + case *constant.SwitchInfo: + return cm + case *constant.NodeInfo: + return cm + default: + hwlog.RunLog.Errorf("AdvanceFaultCmToOriginalCmForNode don't support this type.") + return nil } - isExisting := false - for _, fault := range deviceFaults { - if reflect.DeepEqual(addFault, fault) { - isExisting = true - break - } +} + +// AdvanceDevCmToOrigCm convert advance device cm to original format +func AdvanceDevCmToOrigCm(advanceDeviceCm *constant.AdvanceDeviceFaultCm) *constant.DeviceInfo { + orgDeviceCm := &constant.DeviceInfo{ + DeviceInfoNoName: constant.DeviceInfoNoName{ + DeviceList: make(map[string]string), + UpdateTime: advanceDeviceCm.UpdateTime, + }, + CmName: advanceDeviceCm.CmName, + SuperPodID: advanceDeviceCm.SuperPodID, + ServerIndex: advanceDeviceCm.ServerIndex, } - if !isExisting { - deviceFaults = append(deviceFaults, addFault) + + mergeCode(advanceDeviceCm) + + orgDeviceCm.DeviceList[advanceDeviceCm.GetFaultDeviceListKey()] = + util.ObjToString(faultMapToFaultList(advanceDeviceCm.FaultDeviceList)) + + orgDeviceCm.DeviceList[advanceDeviceCm.GetNetworkUnhealthyKey()] = "" + if len(advanceDeviceCm.NetworkUnhealthy) > 0 { + orgDeviceCm.DeviceList[advanceDeviceCm.GetNetworkUnhealthyKey()] = + strings.Join(advanceDeviceCm.NetworkUnhealthy, ",") } - faultMap[addFault.NPUName] = deviceFaults - return faultMap -} -// AdvanceDeviceCmForNodeMapToString convert advance device cm to original format -func AdvanceDeviceCmForNodeMapToString( - advanceDeviceCm map[string]constant.AdvanceDeviceFaultCm, orgDeviceCm map[string]*constant.DeviceInfo) { - for nodeName, advanceCm := range advanceDeviceCm { - advanceCm = mergeCodeAndRemoveUnhealthy(advanceCm) - cmName := nodeNameToCmName(nodeName) - deviceInfo, found := orgDeviceCm[cmName] - if !found { - continue - } - faultListKey := GetFaultListKey(deviceInfo) - if faultListKey != "" { - orgDeviceCm[cmName].DeviceList[faultListKey] = - util.ObjToString(faultMapToFaultList(advanceCm.FaultDeviceList)) - } + orgDeviceCm.DeviceList[advanceDeviceCm.GetCardUnHealthyKey()] = "" + if len(advanceDeviceCm.CardUnHealthy) > 0 { + orgDeviceCm.DeviceList[advanceDeviceCm.GetCardUnHealthyKey()] = + strings.Join(advanceDeviceCm.CardUnHealthy, ",") + } - networkUnhealthyKey := GetNetworkUnhealthyKey(deviceInfo) - if networkUnhealthyKey != "" { - orgDeviceCm[cmName].DeviceList[networkUnhealthyKey] = strings.Join(advanceCm.NetworkUnhealthy, ",") - } + orgDeviceCm.DeviceList[advanceDeviceCm.GetRecoveringKey()] = "" + if len(advanceDeviceCm.Recovering) > 0 { + orgDeviceCm.DeviceList[advanceDeviceCm.GetRecoveringKey()] = + strings.Join(advanceDeviceCm.Recovering, ",") + } - cardUnhealthyKey := GetCardUnhealthyKey(deviceInfo) - if cardUnhealthyKey != "" { - orgDeviceCm[cmName].DeviceList[cardUnhealthyKey] = strings.Join(advanceCm.CardUnHealthy, ",") - } + orgDeviceCm.DeviceList[advanceDeviceCm.GetAvailableDeviceListKey()] = "" + if len(advanceDeviceCm.AvailableDeviceList) > 0 { + orgDeviceCm.DeviceList[advanceDeviceCm.GetAvailableDeviceListKey()] = + strings.Join(advanceDeviceCm.AvailableDeviceList, ",") } + return orgDeviceCm } func faultMapToFaultList(deviceFaultMap map[string][]constant.DeviceFault) []constant.DeviceFault { deviceFaultList := make([]constant.DeviceFault, 0) - for _, faultList := range deviceFaultMap { - deviceFaultList = append(deviceFaultList, faultList...) + deviceNames := getSortedKeys(deviceFaultMap) + for _, deviceName := range deviceNames { + deviceFaultList = append(deviceFaultList, deviceFaultMap[deviceName]...) } return deviceFaultList } @@ -313,17 +346,8 @@ func isFaultDeletable(faults []constant.DeviceFault, faultTypes []string, faultL return true } -func mergeCodeAndRemoveUnhealthy(advanceDeviceCm constant.AdvanceDeviceFaultCm) constant.AdvanceDeviceFaultCm { +func mergeCode(advanceDeviceCm *constant.AdvanceDeviceFaultCm) { for deviceName, faults := range advanceDeviceCm.FaultDeviceList { - deletableFaultLevels := []string{constant.NotHandleFault, constant.SubHealthFault} - if isFaultDeletable(faults, []string{constant.CardUnhealthy, constant.PublicFaultType}, deletableFaultLevels) { - advanceDeviceCm.CardUnHealthy = util.DeleteStringSliceItem(advanceDeviceCm.CardUnHealthy, deviceName) - hwlog.RunLog.Debugf("remove device %s from CardUnHealthy", deviceName) - } - if isFaultDeletable(faults, []string{constant.CardNetworkUnhealthy}, deletableFaultLevels) { - advanceDeviceCm.NetworkUnhealthy = util.DeleteStringSliceItem(advanceDeviceCm.NetworkUnhealthy, deviceName) - hwlog.RunLog.Debugf("remove device %s from NetworkUnhealthy", deviceName) - } if len(faults) == 0 { continue } @@ -334,51 +358,29 @@ func mergeCodeAndRemoveUnhealthy(advanceDeviceCm constant.AdvanceDeviceFaultCm) } advanceDeviceCm.FaultDeviceList[deviceName] = mergedFaults } - return advanceDeviceCm } -// GetFaultListKey get FaultList key in DeviceInfo -func GetFaultListKey(devInfo *constant.DeviceInfo) string { - for key, _ := range devInfo.DeviceList { - if strings.Contains(key, constant.NPUPreName) && strings.Contains(key, "-Fault") { - return key - } - } - return "" +func getNetworkUnhealthyString(devInfo *constant.DeviceInfo) (string, string) { + key := api.ResourceNamePrefix + GetDeviceType(devInfo) + constant.CmCardNetworkUnhealthySuffix + return key, devInfo.DeviceList[key] } -// GetNetworkUnhealthyKey get networkUnhealthy key in DeviceInfo -func GetNetworkUnhealthyKey(devInfo *constant.DeviceInfo) string { - for key, _ := range devInfo.DeviceList { - if strings.Contains(key, constant.NPUPreName) && strings.Contains(key, "-NetworkUnhealthy") { - return key - } - } - return "" +func getCardUnhealthyString(devInfo *constant.DeviceInfo) (string, string) { + key := api.ResourceNamePrefix + GetDeviceType(devInfo) + constant.CmCardUnhealthySuffix + return key, devInfo.DeviceList[key] } -// GetCardUnhealthyKey get CardUnhealthy key in DeviceInfo -func GetCardUnhealthyKey(devInfo *constant.DeviceInfo) string { - for key, _ := range devInfo.DeviceList { - if strings.Contains(key, constant.NPUPreName) && strings.Contains(key, "-Unhealthy") { - return key - } - } - return "" +func getRecoveringString(devInfo *constant.DeviceInfo) (string, string) { + key := api.ResourceNamePrefix + GetDeviceType(devInfo) + constant.CmRecoveringSuffix + return key, devInfo.DeviceList[key] } -// GetFaultListInfo get fault list info -func GetFaultListInfo(devCMInfo *constant.DeviceInfo) (string, string) { - for faultKey, faultInfo := range devCMInfo.DeviceList { - if strings.Contains(faultKey, constant.NPUPreName) && strings.Contains(faultKey, "-Fault") { - return faultKey, faultInfo - } - } - return "", "" +func getFaultListString(devInfo *constant.DeviceInfo) (string, string) { + key := api.ResourceNamePrefix + GetDeviceType(devInfo) + constant.CmFaultListSuffix + return key, devInfo.DeviceList[key] } -// GetAvailDevListInfo get available device list info -func GetAvailDevListInfo(devCMInfo *constant.DeviceInfo) (string, string) { +func getAvailDevListString(devCMInfo *constant.DeviceInfo) (string, string) { availKey := api.ResourceNamePrefix + GetDeviceType(devCMInfo) availDevList, ok := devCMInfo.DeviceList[availKey] if !ok { @@ -387,48 +389,6 @@ func GetAvailDevListInfo(devCMInfo *constant.DeviceInfo) (string, string) { return availKey, availDevList } -// DelDevFromAvailList delete device from available device list -func DelDevFromAvailList(devCMInfo *constant.DeviceInfo, npuNames []string) { - availKey, availList := GetAvailDevListInfo(devCMInfo) - if len(availList) == 0 { - return - } - splitList := strings.Split(availList, ",") - for _, npuName := range npuNames { - splitList = util.DeleteStringSliceItem(splitList, npuName) - } - devCMInfo.DeviceList[availKey] = strings.Join(splitList, ",") - return -} - -// GetUnhealthyListInfo get unhealthy list info -func GetUnhealthyListInfo(devCMInfo *constant.DeviceInfo) (string, []string) { - for unHealthyKey, unHealthyCards := range devCMInfo.DeviceList { - if strings.Contains(unHealthyKey, constant.NPUPreName) && strings.Contains(unHealthyKey, "-Unhealthy") { - var cardList []string - if len(unHealthyCards) == 0 { - cardList = make([]string, 0) - } else { - cardList = strings.Split(unHealthyCards, ",") - } - return unHealthyKey, cardList - } - } - return "", []string{} -} - -// AddDevFromUnhealthyList add device from unhealthy list -func AddDevFromUnhealthyList(devCMInfo *constant.DeviceInfo, npuNames []string) { - unHealthyKey, unHealthyList := GetUnhealthyListInfo(devCMInfo) - for _, npuName := range npuNames { - if !util.IsSliceContain(npuName, unHealthyList) { - unHealthyList = append(unHealthyList, npuName) - } - } - sort.Strings(unHealthyList) - devCMInfo.DeviceList[unHealthyKey] = strings.Join(unHealthyList, ",") -} - // IsUceFault check faultCode is uce func IsUceFault(faultCode string) bool { if strings.Contains(faultCode, constant.UceFaultCode) { @@ -538,3 +498,69 @@ func ValidBusinessRecoverTime(recoverTime int64) bool { func ValidBusinessUceReportInfo(info *constant.ReportInfo) bool { return ValidBusinessRecoverTime(info.RecoverTime) } + +// SortDataForAdvanceDeviceInfo sort the field of deviceInfo +func SortDataForAdvanceDeviceInfo(deviceInfo *constant.AdvanceDeviceFaultCm) { + sort.Strings(deviceInfo.AvailableDeviceList) + sort.Strings(deviceInfo.CardUnHealthy) + sort.Strings(deviceInfo.NetworkUnhealthy) + sort.Strings(deviceInfo.Recovering) + for _, faultList := range deviceInfo.FaultDeviceList { + sort.Slice(faultList, func(i, j int) bool { + if compareDeviceFault(faultList[i], faultList[j]) <= 0 { + return true + } + return false + }) + } +} + +func compareDeviceFault(a, b constant.DeviceFault) int { + if res := strings.Compare(a.FaultType, b.FaultType); res != 0 { + return res + } + if res := strings.Compare(a.NPUName, b.NPUName); res != 0 { + return res + } + if res := strings.Compare(a.LargeModelFaultLevel, b.LargeModelFaultLevel); res != 0 { + return res + } + if res := strings.Compare(a.FaultLevel, b.FaultLevel); res != 0 { + return res + } + if res := strings.Compare(a.FaultHandling, b.FaultHandling); res != 0 { + return res + } + if res := strings.Compare(a.FaultCode, b.FaultCode); res != 0 { + return res + } + keysA := getSortedKeys(a.FaultTimeAndLevelMap) + keysB := getSortedKeys(b.FaultTimeAndLevelMap) + for i := 0; i < len(keysA); i++ { + if cmp := strings.Compare(keysA[i], keysB[i]); cmp != 0 { + return cmp + } + valA := a.FaultTimeAndLevelMap[keysA[i]] + valB := b.FaultTimeAndLevelMap[keysB[i]] + if cmp := compareFaultTimeAndLevel(valA, valB); cmp != 0 { + return cmp + } + } + return 0 +} + +func compareFaultTimeAndLevel(a, b constant.FaultTimeAndLevel) int { + if res := a.FaultTime - b.FaultTime; res != 0 { + return int(res) + } + return strings.Compare(a.FaultLevel, b.FaultLevel) +} + +func getSortedKeys[T any](m map[string]T) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + sort.Strings(keys) + return keys +} diff --git a/component/clusterd/pkg/domain/faultdomain/fault_utils_test.go b/component/clusterd/pkg/domain/faultdomain/fault_utils_test.go deleted file mode 100644 index e79d86b287db3cd8b5a6a2ea2ac181302819f1ac..0000000000000000000000000000000000000000 --- a/component/clusterd/pkg/domain/faultdomain/fault_utils_test.go +++ /dev/null @@ -1,572 +0,0 @@ -// Copyright (c) Huawei Technologies Co., Ltd. 2024-2025. All rights reserved. - -// Package faultmanager contain fault process -package faultdomain - -import ( - "reflect" - "sort" - "testing" - "time" - - "github.com/agiledragon/gomonkey/v2" - "k8s.io/api/core/v1" - - "ascend-common/common-utils/hwlog" - "clusterd/pkg/common/constant" - "clusterd/pkg/common/util" -) - -const ( - jobId = "JobId" - nodeName = "Node" - time100Seconds = int64(100000) - time120Seconds = int64(120000) - time1Seconds = int64(1000) - deviceId = "0" - rankID = "8" - cmName = "mindx-dl-deviceinfo-" + nodeName - deviceName = constant.Ascend910 + "-" + deviceId - originalDeviceFaultCodeCnt = 2 -) - -var ( - jobServerMap = constant.JobServerInfoMap{ - InfoMap: map[string]map[string]constant.ServerHccl{ - jobId: { - nodeName: { - DeviceList: []constant.Device{{ - DeviceID: deviceId, - RankID: rankID, - }}, - ServerName: nodeName, - }, - }, - }, - } - originalDeviceCm = &constant.DeviceInfo{ - CmName: cmName, - DeviceInfoNoName: constant.DeviceInfoNoName{ - DeviceList: map[string]string{ - "huawei.com/Ascend910-Fault": ` -[ - { - "fault_type": "CardUnhealthy", - "fault_code": "80E01801 , 80C98009 ", - "fault_time_and_level_map": - { - "80E01801": {"fault_time":100000, "fault_level": "RestartBusiness"}, - "80C98009": {"fault_time":120000, "fault_level": "NotHandleFault"} - },"npu_name": "Ascend910-0" - } -]`, - }, - }, - } -) - -func TestMain(m *testing.M) { - hwlog.InitRunLogger(&hwlog.LogConfig{OnlyToStdout: true}, nil) - m.Run() -} - -func TestSplitDeviceFault(t *testing.T) { - t.Run("TestSplitDeviceFault", func(t *testing.T) { - npuName := "Ascend910-0" - var faultInfo = constant.DeviceFault{ - NPUName: npuName, - FaultCode: "0x1,0x2", - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x1": {FaultLevel: constant.NotHandleFault, FaultTime: 1}, - "0x2": {FaultLevel: constant.SubHealthFault, FaultTime: 1}, - }, - } - - got := splitDeviceFault(faultInfo, "node1") - want := []constant.DeviceFault{ - { - NPUName: npuName, - FaultCode: "0x1", - FaultLevel: constant.NotHandleFault, - LargeModelFaultLevel: constant.NotHandleFault, - FaultHandling: constant.NotHandleFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x1": {FaultLevel: constant.NotHandleFault, FaultTime: 1}, - }, - }, { - NPUName: npuName, - FaultCode: "0x2", - FaultLevel: constant.SubHealthFault, - LargeModelFaultLevel: constant.SubHealthFault, - FaultHandling: constant.SubHealthFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x2": {FaultLevel: constant.SubHealthFault, FaultTime: 1}, - }, - }, - } - - if !reflect.DeepEqual(got, want) { - t.Errorf("splitDeviceFault() = %v, want %v", got, want) - } - }) -} - -// TestSplitDeviceFaultWithManuallySeparateFaultLevel should split out a DeviceFault as middle data, -// when dp report constant.ManuallySeparateNPU -func TestSplitDeviceFaultWithManuallySeparateFaultLevel(t *testing.T) { - t.Run("TestSplitDeviceFaultWithManuallySeparateFaultLevel", func(t *testing.T) { - npuName := "Ascend910-0" - var faultInfo = constant.DeviceFault{ - NPUName: npuName, - FaultCode: "", - FaultLevel: constant.ManuallySeparateNPU, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{}, - } - - got := splitDeviceFault(faultInfo, "node1") - want := []constant.DeviceFault{ - { - NPUName: npuName, - FaultCode: constant.ManuallySeparateNPU, - FaultLevel: constant.ManuallySeparateNPU, - LargeModelFaultLevel: constant.ManuallySeparateNPU, - FaultHandling: constant.ManuallySeparateNPU, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - constant.ManuallySeparateNPU: { - FaultTime: constant.UnknownFaultTime, - FaultLevel: constant.ManuallySeparateNPU, - }, - }, - }, - } - - if !reflect.DeepEqual(got, want) { - t.Errorf("splitDeviceFault() = %v, want %v", got, want) - } - }) -} - -// TestMergeSameTypeDeviceFault should be merged, when fault type is same -func TestMergeSameTypeDeviceFault(t *testing.T) { - t.Run("Test_mergeDeviceFault", func(t *testing.T) { - npuName := "Ascend910-0" - split := []constant.DeviceFault{ - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: "0x1", - FaultLevel: constant.NotHandleFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x1": {FaultLevel: constant.NotHandleFault, FaultTime: 1}, - "0x2": {FaultLevel: constant.SubHealthFault, FaultTime: 1}, - }, - }, - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: "0x2", - FaultLevel: constant.SubHealthFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x1": {FaultLevel: constant.NotHandleFault, FaultTime: 1}, - "0x2": {FaultLevel: constant.SubHealthFault, FaultTime: 1}, - }, - }, - } - want := []constant.DeviceFault{ - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: "0x1,0x2", - FaultLevel: constant.SubHealthFault, - LargeModelFaultLevel: constant.SubHealthFault, - FaultHandling: constant.SubHealthFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x1": {FaultLevel: constant.NotHandleFault, FaultTime: 1}, - "0x2": {FaultLevel: constant.SubHealthFault, FaultTime: 1}, - }, - }, - } - got, err := mergeDeviceFault(split) - if err != nil { - t.Errorf("mergeDeviceFault() error = %v", err) - } - if !reflect.DeepEqual(got, want) { - t.Errorf("mergeDeviceFault() got = %v, want %v", util.ObjToString(got), util.ObjToString(want)) - } - }) -} - -// TestMergeDifferentTypeDeviceFault should not be merged, when fault type isn't same -func TestMergeDifferentTypeDeviceFault(t *testing.T) { - t.Run("Test_mergeDeviceFault", func(t *testing.T) { - npuName := "Ascend910-0" - split := []constant.DeviceFault{ - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: "0x1", - FaultLevel: constant.NotHandleFault, - LargeModelFaultLevel: constant.NotHandleFault, - FaultHandling: constant.NotHandleFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x1": {FaultLevel: constant.NotHandleFault, FaultTime: 1}, - }, - }, - { - FaultType: constant.CardNetworkUnhealthy, - NPUName: npuName, - FaultCode: "0x2", - FaultLevel: constant.SubHealthFault, - LargeModelFaultLevel: constant.SubHealthFault, - FaultHandling: constant.SubHealthFault, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - "0x2": {FaultLevel: constant.SubHealthFault, FaultTime: 1}, - }, - }, - } - got, err := mergeDeviceFault(split) - if err != nil { - t.Errorf("mergeDeviceFault() error = %v", err) - } - sort.Slice(got, func(i, j int) bool { - return got[i].FaultType > got[j].FaultType - }) - if !reflect.DeepEqual(got, split) { - t.Errorf("mergeDeviceFault() got = %v, want %v", util.ObjToString(got), util.ObjToString(split)) - } - }) -} - -// TestMergeManuallySeparateNPUTypeDeviceFault should combine other fault info and constant.ManuallySeparateNPU. -func TestMergeManuallySeparateNPUTypeDeviceFault(t *testing.T) { - t.Run("TestMergeManuallySeparateNPUTypeDeviceFault", func(t *testing.T) { - npuName := "Ascend910-0" - split := []constant.DeviceFault{ - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: constant.ManuallySeparateNPU, - FaultLevel: constant.ManuallySeparateNPU, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{}, - }, - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: constant.UceFaultCode, - FaultLevel: constant.RestartBusiness, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - constant.UceFaultCode: { - FaultTime: constant.UnknownFaultTime, - FaultLevel: constant.RestartBusiness, - }, - }, - }, - } - got, err := mergeDeviceFault(split) - if err != nil { - t.Errorf("mergeDeviceFault() error = %v", err) - } - want := []constant.DeviceFault{ - { - FaultType: constant.CardUnhealthy, - NPUName: npuName, - FaultCode: constant.UceFaultCode, - FaultLevel: constant.ManuallySeparateNPU, - LargeModelFaultLevel: constant.ManuallySeparateNPU, - FaultHandling: constant.ManuallySeparateNPU, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - constant.UceFaultCode: { - FaultTime: constant.UnknownFaultTime, - FaultLevel: constant.RestartBusiness, - }, - }, - }, - } - if !reflect.DeepEqual(got, want) { - t.Error("TestMergeManuallySeparateNPUTypeDeviceFault fail") - } - }) -} - -// TestGetAdvanceDeviceCm should get advanceDeviceCm from originalDeviceCm -func TestGetAdvanceDeviceCm(t *testing.T) { - advanceDeviceCm := GetAdvanceDeviceCm(originalDeviceCm) - if len(advanceDeviceCm.FaultDeviceList[deviceName]) != originalDeviceFaultCodeCnt { - t.Errorf("TestGetAdvanceDeviceCm failed") - return - } - faultTimeAndLevel, ok := advanceDeviceCm.FaultDeviceList[deviceName][0].FaultTimeAndLevelMap[constant.UceFaultCode] - if !ok || faultTimeAndLevel.FaultTime != time100Seconds || - faultTimeAndLevel.FaultLevel != constant.RestartBusiness { - t.Errorf("TestGetAdvanceDeviceCm failed") - return - } -} - -// TestValidBusinessUceReportInfo valid business uce report info -func TestValidBusinessUceReportInfo(t *testing.T) { - t.Run("TestValidBusinessUceReportInfo", func(t *testing.T) { - reportInfo := &constant.ReportInfo{ - RecoverTime: time100Seconds - time1Seconds, - CompleteTime: 0, - } - mockTime := time.Time{} - mockUnixMilli := gomonkey.ApplyPrivateMethod(mockTime, "UnixMilli", func() int64 { - return time100Seconds - }) - mockNow := gomonkey.ApplyFunc(time.Now, func() time.Time { - return mockTime - }) - defer func() { - mockNow.Reset() - mockUnixMilli.Reset() - }() - if !ValidBusinessUceReportInfo(reportInfo) { - t.Error("TestValidBusinessUceReportInfo fail") - } - reportInfo.RecoverTime = 0 - if ValidBusinessUceReportInfo(reportInfo) { - t.Error("TestValidBusinessUceReportInfo fail") - } - }) -} - -// TestCanDoStepRetry check uceDeviceInfo can do step retry -func TestCanDoStepRetry(t *testing.T) { - uceDeviceInfo := &constant.UceDeviceInfo{ - DeviceName: deviceName, - FaultTime: time100Seconds, - RecoverTime: time100Seconds + time1Seconds, - CompleteTime: 0, - } - t.Run("TestCanDoStepRetry", func(t *testing.T) { - mockTime := time.Time{} - mockUnixMilli := gomonkey.ApplyPrivateMethod(mockTime, "UnixMilli", func() int64 { - return time120Seconds - }) - mockNow := gomonkey.ApplyFunc(time.Now, func() time.Time { - return mockTime - }) - defer func() { - mockNow.Reset() - mockUnixMilli.Reset() - }() - if !CanDoStepRetry(uceDeviceInfo) { - t.Error("TestCanDoStepRetry fail") - } - }) -} - -// TestGetContainedElementIdx should return id of the item from slice -func TestGetContainedElementIdx(t *testing.T) { - arr := []string{"1", "2"} - t.Run("TestGetContainedElementIdx", func(t *testing.T) { - if got := GetContainedElementIdx("1", arr); got != 0 { - t.Error("GetContainedElementIdx() fail") - } - if got := GetContainedElementIdx("3", arr); got != -1 { - t.Error("GetContainedElementIdx() fail") - } - }) -} - -// TestGetFaultTime should return fault time from DeviceFault -func TestGetFaultTime(t *testing.T) { - fault := constant.DeviceFault{ - FaultCode: constant.UceFaultCode, - FaultTimeAndLevelMap: map[string]constant.FaultTimeAndLevel{ - constant.UceFaultCode: { - FaultTime: time100Seconds, - FaultLevel: constant.RestartBusiness, - }, - }, - } - t.Run("TestGetFaultTime", func(t *testing.T) { - if got := GetFaultTime(fault, ""); got != time100Seconds { - t.Error("GetFaultTime fail") - } - fault.FaultTimeAndLevelMap = make(map[string]constant.FaultTimeAndLevel) - if got := GetFaultTime(fault, ""); got != constant.DeviceNotFault { - t.Error("GetFaultTime fail") - } - }) -} - -// TestFaultCodeJudge check fault code is right -func TestFaultCodeJudge(t *testing.T) { - t.Run("TestFaultCodeJudgeAic", func(t *testing.T) { - if got := IsUceAccompanyFault(constant.AicFaultCode); got == false { - t.Error("TestFaultCodeJudgeAic fail") - } - }) - t.Run("TestFaultCodeJudgeLinkDownFault", func(t *testing.T) { - if got := IsLinkDownFault(constant.LinkDownFaultCode); got == false { - t.Error("TestFaultCodeJudgeLinkDownFault fail") - } - }) - t.Run("TestFaultCodeJudgeCqeFault", func(t *testing.T) { - if got := IsCqeFault(constant.DevCqeFaultCode); got == false { - t.Error("TestFaultCodeJudgeCqeFault fail") - } - }) - t.Run("TestFaultCodeJudgeUceFault", func(t *testing.T) { - if got := IsUceFault(constant.UceFaultCode); got == false { - t.Error("TestFaultCodeJudgeUceFault fail") - } - }) -} - -// TestAdvanceDeviceCmForNodeMapToString should return string-format CM from AdvanceDeviceCm -func TestAdvanceDeviceCmForNodeMapToString(t *testing.T) { - deviceInfoCms := map[string]*constant.DeviceInfo{ - cmName: originalDeviceCm, - } - t.Run("TestAdvanceDeviceCmForNodeMapToString", func(t *testing.T) { - advanceMap := GetAdvanceDeviceCmForNodeMap(deviceInfoCms) - orgDeviceCm := make(map[string]*constant.DeviceInfo) - util.DeepCopy(&orgDeviceCm, deviceInfoCms) - AdvanceDeviceCmForNodeMapToString(advanceMap, orgDeviceCm) - if !reflect.DeepEqual(GetAdvanceDeviceCmForNodeMap(orgDeviceCm), GetAdvanceDeviceCmForNodeMap(deviceInfoCms)) { - t.Error("TestAdvanceDeviceCmForNodeMapToString fail") - } - }) -} - -// TestAddFaultAndDeleteFaultMap should add or delete fault right -func TestAddFaultAndDeleteFaultMap(t *testing.T) { - addFault := constant.DeviceFault{ - NPUName: deviceName, - } - t.Run("TestAddFaultIntoFaultMap", func(t *testing.T) { - faultMap := AddFaultIntoFaultMap(nil, addFault) - if len(faultMap[addFault.NPUName]) != 1 { - t.Error("TestAddFaultIntoFaultMap fail") - } - }) - t.Run("TestDeleteFaultFromFaultMap", func(t *testing.T) { - faultMap := DeleteFaultFromFaultMap(nil, addFault) - if len(faultMap[addFault.NPUName]) != 0 { - t.Error("TestDeleteFaultFromFaultMap fail") - } - faultMap = AddFaultIntoFaultMap(nil, addFault) - faultMap = DeleteFaultFromFaultMap(faultMap, addFault) - if len(faultMap[addFault.NPUName]) != 0 { - t.Error("TestDeleteFaultFromFaultMap fail") - } - }) -} - -// TestGetAdvanceDeviceCmForNodeMap should get AdvanceDeviceCm -func TestGetAdvanceDeviceCmForNodeMap(t *testing.T) { - deviceInfoCms := map[string]*constant.DeviceInfo{ - cmName: originalDeviceCm, - } - t.Run("TestGetAdvanceDeviceConfigmap", func(t *testing.T) { - got := GetAdvanceDeviceCmForNodeMap(deviceInfoCms) - if len(got[nodeName].FaultDeviceList[deviceName]) != originalDeviceFaultCodeCnt { - t.Error("TestGetAdvanceDeviceConfigmap fail") - } - }) -} - -// TestGetNodeAndDeviceFromJobIdAndRankId should return right node and device according to the jobId and rankID -func TestGetNodeAndDeviceFromJobIdAndRankId(t *testing.T) { - t.Run("TestGetNodeAndDeviceFromJobIdAndRankId", func(t *testing.T) { - serverName, device, err := GetNodeAndDeviceFromJobIdAndRankId(jobId, rankID, jobServerMap) - if serverName != nodeName || device != deviceId || err != nil { - t.Error("TestGetNodeAndDeviceFromJobIdAndRankId fail") - } - }) -} - -// TestIsNodeReady check node is ready -func TestIsNodeReady(t *testing.T) { - node := &v1.Node{ - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{{ - Type: v1.NodeReady, - Status: v1.ConditionTrue, - }}, - }, - } - t.Run("TestIsNodeReady", func(t *testing.T) { - if !IsNodeReady(node) { - t.Error("TestIsNodeReady fail") - } - }) -} - -func getCardNotHandleFaults() []constant.DeviceFault { - return append([]constant.DeviceFault{}, constant.DeviceFault{ - FaultType: constant.CardUnhealthy, - FaultLevel: constant.NotHandleFault, - }) -} - -func getCardNotHandleAndPublicFaultSeparateNPU() []constant.DeviceFault { - return append([]constant.DeviceFault{}, constant.DeviceFault{ - FaultType: constant.CardUnhealthy, - FaultLevel: constant.NotHandleFault, - }, constant.DeviceFault{ - FaultType: constant.PublicFaultType, - FaultLevel: constant.SeparateNPU, - }) -} - -func getCardNetworkNotHandleAndPublicFaultSeparateNPU() []constant.DeviceFault { - return append([]constant.DeviceFault{}, constant.DeviceFault{ - FaultType: constant.CardNetworkUnhealthy, - FaultLevel: constant.NotHandleFault, - }, constant.DeviceFault{ - FaultType: constant.PublicFaultType, - FaultLevel: constant.SeparateNPU, - }) -} - -func getCardNotHandleAndPublicFaultSubHealth() []constant.DeviceFault { - return append([]constant.DeviceFault{}, constant.DeviceFault{ - FaultType: constant.CardUnhealthy, - FaultLevel: constant.NotHandleFault, - }, constant.DeviceFault{ - FaultType: constant.PublicFaultType, - FaultLevel: constant.SubHealthFault, - }) -} - -// TestIsFaultDeletable check faults in specified fault type are NotHandleFault and SubHealthFault -func TestIsFaultDeletable(t *testing.T) { - t.Run("TestIsFaultDeletable", func(t *testing.T) { - deletableFaultLevels := []string{constant.NotHandleFault, constant.SubHealthFault} - faults := getCardNotHandleFaults() - if !isFaultDeletable(faults, []string{constant.CardUnhealthy}, deletableFaultLevels) { - t.Error("when only NotHandleFault in CardUnhealthy then should remove") - } - if !isFaultDeletable(faults, []string{constant.CardNetworkUnhealthy}, deletableFaultLevels) { - t.Error("when no fault in CardNetworkUnhealthy then should remove from CardNetworkUnhealthy") - } - - faults = getCardNotHandleAndPublicFaultSeparateNPU() - if isFaultDeletable(faults, []string{constant.CardUnhealthy, constant.PublicFaultType}, deletableFaultLevels) { - t.Error("when PublicFaultType is SeparateNPU then should not remove from CardUnhealthy") - } - - faults = getCardNetworkNotHandleAndPublicFaultSeparateNPU() - if !isFaultDeletable(faults, []string{constant.CardNetworkUnhealthy}, deletableFaultLevels) { - t.Error("when PublicFaultType is SeparateNPU and CardNetworkUnhealthy is NotHandleFault " + - "then should remove from CardNetworkUnhealthy") - } - - faults = append([]constant.DeviceFault{}, constant.DeviceFault{FaultType: constant.PublicFaultType}) - if isFaultDeletable(faults, []string{constant.CardUnhealthy, constant.PublicFaultType}, deletableFaultLevels) { - t.Error("when PublicFaultType is SeparateNPU then should not remove from CardUnhealthy") - } - faults = make([]constant.DeviceFault, 0) - if !isFaultDeletable(faults, []string{constant.CardUnhealthy, constant.PublicFaultType}, deletableFaultLevels) { - t.Error("when no faults then should remove from CardUnhealthy") - } - faults = getCardNotHandleAndPublicFaultSubHealth() - if !isFaultDeletable(faults, []string{constant.CardUnhealthy, constant.PublicFaultType}, deletableFaultLevels) { - t.Error("when SubHealthFault and NotHandleFault faults then should remove from CardUnhealthy") - } - }) -}