diff --git a/component/ascend-device-plugin/pkg/common/constants.go b/component/ascend-device-plugin/pkg/common/constants.go index 8447a7f910be777e254569c8a716b7b0d13cd492..745f472fc4cdb66c859c1e72b45dda563b721f6e 100644 --- a/component/ascend-device-plugin/pkg/common/constants.go +++ b/component/ascend-device-plugin/pkg/common/constants.go @@ -891,3 +891,12 @@ const ( // MaxPodEventRetryTimes max try time for pod add event while cache none MaxPodEventRetryTimes = 4 ) + +const ( + // WriteEventRateLimit upper limit rate of write fault to k8s event per minute + WriteEventRateLimit = 10 + // FaultCallBackRateLimit upper limit rate of call back receive fault from driver per minute + FaultCallBackRateLimit = 1000 + // WriteEventChanLenLimit upper limit of length of cache event + WriteEventChanLenLimit = 100 +) diff --git a/component/ascend-device-plugin/pkg/common/fault_code.go b/component/ascend-device-plugin/pkg/common/fault_code.go index c40f30e2e5dbedf4cb82ff285bb6544fdaa478a0..91d250cd52e12d09b7df1e8cb065cb2f3a047d15 100644 --- a/component/ascend-device-plugin/pkg/common/fault_code.go +++ b/component/ascend-device-plugin/pkg/common/fault_code.go @@ -24,6 +24,7 @@ import ( "sync" "time" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/sets" "ascend-common/common-utils/hwlog" @@ -124,6 +125,7 @@ var ( RestartNPU, PreSeparateNPU, SeparateNPU, SubHealthFault) // NetworkFaultCodes is a set that contains all the network fault codes NetworkFaultCodes = sets.NewInt64(LinkDownFaultCode) + limiter = rate.NewLimiter(rate.Every(1*time.Minute/FaultCallBackRateLimit), FaultCallBackRateLimit) ) // fault customization @@ -962,8 +964,11 @@ func getMostSeriousFaultType(fautTypes []string) string { // SetDeviceInit set should init device's logicID func SetDeviceInit(logicID int32) { logicIDLock.Lock() + defer logicIDLock.Unlock() + if Int32Tool.Contains(initLogicIDs, logicID) { + return + } initLogicIDs = append(initLogicIDs, logicID) - logicIDLock.Unlock() } // GetAndCleanLogicID get should init device's logicID and clean cache @@ -1171,10 +1176,16 @@ func DelOnceFrequencyFault() { // SaveDevFaultInfo save device fault info , subscribe interface call back function func SaveDevFaultInfo(devFaultInfo common.DevFaultInfo) { + if !limiter.Allow() { + hwlog.RunLog.Warnf("fault callback rate limit overflowed, current fault: %#v will be discard", devFaultInfo) + hwlog.RunLog.Warnf("will set current device: %v into init status", devFaultInfo.LogicID) + SetDeviceInit(devFaultInfo.LogicID) + return + } defer func() { TriggerUpdate("A fault has occurred") }() - hwlog.RunLog.Infof("receive devFaultInfo: %v, hex code: %v", devFaultInfo, + hwlog.RunLog.Infof("receive devFaultInfo: %#v, hex code: %v", devFaultInfo, strconv.FormatInt(devFaultInfo.EventID, Hex)) if devFaultInfo.EventID == 0 { return diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon.go b/component/ascend-device-plugin/pkg/device/ascendcommon.go index 5e2b388d87e0eeed04ef572790c73ef987672169..aa47f9c3491b9000296117a1936fdb5b271b8a58 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon.go @@ -16,6 +16,7 @@ package device import ( + "context" "encoding/json" "fmt" "regexp" @@ -26,6 +27,7 @@ import ( "time" "github.com/containerd/containerd" + "golang.org/x/time/rate" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" @@ -50,6 +52,8 @@ var ( lastCheckNodeLabel int64 useIpv4 = true re = regexp.MustCompile(`"fault_time":\d+,`) + allFaultInfo = make(chan npuCommon.DevFaultInfo, common.WriteEventChanLenLimit) + limiter = rate.NewLimiter(rate.Every(time.Minute/common.WriteEventRateLimit), common.WriteEventRateLimit) ) const ( @@ -126,6 +130,7 @@ type DevManager interface { GetUsedChips() sets.String UpdateDeviceUsedStatus(groupDevice map[string][]*common.NpuDevice) GetDeviceIP(deviceType string, phyID int) (string, error) + WriteFaultToEvent(ctx context.Context) } // SetDmgr set devmanager @@ -1083,7 +1088,15 @@ func (tool *AscendTools) writeNewFaultCode(deviceMap map[string][]*common.NpuDev func (tool *AscendTools) flushFaultCodesWithInit(device *common.NpuDevice, devFaultInfoMap map[int32][]npuCommon.DevFaultInfo) { if devFaultInfo, ok := devFaultInfoMap[device.LogicID]; ok { - tool.writeFaultToEvent(devFaultInfo) + for _, faultInfo := range devFaultInfo { + select { + case allFaultInfo <- faultInfo: + hwlog.RunLog.Debugf("fault %#v has been put into queue", faultInfo) + default: + hwlog.RunLog.Warnf("there is too many fault already in queue, queue len:%d, "+ + "will not write to k8s event: %#v", len(allFaultInfo), faultInfo) + } + } } common.SetNewFaultAndCacheOnceRecoverFault(device.LogicID, devFaultInfoMap[device.LogicID], device) common.SetNetworkNewFaultAndCacheOnceRecoverFault(device.LogicID, devFaultInfoMap[device.LogicID], device) @@ -1239,11 +1252,25 @@ func (tool *AscendTools) SetResetFailedTimes(deviceLogicId int32, count int) { tool.resetFailedTimesMap[deviceLogicId] = count } -func (tool *AscendTools) writeFaultToEvent(devFaultInfo []npuCommon.DevFaultInfo) { - for _, faultInfo := range devFaultInfo { - if err := tool.doWriteFaultToEvent(faultInfo); err != nil { - hwlog.RunLog.Errorf("failed to write device fault to event, %v", err) - continue +func (tool *AscendTools) WriteFaultToEvent(ctx context.Context) { + for { + select { + case _, ok := <-ctx.Done(): + if !ok { + hwlog.RunLog.Info("stop signal chanel closed") + } + hwlog.RunLog.Info("write fault to k8s event stop") + return + case faultInfo := <-allFaultInfo: + if !limiter.Allow() { + hwlog.RunLog.Warn("write k8s event limiter overflowed") + hwlog.RunLog.Warnf("current event for fault:%#v will be discard", faultInfo) + continue + } + if err := tool.doWriteFaultToEvent(faultInfo); err != nil { + hwlog.RunLog.Errorf("failed to write device fault to k8s event, %v", err) + continue + } } } } @@ -1296,6 +1323,7 @@ func (tool *AscendTools) doWriteFaultToEvent(faultInfo npuCommon.DevFaultInfo) e if _, err := tool.client.CreateEvent(event); err != nil { return fmt.Errorf("failed to create event, %v", err) } + hwlog.RunLog.Infof("successfully create event for fault: %#v", faultInfo) return nil } diff --git a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go index a899a7b1deddc2683f1dd8431a8d17f548adc976..3fa549864dd3983943a746b51eeffd9794822e7f 100644 --- a/component/ascend-device-plugin/pkg/device/ascendcommon_test.go +++ b/component/ascend-device-plugin/pkg/device/ascendcommon_test.go @@ -16,6 +16,7 @@ package device import ( + "context" "encoding/json" "errors" "fmt" @@ -23,6 +24,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/agiledragon/gomonkey/v2" "github.com/containerd/containerd" @@ -454,8 +456,13 @@ func TestWriteFaultToEvent(t *testing.T) { return errors.New("write fault to event fail") }) defer mockDoWriteFaultToEventMethod.Reset() - faultInfo := []npuCommon.DevFaultInfo{{LogicID: 0, Assertion: 3, EventID: common.CardDropFaultCode}} - tool.writeFaultToEvent(faultInfo) + allFaultInfo <- npuCommon.DevFaultInfo{LogicID: 0, Assertion: FaultOnce, EventID: common.CardDropFaultCode} + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(time.Second) + cancel() + }() + tool.WriteFaultToEvent(ctx) }) } diff --git a/component/ascend-device-plugin/pkg/server/manager.go b/component/ascend-device-plugin/pkg/server/manager.go index 3154183ee95be602e5f1b6d913641685f0305e4f..d3e317d124998a842a08eec4fc421d12afd99c14 100644 --- a/component/ascend-device-plugin/pkg/server/manager.go +++ b/component/ascend-device-plugin/pkg/server/manager.go @@ -444,6 +444,8 @@ func (hdm *HwDevManager) ListenDevice(ctx context.Context) { } go hdm.updateNodeAnnotations(ctx) + // report device fault to k8s event + go hdm.manager.WriteFaultToEvent(ctx) initTime := time.Now() ticker := time.NewTicker(time.Duration(common.ParamOption.ListAndWatchPeriod) * time.Second) defer ticker.Stop() diff --git a/component/clusterd/pkg/interface/kube/informer_test.go b/component/clusterd/pkg/interface/kube/informer_test.go index 117c42d1678c8ca0931bb63a6cd5517bcf81b225..739df5fc328bf6366acd04a2058956302bb693d3 100644 --- a/component/clusterd/pkg/interface/kube/informer_test.go +++ b/component/clusterd/pkg/interface/kube/informer_test.go @@ -6,7 +6,7 @@ package kube import ( "reflect" "testing" - + "github.com/agiledragon/gomonkey/v2" "github.com/smartystreets/goconvey/convey" "k8s.io/api/core/v1"