From 3166c2ecc9a957f1ecc88e27271069f35007eec9 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 29 Apr 2025 11:20:44 +0800 Subject: [PATCH 01/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/common/fault_code.go | 10 ++++++- .../pkg/device/ascendcommon.go | 30 +++++++++++++++---- .../pkg/server/manager.go | 3 +- 3 files changed, 35 insertions(+), 8 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index c40f30e2e..9267d2757 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/sets" "ascend-common/common-utils/hwlog" @@ -1169,12 +1170,19 @@ func DelOnceFrequencyFault() { recoverFaultFrequencyMap = make(map[int32]string, GeneralMapSize) } +var limiter = rate.NewLimiter(rate.Every(1*time.Second), 10) + // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { + if err := limiter.Wait(nil); err != nil { + hwlog.RunLog.Infof("Rate limit error: %v", err) + return + } + hwlog.RunLog.Info("got token,will report fault") defer func() { TriggerUpdate("A fault has occurred") }() - hwlog.RunLog.Infof("receive devFaultInfo: %v, hex code: %v", devFaultInfo, + hwlog.RunLog.Infof("receive devFaultInfo: %#v, hex code: %v", devFaultInfo, strconv.FormatInt(devFaultInfo.EventID, Hex)) if devFaultInfo.EventID == 0 { return diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 18dfd4b3a..9b0e6e3d3 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -26,6 +26,7 @@ import ( "time" "github.com/containerd/containerd" + "golang.org/x/time/rate" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -125,6 +126,7 @@ type DevManager interface { LogFaultModeChange(*common.NpuDevice, []int32, string) GetUsedChips() sets.String UpdateDeviceUsedStatus(groupDevice map[string][]*common.NpuDevice) + WriteFaultToEvent() } // SetDmgr set devmanager @@ -1078,10 +1080,16 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev isFirstFlushFault = false } +var allFaultInfo = make(chan npuCommon.DevFaultInfo, 100) +var limiter = rate.NewLimiter(rate.Every(1*time.Second), 1) + func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { if devFaultInfo, ok := devFaultInfoMap[device.LogicID]; ok { - tool.writeFaultToEvent(devFaultInfo) + for _, faultInfo := range devFaultInfo { + allFaultInfo <- faultInfo + } + //tool.writeFaultToEvent(devFaultInfo) } common.SetNewFaultAndCacheOnceRecoverFault(device.LogicID, devFaultInfoMap[device.LogicID], device) common.SetNetworkNewFaultAndCacheOnceRecoverFault(device.LogicID, devFaultInfoMap[device.LogicID], device) @@ -1237,11 +1245,21 @@ func (tool *AscendTools) SetResetFailedTimes(deviceLogicId int32, count int) { tool.resetFailedTimesMap[deviceLogicId] = count } -func (tool *AscendTools) writeFaultToEvent(devFaultInfo []npuCommon.DevFaultInfo) { - for _, faultInfo := range devFaultInfo { - if err := tool.doWriteFaultToEvent(faultInfo); err != nil { - hwlog.RunLog.Errorf("failed to write device fault to event, %v", err) - continue +func (tool *AscendTools) WriteFaultToEvent() { + for { + select { + case faultInfo := <-allFaultInfo: + hwlog.RunLog.Infof("current k8s event chan len:%d", len(allFaultInfo)) + if err := limiter.Wait(nil); err != nil { + hwlog.RunLog.Errorf("Rate limit error: %v", err) + continue + } + if err := tool.doWriteFaultToEvent(faultInfo); err != nil { + hwlog.RunLog.Errorf("failed to write device fault to event, %v", err) + continue + } + default: + time.Sleep(time.Second) } } } diff --git a/component/ascend-device-plugin/pkg/server/manager.go b/component/ascend-device-plugin/pkg/server/manager.go index 8120afa3c..14ed94f8b 100644 --- a/component/ascend-device-plugin/pkg/server/manager.go +++ b/component/ascend-device-plugin/pkg/server/manager.go @@ -437,7 +437,8 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { if common.ParamOption.CheckCachedPods { go hdm.manager.GetKubeClient().PodInformerInspector(ctx) } - + // report device fault to k8s event + go hdm.manager.WriteFaultToEvent() initTime := time.Now() ticker := time.NewTicker(time.Duration(common.ParamOption.ListAndWatchPeriod) * time.Second) defer ticker.Stop() -- Gitee From ed339b80860b39cab72522e44e23be8241a5294e Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 09:38:14 +0800 Subject: [PATCH 02/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 2 +- component/ascend-device-plugin/pkg/device/ascendcommon.go | 2 +- .../ascend-device-plugin/pkg/device/ascendcommon_test.go | 5 +++-- component/ascend-device-plugin/pkg/server/manager.go | 1 + 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 9267d2757..da0ed3b8c 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1170,7 +1170,7 @@ func DelOnceFrequencyFault() { recoverFaultFrequencyMap = make(map[int32]string, GeneralMapSize) } -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 10) +var limiter = rate.NewLimiter(rate.Every(1*time.Second), 1000) // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 9b0e6e3d3..8d8825812 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1081,7 +1081,7 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev } var allFaultInfo = make(chan npuCommon.DevFaultInfo, 100) -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 1) +var limiter = rate.NewLimiter(rate.Every(1*time.Second), 10) func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index a899a7b1d..190bda5f8 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -454,8 +454,9 @@ func TestWriteFaultToEvent(t *testing.T) { return errors.New("write fault to event fail") }) defer mockDoWriteFaultToEventMethod.Reset() - faultInfo := []npuCommon.DevFaultInfo{{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode}} - tool.writeFaultToEvent(faultInfo) + //faultInfo := []npuCommon.DevFaultInfo{{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode}} + allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} + tool.WriteFaultToEvent() }) } diff --git a/component/ascend-device-plugin/pkg/server/manager.go b/component/ascend-device-plugin/pkg/server/manager.go index 14ed94f8b..af8ed9d2c 100644 --- a/component/ascend-device-plugin/pkg/server/manager.go +++ b/component/ascend-device-plugin/pkg/server/manager.go @@ -434,6 +434,7 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { hdm.separateNPUIDFromDeviceInfoIntoCache() go hdm.pollFaultCodeCM(ctx) go hdm.Serve(ctx) + go hdm.manager.WriteFaultToEvent() if common.ParamOption.CheckCachedPods { go hdm.manager.GetKubeClient().PodInformerInspector(ctx) } -- Gitee From 31b8a303b4689dc63f3876160043c35f46bbd9ef Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 09:51:48 +0800 Subject: [PATCH 03/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/device/ascendcommon.go | 7 ++++++- .../ascend-device-plugin/pkg/device/ascendcommon_test.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 8d8825812..4beacbf04 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1087,7 +1087,12 @@ func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { if devFaultInfo, ok := devFaultInfoMap[device.LogicID]; ok { for _, faultInfo := range devFaultInfo { - allFaultInfo <- faultInfo + select { + case allFaultInfo <- faultInfo: + hwlog.RunLog.Info("fault has been put into queue") + default: + hwlog.RunLog.Infof("there is too many fault already in queue,queue len:%d, will discard it", len(allFaultInfo)) + } } //tool.writeFaultToEvent(devFaultInfo) } diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index 190bda5f8..caacd56b5 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -454,7 +454,7 @@ func TestWriteFaultToEvent(t *testing.T) { return errors.New("write fault to event fail") }) defer mockDoWriteFaultToEventMethod.Reset() - //faultInfo := []npuCommon.DevFaultInfo{{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode}} + //faultInfo := []npuCommon.D evFaultInfo{{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode}} allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} tool.WriteFaultToEvent() }) -- Gitee From 4908faf2228f6a28567441cbede29cac4746fbdc Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 10:13:23 +0800 Subject: [PATCH 04/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 3 ++- component/ascend-device-plugin/pkg/device/ascendcommon.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index da0ed3b8c..2987e646a 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -16,6 +16,7 @@ package common import ( + "context" "encoding/json" "fmt" "sort" @@ -1174,7 +1175,7 @@ var limiter = rate.NewLimiter(rate.Every(1*time.Second), 1000) // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { - if err := limiter.Wait(nil); err != nil { + if err := limiter.Wait(context.TODO()); err != nil { hwlog.RunLog.Infof("Rate limit error: %v", err) return } diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 4beacbf04..f143c620b 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -16,6 +16,7 @@ package device import ( + "context" "encoding/json" "fmt" "regexp" @@ -1255,7 +1256,7 @@ func (tool *AscendTools) WriteFaultToEvent() { select { case faultInfo := <-allFaultInfo: hwlog.RunLog.Infof("current k8s event chan len:%d", len(allFaultInfo)) - if err := limiter.Wait(nil); err != nil { + if err := limiter.Wait(context.TODO()); err != nil { hwlog.RunLog.Errorf("Rate limit error: %v", err) continue } -- Gitee From 34e5616969118efff5be21fd1cd3d7619341e5d8 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 10:21:01 +0800 Subject: [PATCH 05/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/device/ascendcommon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index f143c620b..9f49ec704 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1265,7 +1265,7 @@ func (tool *AscendTools) WriteFaultToEvent() { continue } default: - time.Sleep(time.Second) + time.Sleep(time.Millisecond * 50) } } } -- Gitee From c616aa345559c69d6e40e16e048094f5c5a713d1 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 10:34:38 +0800 Subject: [PATCH 06/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 2 +- component/ascend-device-plugin/pkg/device/ascendcommon.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 2987e646a..49cfb6898 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1171,7 +1171,7 @@ func DelOnceFrequencyFault() { recoverFaultFrequencyMap = make(map[int32]string, GeneralMapSize) } -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 1000) +var limiter = rate.NewLimiter(rate.Every(1*time.Minute), 1000) // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 9f49ec704..72e745c5c 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1082,7 +1082,7 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev } var allFaultInfo = make(chan npuCommon.DevFaultInfo, 100) -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 10) +var limiter = rate.NewLimiter(rate.Every(1*time.Minute), 10) func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { @@ -1260,10 +1260,13 @@ func (tool *AscendTools) WriteFaultToEvent() { hwlog.RunLog.Errorf("Rate limit error: %v", err) continue } + startTime := time.Now() if err := tool.doWriteFaultToEvent(faultInfo); err != nil { hwlog.RunLog.Errorf("failed to write device fault to event, %v", err) continue } + endTime := time.Now() + hwlog.RunLog.Infof("used time:%v", endTime.Sub(startTime)) default: time.Sleep(time.Millisecond * 50) } -- Gitee From a210dd87f30be5f6a752e13de1f0a88393c1bb56 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 10:44:07 +0800 Subject: [PATCH 07/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 2 +- component/ascend-device-plugin/pkg/device/ascendcommon.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 49cfb6898..2b8ca457e 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1171,7 +1171,7 @@ func DelOnceFrequencyFault() { recoverFaultFrequencyMap = make(map[int32]string, GeneralMapSize) } -var limiter = rate.NewLimiter(rate.Every(1*time.Minute), 1000) +var limiter = rate.NewLimiter(rate.Every(1*time.Minute)/1000, 1000) // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 72e745c5c..d38eebf40 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1082,7 +1082,7 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev } var allFaultInfo = make(chan npuCommon.DevFaultInfo, 100) -var limiter = rate.NewLimiter(rate.Every(1*time.Minute), 10) +var limiter = rate.NewLimiter(rate.Every(time.Minute/10), 10) func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { -- Gitee From 4b80d978bd10c256cb080460f447f03edf7b4cfc Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 11:01:54 +0800 Subject: [PATCH 08/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/common/constants.go | 11 +++++++++++ .../pkg/common/fault_code.go | 5 ++--- .../pkg/device/ascendcommon.go | 17 +++++++++++------ .../ascend-device-plugin/pkg/server/manager.go | 3 +-- 4 files changed, 25 insertions(+), 11 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/constants.go b/component/ascend-device-plugin/pkg/common/constants.go index 8447a7f91..5164cac05 100644 --- a/component/ascend-device-plugin/pkg/common/constants.go +++ b/component/ascend-device-plugin/pkg/common/constants.go @@ -891,3 +891,14 @@ const ( // MaxPodEventRetryTimes max try time for pod add event while cache none MaxPodEventRetryTimes = 4 ) + +const ( + // WriteEventInterval interval between each check + WriteEventInterval = 50 + // WriteEventRateLimit upper limit rate of write fault to k8s event + WriteEventRateLimit = 10 + // FaultCallBackRateLimit upper limit rate of call back receive fault from driver + FaultCallBackRateLimit = 1000 + // WriteEventChanLenLimit upper limit of length of cache event + WriteEventChanLenLimit = 100 +) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 2b8ca457e..5b170cf62 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -19,13 +19,13 @@ import ( "context" "encoding/json" "fmt" + "golang.org/x/time/rate" "sort" "strconv" "strings" "sync" "time" - "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/sets" "ascend-common/common-utils/hwlog" @@ -126,6 +126,7 @@ var ( RestartNPU, PreSeparateNPU, SeparateNPU, SubHealthFault) // NetworkFaultCodes is a set that contains all the network fault codes NetworkFaultCodes = sets.NewInt64(LinkDownFaultCode) + limiter = rate.NewLimiter(rate.Every(1*time.Minute)/FaultCallBackRateLimit, FaultCallBackRateLimit) ) // fault customization @@ -1171,8 +1172,6 @@ func DelOnceFrequencyFault() { recoverFaultFrequencyMap = make(map[int32]string, GeneralMapSize) } -var limiter = rate.NewLimiter(rate.Every(1*time.Minute)/1000, 1000) - // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { if err := limiter.Wait(context.TODO()); err != nil { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index d38eebf40..0292d464e 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -127,7 +127,7 @@ type DevManager interface { LogFaultModeChange(*common.NpuDevice, []int32, string) GetUsedChips() sets.String UpdateDeviceUsedStatus(groupDevice map[string][]*common.NpuDevice) - WriteFaultToEvent() + WriteFaultToEvent(ctx context.Context) } // SetDmgr set devmanager @@ -1081,8 +1081,8 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev isFirstFlushFault = false } -var allFaultInfo = make(chan npuCommon.DevFaultInfo, 100) -var limiter = rate.NewLimiter(rate.Every(time.Minute/10), 10) +var allFaultInfo = make(chan npuCommon.DevFaultInfo, common.WriteEventChanLenLimit) +var limiter = rate.NewLimiter(rate.Every(time.Minute/common.WriteEventRateLimit), common.WriteEventRateLimit) func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { @@ -1095,7 +1095,6 @@ func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, hwlog.RunLog.Infof("there is too many fault already in queue,queue len:%d, will discard it", len(allFaultInfo)) } } - //tool.writeFaultToEvent(devFaultInfo) } common.SetNewFaultAndCacheOnceRecoverFault(device.LogicID, devFaultInfoMap[device.LogicID], device) common.SetNetworkNewFaultAndCacheOnceRecoverFault(device.LogicID, devFaultInfoMap[device.LogicID], device) @@ -1251,9 +1250,15 @@ func (tool *AscendTools) SetResetFailedTimes(deviceLogicId int32, count int) { tool.resetFailedTimesMap[deviceLogicId] = count } -func (tool *AscendTools) WriteFaultToEvent() { +func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { for { select { + case _, ok := <-ctx.Done(): + if !ok { + hwlog.RunLog.Info("stop signal chanel closed") + } + hwlog.RunLog.Info("write fault to k8s event stop") + return case faultInfo := <-allFaultInfo: hwlog.RunLog.Infof("current k8s event chan len:%d", len(allFaultInfo)) if err := limiter.Wait(context.TODO()); err != nil { @@ -1268,7 +1273,7 @@ func (tool *AscendTools) WriteFaultToEvent() { endTime := time.Now() hwlog.RunLog.Infof("used time:%v", endTime.Sub(startTime)) default: - time.Sleep(time.Millisecond * 50) + time.Sleep(time.Millisecond * common.WriteEventInterval) } } } diff --git a/component/ascend-device-plugin/pkg/server/manager.go b/component/ascend-device-plugin/pkg/server/manager.go index af8ed9d2c..3ccd80d27 100644 --- a/component/ascend-device-plugin/pkg/server/manager.go +++ b/component/ascend-device-plugin/pkg/server/manager.go @@ -434,12 +434,11 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { hdm.separateNPUIDFromDeviceInfoIntoCache() go hdm.pollFaultCodeCM(ctx) go hdm.Serve(ctx) - go hdm.manager.WriteFaultToEvent() if common.ParamOption.CheckCachedPods { go hdm.manager.GetKubeClient().PodInformerInspector(ctx) } // report device fault to k8s event - go hdm.manager.WriteFaultToEvent() + go hdm.manager.WriteFaultToEvent(ctx) initTime := time.Now() ticker := time.NewTicker(time.Duration(common.ParamOption.ListAndWatchPeriod) * time.Second) defer ticker.Stop() -- Gitee From f67344a726e89c19ce3bf022b6a0f0670ce2cfa2 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 11:10:51 +0800 Subject: [PATCH 09/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/device/ascendcommon.go | 7 +++---- .../ascend-device-plugin/pkg/device/ascendcommon_test.go | 4 ++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 0292d464e..53b77d225 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -19,6 +19,7 @@ import ( "context" "encoding/json" "fmt" + "golang.org/x/time/rate" "regexp" "sort" "strconv" @@ -27,7 +28,6 @@ import ( "time" "github.com/containerd/containerd" - "golang.org/x/time/rate" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -52,6 +52,8 @@ var ( lastCheckNodeLabel int64 useIpv4 = true re = regexp.MustCompile(`"fault_time":\d+,`) + allFaultInfo = make(chan npuCommon.DevFaultInfo, common.WriteEventChanLenLimit) + limiter = rate.NewLimiter(rate.Every(time.Minute/common.WriteEventRateLimit), common.WriteEventRateLimit) ) const ( @@ -1081,9 +1083,6 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev isFirstFlushFault = false } -var allFaultInfo = make(chan npuCommon.DevFaultInfo, common.WriteEventChanLenLimit) -var limiter = rate.NewLimiter(rate.Every(time.Minute/common.WriteEventRateLimit), common.WriteEventRateLimit) - func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { if devFaultInfo, ok := devFaultInfoMap[device.LogicID]; ok { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index caacd56b5..e81f6f97c 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -16,6 +16,7 @@ package device import ( + "context" "encoding/json" "errors" "fmt" @@ -454,9 +455,8 @@ func TestWriteFaultToEvent(t *testing.T) { return errors.New("write fault to event fail") }) defer mockDoWriteFaultToEventMethod.Reset() - //faultInfo := []npuCommon.D evFaultInfo{{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode}} allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} - tool.WriteFaultToEvent() + tool.WriteFaultToEvent(context.TODO()) }) } -- Gitee From c0733d74df80c3d392a1ba86064711ceb7adde7b Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 11:19:21 +0800 Subject: [PATCH 10/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 2 +- component/ascend-device-plugin/pkg/device/ascendcommon.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 5b170cf62..aadef4fb6 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -19,13 +19,13 @@ import ( "context" "encoding/json" "fmt" - "golang.org/x/time/rate" "sort" "strconv" "strings" "sync" "time" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/sets" "ascend-common/common-utils/hwlog" diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 53b77d225..4d376ea04 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -19,7 +19,6 @@ import ( "context" "encoding/json" "fmt" - "golang.org/x/time/rate" "regexp" "sort" "strconv" @@ -28,6 +27,7 @@ import ( "time" "github.com/containerd/containerd" + "golang.org/x/time/rate" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" -- Gitee From f04842da45ab3cfc90e48e4423714eaa419f6381 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 11:38:40 +0800 Subject: [PATCH 11/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-device-plugin/pkg/device/ascendcommon_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index e81f6f97c..62e4ebf59 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -456,7 +456,9 @@ func TestWriteFaultToEvent(t *testing.T) { }) defer mockDoWriteFaultToEventMethod.Reset() allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} - tool.WriteFaultToEvent(context.TODO()) + ctx := context.TODO() + tool.WriteFaultToEvent(ctx) + ctx.Done() }) } -- Gitee From dc31391cad5c0068fb3f0661ba855b61b55f919a Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 14:08:35 +0800 Subject: [PATCH 12/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-device-plugin/pkg/device/ascendcommon_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index 62e4ebf59..fa66522b7 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -24,6 +24,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/agiledragon/gomonkey/v2" "github.com/containerd/containerd" @@ -457,8 +458,11 @@ func TestWriteFaultToEvent(t *testing.T) { defer mockDoWriteFaultToEventMethod.Reset() allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} ctx := context.TODO() + go func() { + time.Sleep(time.Second) + ctx.Done() + }() tool.WriteFaultToEvent(ctx) - ctx.Done() }) } -- Gitee From bb228c25aa5ad3aebf6881b805d0777a56195c20 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 14:40:16 +0800 Subject: [PATCH 13/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-device-plugin/pkg/common/fault_code.go | 2 +- .../ascend-device-plugin/pkg/device/ascendcommon.go | 12 +++++------- .../pkg/device/ascendcommon_test.go | 4 ++-- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index aadef4fb6..4369fc0d3 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1175,7 +1175,7 @@ func DelOnceFrequencyFault() { // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { if err := limiter.Wait(context.TODO()); err != nil { - hwlog.RunLog.Infof("Rate limit error: %v", err) + hwlog.RunLog.Infof("fault callback rate limit error: %v", err) return } hwlog.RunLog.Info("got token,will report fault") diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 4d376ea04..f5b08a232 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1089,9 +1089,10 @@ func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, for _, faultInfo := range devFaultInfo { select { case allFaultInfo <- faultInfo: - hwlog.RunLog.Info("fault has been put into queue") + hwlog.RunLog.Debugf("fault %#v has been put into queue", faultInfo) default: - hwlog.RunLog.Infof("there is too many fault already in queue,queue len:%d, will discard it", len(allFaultInfo)) + hwlog.RunLog.Warnf("there is too many fault already in queue, queue len:%d, "+ + "will not wrote to k8s event: %#v", len(allFaultInfo), faultInfo) } } } @@ -1261,16 +1262,13 @@ func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { case faultInfo := <-allFaultInfo: hwlog.RunLog.Infof("current k8s event chan len:%d", len(allFaultInfo)) if err := limiter.Wait(context.TODO()); err != nil { - hwlog.RunLog.Errorf("Rate limit error: %v", err) + hwlog.RunLog.Errorf("write k8s event limiter err: %v", err) continue } - startTime := time.Now() if err := tool.doWriteFaultToEvent(faultInfo); err != nil { - hwlog.RunLog.Errorf("failed to write device fault to event, %v", err) + hwlog.RunLog.Errorf("failed to write device fault to k8s event, %v", err) continue } - endTime := time.Now() - hwlog.RunLog.Infof("used time:%v", endTime.Sub(startTime)) default: time.Sleep(time.Millisecond * common.WriteEventInterval) } diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index fa66522b7..7528cd795 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -457,10 +457,10 @@ func TestWriteFaultToEvent(t *testing.T) { }) defer mockDoWriteFaultToEventMethod.Reset() allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} - ctx := context.TODO() + ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(time.Second) - ctx.Done() + cancel() }() tool.WriteFaultToEvent(ctx) }) -- Gitee From 9082de22df84f670f237e1576eb70b2b54970ccb Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 14:45:44 +0800 Subject: [PATCH 14/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/device/ascendcommon_test.go | 2 +- .../agent/ms_mgr/test_msrun_plugin.py | 166 ++++++++++++++++++ 2 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index 7528cd795..3fa549864 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -456,7 +456,7 @@ func TestWriteFaultToEvent(t *testing.T) { return errors.New("write fault to event fail") }) defer mockDoWriteFaultToEventMethod.Reset() - allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode} + allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: FaultOnce, EventID: common.CardDropFaultCode} ctx, cancel := context.WithCancel(context.Background()) go func() { time.Sleep(time.Second) diff --git a/component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py b/component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py new file mode 100644 index 000000000..c23d38bdf --- /dev/null +++ b/component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py @@ -0,0 +1,166 @@ +import unittest +import os +from unittest.mock import MagicMock, patch + +from taskd.python.framework.agent.ms_mgr.msrun_plugin import MSRunPlugin + + +class TestMSRunPlugin(unittest.TestCase): + + def setUp(self): + self.plugin = MSRunPlugin() + + def test_register_callbacks(self): + operator = "test_operator" + func = MagicMock() + self.plugin.register_callbacks(operator, func) + self.assertEqual(self.plugin._MSRunPlugin__func_map[operator], func) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.wait_to_start') # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_start_mindspore_workers(self, mock_func_map, mock_wait_to_start): + mock_start_worker_func = MagicMock() + mock_func_map.get.return_value = mock_start_worker_func + mock_wait_to_start.return_value = True + self.plugin.start_mindspore_workers() + mock_start_worker_func.assert_called() + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.fault_ranks', [{"RankId": 0, "Status": "recovered"}]) # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.rank_table_version', 0) # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.read_rank_table_version', return_value=1) # 请将 your_module_name 替换为实际的模块名 + def test_all_fault_has_recovered(self): + result = self.plugin.all_fault_has_recovered() + self.assertTrue(result) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.fault_ranks', [{"RankId": 0, "Status": "fault"}]) # 请将 your_module_name 替换为实际的模块名 + def test_all_fault_has_not_recovered(self): + result = self.plugin.all_fault_has_recovered() + self.assertFalse(result) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.update_reset_info') # 请将 your_module_name 替换为实际的模块名 + def test_get_fault_status(self): + mock_fault_local_ranks = [0] + mock_fault_status = True + mock_unrecovered_status = False + mock_retry_status = False + mock_fault_status_obj = MagicMock( + local_ranks=mock_fault_local_ranks, + is_fault=mock_fault_status, + is_unrecovered=mock_unrecovered_status, + is_retried=mock_retry_status + ) + self.plugin.fault_ranks = [{"RankId": 0, "Status": "fault"}] + self.plugin.pre_fault_ranks = None + self.plugin.retry_time = 1 + self.plugin.pre_retry_time = 0 + self.plugin.node_global_rank_ids = [0] + with patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.FaultStatus', return_value=mock_fault_status_obj): # 请将 your_module_name 替换为实际的模块名 + result = self.plugin.get_fault_status() + self.assertEqual(result.local_ranks, mock_fault_local_ranks) + self.assertEqual(result.is_fault, mock_fault_status) + self.assertEqual(result.is_unrecovered, mock_unrecovered_status) + self.assertEqual(result.is_retried, mock_retry_status) + + def test_read_rank_table_version(self): + with patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.safe_get_file_info', return_value="1"): # 请将 your_module_name 替换为实际的模块名 + result = self.plugin.read_rank_table_version() + self.assertEqual(result, 1) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.start_mindspore_workers') # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin._init_grpc_client_if_needed') # 请将 your_module_name 替换为实际的模块名 + def test_start(self, mock_init_grpc, mock_start_workers, mock_func_map): + mock_kill_worker_func = MagicMock() + mock_start_worker_func = MagicMock() + mock_monitor_func = MagicMock() + mock_func_map.get.side_effect = [mock_kill_worker_func, mock_start_worker_func, mock_monitor_func] + self.plugin.start() + mock_start_workers.assert_called() + mock_init_grpc.assert_called() + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.rank_info', {0: {"status": "ok"}}) # 请将 your_module_name 替换为实际的模块名 + def test_update_rank_status(self): + mock_rank_status_dict = {0: {"status": "ok"}} + self.plugin.update_rank_status(mock_rank_status_dict) + self.assertEqual(self.plugin.rank_info, mock_rank_status_dict) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.fault_processor.get_reset_info_from_cm') # 请将 your_module_name 替换为实际的模块名 + def test_update_reset_info(self, mock_get_reset_info): + mock_reset_data = MagicMock() + mock_reset_data.fault_ranks = [{"RankId": 0, "Status": "fault"}] + mock_reset_data.retry_time = 1 + mock_reset_data.grace_exit = 0 + mock_reset_data.restart_type = "test_type" + mock_get_reset_info.return_value = mock_reset_data + self.plugin.update_reset_info() + self.assertEqual(self.plugin.fault_ranks, mock_reset_data.fault_ranks) + self.assertEqual(self.plugin.retry_time, mock_reset_data.retry_time) + self.assertEqual(self.plugin.grace_exit, mock_reset_data.grace_exit) + self.assertEqual(self.plugin.restart_type, mock_reset_data.restart_type) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.fault_processor.get_reset_info_from_cm') # 请将 your_module_name 替换为实际的模块名 + def test_wait_to_start(self, mock_get_reset_info): + mock_reset_data = MagicMock() + mock_reset_data.fault_ranks = [] + mock_reset_data.retry_time = 0 + mock_reset_data.fault_flush = False + mock_get_reset_info.return_value = mock_reset_data + self.plugin.node_global_rank_ids = [0] + result = self.plugin.wait_to_start() + self.assertTrue(result) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.grace_exit', 1) # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_handle_grace_exit(self, mock_func_map): + mock_kill_worker_func = MagicMock() + mock_func_map.get.return_value = mock_kill_worker_func + result = self.plugin._handle_grace_exit() + self.assertTrue(result) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_handle_fault_status(self, mock_func_map): + mock_fault_status = MagicMock(is_fault=True) + mock_kill_worker_func = MagicMock() + mock_func_map.get.return_value = mock_kill_worker_func + self.plugin._handle_fault_status(mock_fault_status) + mock_kill_worker_func.assert_called() + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_handle_process_retry_fault(self, mock_func_map): + mock_fault_status = MagicMock(is_retried=True, is_unrecovered=False) + mock_kill_worker_func = MagicMock() + mock_start_worker_func = MagicMock() + mock_func_map.get.side_effect = [mock_kill_worker_func, mock_start_worker_func] + self.plugin.all_fault_has_recovered = MagicMock(return_value=True) + result = self.plugin._handle_process_retry_fault(mock_fault_status) + self.assertTrue(result) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_handle_hardware_fault(self, mock_func_map): + mock_fault_status = MagicMock(is_unrecovered=True) + mock_kill_worker_func = MagicMock() + mock_start_worker_func = MagicMock() + mock_func_map.get.side_effect = [mock_kill_worker_func, mock_start_worker_func] + self.plugin.all_fault_has_recovered = MagicMock(return_value=True) + result = self.plugin._handle_hardware_fault(mock_fault_status) + self.assertTrue(result) + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.all_rank_succeed', True) # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_handle_all_process_succeed(self, mock_func_map): + mock_kill_worker_func = MagicMock() + mock_func_map.get.return_value = mock_kill_worker_func + self.plugin._handle_all_process_succeed() + mock_kill_worker_func.assert_called() + + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.rank_status', MSRunPlugin.RANK_STATUS_UNHEALTHY) # 请将 your_module_name 替换为实际的模块名 + @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 + def test_handle_exist_unhealthy_process(self, mock_func_map): + mock_kill_worker_func = MagicMock() + mock_func_map.get.return_value = mock_kill_worker_func + self.plugin._handle_exist_unhealthy_process() + mock_kill_worker_func.assert_called() + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file -- Gitee From 47191ce57645893dc8c8da6fe5b5bc6c1670834a Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 15:16:19 +0800 Subject: [PATCH 15/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../agent/ms_mgr/test_msrun_plugin.py | 166 ------------------ 1 file changed, 166 deletions(-) delete mode 100644 component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py diff --git a/component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py b/component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py deleted file mode 100644 index c23d38bdf..000000000 --- a/component/taskd/tests/ut/python/framework/agent/ms_mgr/test_msrun_plugin.py +++ /dev/null @@ -1,166 +0,0 @@ -import unittest -import os -from unittest.mock import MagicMock, patch - -from taskd.python.framework.agent.ms_mgr.msrun_plugin import MSRunPlugin - - -class TestMSRunPlugin(unittest.TestCase): - - def setUp(self): - self.plugin = MSRunPlugin() - - def test_register_callbacks(self): - operator = "test_operator" - func = MagicMock() - self.plugin.register_callbacks(operator, func) - self.assertEqual(self.plugin._MSRunPlugin__func_map[operator], func) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.wait_to_start') # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_start_mindspore_workers(self, mock_func_map, mock_wait_to_start): - mock_start_worker_func = MagicMock() - mock_func_map.get.return_value = mock_start_worker_func - mock_wait_to_start.return_value = True - self.plugin.start_mindspore_workers() - mock_start_worker_func.assert_called() - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.fault_ranks', [{"RankId": 0, "Status": "recovered"}]) # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.rank_table_version', 0) # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.read_rank_table_version', return_value=1) # 请将 your_module_name 替换为实际的模块名 - def test_all_fault_has_recovered(self): - result = self.plugin.all_fault_has_recovered() - self.assertTrue(result) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.fault_ranks', [{"RankId": 0, "Status": "fault"}]) # 请将 your_module_name 替换为实际的模块名 - def test_all_fault_has_not_recovered(self): - result = self.plugin.all_fault_has_recovered() - self.assertFalse(result) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.update_reset_info') # 请将 your_module_name 替换为实际的模块名 - def test_get_fault_status(self): - mock_fault_local_ranks = [0] - mock_fault_status = True - mock_unrecovered_status = False - mock_retry_status = False - mock_fault_status_obj = MagicMock( - local_ranks=mock_fault_local_ranks, - is_fault=mock_fault_status, - is_unrecovered=mock_unrecovered_status, - is_retried=mock_retry_status - ) - self.plugin.fault_ranks = [{"RankId": 0, "Status": "fault"}] - self.plugin.pre_fault_ranks = None - self.plugin.retry_time = 1 - self.plugin.pre_retry_time = 0 - self.plugin.node_global_rank_ids = [0] - with patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.FaultStatus', return_value=mock_fault_status_obj): # 请将 your_module_name 替换为实际的模块名 - result = self.plugin.get_fault_status() - self.assertEqual(result.local_ranks, mock_fault_local_ranks) - self.assertEqual(result.is_fault, mock_fault_status) - self.assertEqual(result.is_unrecovered, mock_unrecovered_status) - self.assertEqual(result.is_retried, mock_retry_status) - - def test_read_rank_table_version(self): - with patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.safe_get_file_info', return_value="1"): # 请将 your_module_name 替换为实际的模块名 - result = self.plugin.read_rank_table_version() - self.assertEqual(result, 1) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.start_mindspore_workers') # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin._init_grpc_client_if_needed') # 请将 your_module_name 替换为实际的模块名 - def test_start(self, mock_init_grpc, mock_start_workers, mock_func_map): - mock_kill_worker_func = MagicMock() - mock_start_worker_func = MagicMock() - mock_monitor_func = MagicMock() - mock_func_map.get.side_effect = [mock_kill_worker_func, mock_start_worker_func, mock_monitor_func] - self.plugin.start() - mock_start_workers.assert_called() - mock_init_grpc.assert_called() - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.rank_info', {0: {"status": "ok"}}) # 请将 your_module_name 替换为实际的模块名 - def test_update_rank_status(self): - mock_rank_status_dict = {0: {"status": "ok"}} - self.plugin.update_rank_status(mock_rank_status_dict) - self.assertEqual(self.plugin.rank_info, mock_rank_status_dict) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.fault_processor.get_reset_info_from_cm') # 请将 your_module_name 替换为实际的模块名 - def test_update_reset_info(self, mock_get_reset_info): - mock_reset_data = MagicMock() - mock_reset_data.fault_ranks = [{"RankId": 0, "Status": "fault"}] - mock_reset_data.retry_time = 1 - mock_reset_data.grace_exit = 0 - mock_reset_data.restart_type = "test_type" - mock_get_reset_info.return_value = mock_reset_data - self.plugin.update_reset_info() - self.assertEqual(self.plugin.fault_ranks, mock_reset_data.fault_ranks) - self.assertEqual(self.plugin.retry_time, mock_reset_data.retry_time) - self.assertEqual(self.plugin.grace_exit, mock_reset_data.grace_exit) - self.assertEqual(self.plugin.restart_type, mock_reset_data.restart_type) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.fault_processor.get_reset_info_from_cm') # 请将 your_module_name 替换为实际的模块名 - def test_wait_to_start(self, mock_get_reset_info): - mock_reset_data = MagicMock() - mock_reset_data.fault_ranks = [] - mock_reset_data.retry_time = 0 - mock_reset_data.fault_flush = False - mock_get_reset_info.return_value = mock_reset_data - self.plugin.node_global_rank_ids = [0] - result = self.plugin.wait_to_start() - self.assertTrue(result) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.grace_exit', 1) # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_handle_grace_exit(self, mock_func_map): - mock_kill_worker_func = MagicMock() - mock_func_map.get.return_value = mock_kill_worker_func - result = self.plugin._handle_grace_exit() - self.assertTrue(result) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_handle_fault_status(self, mock_func_map): - mock_fault_status = MagicMock(is_fault=True) - mock_kill_worker_func = MagicMock() - mock_func_map.get.return_value = mock_kill_worker_func - self.plugin._handle_fault_status(mock_fault_status) - mock_kill_worker_func.assert_called() - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_handle_process_retry_fault(self, mock_func_map): - mock_fault_status = MagicMock(is_retried=True, is_unrecovered=False) - mock_kill_worker_func = MagicMock() - mock_start_worker_func = MagicMock() - mock_func_map.get.side_effect = [mock_kill_worker_func, mock_start_worker_func] - self.plugin.all_fault_has_recovered = MagicMock(return_value=True) - result = self.plugin._handle_process_retry_fault(mock_fault_status) - self.assertTrue(result) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_handle_hardware_fault(self, mock_func_map): - mock_fault_status = MagicMock(is_unrecovered=True) - mock_kill_worker_func = MagicMock() - mock_start_worker_func = MagicMock() - mock_func_map.get.side_effect = [mock_kill_worker_func, mock_start_worker_func] - self.plugin.all_fault_has_recovered = MagicMock(return_value=True) - result = self.plugin._handle_hardware_fault(mock_fault_status) - self.assertTrue(result) - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.all_rank_succeed', True) # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_handle_all_process_succeed(self, mock_func_map): - mock_kill_worker_func = MagicMock() - mock_func_map.get.return_value = mock_kill_worker_func - self.plugin._handle_all_process_succeed() - mock_kill_worker_func.assert_called() - - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.rank_status', MSRunPlugin.RANK_STATUS_UNHEALTHY) # 请将 your_module_name 替换为实际的模块名 - @patch('taskd.python.framework.agent.ms_mgr.msrun_plugin.MSRunPlugin.__func_map') # 请将 your_module_name 替换为实际的模块名 - def test_handle_exist_unhealthy_process(self, mock_func_map): - mock_kill_worker_func = MagicMock() - mock_func_map.get.return_value = mock_kill_worker_func - self.plugin._handle_exist_unhealthy_process() - mock_kill_worker_func.assert_called() - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file -- Gitee From 16de92261ff40f82c98948ec648a147e7f67fe4d Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 15:18:32 +0800 Subject: [PATCH 16/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 1 - component/ascend-device-plugin/pkg/device/ascendcommon.go | 1 - 2 files changed, 2 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 4369fc0d3..1f6c769e8 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1178,7 +1178,6 @@ func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { hwlog.RunLog.Infof("fault callback rate limit error: %v", err) return } - hwlog.RunLog.Info("got token,will report fault") defer func() { TriggerUpdate("A fault has occurred") }() diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index f5b08a232..72cd3058b 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1260,7 +1260,6 @@ func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { hwlog.RunLog.Info("write fault to k8s event stop") return case faultInfo := <-allFaultInfo: - hwlog.RunLog.Infof("current k8s event chan len:%d", len(allFaultInfo)) if err := limiter.Wait(context.TODO()); err != nil { hwlog.RunLog.Errorf("write k8s event limiter err: %v", err) continue -- Gitee From 4481d04fbfbd3d92312d183da241e1bfd9471214 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 15:47:12 +0800 Subject: [PATCH 17/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 1 + component/ascend-device-plugin/pkg/device/ascendcommon.go | 1 + 2 files changed, 2 insertions(+) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 1f6c769e8..6219d04be 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1178,6 +1178,7 @@ func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { hwlog.RunLog.Infof("fault callback rate limit error: %v", err) return } + hwlog.RunLog.Info("call back got token") defer func() { TriggerUpdate("A fault has occurred") }() diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 72cd3058b..cb7d9c25b 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1264,6 +1264,7 @@ func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { hwlog.RunLog.Errorf("write k8s event limiter err: %v", err) continue } + hwlog.RunLog.Info("write fault to event got token") if err := tool.doWriteFaultToEvent(faultInfo); err != nil { hwlog.RunLog.Errorf("failed to write device fault to k8s event, %v", err) continue -- Gitee From 95fea220af601527ca13f75172386c2ee6c8caa0 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 16:00:34 +0800 Subject: [PATCH 18/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/device/ascendcommon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index cb7d9c25b..eb051c463 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1264,7 +1264,6 @@ func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { hwlog.RunLog.Errorf("write k8s event limiter err: %v", err) continue } - hwlog.RunLog.Info("write fault to event got token") if err := tool.doWriteFaultToEvent(faultInfo); err != nil { hwlog.RunLog.Errorf("failed to write device fault to k8s event, %v", err) continue @@ -1323,6 +1322,7 @@ func (tool *AscendTools) doWriteFaultToEvent(faultInfo npuCommon.DevFaultInfo) e if _, err := tool.client.CreateEvent(event); err != nil { return fmt.Errorf("failed to create event, %v", err) } + hwlog.RunLog.Infof("successfully create event for fault: %#v", faultInfo) return nil } -- Gitee From 2938220060c3ddde4ae23b609e5a1304354fef8b Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 16:00:37 +0800 Subject: [PATCH 19/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 1 - 1 file changed, 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 6219d04be..1f6c769e8 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -1178,7 +1178,6 @@ func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { hwlog.RunLog.Infof("fault callback rate limit error: %v", err) return } - hwlog.RunLog.Info("call back got token") defer func() { TriggerUpdate("A fault has occurred") }() -- Gitee From d14a69b04e6fcaac7385f7d0af2943678d03cbc9 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Tue, 6 May 2025 16:58:52 +0800 Subject: [PATCH 20/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/fault_code.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index 1f6c769e8..cac76330e 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -126,7 +126,7 @@ var ( RestartNPU, PreSeparateNPU, SeparateNPU, SubHealthFault) // NetworkFaultCodes is a set that contains all the network fault codes NetworkFaultCodes = sets.NewInt64(LinkDownFaultCode) - limiter = rate.NewLimiter(rate.Every(1*time.Minute)/FaultCallBackRateLimit, FaultCallBackRateLimit) + limiter = rate.NewLimiter(rate.Every(1*time.Minute/FaultCallBackRateLimit), FaultCallBackRateLimit) ) // fault customization -- Gitee From d383bc551b4e0badea5fa4cda1077be0debe47a6 Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Mon, 12 May 2025 15:48:06 +0800 Subject: [PATCH 21/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ascend-device-plugin/pkg/common/constants.go | 6 ++---- .../ascend-device-plugin/pkg/common/fault_code.go | 12 ++++++++---- .../ascend-device-plugin/pkg/device/ascendcommon.go | 9 ++++----- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/component/ascend-device-plugin/pkg/common/constants.go b/component/ascend-device-plugin/pkg/common/constants.go index 5164cac05..745f472fc 100644 --- a/component/ascend-device-plugin/pkg/common/constants.go +++ b/component/ascend-device-plugin/pkg/common/constants.go @@ -893,11 +893,9 @@ const ( ) const ( - // WriteEventInterval interval between each check - WriteEventInterval = 50 - // WriteEventRateLimit upper limit rate of write fault to k8s event + // WriteEventRateLimit upper limit rate of write fault to k8s event per minute WriteEventRateLimit = 10 - // FaultCallBackRateLimit upper limit rate of call back receive fault from driver + // FaultCallBackRateLimit upper limit rate of call back receive fault from driver per minute FaultCallBackRateLimit = 1000 // WriteEventChanLenLimit upper limit of length of cache event WriteEventChanLenLimit = 100 diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index cac76330e..91d250cd5 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -16,7 +16,6 @@ package common import ( - "context" "encoding/json" "fmt" "sort" @@ -965,8 +964,11 @@ func getMostSeriousFaultType(fautTypes []string) string { // SetDeviceInit set should init device's logicID func SetDeviceInit(logicID int32) { logicIDLock.Lock() + defer logicIDLock.Unlock() + if Int32Tool.Contains(initLogicIDs, logicID) { + return + } initLogicIDs = append(initLogicIDs, logicID) - logicIDLock.Unlock() } // GetAndCleanLogicID get should init device's logicID and clean cache @@ -1174,8 +1176,10 @@ func DelOnceFrequencyFault() { // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { - if err := limiter.Wait(context.TODO()); err != nil { - hwlog.RunLog.Infof("fault callback rate limit error: %v", err) + if !limiter.Allow() { + hwlog.RunLog.Warnf("fault callback rate limit overflowed, current fault: %#v will be discard", devFaultInfo) + hwlog.RunLog.Warnf("will set current device: %v into init status", devFaultInfo.LogicID) + SetDeviceInit(devFaultInfo.LogicID) return } defer func() { diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index eb051c463..c5b4be765 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -1092,7 +1092,7 @@ func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, hwlog.RunLog.Debugf("fault %#v has been put into queue", faultInfo) default: hwlog.RunLog.Warnf("there is too many fault already in queue, queue len:%d, "+ - "will not wrote to k8s event: %#v", len(allFaultInfo), faultInfo) + "will not write to k8s event: %#v", len(allFaultInfo), faultInfo) } } } @@ -1260,16 +1260,15 @@ func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { hwlog.RunLog.Info("write fault to k8s event stop") return case faultInfo := <-allFaultInfo: - if err := limiter.Wait(context.TODO()); err != nil { - hwlog.RunLog.Errorf("write k8s event limiter err: %v", err) + if !limiter.Allow() { + hwlog.RunLog.Warn("write k8s event limiter overflowed") + hwlog.RunLog.Warnf("current event for fault:%#v will be discard", faultInfo) continue } if err := tool.doWriteFaultToEvent(faultInfo); err != nil { hwlog.RunLog.Errorf("failed to write device fault to k8s event, %v", err) continue } - default: - time.Sleep(time.Millisecond * common.WriteEventInterval) } } } -- Gitee From 2ca32f77a0c675950dbbc94a93577d446f09414e Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Mon, 12 May 2025 16:51:06 +0800 Subject: [PATCH 22/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/common/constants.go b/component/ascend-device-plugin/pkg/common/constants.go index 745f472fc..f579d5d6a 100644 --- a/component/ascend-device-plugin/pkg/common/constants.go +++ b/component/ascend-device-plugin/pkg/common/constants.go @@ -896,7 +896,7 @@ const ( // WriteEventRateLimit upper limit rate of write fault to k8s event per minute WriteEventRateLimit = 10 // FaultCallBackRateLimit upper limit rate of call back receive fault from driver per minute - FaultCallBackRateLimit = 1000 + FaultCallBackRateLimit = 10 // WriteEventChanLenLimit upper limit of length of cache event WriteEventChanLenLimit = 100 ) -- Gitee From d7ddb56014fad1e3a3ba376ead2ff7e08ec0148c Mon Sep 17 00:00:00 2001 From: Lianjun Zhang Atlas Date: Mon, 12 May 2025 16:59:20 +0800 Subject: [PATCH 23/23] =?UTF-8?q?k8s=E4=B8=8A=E6=8A=A5=E6=95=85=E9=9A=9C?= =?UTF-8?q?=E8=87=B3=E7=9C=8Bk8s=20event=20=E5=A2=9E=E5=8A=A0=E9=99=90?= =?UTF-8?q?=E6=B5=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/ascend-device-plugin/pkg/common/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/ascend-device-plugin/pkg/common/constants.go b/component/ascend-device-plugin/pkg/common/constants.go index f579d5d6a..745f472fc 100644 --- a/component/ascend-device-plugin/pkg/common/constants.go +++ b/component/ascend-device-plugin/pkg/common/constants.go @@ -896,7 +896,7 @@ const ( // WriteEventRateLimit upper limit rate of write fault to k8s event per minute WriteEventRateLimit = 10 // FaultCallBackRateLimit upper limit rate of call back receive fault from driver per minute - FaultCallBackRateLimit = 10 + FaultCallBackRateLimit = 1000 // WriteEventChanLenLimit upper limit of length of cache event WriteEventChanLenLimit = 100 ) -- Gitee