From 34bb32e16f35e19e503225f1f8de91dff5342537 Mon Sep 17 00:00:00 2001 From: lijinghan Date: Fri, 15 Aug 2025 10:04:14 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E3=80=90taskd=E3=80=91=20job=E5=86=B2?= =?UTF-8?q?=E8=B0=83=E5=BA=A6=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../taskd/taskd/go/common/constant/const.go | 22 ++- .../manager/infrastructure/storage/type.go | 6 + .../job_rescheduling/job_rescheduling.go | 174 ++++++++++++++++++ .../manager/service/plugin_handler.go | 7 + .../manager/service/stream_handler.go | 7 + 5 files changed, 213 insertions(+), 3 deletions(-) create mode 100644 component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go diff --git a/component/taskd/taskd/go/common/constant/const.go b/component/taskd/taskd/go/common/constant/const.go index 799a81ffe..612038ae2 100644 --- a/component/taskd/taskd/go/common/constant/const.go +++ b/component/taskd/taskd/go/common/constant/const.go @@ -164,9 +164,10 @@ const ( // All cluster info type must be defined here const ( - ClusterRole = "Cluster" - ClusterDRank = "ClusterD" - TaskDRank = "TaskD" + ClusterRole = "Cluster" + ClusterDRank = "ClusterD" + TaskDRank = "TaskD" + ControllerRole = "Controller" ) // All cluster command must be defined here @@ -299,3 +300,18 @@ const ( // LocalProxyEnableOn local proxy enable value LocalProxyEnableOn = "on" ) + +const ( + // JobReschedulingPluginName name of job rescheduling plugin + JobReschedulingPluginName = "JobReschedulingPlugin" + // JobReschedulingStreamName name of job rescheduling stream + JobReschedulingStreamName = "JobReschedulingStream" + // SingalKillMaster singal kill master + SingalKillMaster = "killMaster" + // RestartController restart controller + RestartController = "restart_controller" + // DestroyController destroy controller + DestroyController = "destroy_controller" + // Actions actions + Actions = "actions" +) diff --git a/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/type.go b/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/type.go index 4cb936b17..0aa57bc0f 100644 --- a/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/type.go +++ b/component/taskd/taskd/go/framework_backend/manager/infrastructure/storage/type.go @@ -42,3 +42,9 @@ type MsgBody struct { Message string `json:"message"` Extension map[string]string `json:"extension"` } + +// AgentReportInfo agent report info +type AgentReportInfo struct { + FaultRanks []int `json:"fault_ranks"` + RestartTime int `json:"restart_time"` +} diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go b/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go new file mode 100644 index 000000000..e2a047892 --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go @@ -0,0 +1,174 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package jobrescheduling for taskd manager plugin +package jobrescheduling + +import ( + "ascend-common/common-utils/hwlog" + "taskd/common/constant" + "taskd/common/utils" + "taskd/framework_backend/manager/infrastructure" + "taskd/framework_backend/manager/infrastructure/storage" + "taskd/toolkit_backend/net/common" +) + +// JobReschedulingPlugin job rescheduling plugin +type JobReschedulingPlugin struct { + pullMsgs []infrastructure.Msg + faultOccur bool + processStatus string + agentStatus map[int]bool + killMaster bool +} + +var ( + agent0ExitMsg = infrastructure.Msg{ + Receiver: []string{common.AgentRole + "0"}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.ExitAgentCode, + }, + } +) + +// NewJobReschedulingPlugin new job rescheduling plugin +func NewJobReschedulingPlugin() infrastructure.ManagerPlugin { + return &JobReschedulingPlugin{ + pullMsgs: make([]infrastructure.Msg, 0), + faultOccur: false, + processStatus: "", + agentStatus: make(map[int]bool), + killMaster: false, + } +} + +// Name name of job rescheduling plugin +func (job *JobReschedulingPlugin) Name() string { + return constant.JobReschedulingPluginName +} + +// Handle handle job rescheduling plugin +func (job *JobReschedulingPlugin) Handle() (infrastructure.HandleResult, error) { + if job.killMaster { + job.processStatus = constant.HandleStageProcess + job.pullMsgs = append(job.pullMsgs, infrastructure.Msg{ + Receiver: []string{constant.ControllerRole}, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: 0, + Extension: map[string]string{ + constant.Actions: utils.ObjToString([]string{constant.DestroyController}), + }, + }, + }) + job.pullMsgs = append(job.pullMsgs, agent0ExitMsg) + return infrastructure.HandleResult{Stage: constant.HandleStageProcess}, nil + } + if !job.faultOccur { + hwlog.RunLog.Info("JobReschedulingPlugin not fault occur") + return infrastructure.HandleResult{ + Stage: constant.HandleStageFinal, + }, nil + } + if job.agentStatus[0] == true { + hwlog.RunLog.Info("JobReschedulingPlugin agent 0 exit") + job.resetPluginInfo() + return infrastructure.HandleResult{ + Stage: constant.HandleStageFinal, + }, nil + } + hwlog.RunLog.Info("JobReschedulingPlugin handle fault") + job.pullMsgs = append(job.pullMsgs, agent0ExitMsg) + job.processStatus = constant.HandleStageProcess + return infrastructure.HandleResult{Stage: constant.HandleStageProcess}, nil +} + +// Handle handle job rescheduling plugin +func (job *JobReschedulingPlugin) PullMsg() ([]infrastructure.Msg, error) { + msgs := job.pullMsgs + job.pullMsgs = make([]infrastructure.Msg, 0) + return msgs, nil +} + +// Predicate predicate job rescheduling plugin +func (job *JobReschedulingPlugin) Predicate(shot storage.SnapShot) (infrastructure.PredicateResult, error) { + if job.processStatus != "" { + hwlog.RunLog.Infof("JobReschedulingPlugin Predicate processStatus:%v", job.processStatus) + job.updatePluginInfo(shot) + return infrastructure.PredicateResult{PluginName: job.Name(), + CandidateStatus: constant.CandidateStatus, + PredicateStream: map[string]string{ + constant.JobReschedulingStreamName: "", + }}, nil + } + if shot.ClusterInfos.Clusters[constant.ClusterRole].Command[constant.SingalKillMaster] != "" { + job.killMaster = true + return infrastructure.PredicateResult{PluginName: job.Name(), + CandidateStatus: constant.CandidateStatus, + PredicateStream: map[string]string{ + constant.JobReschedulingStreamName: "", + }}, nil + } + + for _, agent := range shot.AgentInfos.Agents { + if agent.Status[constant.ReportFaultRank] != "" { + job.faultOccur = true + job.processStatus = constant.HandleStageInit + hwlog.RunLog.Infof("JobReschedulingPlugin candidate token, info:%v", agent.Status[constant.ReportFaultRank]) + return infrastructure.PredicateResult{PluginName: job.Name(), + CandidateStatus: constant.CandidateStatus, + PredicateStream: map[string]string{ + constant.JobReschedulingStreamName: "", + }}, nil + } + } + hwlog.RunLog.Info("JobReschedulingPlugin not fault occur") + return infrastructure.PredicateResult{ + PluginName: job.Name(), CandidateStatus: constant.UnselectStatus, PredicateStream: nil}, nil +} + +// Release release job rescheduling plugin +func (job *JobReschedulingPlugin) Release() error { + return nil +} + +func (job *JobReschedulingPlugin) resetPluginInfo() { + job.pullMsgs = make([]infrastructure.Msg, 0) + job.faultOccur = false + job.processStatus = "" + job.agentStatus = make(map[int]bool) + job.killMaster = false +} + +func (job *JobReschedulingPlugin) updatePluginInfo(shot storage.SnapShot) { + agenInfo, ok := shot.AgentInfos.Agents[common.AgentRole+"0"] + if !ok { + hwlog.RunLog.Errorf("JobReschedulingPlugin updatePluginInfo agent 0 not exist") + job.resetPluginInfo() + return + } + if agenInfo.Status[constant.Exit] != "" { + job.agentStatus[0] = true + } + if shot.ClusterInfos.Clusters[constant.ClusterRole].Command[constant.SingalKillMaster] != "" { + job.killMaster = true + } + for _, agent := range shot.AgentInfos.Agents { + if agent.Status[constant.ReportFaultRank] != "" { + job.faultOccur = true + break + } + } +} diff --git a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go index 59f48f146..122dcfbf7 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go @@ -22,6 +22,7 @@ import ( "taskd/framework_backend/manager/infrastructure" "taskd/framework_backend/manager/infrastructure/storage" "taskd/framework_backend/manager/plugins/faultdig" + jobrescheduling "taskd/framework_backend/manager/plugins/job_rescheduling" "taskd/framework_backend/manager/plugins/om" ) @@ -52,6 +53,12 @@ func (p *PluginHandler) Init() error { hwlog.RunLog.Errorf("register plugin %s failed!", omPlugin.Name()) return fmt.Errorf("register plugin %s failed", omPlugin.Name()) } + jobReschedulingPlugin := jobrescheduling.NewJobReschedulingPlugin() + if err := p.Register(jobReschedulingPlugin.Name(), jobReschedulingPlugin); err != nil { + hwlog.RunLog.Errorf("register plugin %s failed!", jobReschedulingPlugin.Name()) + return fmt.Errorf("register plugin %s failed", jobReschedulingPlugin.Name()) + } + return nil } diff --git a/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go b/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go index 51d7c4725..c649e1347 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/stream_handler.go @@ -66,6 +66,13 @@ func (s *StreamHandler) Init() error { OmStream.GetName()) return err } + jobReschedulingStream := infrastructure.NewStream(constant.JobReschedulingStreamName, + map[string]int{constant.JobReschedulingPluginName: 1}) + if err := s.SetStream(jobReschedulingStream); err != nil { + hwlog.RunLog.Errorf("init stream handler failed: set stream %s failed", + jobReschedulingStream.GetName()) + return err + } return nil } -- Gitee From 5dadc02f569885b3a68accfabb158e4f346bef05 Mon Sep 17 00:00:00 2001 From: lijinghan Date: Fri, 15 Aug 2025 10:39:36 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E3=80=90taskd=E3=80=91agent=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E9=80=82=E9=85=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../framework/agent/base_agent/agent_network.py | 13 ++++++++++--- .../python/framework/agent/base_agent/base_agent.py | 4 ++-- .../python/framework/agent/ms_agent/ms_agent.py | 10 +++++----- .../python/framework/agent/pt_agent/pt_agent.py | 13 +++++++------ .../taskd/python/toolkit/constants/constants.py | 4 +++- 5 files changed, 27 insertions(+), 17 deletions(-) diff --git a/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py b/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py index f4b9571aa..1fbca3ebb 100644 --- a/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py +++ b/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py @@ -27,6 +27,7 @@ from taskd.python.utils.log import run_log from taskd.python.cython_api import cython_api from taskd.python.framework.common.type import MsgBody, MessageInfo, Position, DEFAULT_BIZTYPE from taskd.python.toolkit.constants.constants import SEND_RETRY_TIMES +from taskd.python.toolkit.constants import constants class AgentMessageManager(): @@ -84,6 +85,7 @@ class AgentMessageManager(): """ run_log.debug(f"agent send message: {message}") msg_json = json.dumps(asdict(message)).encode('utf-8') + self.lib.DestroyNetTool.argtypes = [ctypes.c_void_p] send_times = 0 self.lib.SyncSendMessage.argtypes = [ctypes.c_void_p, ctypes.c_char_p] self.lib.SyncSendMessage.restype = ctypes.c_int @@ -94,6 +96,9 @@ class AgentMessageManager(): result = self.lib.SyncSendMessage(self._network_instance, msg_json) if result == 0: run_log.info(f"agent send message success, msg: {message.uuid}") + if message.body.code == constants.EXITAGENTCODE: + run_log.info(f"agent send exit message, msg: {message.uuid}") + self.lib.DestroyNetTool(self._network_instance) break run_log.warning(f"agent send message failed, result: {result}") send_times += 1 @@ -106,6 +111,8 @@ class AgentMessageManager(): while True: self.lib.ReceiveMessageC.argtypes = [ctypes.c_void_p] self.lib.ReceiveMessageC.restype = ctypes.c_void_p + self.lib.FreeCMemory.argtypes = [ctypes.c_void_p] + self.lib.FreeCMemory.restype = None msg_ptr = self.lib.ReceiveMessageC(self._network_instance) if msg_ptr is None: continue @@ -117,9 +124,9 @@ class AgentMessageManager(): continue self.msg_queue.put(msg) self.lib.FreeCMemory(msg_ptr) - if msg.msg_type == "exit": - self.lib.DestroyNetwork(self._network_instance) - return + #if msg.msg_type == "exit": + # self.lib.DestroyNetwork(self._network_instance) + # return def get_network_instance(self): """ diff --git a/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py b/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py index dc3c20cce..edaccb0c7 100644 --- a/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py +++ b/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py @@ -81,10 +81,10 @@ class BaseAgent: except queue.Empty: run_log.debug('msg_queue is empty') return - self.command_map.get(item.MsgType)(item) + self.command_map.get(item.code)(item) def grace_exit(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to grace exit workers') + run_log.info(f'receive {msg.code} command, start to grace exit workers') try: grace_exit_pids(self.pids) except Exception as e: diff --git a/component/taskd/taskd/python/framework/agent/ms_agent/ms_agent.py b/component/taskd/taskd/python/framework/agent/ms_agent/ms_agent.py index 23b94fc8f..7ed398985 100644 --- a/component/taskd/taskd/python/framework/agent/ms_agent/ms_agent.py +++ b/component/taskd/taskd/python/framework/agent/ms_agent/ms_agent.py @@ -50,7 +50,7 @@ class MsAgent(BaseAgent): self.command_map = { 'START': self.initialize_workers, 'STOP': self.stop_workers, - 'EXIT': self.exit_agent, + constants.EXITAGENTCODE: self.exit_agent, 'RESTART': self.restart_workers, 'GRACE_EXIT': self.grace_exit, } @@ -123,20 +123,20 @@ class MsAgent(BaseAgent): def initialize_workers(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to initialize workers') + run_log.info(f'receive {msg.code} command, start to initialize workers') self._func_map.get('START_ALL_WORKER')() def stop_workers(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to stop workers') + run_log.info(f'receive {msg.code} command, start to stop workers') self._func_map.get('KILL_WORKER')([constants.KILL_ALL_WORKERS]) def exit_agent(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to exit agent') + run_log.info(f'receive {msg.code} command, start to exit agent') self._func_map.get('KILL_WORKER')([constants.KILL_ALL_WORKERS]) self.send_message_to_manager('STATUS', REPORT_CODE, AgentReportInfo()) exit(1) def restart_workers(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to restart workers') + run_log.info(f'receive {msg.code} command, start to restart workers') self._func_map.get('KILL_WORKER')([constants.KILL_ALL_WORKERS]) self._func_map.get('START_ALL_WORKER')() diff --git a/component/taskd/taskd/python/framework/agent/pt_agent/pt_agent.py b/component/taskd/taskd/python/framework/agent/pt_agent/pt_agent.py index 9e3b6ea27..9d2305674 100644 --- a/component/taskd/taskd/python/framework/agent/pt_agent/pt_agent.py +++ b/component/taskd/taskd/python/framework/agent/pt_agent/pt_agent.py @@ -21,6 +21,7 @@ from taskd.python.utils.log import run_log from taskd.python.framework.agent.base_agent.agent_network import init_network_client from taskd.python.framework.agent.base_agent.base_agent import BaseAgent, REPORT_CODE from taskd.python.framework.common.type import AgentReportInfo +from taskd.python.toolkit.constants import constants try: from torch.distributed.elastic.agent.server.api import WorkerState, RunResult except ImportError: @@ -43,7 +44,7 @@ class PtAgent(BaseAgent): self.command_map = { 'START': self.initialize_workers, 'STOP': self.stop_workers, - 'EXIT': self.exit_agent, + constants.EXITAGENTCODE: self.exit_agent, 'RESTART': self.restart_workers, 'GRACE_EXIT': self.grace_exit, } @@ -97,24 +98,24 @@ class PtAgent(BaseAgent): return def initialize_workers(self, msg): - run_log.info(f'receive {msg.msg_type} command, restart time is {msg.extension},' + run_log.info(f'receive {msg.code} command, restart time is {msg.extension},' f' start to initialize workers') self.pt_instance._remaining_restarts = int(msg.extension) self._func_map.get('START_ALL_WORKER')(self.worker_group) def stop_workers(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to stop workers') + run_log.info(f'receive {msg.code} command, start to stop workers') self._func_map.get('KILL_WORKER')(self.worker_group) self.worker_group.state = WorkerState.STOPPED def exit_agent(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to exit agent') + run_log.info(f'receive {msg.code} command, start to exit agent') self._func_map.get('KILL_WORKER')(self.worker_group) - self.send_message_to_manager('STATUS', REPORT_CODE, AgentReportInfo()) + self.send_message_to_manager('STATUS', constants.EXITAGENTCODE, AgentReportInfo()) exit(1) def restart_workers(self, msg): - run_log.info(f'receive {msg.msg_type} command, start to restart workers, restart time is {msg.extension}') + run_log.info(f'receive {msg.code} command, start to restart workers, restart time is {msg.extension}') self.pt_instance._remaining_restarts = int(msg.extension) self._func_map.get('KILL_WORKER')(self.worker_group) self.worker_group.state = WorkerState.STOPPED diff --git a/component/taskd/taskd/python/toolkit/constants/constants.py b/component/taskd/taskd/python/toolkit/constants/constants.py index 17440fdc9..bef5e2d5e 100644 --- a/component/taskd/taskd/python/toolkit/constants/constants.py +++ b/component/taskd/taskd/python/toolkit/constants/constants.py @@ -107,4 +107,6 @@ STOP_TRAIN_PAUSE = "pause" SWITCH_NIC_DEFAULT_TIMEOUT = 600 SWITCH_NIC_MAX_TIMEOUT = 120 * 60 -HCCL_CONNECT_TIMEOUT = "HCCL_CONNECT_TIMEOUT" \ No newline at end of file +HCCL_CONNECT_TIMEOUT = "HCCL_CONNECT_TIMEOUT" + +EXITAGENTCODE = 203 \ No newline at end of file -- Gitee From 815d5242fd7d8dc7ce5d51fdd89e92ff8fbbaa8f Mon Sep 17 00:00:00 2001 From: lijinghan Date: Fri, 15 Aug 2025 11:08:30 +0800 Subject: [PATCH 3/4] fix1 --- .../job_rescheduling/job_rescheduling.go | 28 ++++++++++++------- .../agent/base_agent/agent_network.py | 8 +++--- .../framework/agent/base_agent/base_agent.py | 4 +-- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go b/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go index e2a047892..281ad7d84 100644 --- a/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go @@ -82,7 +82,8 @@ func (job *JobReschedulingPlugin) Handle() (infrastructure.HandleResult, error) Stage: constant.HandleStageFinal, }, nil } - if job.agentStatus[0] == true { + + if value, ok := job.agentStatus[0]; ok && value == true { hwlog.RunLog.Info("JobReschedulingPlugin agent 0 exit") job.resetPluginInfo() return infrastructure.HandleResult{ @@ -113,13 +114,16 @@ func (job *JobReschedulingPlugin) Predicate(shot storage.SnapShot) (infrastructu constant.JobReschedulingStreamName: "", }}, nil } - if shot.ClusterInfos.Clusters[constant.ClusterRole].Command[constant.SingalKillMaster] != "" { - job.killMaster = true - return infrastructure.PredicateResult{PluginName: job.Name(), - CandidateStatus: constant.CandidateStatus, - PredicateStream: map[string]string{ - constant.JobReschedulingStreamName: "", - }}, nil + clusterInfo, ok := shot.ClusterInfos.Clusters[constant.ClusterRole] + if ok { + if clusterInfo.Command[constant.SingalKillMaster] != "" { + job.killMaster = true + return infrastructure.PredicateResult{PluginName: job.Name(), + CandidateStatus: constant.CandidateStatus, + PredicateStream: map[string]string{ + constant.JobReschedulingStreamName: "", + }}, nil + } } for _, agent := range shot.AgentInfos.Agents { @@ -162,9 +166,13 @@ func (job *JobReschedulingPlugin) updatePluginInfo(shot storage.SnapShot) { if agenInfo.Status[constant.Exit] != "" { job.agentStatus[0] = true } - if shot.ClusterInfos.Clusters[constant.ClusterRole].Command[constant.SingalKillMaster] != "" { - job.killMaster = true + clusterInfo, ok := shot.ClusterInfos.Clusters[constant.ClusterRole] + if ok { + if clusterInfo.Command[constant.SingalKillMaster] != "" { + job.killMaster = true + } } + for _, agent := range shot.AgentInfos.Agents { if agent.Status[constant.ReportFaultRank] != "" { job.faultOccur = true diff --git a/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py b/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py index 1fbca3ebb..8273742e4 100644 --- a/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py +++ b/component/taskd/taskd/python/framework/agent/base_agent/agent_network.py @@ -79,7 +79,7 @@ class AgentMessageManager(): run_log.info(f"agent register: {msg}") self.send_message(msg) - def send_message(self, message: MessageInfo): + def send_message(self, message: MessageInfo, code: int = 0): """ Send message to taskd manager. """ @@ -96,7 +96,7 @@ class AgentMessageManager(): result = self.lib.SyncSendMessage(self._network_instance, msg_json) if result == 0: run_log.info(f"agent send message success, msg: {message.uuid}") - if message.body.code == constants.EXITAGENTCODE: + if code == constants.EXITAGENTCODE: run_log.info(f"agent send exit message, msg: {message.uuid}") self.lib.DestroyNetTool(self._network_instance) break @@ -212,7 +212,7 @@ def get_message_manager() -> AgentMessageManager: return AgentMessageManager.instance -def network_send_message(msg: MessageInfo): +def network_send_message(msg: MessageInfo, code: int = 0): """ Send message to taskd manager. """ @@ -220,7 +220,7 @@ def network_send_message(msg: MessageInfo): if msg_manager.get_network_instance() is None: run_log.warning("network instance is None") return - msg_manager.send_message(msg) + msg_manager.send_message(msg, code) def get_msg_network_instance(): diff --git a/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py b/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py index edaccb0c7..671705313 100644 --- a/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py +++ b/component/taskd/taskd/python/framework/agent/base_agent/base_agent.py @@ -34,7 +34,7 @@ DEFAULT_DST = { } -REPORT_CODE = 601 +REPORT_CODE = 202 DEFAULT_MSG_TYPE = "DEFAULT" STATUS_MSG_TYPE = "STATUS" @@ -108,7 +108,7 @@ class BaseAgent: dst=DEFAULT_DST, body=body_json ) - network_send_message(msg_info) + network_send_message(msg_info, code) def check_network(self): time_cost = 0 -- Gitee From b27fab1b2ef962613853617f073eb50daaf8d4a7 Mon Sep 17 00:00:00 2001 From: lijinghan Date: Sat, 16 Aug 2025 11:01:25 +0800 Subject: [PATCH 4/4] [WIP]test --- .../plugins/job_rescheduling/job_rescheduling.go | 7 ++++--- .../manager/service/plugin_handler.go | 12 ++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go b/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go index 281ad7d84..68ca75ff0 100644 --- a/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go +++ b/component/taskd/taskd/go/framework_backend/manager/plugins/job_rescheduling/job_rescheduling.go @@ -18,7 +18,6 @@ package jobrescheduling import ( "ascend-common/common-utils/hwlog" "taskd/common/constant" - "taskd/common/utils" "taskd/framework_backend/manager/infrastructure" "taskd/framework_backend/manager/infrastructure/storage" "taskd/toolkit_backend/net/common" @@ -62,8 +61,9 @@ func (job *JobReschedulingPlugin) Name() string { // Handle handle job rescheduling plugin func (job *JobReschedulingPlugin) Handle() (infrastructure.HandleResult, error) { if job.killMaster { + hwlog.RunLog.Info("JobReschedulingPlugin Handle kill master") job.processStatus = constant.HandleStageProcess - job.pullMsgs = append(job.pullMsgs, infrastructure.Msg{ + /*job.pullMsgs = append(job.pullMsgs, infrastructure.Msg{ Receiver: []string{constant.ControllerRole}, Body: storage.MsgBody{ MsgType: constant.Action, @@ -72,7 +72,7 @@ func (job *JobReschedulingPlugin) Handle() (infrastructure.HandleResult, error) constant.Actions: utils.ObjToString([]string{constant.DestroyController}), }, }, - }) + })*/ job.pullMsgs = append(job.pullMsgs, agent0ExitMsg) return infrastructure.HandleResult{Stage: constant.HandleStageProcess}, nil } @@ -117,6 +117,7 @@ func (job *JobReschedulingPlugin) Predicate(shot storage.SnapShot) (infrastructu clusterInfo, ok := shot.ClusterInfos.Clusters[constant.ClusterRole] if ok { if clusterInfo.Command[constant.SingalKillMaster] != "" { + hwlog.RunLog.Info("JobReschedulingPlugin Predicate kill master") job.killMaster = true return infrastructure.PredicateResult{PluginName: job.Name(), CandidateStatus: constant.CandidateStatus, diff --git a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go index 122dcfbf7..d490bc67a 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/plugin_handler.go @@ -17,8 +17,10 @@ package service import ( "fmt" + "time" "ascend-common/common-utils/hwlog" + "taskd/common/constant" "taskd/framework_backend/manager/infrastructure" "taskd/framework_backend/manager/infrastructure/storage" "taskd/framework_backend/manager/plugins/faultdig" @@ -41,6 +43,8 @@ type PluginHandler struct { Plugins map[string]infrastructure.ManagerPlugin } +var timestart = time.Now() + // Init register all plugin func (p *PluginHandler) Init() error { profilingPlugin := faultdig.NewProfilingPlugin() @@ -104,6 +108,14 @@ func (p *PluginHandler) Handle(pluginName string) (infrastructure.HandleResult, // Predicate execute the predicate function of all registered plugin func (p *PluginHandler) Predicate(snapshot *storage.SnapShot) []infrastructure.PredicateResult { var predicateResults []infrastructure.PredicateResult + currentTime := time.Now() + if currentTime.Unix()-timestart.Unix() > 30 { + command := make(map[string]string) + command[constant.SingalKillMaster] = "1" + snapshot.ClusterInfos.Clusters[constant.ClusterRole] = &storage.ClusterInfo{ + Command: command, + } + } for _, plugin := range p.Plugins { result, err := plugin.Predicate(*snapshot) if err != nil { -- Gitee