From 1bcfe10bfb4d9d0eb3f8a9c07436198b68d9f669 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Thu, 21 Aug 2025 21:08:43 +0800 Subject: [PATCH 1/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91=E6=8F=90=E4=BA=A4=E6=9A=82=E5=81=9C=E8=AE=AD?= =?UTF-8?q?=E7=BB=83=E6=8F=92=E4=BB=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clusterd/pkg/common/constant/const.go | 17 +- .../taskd/taskd/go/common/constant/const.go | 46 ++++- .../taskd/taskd/go/common/utils/utils.go | 29 +++ .../infrastructure/storage/cluster_infos.go | 8 + .../go/framework_backend/manager/manager.go | 84 +++++++++ .../stop_train_plugin/stop_train_plugin.go | 170 ++++++++++++++++++ .../manager/plugins/utils/get_msgs.go | 144 +++++++++++++++ .../manager/service/cluster.go | 8 + .../manager/service/plugin_handler.go | 27 ++- .../manager/service/stream_handler.go | 10 ++ 10 files changed, 532 insertions(+), 11 deletions(-) create mode 100644 component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go create mode 100644 component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go diff --git a/component/clusterd/pkg/common/constant/const.go b/component/clusterd/pkg/common/constant/const.go index b689c79c0..59784ac32 100644 --- a/component/clusterd/pkg/common/constant/const.go +++ b/component/clusterd/pkg/common/constant/const.go @@ -1,6 +1,6 @@ // Copyright (c) Huawei Technologies Co., Ltd. 2024-2024. All rights reserved. -// Package common is grpc common types and functions +// Package constant is grpc common types and functions package constant // process signal type @@ -17,6 +17,8 @@ const ( SaveAndExitSignalType = "saveAndExit" // KeepAliveSignalType keep alive signal type KeepAliveSignalType = "keep-alive" + // FaultNodesExitSignalType fault nodes exit signal type + FaultNodesExitSignalType = "faultNodesExit" ) // recover strategy name @@ -250,3 +252,16 @@ const ( Failed = "failed" Start = "start" ) + +const ( + // FaultNodesExitAction action to notify fault nodes to exit + FaultNodesExitAction = "fault_nodes_exit" + // FaultNodesRestartAction action to notify fault nodes to restart + FaultNodesRestartAction = "fault_nodes_restart" + // OnGlobalRankAction on_global_rank action + OnGlobalRankAction = "on_global_rank" + // StopAction stop_train action + StopAction = "stop_train" + // ChangeStrategyAction change_strategy action + ChangeStrategyAction = "change_strategy" +) diff --git a/component/taskd/taskd/go/common/constant/const.go b/component/taskd/taskd/go/common/constant/const.go index 799a81ffe..ef3dbc823 100644 --- a/component/taskd/taskd/go/common/constant/const.go +++ b/component/taskd/taskd/go/common/constant/const.go @@ -125,6 +125,10 @@ const ( FaultRankCode = 202 ExitAgentCode = 203 SwitchNicCode = 204 + ProcessManageRecoverSignal = 205 + ProcessManageKeepAliveSignal = 206 + RestartAgentCode = 207 + RestartWorkersCode = 208 ProfilingAllCloseCmdCode = 700 ProfilingDefaultDomainOnCode = 710 ProfilingCommDomainOnCode = 701 @@ -268,8 +272,24 @@ const ( // StreamName and PluginName const ( - ProfilingStream = "ProfilingCollect" - ProfilingPluginName = "ProfilingPlugin" + ProfilingStream = "ProfilingCollect" + ResumeTrainingAfterFaultStream = "ResumeTrainingAfterFaultStream" + + ProfilingPluginName = "ProfilingPlugin" + StopTrainPluginName = "StopTrainPlugin" + StepRetryPluginName = "StepRetryPlugin" + ARFPluginName = "ARFPlugin" + ScaleTrainPluginName = "ScaleTrainPlugin" + DumpPluginName = "DumpPlugin" +) + +// Plugin priority +const ( + Priority1 = iota + 1 + Priority2 + Priority3 + Priority4 + Priority5 ) const ( @@ -289,6 +309,21 @@ const ( SwitchOK = "switchOK" // SwitchFail value of switch fail SwitchFail = "switchFail" + + // SignalType key of SignalType + SignalType = "SignalType" + // Actions key of Actions + Actions = "Actions" + // FaultRanks key of FaultRanks + FaultRanks = "FaultRanks" + // ChangeStrategy key of ChangeStrategy + ChangeStrategy = "ChangeStrategy" + // Timeout key of Timeout + Timeout = "Timeout" + // NodeRankIds key of NodeRankIds + NodeRankIds = "NodeRankIds" + // ExtraParams key of ExtraParams + ExtraParams = "ExtraParams" ) const ( @@ -299,3 +334,10 @@ const ( // LocalProxyEnableOn local proxy enable value LocalProxyEnableOn = "on" ) + +const ( + // StopTrainAction stop train signal action + StopTrainAction = "stop_train" + // ControllerName is name of controller + ControllerName = "controller" +) diff --git a/component/taskd/taskd/go/common/utils/utils.go b/component/taskd/taskd/go/common/utils/utils.go index 66a796581..2ea14177a 100644 --- a/component/taskd/taskd/go/common/utils/utils.go +++ b/component/taskd/taskd/go/common/utils/utils.go @@ -17,6 +17,8 @@ package utils import ( "context" + "crypto/sha256" + "encoding/hex" "encoding/json" "fmt" "net" @@ -27,6 +29,7 @@ import ( "ascend-common/common-utils/hwlog" "ascend-common/common-utils/utils" + pb "clusterd/pkg/interface/grpc/recover" "taskd/common/constant" ) @@ -252,6 +255,7 @@ func ProfilingCmdToBizCode(cmd constant.ProfilingDomainCmd) int32 { return constant.ProfilingAllCloseCmdCode } +// GetClusterdAddr get ClusterD addr func GetClusterdAddr() (string, error) { proxyIp := os.Getenv(constant.LocalProxyEnableEnv) if proxyIp == constant.LocalProxyEnableOn { @@ -265,3 +269,28 @@ func GetClusterdAddr() (string, error) { } return ipFromEnv + constant.ClusterdPort, nil } + +// GetFaultRanksMapByList get fault rank map by list +func GetFaultRanksMapByList(faultRanks []*pb.FaultRank) map[int]int { + ranksMap := make(map[int]int) + for _, faultRank := range faultRanks { + rankIdInt, err := strconv.Atoi(faultRank.RankId) + if err != nil { + hwlog.RunLog.Warnf("convert rankId %s to int failed", faultRank.RankId) + continue + } + typeInt, err := strconv.Atoi(faultRank.FaultType) + if err != nil { + hwlog.RunLog.Warnf("convert type %s to int failed", faultRank.FaultType) + continue + } + ranksMap[rankIdInt] = typeInt + } + return ranksMap +} + +// StringTo64Hash convert string to 64 bits hash +func StringTo64Hash(s string) string { + hash := sha256.Sum256([]byte(s)) + return hex.EncodeToString(hash[:]) +} diff --git a/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/cluster_infos.go b/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/cluster_infos.go index 85ab25160..a11a508aa 100644 --- a/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/cluster_infos.go +++ b/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/cluster_infos.go @@ -90,3 +90,11 @@ func (c *ClusterInfos) updateCluster(clusterName string, newCluster *ClusterInfo } return nil } + +// GetCluster get the cluster info by cluster name +func (c *ClusterInfos) GetCluster(clusterName string) (*ClusterInfo, error) { + if cluster, exists := c.Clusters[clusterName]; exists { + return cluster, nil + } + return nil, fmt.Errorf("cluster name is unregistered : %v", clusterName) +} diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index 1992e91b9..8f01b36cb 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "io" + "strconv" "sync/atomic" "time" @@ -27,6 +28,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "ascend-common/common-utils/hwlog" + clusterd_constant "clusterd/pkg/common/constant" "clusterd/pkg/interface/grpc/profiling" "clusterd/pkg/interface/grpc/recover" "taskd/common/constant" @@ -166,6 +168,7 @@ func (m *BaseManager) registerClusterD(retryTime time.Duration) { return } + go m.subscribeProcessManageSignal(conn) go m.subscribeProfiling(conn, 0) go m.subscribeSwitchNic(conn) } @@ -358,3 +361,84 @@ func convertProfilingMsg(profilingSwitchData *profiling.ProfilingSwitch) constan } return profilingSwitch } + +func (m *BaseManager) subscribeProcessManageSignal(conn *grpc.ClientConn) { + recoverClient := pb.NewRecoverClient(conn) + clientInfo := &pb.ClientInfo{ + JobId: m.JobId, + Role: roleTaskd, + } + status, err := recoverClient.Init(m.svcCtx, clientInfo) + if err != nil || status.Code != common.OK { + hwlog.RunLog.Errorf("request Init failed, error: %v, response: %v", err, status) + return + } + status, err = recoverClient.Register(m.svcCtx, clientInfo) + if err != nil || status.Code != common.OK { + hwlog.RunLog.Errorf("request Register failed, error: %v, response: %v", err, status) + return + } + stream, err := recoverClient.SubscribeProcessManageSignal(m.svcCtx, clientInfo) + if err != nil { + hwlog.RunLog.Errorf("request SubscribeProcessManageSignal failed, error: %v", err) + return + } + for { + select { + case <-m.svcCtx.Done(): + hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") + return + case <-stream.Context().Done(): + hwlog.RunLog.Info("client stream exit, stop subscribe profiling info and re-register") + return + default: + responseMsg, recvErr := stream.Recv() + if recvErr != nil { + hwlog.RunLog.Error(recvErr) + } else { + hwlog.RunLog.Infof("receive manage signal info: %v", responseMsg) + m.enqueueProcessManageSignal(responseMsg, constant.ClusterDRank) + } + } + } +} + +func (m *BaseManager) enqueueProcessManageSignal(processManageSignal *pb.ProcessManageSignal, serverRank string) { + action := constant.KeepAlive + code := constant.ProcessManageKeepAliveSignal + var params map[string]string + if processManageSignal.SignalType != clusterd_constant.KeepAliveSignalType { + action = constant.Action + code = constant.ProcessManageRecoverSignal + params = map[string]string{ + constant.SignalType: processManageSignal.SignalType, + constant.Actions: utils.ObjToString(processManageSignal.Actions), + constant.FaultRanks: utils.ObjToString(utils.GetFaultRanksMapByList(processManageSignal.FaultRanks)), + constant.ChangeStrategy: processManageSignal.ChangeStrategy, + constant.Timeout: strconv.FormatInt(processManageSignal.Timeout, constant.TenBase), + } + } + + message := storage.BaseMessage{ + Header: storage.MsgHeader{ + BizType: "default", + Uuid: uuid.New().String(), + Src: &common.Position{ + Role: constant.ClusterRole, + ServerRank: serverRank, + }, + Timestamp: time.Now(), + }, + Body: storage.MsgBody{ + MsgType: action, + Code: int32(code), + Extension: params, + }, + } + err := m.MsgHd.MsgQueue.Enqueue(message) + if err != nil { + hwlog.RunLog.Infof("enqueue process manage signal %v error %v", processManageSignal, err) + return + } + hwlog.RunLog.Infof("enqueue process manage signal successfully, signal: %v", processManageSignal) +} diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go b/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go new file mode 100644 index 000000000..fe2bb16c0 --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go @@ -0,0 +1,170 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package stop_train_plugin for +package stop_train_plugin + +import ( + "errors" + "strconv" + + "ascend-common/common-utils/hwlog" + clusterd_constant "clusterd/pkg/common/constant" + pb "clusterd/pkg/interface/grpc/recover" + "taskd/common/constant" + "taskd/common/utils" + "taskd/framework_backend/manager/infrastructure" + "taskd/framework_backend/manager/infrastructure/storage" + plugin_utils "taskd/framework_backend/manager/plugins/utils" +) + +type StopTrainingPlugin struct { + hasToken bool + shot storage.SnapShot + signalInfo *plugin_utils.SignalInfo + HasSendMessages map[string]string +} + +type signalInfo struct { + SignalType string + Actions []string + FaultRanks []*pb.FaultRank + ChangeStrategy string + Timeout int64 + NodeRankIds []string + ExtraParams string +} + +func New() infrastructure.ManagerPlugin { + return &StopTrainingPlugin{ + HasSendMessages: make(map[string]string), + } +} + +func (s *StopTrainingPlugin) Name() string { + return constant.StopTrainPluginName +} + +func (s *StopTrainingPlugin) Predicate(shot storage.SnapShot) (infrastructure.PredicateResult, error) { + s.shot = shot + s.signalInfo = nil + if s.hasToken { + return infrastructure.PredicateResult{ + PluginName: s.Name(), + CandidateStatus: constant.CandidateStatus, + PredicateStream: map[string]string{constant.ResumeTrainingAfterFaultStream: ""}, + }, nil + } + err := s.getSignalInfo() + if err != nil { + hwlog.RunLog.Errorf("getSignalInfo error: %v", err) + return infrastructure.PredicateResult{PluginName: s.Name(), CandidateStatus: constant.UnselectStatus}, nil + } + if s.signalInfo.SignalType == clusterd_constant.StopTrainSignalType { + hwlog.RunLog.Info("get stop_train signal, apply for the token") + return infrastructure.PredicateResult{ + PluginName: s.Name(), + CandidateStatus: constant.CandidateStatus, + PredicateStream: map[string]string{constant.ResumeTrainingAfterFaultStream: ""}, + }, nil + } + return infrastructure.PredicateResult{CandidateStatus: constant.UnselectStatus}, nil +} + +func (s *StopTrainingPlugin) Release() error { + return nil +} + +func (s *StopTrainingPlugin) Handle() (infrastructure.HandleResult, error) { + hwlog.RunLog.Infof("plugin[%s] enter handle", s.Name()) + s.hasToken = true + if s.signalInfo == nil { + err := s.getSignalInfo() + if err != nil { + hwlog.RunLog.Errorf("getSignalInfo error: %v", err) + return infrastructure.HandleResult{Stage: constant.HandleStageException}, nil + } + } + if s.signalInfo.SignalType == clusterd_constant.GlobalFaultSignalType { + hwlog.RunLog.Info("get global fault signal, need to release token") + s.hasToken = false + s.HasSendMessages = make(map[string]string) + return infrastructure.HandleResult{Stage: constant.HandleStageFinal}, nil + } + return infrastructure.HandleResult{Stage: constant.HandleStageProcess}, nil +} + +func (s *StopTrainingPlugin) PullMsg() ([]infrastructure.Msg, error) { + msgs := make([]infrastructure.Msg, 0) + if s.signalInfo == nil { + hwlog.RunLog.Warn("signalInfo is nil") + return msgs, nil + } + if _, ok := s.HasSendMessages[utils.StringTo64Hash(s.signalInfo.SignalType+s.signalInfo. + Command[constant. + Actions])]; ok { + hwlog.RunLog.Debugf("the signal info has dealed, signal info: %v", s.signalInfo) + return msgs, nil + } + if s.signalInfo.SignalType == clusterd_constant.StopTrainSignalType || s.signalInfo. + SignalType == clusterd_constant.FaultNodesExitSignalType || s.signalInfo. + SignalType == clusterd_constant.GlobalFaultSignalType { + msgs = append(msgs, s.signalInfo.GetMsgs()...) + } + s.HasSendMessages[utils.StringTo64Hash(s.signalInfo.SignalType+ + s.signalInfo.Command[constant.Actions])] = "" + hwlog.RunLog.Infof("pull msgs: %+v", msgs) + return msgs, nil +} + +func (s *StopTrainingPlugin) getSignalInfo() error { + clusterInfo, err := s.shot.ClusterInfos.GetCluster(constant.ClusterDRank) + if err != nil { + hwlog.RunLog.Errorf("Get clusterD info failed: %s", err.Error()) + return err + } + if clusterInfo == nil { + return errors.New("cluster info is nil") + } + s.signalInfo = &plugin_utils.SignalInfo{ + SignalType: clusterInfo.Command[constant.SignalType], + ChangeStrategy: clusterInfo.Command[constant.ChangeStrategy], + ExtraParams: clusterInfo.Command[constant.ExtraParams], + Command: clusterInfo.Command, + } + if s.signalInfo.SignalType == "" { + return nil + } + s.signalInfo.Timeout, err = strconv.ParseInt(clusterInfo.Command[constant.Timeout], constant.TenBase, constant.BitSize64) + if err != nil { + hwlog.RunLog.Errorf("ParseInt failed: %s", err.Error()) + return err + } + s.signalInfo.Actions, err = utils.StringToObj[[]string](clusterInfo.Command[constant.Actions]) + if err != nil { + hwlog.RunLog.Errorf("unmarshal actions failed: %s", err.Error()) + return err + } + s.signalInfo.FaultRanks, err = utils.StringToObj[map[int]int](clusterInfo.Command[constant.FaultRanks]) + if err != nil { + hwlog.RunLog.Errorf("unmarshal actions failed: %s", err.Error()) + return err + } + s.signalInfo.NodeRankIds, err = utils.StringToObj[[]string](clusterInfo.Command[constant.NodeRankIds]) + if err != nil { + hwlog.RunLog.Errorf("unmarshal actions failed: %s", err.Error()) + return err + } + return nil +} diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go b/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go new file mode 100644 index 000000000..0291b7331 --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go @@ -0,0 +1,144 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package utils for +package utils + +import ( + clusterd_constant "clusterd/pkg/common/constant" + "taskd/common/constant" + "taskd/common/utils" + "taskd/framework_backend/manager/infrastructure" + "taskd/framework_backend/manager/infrastructure/storage" + "taskd/toolkit_backend/net/common" +) + +type SignalInfo struct { + SignalType string + Actions []string + FaultRanks map[int]int + ChangeStrategy string + Timeout int64 + NodeRankIds []string + ExtraParams string + Command map[string]string +} + +// GetMsgs returns msgs by actions +func (s *SignalInfo) GetMsgs() []infrastructure.Msg { + msgs := make([]infrastructure.Msg, 0) + for _, action := range s.Actions { + if action == clusterd_constant.StopAction { + msgs = append(msgs, s.getStopTrainActionMsgs()...) + } else if action == clusterd_constant.FaultNodesExitAction { + msgs = append(msgs, s.getFaultNodesExitActionMsgs()...) + } else if action == clusterd_constant.OnGlobalRankAction { + msgs = append(msgs, s.getOnGlobalRankActionMsgs()...) + } else if action == clusterd_constant.FaultNodesRestartAction { + msgs = append(msgs, s.getFaultNodesRestartActionMsgs()...) + } else if action == clusterd_constant.ChangeStrategyAction { + msgs = append(msgs, s.getChangeStrategyActionMsgs()...) + } + } + return msgs +} + +func (s *SignalInfo) getStopTrainActionMsgs() []infrastructure.Msg { + return []infrastructure.Msg{ + { + Receiver: []string{constant.ControllerName}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.ProcessManageRecoverSignal, + Extension: map[string]string{ + constant.SignalType: s.SignalType, + constant.Actions: utils.ObjToString([]string{clusterd_constant.StopAction}), + constant.FaultRanks: s.Command[constant.FaultRanks], + }, + }, + }, + } +} + +func (s *SignalInfo) getFaultNodesExitActionMsgs() []infrastructure.Msg { + msgs := make([]infrastructure.Msg, 0) + for _, nodeRankId := range s.NodeRankIds { + msgs = append(msgs, infrastructure.Msg{ + Receiver: []string{common.AgentRole + nodeRankId}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.ExitAgentCode, + Extension: map[string]string{ + constant.SignalType: s.SignalType, + constant.Actions: utils.ObjToString([]string{clusterd_constant.FaultNodesExitAction}), + }, + }, + }) + } + return msgs +} + +func (s *SignalInfo) getFaultNodesRestartActionMsgs() []infrastructure.Msg { + msgs := make([]infrastructure.Msg, 0) + for _, nodeRankId := range s.NodeRankIds { + msgs = append(msgs, infrastructure.Msg{ + Receiver: []string{common.AgentRole + nodeRankId}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.RestartWorkersCode, + Extension: map[string]string{ + constant.SignalType: s.SignalType, + constant.Actions: utils.ObjToString([]string{clusterd_constant.FaultNodesRestartAction}), + constant.FaultRanks: s.Command[constant.FaultRanks], + }, + }, + }) + } + return msgs +} + +func (s *SignalInfo) getOnGlobalRankActionMsgs() []infrastructure.Msg { + return []infrastructure.Msg{ + { + Receiver: []string{constant.ControllerName}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.ProcessManageRecoverSignal, + Extension: map[string]string{ + constant.SignalType: s.SignalType, + constant.Actions: utils.ObjToString([]string{clusterd_constant.OnGlobalRankAction}), + constant.FaultRanks: s.Command[constant.FaultRanks], + }, + }, + }, + } +} + +func (s *SignalInfo) getChangeStrategyActionMsgs() []infrastructure.Msg { + return []infrastructure.Msg{ + { + Receiver: []string{constant.ControllerName}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.ProcessManageRecoverSignal, + Extension: map[string]string{ + constant.SignalType: s.SignalType, + constant.Actions: utils.ObjToString([]string{clusterd_constant.ChangeStrategyAction}), + constant.ChangeStrategy: s.ChangeStrategy, + constant.ExtraParams: s.ExtraParams, + }, + }, + }, + } +} diff --git a/component/taskd/taskd/go/framework_backend/manager/service/cluster.go b/component/taskd/taskd/go/framework_backend/manager/service/cluster.go index feb202c57..0c996de6a 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/cluster.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/cluster.go @@ -54,6 +54,14 @@ func (mpc *MsgProcessor) clusterAction(data storage.BaseMessage, clusterInfo *st clusterInfo.Command[constant.GlobalOpKey] = data.Body.Extension[constant.GlobalOpKey] clusterInfo.Command[constant.SwitchNicUUID] = data.Header.Uuid clusterInfo.Command[constant.SwitchJobID] = data.Body.Extension[constant.SwitchJobID] + case constant.ProcessManageRecoverSignal: + clusterInfo.Command[constant.SignalType] = data.Body.Extension[constant.SignalType] + clusterInfo.Command[constant.Actions] = data.Body.Extension[constant.Actions] + clusterInfo.Command[constant.FaultRanks] = data.Body.Extension[constant.FaultRanks] + clusterInfo.Command[constant.ChangeStrategy] = data.Body.Extension[constant.ChangeStrategy] + clusterInfo.Command[constant.Timeout] = data.Body.Extension[constant.Timeout] + clusterInfo.Command[constant.NodeRankIds] = data.Body.Extension[constant.NodeRankIds] + clusterInfo.Command[constant.ExtraParams] = data.Body.Extension[constant.ExtraParams] default: defaultDomainCmd, commDomainCmd, err := profilingCmd(data.Body.Code) if err != nil { diff --git a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go index 59f48f146..208abf25e 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go @@ -23,6 +23,7 @@ import ( "taskd/framework_backend/manager/infrastructure/storage" "taskd/framework_backend/manager/plugins/faultdig" "taskd/framework_backend/manager/plugins/om" + "taskd/framework_backend/manager/plugins/stop_train_plugin" ) // PluginHandlerInterface define the interface of plugin handler @@ -42,15 +43,14 @@ type PluginHandler struct { // Init register all plugin func (p *PluginHandler) Init() error { - profilingPlugin := faultdig.NewProfilingPlugin() - if err := p.Register(profilingPlugin.Name(), profilingPlugin); err != nil { - hwlog.RunLog.Errorf("register plugin %s failed!", profilingPlugin.Name()) - return fmt.Errorf("register plugin %s failed", profilingPlugin.Name()) + plugins := []infrastructure.ManagerPlugin{ + faultdig.NewProfilingPlugin(), + om.NewOmPlugin(), + stop_train_plugin.New(), } - omPlugin := om.NewOmPlugin() - if err := p.Register(omPlugin.Name(), omPlugin); err != nil { - hwlog.RunLog.Errorf("register plugin %s failed!", omPlugin.Name()) - return fmt.Errorf("register plugin %s failed", omPlugin.Name()) + if err := p.RegisterPlugins(plugins); err != nil { + hwlog.RunLog.Errorf("register plugins failed, error: %v", err) + return err } return nil } @@ -80,6 +80,17 @@ func (p *PluginHandler) Register(pluginName string, plugin infrastructure.Manage return nil } +// RegisterPlugins register plugins in batches +func (p *PluginHandler) RegisterPlugins(plugins []infrastructure.ManagerPlugin) error { + for _, plugin := range plugins { + if err := p.Register(plugin.Name(), plugin); err != nil { + hwlog.RunLog.Errorf("register plugin %s failed!", plugin.Name()) + return err + } + } + return nil +} + // Handle execute the handle function of plugin func (p *PluginHandler) Handle(pluginName string) (infrastructure.HandleResult, error) { var result infrastructure.HandleResult diff --git a/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go b/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go index 49508c304..733de532c 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go @@ -66,6 +66,16 @@ func (s *StreamHandler) Init() error { OmStream.GetName()) return err } + resumeTrainingAfterFaultStream := infrastructure.NewStream( + constant.ResumeTrainingAfterFaultStream, + map[string]int{ + constant.StopTrainPluginName: constant.Priority1, + }) + if err := s.SetStream(resumeTrainingAfterFaultStream); err != nil { + hwlog.RunLog.Errorf("init stream handler failed: set stream %s failed", + resumeTrainingAfterFaultStream.GetName()) + return err + } return nil } -- Gitee From b6a7bb92ce47d81fe8e78c51bd7eaca11b918192 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Thu, 21 Aug 2025 21:18:00 +0800 Subject: [PATCH 2/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91=E6=8F=90=E4=BA=A4=E6=9A=82=E5=81=9C=E8=AE=AD?= =?UTF-8?q?=E7=BB=83=E6=8F=92=E4=BB=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/plugins/stop_train_plugin/stop_train_plugin.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go b/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go index fe2bb16c0..c9d7e1c50 100644 --- a/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go @@ -21,7 +21,7 @@ import ( "ascend-common/common-utils/hwlog" clusterd_constant "clusterd/pkg/common/constant" - pb "clusterd/pkg/interface/grpc/recover" + "clusterd/pkg/interface/grpc/recover" "taskd/common/constant" "taskd/common/utils" "taskd/framework_backend/manager/infrastructure" -- Gitee From c4398c12cd39a299cce15a027a3112a3dc4ebc2b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 16:51:59 +0800 Subject: [PATCH 3/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91=E6=8F=90=E4=BA=A4=E6=9A=82=E5=81=9C=E8=AE=AD?= =?UTF-8?q?=E7=BB=83=E6=8F=92=E4=BB=B6=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../go/framework_backend/manager/service/plugin_handler_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler_test.go b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler_test.go index 95fc81cd2..41777ebd4 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler_test.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler_test.go @@ -210,7 +210,7 @@ func TestInit_Success(t *testing.T) { handler := NewPluginHandler() err := handler.Init() assert.NoError(t, err) - assert.Len(t, handler.Plugins, 2) // Ensure example plugin is registered + assert.Len(t, handler.Plugins, 3) // Ensure example plugin is registered } func TestInitRegisterFailure(t *testing.T) { -- Gitee From a9e40bfdb729e813b424691f01cd0e30d8e614fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 17:37:37 +0800 Subject: [PATCH 4/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91=E6=8F=90=E4=BA=A4clusterd=E5=85=AC=E5=85=B1?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E6=94=AF=E6=8C=81=E9=80=9A=E8=BF=87grpc?= =?UTF-8?q?=E4=B8=8Emanager=E4=BA=A4=E4=BA=92=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/application/recover/controller.go | 142 ++++++++++++++---- .../pkg/application/recover/om_controller.go | 6 + .../clusterd/pkg/common/constant/const.go | 2 + component/clusterd/pkg/domain/common/code.go | 2 + component/clusterd/pkg/domain/common/event.go | 3 + component/clusterd/pkg/domain/common/type.go | 1 + component/clusterd/pkg/domain/common/utils.go | 74 +++++++++ .../pkg/interface/grpc/recover/recover.pb.go | 129 +++++++++------- .../pkg/interface/grpc/recover/recover.proto | 3 + 9 files changed, 283 insertions(+), 79 deletions(-) diff --git a/component/clusterd/pkg/application/recover/controller.go b/component/clusterd/pkg/application/recover/controller.go index 5a249ce26..6876ebdfc 100644 --- a/component/clusterd/pkg/application/recover/controller.go +++ b/component/clusterd/pkg/application/recover/controller.go @@ -33,11 +33,18 @@ const ( ) var ( - saveAndExitActions = []string{"save_and_exit"} - stopTrainActions = []string{"stop_train"} - pauseTrainActions = []string{"pause_train"} - globalFaultActions = []string{"on_global_rank"} - changeStrategyActions = []string{"change_strategy"} + saveAndExitActions = []string{"save_and_exit"} + stopTrainActions = []string{"stop_train"} + pauseTrainActions = []string{"pause_train"} + globalFaultActions = []string{"on_global_rank"} + changeStrategyActions = []string{"change_strategy"} + notifyStrategySuccessEventMap = map[string]string{ + constant.ProcessRetryStrategyName: common.NotifyRetrySuccessEvent, + constant.ProcessRecoverStrategyName: common.NotifyRecoverSuccessEvent, + constant.ProcessDumpStrategyName: common.NotifyDumpSuccessEvent, + constant.ProcessExitStrategyName: common.NotifyExitSuccessEvent, + constant.ProcessContinueTrain: common.NotifyContinueSuccessEvent, + } ) // EventController is recover event controller @@ -46,6 +53,7 @@ type EventController struct { faultFlushing bool keepAliveSecond int uuid string + faultPodVersion map[string]string faultPod map[string]string events chan string latestStrategy []string @@ -89,6 +97,7 @@ func NewEventController(jobInfo common.JobBaseInfo, keepAlive int, serviceCtx co keepAliveSecond: keepAlive, uuid: "", faultPod: make(map[string]string), + faultPodVersion: make(map[string]string), events: make(chan string, eventChanLength), latestStrategy: []string{}, latestRecoverResult: []*pb.RecoverStatusRequest{}, @@ -132,10 +141,14 @@ func (ctl *EventController) GetFaultPod() map[string]string { func (ctl *EventController) mergeFaultPod(faultPod map[string]string) { ctl.lock.Lock() defer ctl.lock.Unlock() + faultPodVersion := common.GetPodVersion(ctl.jobInfo.JobId, faultPod) for podRank, podId := range faultPod { if _, ok := ctl.faultPod[podRank]; !ok { ctl.faultPod[podRank] = podId } + if _, ok := ctl.faultPodVersion[podRank]; !ok { + ctl.faultPodVersion[podRank] = faultPodVersion[podRank] + } } } @@ -176,6 +189,7 @@ func (ctl *EventController) reset(stop bool) { ctl.uuid = "" ctl.latestStrategy = ctl.latestStrategy[:0] ctl.faultPod = make(map[string]string) + ctl.faultPodVersion = make(map[string]string) if !ctl.isChanClosed { ctl.closeControllerChan() ctl.isChanClosed = true @@ -334,6 +348,31 @@ func (ctl *EventController) supportTargetRecoverStrategy(recoverStrategy string) return false } +func (ctl *EventController) supportTargetStrategy(recoverStrategy string) bool { + if !ctl.configTargetStrategy(recoverStrategy) { + return false + } + agentSupport := false + for _, strategy := range ctl.agentReportStrategies { + if strategy == recoverStrategy { + agentSupport = true + break + } + } + return agentSupport +} + +func (ctl *EventController) configTargetStrategy(recoverStrategy string) bool { + configed := false + for _, strategy := range ctl.jobInfo.MindXConfigStrategies { + if strategy == recoverStrategy { + configed = true + break + } + } + return configed +} + func (ctl *EventController) supportRestartProcessStrategy() bool { return ctl.restartFaultProcess && ctl.supportTargetRecoverStrategy(constant.ProcessRecoverInPlaceStrategyName) } @@ -502,20 +541,16 @@ func (ctl *EventController) handleSendResult(signal *pb.ProcessManageSignal, err ctl.addEvent(common.NotifyFailEvent) return } + if signal.SignalType == constant.FaultNodesExitSignalType { + ctl.addEvent(common.NotifyFaultNodesExitSuccessEvent) + return + } if signal.SignalType != constant.ChangeStrategySignalType { ctl.addEvent(common.NotifySuccessEvent) return } - if signal.ChangeStrategy == constant.ProcessRetryStrategyName { - ctl.addEvent(common.NotifyRetrySuccessEvent) - } else if signal.ChangeStrategy == constant.ProcessRecoverStrategyName { - ctl.addEvent(common.NotifyRecoverSuccessEvent) - } else if signal.ChangeStrategy == constant.ProcessDumpStrategyName { - ctl.addEvent(common.NotifyDumpSuccessEvent) - } else if signal.ChangeStrategy == constant.ProcessExitStrategyName { - ctl.addEvent(common.NotifyExitSuccessEvent) - } else if signal.ChangeStrategy == constant.ProcessContinueTrain { - ctl.addEvent(common.NotifyContinueSuccessEvent) + if event, ok := notifyStrategySuccessEventMap[signal.ChangeStrategy]; ok { + ctl.addEvent(event) } else { hwlog.RunLog.Errorf("unsupported strategy=%s, jobId=%s", signal.ChangeStrategy, signal.JobId) @@ -828,6 +863,16 @@ func (ctl *EventController) notifyFaultForRetryFaultCase(retryFaults, return common.NotifyFailEvent, common.OperateConfigMapError, nil } signal.FaultRanks = normalFaults + signal.NodeRankIds, err = common.GetNodeRankIdsByRankIds(ctl.jobInfo.JobId, allFaultRanks) + if err != nil { + hwlog.RunLog.Errorf("jobId=%s, GetNodeRankIdsByRankIds err:%v", ctl.jobInfo.JobId, err) + return common.NotifyFailEvent, common.OperateConfigMapError, nil + } + if !ctl.restartFaultProcess { + signal.Actions = append(signal.Actions, constant.FaultNodesExitAction) + } else { + signal.Actions = append(signal.Actions, constant.FaultNodesRestartAction) + } hwlog.RunLog.Infof("write configmap faultList success, %s", cm.Data[constant.ResetInfoCMDataKey]) } else { hwlog.RunLog.Infof("jobId=%s, uce error case", ctl.jobInfo.JobId) @@ -861,6 +906,14 @@ func (ctl *EventController) notifyFaultForNormalFaultCase(uceFaults, normalFault hwlog.RunLog.Errorf("update cache info fail, jobId=%s err=%v", ctl.jobInfo.JobId, err) return "", common.ServerInnerError, err } + signal := &pb.ProcessManageSignal{ + Uuid: ctl.uuid, + JobId: ctl.jobInfo.JobId, + SignalType: constant.GlobalFaultSignalType, + Actions: globalFaultActions, + ChangeStrategy: "", + } + signal.FaultRanks = append(signal.FaultRanks, allFaults...) if !ctl.restartFaultProcess { hwlog.RunLog.Infof("jobId=%s write reset json, restartFaultProcess: %v, faultRanks: %v", ctl.jobInfo.JobId, ctl.restartFaultProcess, allFaultRanks) @@ -871,15 +924,13 @@ func (ctl *EventController) notifyFaultForNormalFaultCase(uceFaults, normalFault return common.NotifyFailEvent, common.OperateConfigMapError, nil } hwlog.RunLog.Infof("write configmap faultList success, %s", cm.Data[constant.ResetInfoCMDataKey]) + signal.Actions = append(signal.Actions, constant.FaultNodesExitAction) + signal.NodeRankIds, err = common.GetNodeRankIdsByRankIds(ctl.jobInfo.JobId, allFaultRanks) + if err != nil { + hwlog.RunLog.Errorf("jobId=%s, GetNodeRankIdsByRankIds err:%v", ctl.jobInfo.JobId, err) + return common.NotifyFailEvent, common.OperateConfigMapError, nil + } } - signal := &pb.ProcessManageSignal{ - Uuid: ctl.uuid, - JobId: ctl.jobInfo.JobId, - SignalType: constant.GlobalFaultSignalType, - Actions: globalFaultActions, - ChangeStrategy: "", - } - signal.FaultRanks = append(signal.FaultRanks, allFaults...) return ctl.signalEnqueue(signal) } @@ -1007,6 +1058,7 @@ func (ctl *EventController) handleNotifyDecidedStrategy() (string, common.RespCo JobId: ctl.jobInfo.JobId, SignalType: constant.ChangeStrategySignalType, Actions: changeStrategyActions, + FaultRanks: ctl.cacheNormalFault, } var err error signal.ChangeStrategy, err = ctl.chooseStrategy() @@ -1046,6 +1098,14 @@ func (ctl *EventController) handleNotifyDecidedStrategy() (string, common.RespCo return ctl.handleRestartFaultProcess(signal) } hwlog.RunLog.Infof("jobId=%s, choose strategy:%s", ctl.jobInfo.JobId, signal.ChangeStrategy) + if signal.ChangeStrategy == constant.ProcessRetryStrategyName { + signal.FaultRanks = ctl.cacheRetryFault + } + signal.NodeRankIds, err = common.GetNodeRankIdsByFaultRanks(ctl.jobInfo.JobId, signal.FaultRanks) + if err != nil { + hwlog.RunLog.Errorf("jobId=%s, GetNodeRankIdsByFaultRanks err:%v", ctl.jobInfo.JobId, err) + return "", common.ServerInnerError, err + } return ctl.signalEnqueue(signal) } @@ -1054,7 +1114,7 @@ func (ctl *EventController) handleRestartFaultProcess(signal *pb.ProcessManageSi hwlog.RunLog.Infof("jobId:%s, enter handleRestartFaultProcess func, choose strategy:%s", ctl.jobInfo.JobId, signal.ChangeStrategy) if signal.ChangeStrategy == constant.ProcessRecoverInPlaceStrategyName { - _, allFaultRanks, err := ctl.updateCacheFaultAndPod() + allFaults, allFaultRanks, err := ctl.updateCacheFaultAndPod() if err != nil { hwlog.RunLog.Errorf("update cache info fail, jobId=%s err=%v", ctl.jobInfo.JobId, err) return "", common.ServerInnerError, err @@ -1069,6 +1129,13 @@ func (ctl *EventController) handleRestartFaultProcess(signal *pb.ProcessManageSi } hwlog.RunLog.Infof("write configmap faultList success, %s", cm.Data[constant.ResetInfoCMDataKey]) signal.ChangeStrategy = constant.ProcessRecoverStrategyName + signal.Actions = append(signal.Actions, constant.FaultNodesRestartAction) + signal.FaultRanks = allFaults + signal.NodeRankIds, err = common.GetNodeRankIdsByRankIds(ctl.jobInfo.JobId, allFaultRanks) + if err != nil { + hwlog.RunLog.Errorf("jobId=%s, GetNodeRankIdsByRankIds err:%v", ctl.jobInfo.JobId, err) + return common.NotifyFailEvent, common.OperateConfigMapError, nil + } } else if signal.ChangeStrategy != constant.ProcessRetryStrategyName { hwlog.RunLog.Warnf("choose strategy: %s, not restart fault process", signal.ChangeStrategy) ctl.restartFaultProcess = false @@ -1119,13 +1186,16 @@ func (ctl *EventController) extractRecoverResult() (common.RecoverResult, error) RecoverSuccess: true, }, nil } - strategy := latestResult[n-1].Strategy - code := latestResult[n-1].Status.Code - recoverSuccess := latestResult[n-1].Status.Code == int32(common.OK) + + lengthResult := len(latestResult) + strategy := latestResult[lengthResult-1].Strategy + code := latestResult[lengthResult-1].Status.Code + recoverSuccess := latestResult[lengthResult-1].Status.Code == int32(common.OK) return common.RecoverResult{ Strategy: strategy, Code: common.RespCode(code), RecoverSuccess: recoverSuccess, + IsolateRankIds: latestResult[lengthResult-1].IsolateRankIds, }, nil } @@ -1161,6 +1231,7 @@ func (ctl *EventController) handleCheckRecoverResult() (string, common.RespCode, if err != nil { return "", result.Code, err } + hwlog.RunLog.Infof("handleCheckRecoverResult result: %+v", result) switch result.Strategy { case constant.ProcessRetryStrategyName: if result.RecoverSuccess { @@ -1350,7 +1421,8 @@ func (ctl *EventController) listenScheduleResult() { } time.Sleep(time.Second * constant.SleepSecondBeforeCheckPGRunning) hwlog.RunLog.Infof("check pg running %d times", i) - if podgroup.JudgeIsRunningByJobKey(ctl.jobInfo.JobId) { + if podgroup.JudgeIsRunningByJobKey(ctl.jobInfo.JobId) && ctl.checkWhetherPodVersionChanged() { + hwlog.RunLog.Infof("job[%s] reschedule success", ctl.jobInfo.JobId) pgRunning = true break } @@ -1556,3 +1628,17 @@ func (ctl *EventController) shouldWaitHcclRoutingConvergence() bool { } return false } + +func (ctl *EventController) checkWhetherPodVersionChanged() bool { + for podRank, version := range ctl.faultPodVersion { + pod := pod.GetPodByRankIndex(ctl.jobInfo.JobId, podRank) + if pod.Name == "" { + continue + } + currentVersion, ok := pod.Labels[constant.PodVersion] + if ok { + return currentVersion != version + } + } + return false +} diff --git a/component/clusterd/pkg/application/recover/om_controller.go b/component/clusterd/pkg/application/recover/om_controller.go index 789e08250..2674b89a6 100644 --- a/component/clusterd/pkg/application/recover/om_controller.go +++ b/component/clusterd/pkg/application/recover/om_controller.go @@ -81,8 +81,14 @@ func (ctl *EventController) notifyContinueTrain() (string, common.RespCode, erro JobId: ctl.jobInfo.JobId, SignalType: constant.ChangeStrategySignalType, Actions: changeStrategyActions, + FaultRanks: ctl.cacheNormalFault, ChangeStrategy: constant.ProcessContinueTrain, } + var err error + signal.NodeRankIds, err = common.GetNodeRankIdsByFaultRanks(ctl.jobInfo.JobId, signal.FaultRanks) + if err != nil { + hwlog.RunLog.Warnf("jobId=%s, GetNodeRankIdsByFaultRanks err:%v", ctl.jobInfo.JobId, err) + } return ctl.signalEnqueue(signal) } diff --git a/component/clusterd/pkg/common/constant/const.go b/component/clusterd/pkg/common/constant/const.go index c1c592d61..0a49f9a8d 100644 --- a/component/clusterd/pkg/common/constant/const.go +++ b/component/clusterd/pkg/common/constant/const.go @@ -266,4 +266,6 @@ const ( ChangeStrategyAction = "change_strategy" // CardDropFault is the fault code of card drop fault CardDropFault = "40F84E00" + // PodVersion label key + PodVersion = "version" ) diff --git a/component/clusterd/pkg/domain/common/code.go b/component/clusterd/pkg/domain/common/code.go index 6a843fe63..5d27c8ef7 100644 --- a/component/clusterd/pkg/domain/common/code.go +++ b/component/clusterd/pkg/domain/common/code.go @@ -11,6 +11,8 @@ const ( OK RespCode = 0 // SuccessCode when query is fine SuccessCode RespCode = 200 + // ExitIsolateRanksCode stands need to notify isolate ranks to exit + ExitIsolateRanksCode RespCode = 201 /* 4xx is client error which is not retryable diff --git a/component/clusterd/pkg/domain/common/event.go b/component/clusterd/pkg/domain/common/event.go index 71999735a..041b15353 100644 --- a/component/clusterd/pkg/domain/common/event.go +++ b/component/clusterd/pkg/domain/common/event.go @@ -98,4 +98,7 @@ const ( // KillPodAfterRestartProcessEvent kill pod when cant not restart process KillPodAfterRestartProcessEvent = "KillPodAfterRestartProcessEvent" + + // NotifyFaultNodesExitSuccessEvent notify fault nodes to exit successfully + NotifyFaultNodesExitSuccessEvent = "notifyFaultNodesExitSuccessEvent" ) diff --git a/component/clusterd/pkg/domain/common/type.go b/component/clusterd/pkg/domain/common/type.go index a4fb2fc73..b2dc49c04 100644 --- a/component/clusterd/pkg/domain/common/type.go +++ b/component/clusterd/pkg/domain/common/type.go @@ -61,4 +61,5 @@ type RecoverResult struct { Strategy string Code RespCode RecoverSuccess bool + IsolateRankIds []string } diff --git a/component/clusterd/pkg/domain/common/utils.go b/component/clusterd/pkg/domain/common/utils.go index 170a39983..462c5ab77 100644 --- a/component/clusterd/pkg/domain/common/utils.go +++ b/component/clusterd/pkg/domain/common/utils.go @@ -376,6 +376,46 @@ func GetPodMap(jobId string, rankList []string) (map[string]string, error) { return podMap, nil } +// GetPodRanks return a dict, key is fault pod rank, value "" +func GetPodRanks(jobId string, rankList []string) (map[string]string, error) { + podMap := make(map[string]string) + devicePerNode := pod.GetPodDeviceNumByJobId(jobId) + if devicePerNode <= 0 { + hwlog.RunLog.Errorf("get device num per pod failed, jobId: %s", jobId) + return nil, fmt.Errorf("get device num per pod failed, jobId: %s", jobId) + } + for _, rank := range rankList { + faultRank, err := strconv.Atoi(rank) + if err != nil { + hwlog.RunLog.Warnf("parse pod rank failed, err is %v", err) + continue + } + faultPodRank := faultRank / devicePerNode + podRank := strconv.Itoa(faultPodRank) + podMap[podRank] = "" + } + return podMap, nil +} + +// GetPodVersion return a dict, key is fault pod rank, value is pod version, only support acjob +func GetPodVersion(jobId string, podRankList map[string]string) map[string]string { + podMap := make(map[string]string) + for podRank, _ := range podRankList { + pod := pod.GetPodByRankIndex(jobId, podRank) + if pod.Name == "" { + hwlog.RunLog.Warnf("discard nil pod, jobId=%s", jobId) + continue + } + version, ok := pod.Labels[constant.PodVersion] + if !ok || version == "" { + hwlog.RunLog.Warnf("get pod version failed jobId=%s", jobId) + continue + } + podMap[podRank] = version + } + return podMap +} + func labelPodFault(jobId string, faultPodRankList []string, labeledMap map[string]string, faultReason string) (map[string]string, error) { if labeledMap == nil { @@ -496,3 +536,37 @@ func CalculateStringDivInt(dividendStr string, divisor int) int { } return dividend / divisor } + +// GetNodeRankIdsByRankIds returns the job's node rank id list by global rank id list +func GetNodeRankIdsByRankIds(jobId string, rankIds []string) ([]string, error) { + nodeRankIds := make([]string, 0) + faultPod, err := GetPodRanks(jobId, rankIds) + if err != nil { + hwlog.RunLog.Errorf("jobId=%s, get pod map err:%v", jobId, err) + return nodeRankIds, err + } + for nodeRankId, _ := range faultPod { + nodeRankIds = append(nodeRankIds, nodeRankId) + } + return nodeRankIds, nil +} + +// RemoveDuplicateNodeRanks remove duplicate node rank ids +func RemoveDuplicateNodeRanks(nodeRankIds []string, oldPods map[string]string) []string { + newNodeRankIds := make([]string, 0) + for _, nodeRankId := range nodeRankIds { + if _, ok := oldPods[nodeRankId]; !ok { + newNodeRankIds = append(newNodeRankIds, nodeRankId) + } + } + return newNodeRankIds +} + +// GetNodeRankIdsByFaultRanks returns the job's node rank id list by global fault rank list +func GetNodeRankIdsByFaultRanks(jobId string, faultRanks []*pb.FaultRank) ([]string, error) { + rankIds := make([]string, 0) + for _, faultRank := range faultRanks { + rankIds = append(rankIds, faultRank.RankId) + } + return GetNodeRankIdsByRankIds(jobId, rankIds) +} diff --git a/component/clusterd/pkg/interface/grpc/recover/recover.pb.go b/component/clusterd/pkg/interface/grpc/recover/recover.pb.go index 2bf59d8e8..1d8889585 100644 --- a/component/clusterd/pkg/interface/grpc/recover/recover.pb.go +++ b/component/clusterd/pkg/interface/grpc/recover/recover.pb.go @@ -169,6 +169,8 @@ type ProcessManageSignal struct { FaultRanks []*FaultRank `protobuf:"bytes,5,rep,name=faultRanks,proto3" json:"faultRanks,omitempty"` ChangeStrategy string `protobuf:"bytes,6,opt,name=changeStrategy,proto3" json:"changeStrategy,omitempty"` Timeout int64 `protobuf:"varint,7,opt,name=timeout,proto3" json:"timeout,omitempty"` + NodeRankIds []string `protobuf:"bytes,8,rep,name=nodeRankIds,proto3" json:"nodeRankIds,omitempty"` + ExtraParams string `protobuf:"bytes,9,opt,name=extraParams,proto3" json:"extraParams,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -248,6 +250,20 @@ func (m *ProcessManageSignal) GetTimeout() int64 { return 0 } +func (m *ProcessManageSignal) GetNodeRankIds() []string { + if m != nil { + return m.NodeRankIds + } + return nil +} + +func (m *ProcessManageSignal) GetExtraParams() string { + if m != nil { + return m.ExtraParams + } + return "" +} + type StopCompleteRequest struct { JobId string `protobuf:"bytes,1,opt,name=jobId,proto3" json:"jobId,omitempty"` Status *Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` @@ -362,6 +378,7 @@ type RecoverStatusRequest struct { JobId string `protobuf:"bytes,1,opt,name=jobId,proto3" json:"jobId,omitempty"` Status *Status `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"` Strategy string `protobuf:"bytes,3,opt,name=strategy,proto3" json:"strategy,omitempty"` + IsolateRankIds []string `protobuf:"bytes,4,rep,name=isolateRankIds,proto3" json:"isolateRankIds,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -413,6 +430,13 @@ func (m *RecoverStatusRequest) GetStrategy() string { return "" } +func (m *RecoverStatusRequest) GetIsolateRankIds() []string { + if m != nil { + return m.IsolateRankIds + } + return nil +} + type ProcessFaultRequest struct { JobId string `protobuf:"bytes,1,opt,name=jobId,proto3" json:"jobId,omitempty"` FaultRanks []*FaultRank `protobuf:"bytes,2,rep,name=faultRanks,proto3" json:"faultRanks,omitempty"` @@ -829,55 +853,58 @@ func init() { } var fileDescriptor_e825e73050144430 = []byte{ - // 799 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0x5d, 0x6f, 0xe3, 0x44, - 0x14, 0x5d, 0xe7, 0x3b, 0x37, 0x6c, 0xd9, 0x9d, 0xa6, 0xbb, 0x56, 0xb4, 0x2a, 0x61, 0x24, 0x90, - 0xc5, 0xc3, 0x50, 0x15, 0xf1, 0x59, 0x5e, 0xa0, 0x69, 0x45, 0x25, 0x08, 0x68, 0x52, 0x09, 0x89, - 0x27, 0x1c, 0x77, 0x92, 0x0c, 0x71, 0x3d, 0xc6, 0x33, 0x0e, 0x84, 0xbf, 0xc2, 0x0f, 0xe4, 0x57, - 0x20, 0xa1, 0x19, 0x7b, 0xec, 0x49, 0x48, 0xfa, 0x02, 0x6f, 0x77, 0x3e, 0xce, 0x3d, 0xe7, 0xde, - 0x39, 0xbe, 0x32, 0x3c, 0xcf, 0x58, 0x24, 0x36, 0x2c, 0x23, 0x69, 0x26, 0x94, 0xc0, 0x17, 0xd0, - 0x99, 0xa9, 0x50, 0xe5, 0x12, 0x21, 0x68, 0x45, 0xe2, 0x81, 0xf9, 0xde, 0xd8, 0x0b, 0xda, 0xd4, - 0xc4, 0x7a, 0x8f, 0x27, 0x0b, 0xe1, 0x37, 0xc6, 0x5e, 0xd0, 0xa7, 0x26, 0xc6, 0x9f, 0x00, 0x5c, - 0xc7, 0x9c, 0x25, 0xea, 0x2e, 0x59, 0x08, 0x34, 0x84, 0xf6, 0x2f, 0x62, 0x7e, 0xf7, 0x60, 0x60, - 0x7d, 0x5a, 0x2c, 0x34, 0x2e, 0x13, 0x31, 0xb3, 0x38, 0x1d, 0xe3, 0xaf, 0xa0, 0x7f, 0x1b, 0xe6, - 0xb1, 0xa2, 0x61, 0xb2, 0x46, 0xaf, 0xa0, 0x93, 0x85, 0xc9, 0xba, 0xc2, 0x95, 0x2b, 0xf4, 0x06, - 0xfa, 0x0b, 0x7d, 0xe9, 0x7e, 0x9b, 0x5a, 0x74, 0xbd, 0x81, 0xff, 0xf2, 0xe0, 0xf4, 0x87, 0x4c, - 0x44, 0x4c, 0xca, 0xef, 0xc2, 0x24, 0x5c, 0xb2, 0x19, 0x5f, 0x26, 0x61, 0xac, 0xe9, 0xf2, 0x9c, - 0xdb, 0x5c, 0x26, 0xae, 0x85, 0x35, 0x5c, 0x61, 0xe7, 0x00, 0xd2, 0x60, 0x0c, 0x41, 0xd3, 0x1c, - 0x39, 0x3b, 0xc8, 0x87, 0x6e, 0x18, 0x29, 0x2e, 0x12, 0xe9, 0xb7, 0xc6, 0xcd, 0xa0, 0x4f, 0xed, - 0x12, 0x7d, 0x00, 0xb0, 0xb0, 0xf2, 0xa5, 0xdf, 0x1e, 0x37, 0x83, 0xc1, 0x25, 0x90, 0xaa, 0x22, - 0xea, 0x9c, 0xa2, 0xf7, 0xe1, 0x24, 0x5a, 0x85, 0xc9, 0x92, 0xcd, 0x54, 0x16, 0x2a, 0xb6, 0xdc, - 0xfa, 0x1d, 0xc3, 0xb4, 0xb7, 0xab, 0xd9, 0x14, 0x7f, 0x64, 0x22, 0x57, 0x7e, 0x77, 0xec, 0x05, - 0x4d, 0x6a, 0x97, 0xf8, 0x77, 0x38, 0x9d, 0x29, 0x91, 0x5e, 0x8b, 0xc7, 0x34, 0x66, 0x8a, 0x51, - 0xf6, 0x6b, 0xce, 0xa4, 0x3a, 0xd2, 0xed, 0x77, 0xa0, 0x23, 0xcd, 0x1b, 0x9a, 0x5a, 0x07, 0x97, - 0x5d, 0x52, 0x3c, 0x29, 0x2d, 0xb7, 0xf7, 0xb4, 0x37, 0x9f, 0xd2, 0x8e, 0xff, 0x80, 0x57, 0xb4, - 0x70, 0x88, 0x95, 0xf9, 0x34, 0xf9, 0x6e, 0xee, 0xc6, 0x93, 0x7d, 0xd1, 0xdd, 0x2f, 0x92, 0x72, - 0x56, 0xe8, 0xd0, 0xdd, 0xaf, 0x76, 0x30, 0x87, 0x61, 0xc5, 0x6d, 0x0a, 0xf8, 0x6f, 0x65, 0x8f, - 0xa0, 0x27, 0xed, 0x03, 0x14, 0x4f, 0x5d, 0xad, 0xf1, 0x8f, 0x95, 0x93, 0x0a, 0xa9, 0xff, 0x57, - 0x8d, 0xf8, 0x0a, 0x5e, 0xce, 0x7e, 0xe3, 0x2a, 0x5a, 0x4d, 0x79, 0x44, 0x99, 0x4c, 0x45, 0x22, - 0x99, 0x4d, 0x3b, 0x71, 0xd3, 0x4e, 0xd0, 0x0b, 0x68, 0x3e, 0xca, 0x65, 0x69, 0x50, 0x1d, 0xe2, - 0x00, 0x5e, 0x38, 0xe0, 0x1d, 0x49, 0xbb, 0x58, 0xfc, 0xa7, 0x07, 0x50, 0x5d, 0x95, 0x47, 0x08, - 0x3e, 0x84, 0x4e, 0xc2, 0xa3, 0xef, 0x53, 0xab, 0xf9, 0x35, 0xa9, 0x21, 0x64, 0x6a, 0x4e, 0x6e, - 0x12, 0x95, 0x6d, 0x69, 0x79, 0x6d, 0x74, 0x0b, 0x03, 0x67, 0x5b, 0x0b, 0x5c, 0xb3, 0x6d, 0x99, - 0x53, 0x87, 0xe8, 0x5d, 0x68, 0x6f, 0xc2, 0x38, 0x67, 0x65, 0xcb, 0x07, 0x64, 0xc2, 0x36, 0x3c, - 0x62, 0xdf, 0x72, 0xa9, 0x68, 0x71, 0xf2, 0x45, 0xe3, 0x33, 0x0f, 0xff, 0x0c, 0x6f, 0x15, 0x4c, - 0xe5, 0x6c, 0x39, 0x2c, 0xcf, 0x0e, 0x81, 0x49, 0xd9, 0x82, 0x72, 0xa5, 0x69, 0x33, 0xe3, 0x0f, - 0x2f, 0xe8, 0x51, 0x1d, 0xda, 0x4e, 0xb5, 0xea, 0x4e, 0x11, 0x80, 0x9a, 0x5a, 0x9f, 0x3f, 0xb0, - 0x8d, 0xef, 0x19, 0x47, 0xe9, 0x10, 0x9d, 0x40, 0x43, 0xa4, 0xa6, 0xec, 0x1e, 0x6d, 0x88, 0x14, - 0x4f, 0xe1, 0xa4, 0x50, 0xa4, 0x5f, 0xc9, 0x60, 0x6a, 0xf6, 0x02, 0x66, 0xd9, 0xf7, 0x90, 0xb5, - 0x25, 0x9a, 0x8e, 0x25, 0xf0, 0x97, 0xb6, 0x42, 0xca, 0x64, 0x1e, 0x1f, 0x33, 0x8e, 0xe6, 0x30, - 0xe7, 0xa6, 0xc2, 0x1e, 0x2d, 0x57, 0x97, 0x7f, 0xb7, 0xa0, 0x5b, 0x3a, 0x1d, 0x9d, 0x43, 0xeb, - 0x2e, 0xe1, 0x0a, 0x0d, 0x48, 0x3d, 0x56, 0x47, 0xd6, 0xcb, 0xf8, 0x19, 0xc2, 0xd0, 0xa3, 0x6c, - 0xc9, 0xa5, 0x62, 0xd9, 0xd1, 0x3b, 0x37, 0xf0, 0x66, 0x96, 0xcf, 0x65, 0x94, 0xf1, 0x39, 0x3b, - 0x34, 0x20, 0x77, 0x70, 0x43, 0x72, 0xe0, 0x0a, 0x7e, 0x76, 0xe1, 0xa1, 0x8f, 0x01, 0x51, 0x96, - 0x8a, 0x4c, 0xb9, 0xb3, 0x07, 0x0d, 0xc9, 0x81, 0x51, 0xe4, 0xb2, 0x5f, 0xc1, 0x59, 0x01, 0xdb, - 0x1b, 0x1c, 0xe8, 0x35, 0x39, 0x3c, 0x4a, 0x5c, 0xf0, 0xa7, 0x70, 0xba, 0x07, 0x36, 0x8e, 0x39, - 0x23, 0x87, 0x26, 0x81, 0x0b, 0xac, 0xc4, 0xba, 0xdf, 0x31, 0xaa, 0x8a, 0x73, 0x3f, 0x6b, 0x17, - 0x16, 0x58, 0x23, 0x4c, 0x79, 0x74, 0x9f, 0x85, 0xd1, 0x1a, 0x0d, 0x9c, 0xaf, 0x62, 0xb7, 0xa9, - 0x7e, 0xd5, 0xd4, 0xea, 0x46, 0xd9, 0xd0, 0x97, 0x64, 0xff, 0x3b, 0x1d, 0x21, 0xf2, 0xaf, 0xef, - 0xde, 0x34, 0xf5, 0x73, 0x38, 0xab, 0xd2, 0x4c, 0x85, 0xe2, 0x8b, 0x6d, 0x71, 0x6f, 0xf7, 0x51, - 0xde, 0x26, 0xbb, 0xf6, 0x34, 0xd0, 0x0b, 0x3d, 0x0f, 0xd3, 0x78, 0xeb, 0x26, 0xd6, 0x45, 0x3e, - 0x27, 0xae, 0xf7, 0x5c, 0xcd, 0xef, 0xc1, 0xe0, 0x1b, 0x16, 0xc6, 0x6a, 0x75, 0xbd, 0x62, 0xa6, - 0xb4, 0x83, 0x7e, 0xf9, 0xba, 0xf3, 0x53, 0x8b, 0x5c, 0xa5, 0xf3, 0x79, 0xc7, 0xfc, 0x04, 0x7c, - 0xf4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xef, 0x7f, 0xbc, 0x0d, 0x15, 0x08, 0x00, 0x00, + // 847 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xac, 0x56, 0xdd, 0x6e, 0x23, 0x35, + 0x14, 0xde, 0x49, 0xd2, 0x34, 0x39, 0xc3, 0x96, 0x5d, 0xb7, 0xdd, 0x1d, 0x55, 0xab, 0x25, 0x58, + 0x02, 0x8d, 0xb8, 0x30, 0x55, 0x11, 0xbf, 0xe5, 0x06, 0xda, 0x5d, 0x51, 0x09, 0xc2, 0xca, 0x59, + 0x09, 0x89, 0x2b, 0x9c, 0xa9, 0x93, 0x9a, 0x4c, 0xc6, 0xc3, 0xd8, 0x13, 0x36, 0xbc, 0x05, 0xd7, + 0x3c, 0x02, 0xcf, 0x88, 0x84, 0xec, 0x19, 0xcf, 0x38, 0x21, 0xe9, 0x0d, 0x7b, 0x67, 0x9f, 0x39, + 0xdf, 0xf9, 0xce, 0xcf, 0xe7, 0xa3, 0x81, 0x87, 0x05, 0x4f, 0xe4, 0x8a, 0x17, 0x24, 0x2f, 0xa4, + 0x96, 0xf8, 0x1c, 0xfa, 0x13, 0xcd, 0x74, 0xa9, 0x10, 0x82, 0x5e, 0x22, 0x6f, 0x79, 0x14, 0x8c, + 0x82, 0xf8, 0x80, 0xda, 0xb3, 0xb1, 0x89, 0x6c, 0x26, 0xa3, 0xce, 0x28, 0x88, 0x87, 0xd4, 0x9e, + 0xf1, 0x67, 0x00, 0x57, 0xa9, 0xe0, 0x99, 0xbe, 0xc9, 0x66, 0x12, 0x9d, 0xc0, 0xc1, 0xaf, 0x72, + 0x7a, 0x73, 0x6b, 0x61, 0x43, 0x5a, 0x5d, 0x0c, 0xae, 0x90, 0x29, 0x77, 0x38, 0x73, 0xc6, 0xdf, + 0xc0, 0xf0, 0x25, 0x2b, 0x53, 0x4d, 0x59, 0xb6, 0x40, 0x4f, 0xa0, 0x5f, 0xb0, 0x6c, 0xd1, 0xe0, + 0xea, 0x1b, 0x7a, 0x06, 0xc3, 0x99, 0x71, 0x7a, 0xbd, 0xce, 0x1d, 0xba, 0x35, 0xe0, 0xbf, 0x3b, + 0x70, 0xfc, 0xaa, 0x90, 0x09, 0x57, 0xea, 0x07, 0x96, 0xb1, 0x39, 0x9f, 0x88, 0x79, 0xc6, 0x52, + 0x43, 0x57, 0x96, 0xc2, 0xc5, 0xb2, 0xe7, 0x36, 0xb1, 0x8e, 0x9f, 0xd8, 0x73, 0x00, 0x65, 0x31, + 0x96, 0xa0, 0x6b, 0x3f, 0x79, 0x16, 0x14, 0xc1, 0x21, 0x4b, 0xb4, 0x90, 0x99, 0x8a, 0x7a, 0xa3, + 0x6e, 0x3c, 0xa4, 0xee, 0x8a, 0x3e, 0x02, 0x98, 0xb9, 0xf4, 0x55, 0x74, 0x30, 0xea, 0xc6, 0xe1, + 0x05, 0x90, 0xa6, 0x22, 0xea, 0x7d, 0x45, 0x1f, 0xc2, 0x51, 0x72, 0xc7, 0xb2, 0x39, 0x9f, 0xe8, + 0x82, 0x69, 0x3e, 0x5f, 0x47, 0x7d, 0xcb, 0xb4, 0x65, 0x35, 0x6c, 0x5a, 0x2c, 0xb9, 0x2c, 0x75, + 0x74, 0x38, 0x0a, 0xe2, 0x2e, 0x75, 0x57, 0x34, 0x82, 0x30, 0x93, 0xb7, 0x9c, 0xda, 0xae, 0xa8, + 0x68, 0x60, 0x73, 0xf1, 0x4d, 0xc6, 0x83, 0xbf, 0xd1, 0x05, 0x7b, 0xc5, 0x0a, 0xb6, 0x54, 0xd1, + 0xd0, 0x12, 0xf8, 0x26, 0xfc, 0x06, 0x8e, 0x27, 0x5a, 0xe6, 0x57, 0x72, 0x99, 0xa7, 0x5c, 0x73, + 0xca, 0x7f, 0x2b, 0xb9, 0xd2, 0x7b, 0x26, 0xf6, 0x1e, 0xf4, 0x95, 0xd5, 0x81, 0xed, 0x57, 0x78, + 0x71, 0x48, 0x2a, 0x59, 0xd0, 0xda, 0xbc, 0x55, 0x7f, 0xf7, 0xbe, 0xfa, 0xf1, 0x1f, 0xf0, 0x84, + 0x56, 0x2a, 0x73, 0xa5, 0xde, 0x4f, 0xbe, 0x19, 0xbb, 0x73, 0x6f, 0x6f, 0xcd, 0x04, 0xab, 0xa0, + 0x82, 0x57, 0x79, 0x98, 0x09, 0x36, 0x16, 0xfc, 0x67, 0x00, 0x27, 0x0d, 0xb9, 0xad, 0xe0, 0xff, + 0xd5, 0x7d, 0x06, 0x03, 0xe5, 0xa6, 0x58, 0xe9, 0xa5, 0xb9, 0x9b, 0x39, 0x0b, 0x25, 0x53, 0xa6, + 0x9b, 0x41, 0x55, 0xa2, 0xd9, 0xb2, 0xe2, 0x9f, 0x1a, 0xd9, 0x56, 0x35, 0xbd, 0xad, 0x66, 0xe0, + 0x4b, 0x78, 0x3c, 0xf9, 0x5d, 0xe8, 0xe4, 0x6e, 0x2c, 0x12, 0xca, 0x55, 0x2e, 0x33, 0xc5, 0x5d, + 0xd8, 0x6b, 0x3f, 0xec, 0x35, 0x7a, 0x04, 0xdd, 0xa5, 0x9a, 0xd7, 0xaf, 0xc1, 0x1c, 0x71, 0x0c, + 0x8f, 0x3c, 0xf0, 0x46, 0x4a, 0x9b, 0x58, 0xfc, 0x57, 0x00, 0xd0, 0xb8, 0xaa, 0x3d, 0x04, 0x1f, + 0x43, 0x3f, 0x13, 0xc9, 0x8f, 0xb9, 0xcb, 0xf9, 0x29, 0x69, 0x21, 0x64, 0x6c, 0xbf, 0xbc, 0xc8, + 0x74, 0xb1, 0xa6, 0xb5, 0xdb, 0xd9, 0x4b, 0x08, 0x3d, 0xb3, 0x49, 0x70, 0xc1, 0xd7, 0x75, 0x4c, + 0x73, 0x44, 0xef, 0xc3, 0xc1, 0x8a, 0xa5, 0x25, 0xaf, 0x47, 0x13, 0x92, 0x6b, 0xbe, 0x12, 0x09, + 0xff, 0x5e, 0x28, 0x4d, 0xab, 0x2f, 0x5f, 0x75, 0xbe, 0x08, 0xf0, 0x2f, 0xf0, 0x4e, 0xc5, 0x54, + 0x2f, 0xb2, 0xdd, 0xe9, 0xb9, 0x8d, 0x73, 0x5d, 0xb7, 0xa0, 0xbe, 0x19, 0xda, 0xc2, 0x0a, 0x29, + 0x88, 0x07, 0xd4, 0x1c, 0x5d, 0xa7, 0x7a, 0x6d, 0xa7, 0x08, 0x40, 0x4b, 0x6d, 0xbe, 0xdf, 0xf2, + 0x55, 0x14, 0xd8, 0x51, 0x9b, 0x23, 0x3a, 0x82, 0x8e, 0xcc, 0x6d, 0xd9, 0x03, 0xda, 0x91, 0x39, + 0x1e, 0xc3, 0x51, 0x95, 0x91, 0x99, 0x92, 0xc5, 0xb4, 0xec, 0x15, 0xcc, 0xb1, 0x6f, 0x21, 0x5b, + 0x49, 0x74, 0x3d, 0x49, 0xe0, 0xaf, 0x5d, 0x85, 0x94, 0xab, 0x32, 0xdd, 0x27, 0x1c, 0xc3, 0x61, + 0xbf, 0xdb, 0x0a, 0x07, 0xb4, 0xbe, 0x5d, 0xfc, 0xd3, 0x83, 0xc3, 0xfa, 0x45, 0xa0, 0xe7, 0xd0, + 0xbb, 0xc9, 0x84, 0x46, 0x21, 0x69, 0x77, 0xf8, 0x99, 0xd3, 0x3c, 0x7e, 0x80, 0x30, 0x0c, 0x28, + 0x9f, 0x0b, 0xa5, 0x79, 0xb1, 0xd7, 0xe7, 0x05, 0x3c, 0x9b, 0x94, 0x53, 0x95, 0x14, 0x62, 0xca, + 0x77, 0x6d, 0xe3, 0x0d, 0xdc, 0x09, 0xd9, 0xe1, 0x82, 0x1f, 0x9c, 0x07, 0xe8, 0x53, 0x40, 0x94, + 0xe7, 0xb2, 0xd0, 0xfe, 0x92, 0x42, 0x27, 0x64, 0xc7, 0xce, 0xf2, 0xd9, 0x2f, 0xe1, 0xb4, 0x82, + 0x6d, 0x6d, 0x18, 0xf4, 0x94, 0xec, 0xde, 0x39, 0x3e, 0xf8, 0x73, 0x38, 0xde, 0x02, 0x5b, 0xc5, + 0x9c, 0x92, 0x5d, 0x1b, 0xc3, 0x07, 0x36, 0xc9, 0xfa, 0xef, 0x18, 0x35, 0xc5, 0xf9, 0xcf, 0xda, + 0x87, 0xc5, 0x4e, 0x08, 0x63, 0x91, 0xbc, 0x2e, 0x58, 0xb2, 0x40, 0xa1, 0xf7, 0x2a, 0x36, 0x9b, + 0x1a, 0x35, 0x4d, 0x6d, 0x3c, 0xea, 0x86, 0x3e, 0x26, 0xdb, 0xef, 0xf4, 0x0c, 0x91, 0xff, 0xbc, + 0x7b, 0xdb, 0xd4, 0x2f, 0xe1, 0xb4, 0x09, 0x33, 0x96, 0x5a, 0xcc, 0xd6, 0x95, 0xdf, 0xe6, 0x50, + 0xde, 0x25, 0x9b, 0xf2, 0xb4, 0xd0, 0x73, 0xb3, 0x37, 0xf3, 0x74, 0xed, 0x07, 0x36, 0x45, 0x3e, + 0x24, 0xbe, 0xf6, 0xfc, 0x9c, 0x3f, 0x80, 0xf0, 0x3b, 0xce, 0x52, 0x7d, 0x77, 0x75, 0xc7, 0x6d, + 0x69, 0x3b, 0xf5, 0xf2, 0x6d, 0xff, 0xe7, 0x1e, 0xb9, 0xcc, 0xa7, 0xd3, 0xbe, 0xfd, 0xe3, 0xf8, + 0xe4, 0xdf, 0x00, 0x00, 0x00, 0xff, 0xff, 0x12, 0x42, 0xc3, 0xa1, 0x82, 0x08, 0x00, 0x00, } diff --git a/component/clusterd/pkg/interface/grpc/recover/recover.proto b/component/clusterd/pkg/interface/grpc/recover/recover.proto index ba00cfd4f..f3ec9d7d4 100644 --- a/component/clusterd/pkg/interface/grpc/recover/recover.proto +++ b/component/clusterd/pkg/interface/grpc/recover/recover.proto @@ -26,6 +26,8 @@ message ProcessManageSignal { repeated FaultRank faultRanks = 5; string changeStrategy = 6; int64 timeout = 7; + repeated string nodeRankIds = 8; + string extraParams = 9; } message StopCompleteRequest { @@ -44,6 +46,7 @@ message RecoverStatusRequest{ string jobId = 1; Status status = 2; string strategy = 3; + repeated string isolateRankIds = 4; } message ProcessFaultRequest{ -- Gitee From aca632b3079c006452a463ee4525f620f1127aef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 18:04:00 +0800 Subject: [PATCH 5/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91clean=20code?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/application/recover/controller.go | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/component/clusterd/pkg/application/recover/controller.go b/component/clusterd/pkg/application/recover/controller.go index 6876ebdfc..33ed0310a 100644 --- a/component/clusterd/pkg/application/recover/controller.go +++ b/component/clusterd/pkg/application/recover/controller.go @@ -830,11 +830,10 @@ func (ctl *EventController) notifyFaultForRetryFaultCase(retryFaults, normalFaults []*pb.FaultRank) (string, common.RespCode, error) { hwlog.RunLog.Infof("jobId=%s enter notifyFaultForRetryFaultCase function", ctl.jobInfo.JobId) signal := &pb.ProcessManageSignal{ - Uuid: ctl.uuid, - JobId: ctl.jobInfo.JobId, - SignalType: constant.GlobalFaultSignalType, - Actions: globalFaultActions, - ChangeStrategy: "", + Uuid: ctl.uuid, + JobId: ctl.jobInfo.JobId, + SignalType: constant.GlobalFaultSignalType, + Actions: globalFaultActions, } if ctl.jobInfo.PlatFormMode { allFaults, err := ctl.writeConfirmFaultAndWaitPlatResultFault(retryFaults) @@ -849,7 +848,6 @@ func (ctl *EventController) notifyFaultForRetryFaultCase(retryFaults, allFaults, allFaultRanks := ctl.normalFaultAssociateSameNodeRank() normalFaults = allFaults ctl.setCacheFault(retryFaults, normalFaults) - faultPod, err := common.GetPodMap(ctl.jobInfo.JobId, allFaultRanks) if err != nil { hwlog.RunLog.Errorf("jobId=%s, get pod map err:%v", ctl.jobInfo.JobId, err) @@ -863,16 +861,6 @@ func (ctl *EventController) notifyFaultForRetryFaultCase(retryFaults, return common.NotifyFailEvent, common.OperateConfigMapError, nil } signal.FaultRanks = normalFaults - signal.NodeRankIds, err = common.GetNodeRankIdsByRankIds(ctl.jobInfo.JobId, allFaultRanks) - if err != nil { - hwlog.RunLog.Errorf("jobId=%s, GetNodeRankIdsByRankIds err:%v", ctl.jobInfo.JobId, err) - return common.NotifyFailEvent, common.OperateConfigMapError, nil - } - if !ctl.restartFaultProcess { - signal.Actions = append(signal.Actions, constant.FaultNodesExitAction) - } else { - signal.Actions = append(signal.Actions, constant.FaultNodesRestartAction) - } hwlog.RunLog.Infof("write configmap faultList success, %s", cm.Data[constant.ResetInfoCMDataKey]) } else { hwlog.RunLog.Infof("jobId=%s, uce error case", ctl.jobInfo.JobId) @@ -1062,6 +1050,15 @@ func (ctl *EventController) handleNotifyDecidedStrategy() (string, common.RespCo } var err error signal.ChangeStrategy, err = ctl.chooseStrategy() + if err != nil { + hwlog.RunLog.Errorf("jobId=%s, get pod map err:%v", ctl.jobInfo.JobId, err) + return "", common.ServerInnerError, err + } + return ctl.handleDecidedStrategyAfterChoose(signal) +} + +func (ctl *EventController) handleDecidedStrategyAfterChoose(signal *pb.ProcessManageSignal) (string, + common.RespCode, error) { if signal.ChangeStrategy == constant.ProcessDumpStrategyName && ctl.isSwitchingNic() && ctl.jobInfo.Framework == constant.MsFramework { // In order to correctly switch from the state machine of mindIO to the state machine for failure recovery, @@ -1069,10 +1066,6 @@ func (ctl *EventController) handleNotifyDecidedStrategy() (string, common.RespCo // it goes through the failure recovery state machine again. signal.ChangeStrategy = constant.ProcessExitStrategyName } - if err != nil { - hwlog.RunLog.Errorf("jobId=%s, get pod map err:%v", ctl.jobInfo.JobId, err) - return "", common.ServerInnerError, err - } if ctl.jobInfo.PlatFormMode && signal.ChangeStrategy == constant.ProcessRecoverStrategyName { hwlog.RunLog.Infof("start wait plat rankTable ready, jobId=%s, pgName=%s", ctl.jobInfo.JobId, ctl.jobInfo.PgName) @@ -1101,6 +1094,7 @@ func (ctl *EventController) handleNotifyDecidedStrategy() (string, common.RespCo if signal.ChangeStrategy == constant.ProcessRetryStrategyName { signal.FaultRanks = ctl.cacheRetryFault } + var err error signal.NodeRankIds, err = common.GetNodeRankIdsByFaultRanks(ctl.jobInfo.JobId, signal.FaultRanks) if err != nil { hwlog.RunLog.Errorf("jobId=%s, GetNodeRankIdsByFaultRanks err:%v", ctl.jobInfo.JobId, err) -- Gitee From 440366f492c3dece9bd4d70448416452cd9f4ae2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 18:21:51 +0800 Subject: [PATCH 6/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91cfix=20dt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clusterd/pkg/application/recover/controller.go | 2 +- .../pkg/application/recover/controller_test.go | 14 +++++++++++--- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/component/clusterd/pkg/application/recover/controller.go b/component/clusterd/pkg/application/recover/controller.go index 33ed0310a..6a1206736 100644 --- a/component/clusterd/pkg/application/recover/controller.go +++ b/component/clusterd/pkg/application/recover/controller.go @@ -1051,7 +1051,7 @@ func (ctl *EventController) handleNotifyDecidedStrategy() (string, common.RespCo var err error signal.ChangeStrategy, err = ctl.chooseStrategy() if err != nil { - hwlog.RunLog.Errorf("jobId=%s, get pod map err:%v", ctl.jobInfo.JobId, err) + hwlog.RunLog.Errorf("jobId=%s, chooseStrategy err:%v", ctl.jobInfo.JobId, err) return "", common.ServerInnerError, err } return ctl.handleDecidedStrategyAfterChoose(signal) diff --git a/component/clusterd/pkg/application/recover/controller_test.go b/component/clusterd/pkg/application/recover/controller_test.go index 7d1085daf..f13d8e284 100644 --- a/component/clusterd/pkg/application/recover/controller_test.go +++ b/component/clusterd/pkg/application/recover/controller_test.go @@ -310,7 +310,8 @@ func TestShouldDumpWhenOccurFault(t *testing.T) { func TestUpdateCacheFaultAndPod(t *testing.T) { ctl := EventController{ - faultPod: map[string]string{}, + faultPod: map[string]string{}, + faultPodVersion: make(map[string]string), } convey.Convey("updateCacheFaultAndPod", t, func() { faultRank := []*pb.FaultRank{{RankId: "1", FaultType: constant.NormalFaultType}} @@ -941,6 +942,8 @@ func TestHandleNotifyDecidedStrategy(t *testing.T) { convey.Convey("Testing handleNotifyDecidedStrategy", t, func() { jobInfo := newJobInfoWithStrategy(nil) serviceCtx := context.Background() + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + defer mockGetNodeRankIdsByFaultRanks.Reset() ctl := NewEventController(jobInfo, keepAliveSeconds, serviceCtx) _, code, err := ctl.handleNotifyDecidedStrategy() convey.So(code, convey.ShouldEqual, common.OK) @@ -1570,8 +1573,9 @@ func TestNotifyFaultForUceFaultCase(t *testing.T) { Namespace: "test-namespace", RecoverConfig: common.RecoverConfig{PlatFormMode: true}, }, - faultPod: make(map[string]string), - uuid: "test-uuid", + faultPod: make(map[string]string), + faultPodVersion: make(map[string]string), + uuid: "test-uuid", } patches := gomonkey.ApplyFunc(hwlog.RunLog.Infof, func(format string, args ...interface{}) {}) @@ -1879,6 +1883,8 @@ func TestNotifyFaultForNormalFaultCase(t *testing.T) { func() (context.Context, chan *pb.ProcessManageSignal) { return context.Background(), make(chan *pb.ProcessManageSignal, 1) }) + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + defer mockGetNodeRankIdsByFaultRanks.Reset() testPlatformModeWriteConfirmNormalFaultError(ctl) testPlatformModeSuccess(ctl) @@ -2941,6 +2947,8 @@ func TestHandleRestartFaultProcess(t *testing.T) { serviceCtx := context.Background() ctl := NewEventController(jobInfo, keepAliveSeconds, serviceCtx) + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + defer mockGetNodeRankIdsByFaultRanks.Reset() patches := gomonkey.ApplyFunc(ctl.signalEnqueue, func(signal *pb.ProcessManageSignal) (string, common.RespCode, error) { return "", common.OK, nil -- Gitee From a7fbbe966aba2b909174056fe85857d309ee3182 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 18:37:27 +0800 Subject: [PATCH 7/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91fix=20dt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../application/recover/controller_test.go | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/component/clusterd/pkg/application/recover/controller_test.go b/component/clusterd/pkg/application/recover/controller_test.go index f13d8e284..2bc0f0c2c 100644 --- a/component/clusterd/pkg/application/recover/controller_test.go +++ b/component/clusterd/pkg/application/recover/controller_test.go @@ -1883,8 +1883,6 @@ func TestNotifyFaultForNormalFaultCase(t *testing.T) { func() (context.Context, chan *pb.ProcessManageSignal) { return context.Background(), make(chan *pb.ProcessManageSignal, 1) }) - mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) - defer mockGetNodeRankIdsByFaultRanks.Reset() testPlatformModeWriteConfirmNormalFaultError(ctl) testPlatformModeSuccess(ctl) @@ -1911,6 +1909,8 @@ func testPlatformModeWriteConfirmNormalFaultError(ctl *EventController) { func testPlatformModeSuccess(ctl *EventController) { convey.Convey("When platform mode and all operations succeed", func() { + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + defer mockGetNodeRankIdsByFaultRanks.Reset() ctl.jobInfo.PlatFormMode = true patches := gomonkey.ApplyPrivateMethod(ctl, "writeConfirmFaultAndWaitPlatResultFault", func(faults []*pb.FaultRank) ([]*pb.FaultRank, error) { @@ -1939,6 +1939,8 @@ func testPlatformModeSuccess(ctl *EventController) { func testNonPlatformModeSuccess(ctl *EventController) { convey.Convey("When non-platform mode and all operations succeed", func() { + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + defer mockGetNodeRankIdsByFaultRanks.Reset() ctl.jobInfo.PlatFormMode = false patches := gomonkey.ApplyPrivateMethod(ctl, "updateCacheFaultAndPod", func() ([]*pb.FaultRank, []string, error) { @@ -2960,6 +2962,8 @@ func TestHandleRestartFaultProcess(t *testing.T) { defer patches.Reset() convey.Convey("choose recover-in-place strategy and write json success, "+ "should notify recover strategy", func() { + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + defer mockGetNodeRankIdsByFaultRanks.Reset() signal := &pb.ProcessManageSignal{ChangeStrategy: constant.ProcessRecoverInPlaceStrategyName} _, code, _ := ctl.handleRestartFaultProcess(signal) convey.So(code, convey.ShouldEqual, common.OK) @@ -3017,3 +3021,25 @@ func TestCatchException(t *testing.T) { } }) } + +func TestSupportTargetStrategy(t *testing.T) { + convey.Convey("MindXConfigStrategies and agentReportStrategies contain recover strategy", t, func() { + jobInfo := newJobInfoWithStrategy([]string{ + constant.ProcessRecoverStrategyName, + constant.ProcessDumpStrategyName}) + ctl := &EventController{ + jobInfo: jobInfo, + agentReportStrategies: []string{constant.ProcessRecoverStrategyName}, + } + hasRecover := ctl.supportTargetStrategy(constant.ProcessRecoverStrategyName) + convey.So(hasRecover, convey.ShouldBeTrue) + + ctl.platStrategy = constant.ProcessRecoverStrategyName + hasRecover = ctl.supportTargetStrategy(constant.ProcessRetryStrategyName) + convey.So(hasRecover, convey.ShouldBeFalse) + + ctl.platStrategy = constant.ProcessRecoverStrategyName + hasRecover = ctl.supportTargetStrategy(constant.ProcessDumpStrategyName) + convey.So(hasRecover, convey.ShouldBeFalse) + }) +} -- Gitee From dadbcc38ef0088b808fe63f69b279011aec5391e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 18:45:39 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91fix=20dt?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../clusterd/pkg/application/recover/controller_test.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/component/clusterd/pkg/application/recover/controller_test.go b/component/clusterd/pkg/application/recover/controller_test.go index 2bc0f0c2c..666fd35f1 100644 --- a/component/clusterd/pkg/application/recover/controller_test.go +++ b/component/clusterd/pkg/application/recover/controller_test.go @@ -1909,7 +1909,7 @@ func testPlatformModeWriteConfirmNormalFaultError(ctl *EventController) { func testPlatformModeSuccess(ctl *EventController) { convey.Convey("When platform mode and all operations succeed", func() { - mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByRankIds, []string{}, nil) defer mockGetNodeRankIdsByFaultRanks.Reset() ctl.jobInfo.PlatFormMode = true patches := gomonkey.ApplyPrivateMethod(ctl, "writeConfirmFaultAndWaitPlatResultFault", @@ -1939,7 +1939,7 @@ func testPlatformModeSuccess(ctl *EventController) { func testNonPlatformModeSuccess(ctl *EventController) { convey.Convey("When non-platform mode and all operations succeed", func() { - mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByRankIds, []string{}, nil) defer mockGetNodeRankIdsByFaultRanks.Reset() ctl.jobInfo.PlatFormMode = false patches := gomonkey.ApplyPrivateMethod(ctl, "updateCacheFaultAndPod", @@ -2949,7 +2949,7 @@ func TestHandleRestartFaultProcess(t *testing.T) { serviceCtx := context.Background() ctl := NewEventController(jobInfo, keepAliveSeconds, serviceCtx) - mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) + mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByRankIds, []string{}, nil) defer mockGetNodeRankIdsByFaultRanks.Reset() patches := gomonkey.ApplyFunc(ctl.signalEnqueue, func(signal *pb.ProcessManageSignal) (string, common.RespCode, error) { @@ -2962,8 +2962,6 @@ func TestHandleRestartFaultProcess(t *testing.T) { defer patches.Reset() convey.Convey("choose recover-in-place strategy and write json success, "+ "should notify recover strategy", func() { - mockGetNodeRankIdsByFaultRanks := gomonkey.ApplyFuncReturn(common.GetNodeRankIdsByFaultRanks, []string{}, nil) - defer mockGetNodeRankIdsByFaultRanks.Reset() signal := &pb.ProcessManageSignal{ChangeStrategy: constant.ProcessRecoverInPlaceStrategyName} _, code, _ := ctl.handleRestartFaultProcess(signal) convey.So(code, convey.ShouldEqual, common.OK) -- Gitee From 34916de0b6f5a610302247ba36c2de1455a30b27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E9=B8=A3=E6=B2=BC?= Date: Fri, 22 Aug 2025 18:53:43 +0800 Subject: [PATCH 9/9] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91add=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../plugins/stop_train_plugin/stop_train_plugin.go | 9 ++++++++- .../framework_backend/manager/plugins/utils/get_msgs.go | 1 + 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go b/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go index c9d7e1c50..0bc4b8293 100644 --- a/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/stop_train_plugin/stop_train_plugin.go @@ -12,7 +12,7 @@ limitations under the License. */ -// Package stop_train_plugin for +// Package stop_train_plugin for stop train plugin package stop_train_plugin import ( @@ -29,6 +29,7 @@ import ( plugin_utils "taskd/framework_backend/manager/plugins/utils" ) +// StopTrainingPlugin stop train plugin define type StopTrainingPlugin struct { hasToken bool shot storage.SnapShot @@ -46,16 +47,19 @@ type signalInfo struct { ExtraParams string } +// New creates an object func New() infrastructure.ManagerPlugin { return &StopTrainingPlugin{ HasSendMessages: make(map[string]string), } } +// Name returns plugin name func (s *StopTrainingPlugin) Name() string { return constant.StopTrainPluginName } +// Predicate check whether apply token func (s *StopTrainingPlugin) Predicate(shot storage.SnapShot) (infrastructure.PredicateResult, error) { s.shot = shot s.signalInfo = nil @@ -82,10 +86,12 @@ func (s *StopTrainingPlugin) Predicate(shot storage.SnapShot) (infrastructure.Pr return infrastructure.PredicateResult{CandidateStatus: constant.UnselectStatus}, nil } +// Release releases token func (s *StopTrainingPlugin) Release() error { return nil } +// Handle handles stream events func (s *StopTrainingPlugin) Handle() (infrastructure.HandleResult, error) { hwlog.RunLog.Infof("plugin[%s] enter handle", s.Name()) s.hasToken = true @@ -105,6 +111,7 @@ func (s *StopTrainingPlugin) Handle() (infrastructure.HandleResult, error) { return infrastructure.HandleResult{Stage: constant.HandleStageProcess}, nil } +// PullMsg returns messages to other module func (s *StopTrainingPlugin) PullMsg() ([]infrastructure.Msg, error) { msgs := make([]infrastructure.Msg, 0) if s.signalInfo == nil { diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go b/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go index 0291b7331..ba7f440f9 100644 --- a/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/utils/get_msgs.go @@ -24,6 +24,7 @@ import ( "taskd/toolkit_backend/net/common" ) +// SignalInfo signal info define type SignalInfo struct { SignalType string Actions []string -- Gitee