From e5c62af3194589763b281cba2a1114d6b2cd324e Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 10:47:04 +0800 Subject: [PATCH 1/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../go/framework_backend/manager/manager.go | 250 ++-------------- .../manager/service/clusterd.go | 268 ++++++++++++++++++ 2 files changed, 287 insertions(+), 231 deletions(-) create mode 100644 component/taskd/taskd/go/framework_backend/manager/service/clusterd.go diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index 1992e91b9..b7d6753ee 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -18,22 +18,14 @@ package manager import ( "context" "fmt" - "io" - "sync/atomic" + "taskd/common/constant" + "taskd/framework_backend/manager/service" "time" - "github.com/google/uuid" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - "ascend-common/common-utils/hwlog" - "clusterd/pkg/interface/grpc/profiling" - "clusterd/pkg/interface/grpc/recover" - "taskd/common/constant" "taskd/common/utils" "taskd/framework_backend/manager/application" "taskd/framework_backend/manager/infrastructure/storage" - "taskd/toolkit_backend/net/common" ) // ClusterInfo define the information from the cluster @@ -72,20 +64,12 @@ func NewTaskDManager(config Config) *BaseManager { // BaseManager the class taskd manager backend type BaseManager struct { Config - BusinessHandler *application.BusinessStreamProcessor - MsgHd *application.MsgHandler - svcCtx context.Context - cancelFunc context.CancelFunc - profilingFromClusterD atomic.Bool + BusinessHandler *application.BusinessStreamProcessor + MsgHd *application.MsgHandler + svcCtx context.Context + cancelFunc context.CancelFunc } -const ( - roleTaskd = "taskd" - maxRegRetryTime = 60 - maxWaitTime = 60 - waitGapTime = 1 -) - // Init base manger func (m *BaseManager) Init() error { if err := utils.InitHwLogger("manager.log", context.Background()); err != nil { @@ -96,15 +80,12 @@ func (m *BaseManager) Init() error { m.svcCtx, m.cancelFunc = context.WithCancel(context.Background()) m.MsgHd = application.NewMsgHandler() m.MsgHd.Start(m.svcCtx) - m.BusinessHandler = application.NewBusinessStreamProcessor(m.MsgHd) if err := m.BusinessHandler.Init(); err != nil { hwlog.RunLog.Errorf("business handler init failed, err: %v", err) return err } - go m.registerClusterD(0) - go m.watchProfilingCmdChange() - + m.clusterdhandle() hwlog.RunLog.Info("manager init success!") return nil } @@ -147,214 +128,21 @@ func (m *BaseManager) Service(snapshot *storage.SnapShot) error { return nil } -func (m *BaseManager) registerClusterD(retryTime time.Duration) { - if retryTime >= maxRegRetryTime { - hwlog.RunLog.Error("init clusterd connect meet max retry time") - return - } - time.Sleep(retryTime * time.Second) - addr, err := utils.GetClusterdAddr() - if err != nil { - hwlog.RunLog.Errorf("get clusterd address err: %v", err) - return - } - hwlog.RunLog.Infof("get clusterd addr %v", addr) - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - hwlog.RunLog.Errorf("init clusterd connect err: %v", err) - m.registerClusterD(retryTime + 1) - return - } - - go m.subscribeProfiling(conn, 0) - go m.subscribeSwitchNic(conn) -} - -func (m *BaseManager) subscribeSwitchNic(conn *grpc.ClientConn) { - client := pb.NewRecoverClient(conn) - clientInfo := &pb.ClientInfo{ - JobId: m.JobId, - Role: roleTaskd, - } - for { - exit, wTime := m.listenSignal(client, clientInfo, waitGapTime) - if exit { - hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") - break - } - time.Sleep(time.Duration(wTime) * time.Second) - if wTime > maxWaitTime { - wTime = 1 - } - } -} - -func (m *BaseManager) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { - stream, err := client.SubscribeNotifySwitch(m.svcCtx, clientInfo) - if err != nil { - hwlog.RunLog.Errorf("register Clusterd notify switch fail, err: %v", err) - return false, wTime + waitGapTime - } - for { - select { - case <-m.svcCtx.Done(): - hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") - return true, 0 - case <-stream.Context().Done(): - hwlog.RunLog.Error("server stream abnormal interruption, register again") - return false, wTime + waitGapTime - default: - responseMsg, recvErr := stream.Recv() - if recvErr == io.EOF { - hwlog.RunLog.Info("stream EOF, register again") - return false, waitGapTime - } - if recvErr != nil { - hwlog.RunLog.Error(recvErr) - continue - } - hwlog.RunLog.Infof("receive switch nic info: %v", responseMsg) - globalOps := responseMsg.GetOp() - globalRanks := responseMsg.GetRankID() - m.enqueueSwitchNic(globalRanks, globalOps) - } - } -} - -func (m *BaseManager) enqueueSwitchNic(ranks []string, ops []bool) { - rankStr := utils.ObjToString(ranks) - opStr := utils.ObjToString(ops) - msg := map[string]string{ - constant.GlobalRankKey: rankStr, - constant.GlobalOpKey: opStr, - constant.SwitchJobID: m.JobId, - } - message := storage.BaseMessage{ - Header: storage.MsgHeader{ - BizType: "default", - Uuid: uuid.New().String(), - Src: &common.Position{ - Role: constant.ClusterRole, - ServerRank: constant.ClusterDRank, - }, - Timestamp: time.Now(), - }, - Body: storage.MsgBody{ - MsgType: constant.Action, - Code: constant.SwitchNicCode, - Extension: msg, - }, - } - err := m.MsgHd.MsgQueue.Enqueue(message) - if err != nil { - hwlog.RunLog.Errorf("enqueue switch msg err %v", err) - return - } - hwlog.RunLog.Infof("enqueue switch msg %v", msg) -} - -func (m *BaseManager) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { - m.profilingFromClusterD.Store(false) - if retryTime >= maxRegRetryTime { - hwlog.RunLog.Error("register Cluster profiling meet max retry time") - return - } - time.Sleep(retryTime * time.Second) - traceClient := profiling.NewTrainingDataTraceClient(conn) - stream, err := traceClient.SubscribeDataTraceSwitch(m.svcCtx, &profiling.ProfilingClientInfo{ - JobId: m.JobId, - Role: roleTaskd, - }) - if err != nil { - hwlog.RunLog.Errorf("register Cluster profiling fail, err: %v", err) - go m.subscribeProfiling(conn, retryTime+1) - return - } - m.profilingFromClusterD.Store(true) - for { - select { - case <-m.svcCtx.Done(): - hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") - return - case <-stream.Context().Done(): - hwlog.RunLog.Info("client stream exit, stop subscribe profiling info and re-register") - go m.subscribeProfiling(conn, retryTime+1) - return - default: - responseMsg, recvErr := stream.Recv() - if recvErr != nil { - hwlog.RunLog.Error(recvErr) - } else { - hwlog.RunLog.Infof("receive profiling info: %v", responseMsg) - profilingMsg := responseMsg.GetProfilingSwitch() - // notify framework receive profiling msg - domainSwitch := utils.PfSwitchToPfDomainSwitch(convertProfilingMsg(profilingMsg)) - m.enqueueProfilingSwitch(domainSwitch, constant.ClusterDRank) - } +func (m *BaseManager) clusterHandle() { + foundClusterd := false + for _, info := range m.ClusterInfos { + if info.Role == constant.ClusterDRank { + m.clusterdhandle() + foundClusterd = true } } -} - -func (m *BaseManager) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { - message := storage.BaseMessage{ - Header: storage.MsgHeader{ - BizType: "default", - Uuid: uuid.New().String(), - Src: &common.Position{ - Role: constant.ClusterRole, - ServerRank: whichServer, - }, - Timestamp: time.Now(), - }, - Body: storage.MsgBody{ - MsgType: constant.Action, - Code: utils.ProfilingCmdToBizCode(cmd), - }, - } - err := m.MsgHd.MsgQueue.Enqueue(message) - if err != nil { - hwlog.RunLog.Infof("%s enqueue profiling cmd %v err %v", whichServer, cmd, err) - return + if !foundClusterd { + m.clusterdhandle() } - hwlog.RunLog.Infof("%s enqueue profiling cmd %v", whichServer, cmd) } -func (m *BaseManager) watchProfilingCmdChange() { - hwlog.RunLog.Info("begin watch ProfilingSwitchFilePath...") - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - select { - case <-m.svcCtx.Done(): - hwlog.RunLog.Info("end watch ProfilingSwitchFilePath...") - return - case <-ticker.C: - if m.profilingFromClusterD.Load() { - hwlog.RunLog.Infof("manager register clusterd, donot watch profiling file.") - return - } - m.getProfilingFromFile() - } - } -} - -func (m *BaseManager) getProfilingFromFile() { - profilingSwitch, err := utils.GetProfilingSwitch(constant.ProfilingSwitchFilePath) - if err != nil { - hwlog.RunLog.Errorf("GetProfilingSwitch err: %v", err) - return - } - domainSwitch := utils.PfSwitchToPfDomainSwitch(profilingSwitch) - m.enqueueProfilingSwitch(domainSwitch, constant.TaskDRank) -} - -func convertProfilingMsg(profilingSwitchData *profiling.ProfilingSwitch) constant.ProfilingSwitch { - profilingSwitch := constant.ProfilingSwitch{ - CommunicationOperator: profilingSwitchData.CommunicationOperator, - Step: profilingSwitchData.Step, - SaveCheckpoint: profilingSwitchData.SaveCheckpoint, - FP: profilingSwitchData.FP, - DataLoader: profilingSwitchData.DataLoader, - } - return profilingSwitch +func (m *BaseManager) clusterdhandle() { + clusterdHandler := service.NewClusterdHandler(m.JobId, m.svcCtx) + go clusterdHandler.RegisterClusterd(0) + go clusterdHandler.WatchProfilingCmdChange() } diff --git a/component/taskd/taskd/go/framework_backend/manager/service/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/clusterd.go new file mode 100644 index 000000000..43690650c --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/service/clusterd.go @@ -0,0 +1,268 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package service is to provide other service tools, i.e. clusterd +package service + +import ( + "context" + "io" + "sync/atomic" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "ascend-common/common-utils/hwlog" + "clusterd/pkg/interface/grpc/profiling" + pb "clusterd/pkg/interface/grpc/recover" + "taskd/common/constant" + "taskd/common/utils" + "taskd/framework_backend/manager/infrastructure/storage" + "taskd/toolkit_backend/net/common" +) + +const ( + roleTaskd = "taskd" + maxRegRetryTime = 60 + maxWaitTime = 60 + waitGapTime = 1 +) + +type Handler struct { + jobId string + svcCtx context.Context + profilingFromClusterD atomic.Bool +} + +func NewClusterdHandler(jobId string, svcCtx context.Context) *Handler { + return &Handler{ + jobId: jobId, + svcCtx: svcCtx, + profilingFromClusterD: atomic.Bool{}, + } +} + +func (m *Handler) RegisterClusterd(retryTime time.Duration) { + if retryTime >= maxRegRetryTime { + hwlog.RunLog.Error("init clusterd connect meet max retry time") + return + } + time.Sleep(retryTime * time.Second) + addr, err := utils.GetClusterdAddr() + if err != nil { + hwlog.RunLog.Errorf("get clusterd address err: %v", err) + return + } + hwlog.RunLog.Infof("get clusterd addr %v", addr) + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + hwlog.RunLog.Errorf("init clusterd connect err: %v", err) + m.RegisterClusterd(retryTime + 1) + return + } + + go m.subscribeProfiling(conn, 0) + go m.subscribeSwitchNic(conn) +} + +func (m *Handler) subscribeSwitchNic(conn *grpc.ClientConn) { + client := pb.NewRecoverClient(conn) + clientInfo := &pb.ClientInfo{ + JobId: m.jobId, + Role: roleTaskd, + } + for { + exit, wTime := m.listenSignal(client, clientInfo, waitGapTime) + if exit { + hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") + break + } + time.Sleep(time.Duration(wTime) * time.Second) + if wTime > maxWaitTime { + wTime = 1 + } + } +} + +func (m *Handler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { + stream, err := client.SubscribeNotifySwitch(m.svcCtx, clientInfo) + if err != nil { + hwlog.RunLog.Errorf("register Clusterd notify switch fail, err: %v", err) + return false, wTime + waitGapTime + } + for { + select { + case <-m.svcCtx.Done(): + hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") + return true, 0 + case <-stream.Context().Done(): + hwlog.RunLog.Error("server stream abnormal interruption, register again") + return false, wTime + waitGapTime + default: + responseMsg, recvErr := stream.Recv() + if recvErr == io.EOF { + hwlog.RunLog.Info("stream EOF, register again") + return false, waitGapTime + } + if recvErr != nil { + hwlog.RunLog.Error(recvErr) + continue + } + hwlog.RunLog.Infof("receive switch nic info: %v", responseMsg) + globalOps := responseMsg.GetOp() + globalRanks := responseMsg.GetRankID() + m.enqueueSwitchNic(globalRanks, globalOps) + } + } +} + +func (m *Handler) enqueueSwitchNic(ranks []string, ops []bool) { + rankStr := utils.ObjToString(ranks) + opStr := utils.ObjToString(ops) + msg := map[string]string{ + constant.GlobalRankKey: rankStr, + constant.GlobalOpKey: opStr, + constant.SwitchJobID: m.jobId, + } + message := storage.BaseMessage{ + Header: storage.MsgHeader{ + BizType: "default", + Uuid: uuid.New().String(), + Src: &common.Position{ + Role: constant.ClusterRole, + ServerRank: constant.ClusterDRank, + }, + Timestamp: time.Now(), + }, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: constant.SwitchNicCode, + Extension: msg, + }, + } + err := m.MsgHd.MsgQueue.Enqueue(message) + if err != nil { + hwlog.RunLog.Errorf("enqueue switch msg err %v", err) + return + } + hwlog.RunLog.Infof("enqueue switch msg %v", msg) +} + +func (m *Handler) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { + m.profilingFromClusterD.Store(false) + if retryTime >= maxRegRetryTime { + hwlog.RunLog.Error("register Cluster profiling meet max retry time") + return + } + time.Sleep(retryTime * time.Second) + traceClient := profiling.NewTrainingDataTraceClient(conn) + stream, err := traceClient.SubscribeDataTraceSwitch(m.svcCtx, &profiling.ProfilingClientInfo{ + JobId: m.jobId, + Role: roleTaskd, + }) + if err != nil { + hwlog.RunLog.Errorf("register Cluster profiling fail, err: %v", err) + go m.subscribeProfiling(conn, retryTime+1) + return + } + m.profilingFromClusterD.Store(true) + for { + select { + case <-m.svcCtx.Done(): + hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") + return + case <-stream.Context().Done(): + hwlog.RunLog.Info("client stream exit, stop subscribe profiling info and re-register") + go m.subscribeProfiling(conn, retryTime+1) + return + default: + responseMsg, recvErr := stream.Recv() + if recvErr != nil { + hwlog.RunLog.Error(recvErr) + } else { + hwlog.RunLog.Infof("receive profiling info: %v", responseMsg) + profilingMsg := responseMsg.GetProfilingSwitch() + // notify framework receive profiling msg + domainSwitch := utils.PfSwitchToPfDomainSwitch(convertProfilingMsg(profilingMsg)) + m.enqueueProfilingSwitch(domainSwitch, constant.ClusterDRank) + } + } + } +} + +func (m *Handler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { + message := storage.BaseMessage{ + Header: storage.MsgHeader{ + BizType: "default", + Uuid: uuid.New().String(), + Src: &common.Position{ + Role: constant.ClusterRole, + ServerRank: whichServer, + }, + Timestamp: time.Now(), + }, + Body: storage.MsgBody{ + MsgType: constant.Action, + Code: utils.ProfilingCmdToBizCode(cmd), + }, + } + err := m.MsgHd.MsgQueue.Enqueue(message) + if err != nil { + hwlog.RunLog.Infof("%s enqueue profiling cmd %v err %v", whichServer, cmd, err) + return + } + hwlog.RunLog.Infof("%s enqueue profiling cmd %v", whichServer, cmd) +} + +func (m *Handler) WatchProfilingCmdChange() { + hwlog.RunLog.Info("begin watch ProfilingSwitchFilePath...") + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { + select { + case <-m.svcCtx.Done(): + hwlog.RunLog.Info("end watch ProfilingSwitchFilePath...") + return + case <-ticker.C: + if m.profilingFromClusterD.Load() { + hwlog.RunLog.Infof("manager register clusterd, donot watch profiling file.") + return + } + m.getProfilingFromFile() + } + } +} + +func (m *Handler) getProfilingFromFile() { + profilingSwitch, err := utils.GetProfilingSwitch(constant.ProfilingSwitchFilePath) + if err != nil { + hwlog.RunLog.Errorf("GetProfilingSwitch err: %v", err) + return + } + domainSwitch := utils.PfSwitchToPfDomainSwitch(profilingSwitch) + m.enqueueProfilingSwitch(domainSwitch, constant.TaskDRank) +} + +func convertProfilingMsg(profilingSwitchData *profiling.ProfilingSwitch) constant.ProfilingSwitch { + profilingSwitch := constant.ProfilingSwitch{ + CommunicationOperator: profilingSwitchData.CommunicationOperator, + Step: profilingSwitchData.Step, + SaveCheckpoint: profilingSwitchData.SaveCheckpoint, + FP: profilingSwitchData.FP, + DataLoader: profilingSwitchData.DataLoader, + } + return profilingSwitch +} -- Gitee From c7bdcf9b63eb2fb81e2c0d8f48187c4a5e7466ab Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 11:38:01 +0800 Subject: [PATCH 2/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- component/taskd/taskd/go/backend_api.go | 2 +- .../taskd/taskd/go/common/constant/type.go | 26 ++++++++ .../taskd/taskd/go/common/utils/utils.go | 1 - .../go/framework_backend/manager/manager.go | 44 +++---------- .../manager/service/{ => handler}/clusterd.go | 61 ++++++++++++------- .../manager/service/handler/type.go | 36 +++++++++++ 6 files changed, 111 insertions(+), 59 deletions(-) rename component/taskd/taskd/go/framework_backend/manager/service/{ => handler}/clusterd.go (81%) create mode 100644 component/taskd/taskd/go/framework_backend/manager/service/handler/type.go diff --git a/component/taskd/taskd/go/backend_api.go b/component/taskd/taskd/go/backend_api.go index 54d07f333..e90585f99 100644 --- a/component/taskd/taskd/go/backend_api.go +++ b/component/taskd/taskd/go/backend_api.go @@ -113,7 +113,7 @@ func StepOut() C.int { // //export InitTaskdManager func InitTaskdManager(configStr *C.char) C.int { - var config manager.Config + var config constant.Config if err := json.Unmarshal([]byte(C.GoString(configStr)), &config); err != nil { return C.int(1) } diff --git a/component/taskd/taskd/go/common/constant/type.go b/component/taskd/taskd/go/common/constant/type.go index 48342e5e2..1882d5339 100644 --- a/component/taskd/taskd/go/common/constant/type.go +++ b/component/taskd/taskd/go/common/constant/type.go @@ -45,3 +45,29 @@ type ProfilingSwitch struct { type ProfilingWorkerState struct { state string } + +// ClusterInfo define the information from the cluster +type ClusterInfo struct { + // IP indicate cluster server ip + Ip string `json:"ip"` + // Port indicate cluster server port + Port string `json:"port"` + // Name indicate cluster server service name + Name string `json:"name"` + // Role + Role string `json:"role"` +} + +// Config define the configuration of manager +type Config struct { + // JobId indicate the id of the job where the manager is located + JobId string `json:"job_id"` + // NodeNums indicate the number of nodes where the manager is located + NodeNums int `json:"node_nums"` + // ProcPerNode indicate the number of business processes where the manager's job is located + ProcPerNode int `json:"proc_per_node"` + // PluginDir indicate the plugin dir + PluginDir string `json:"plugin_dir"` + // ClusterInfos indicate the information of cluster + ClusterInfos []ClusterInfo `json:"cluster_infos"` +} diff --git a/component/taskd/taskd/go/common/utils/utils.go b/component/taskd/taskd/go/common/utils/utils.go index 3325fd559..c43b00e75 100644 --- a/component/taskd/taskd/go/common/utils/utils.go +++ b/component/taskd/taskd/go/common/utils/utils.go @@ -255,7 +255,6 @@ func GetClusterdAddr() (string, error) { parsedIP := net.ParseIP(ipFromEnv) if parsedIP == nil { return "", fmt.Errorf("%s is NOT a valid IP address", ipFromEnv) - } return ipFromEnv + constant.ClusterdPort, nil } diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index b7d6753ee..97c8807fc 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -18,44 +18,18 @@ package manager import ( "context" "fmt" - "taskd/common/constant" - "taskd/framework_backend/manager/service" "time" "ascend-common/common-utils/hwlog" + "taskd/common/constant" "taskd/common/utils" "taskd/framework_backend/manager/application" "taskd/framework_backend/manager/infrastructure/storage" + "taskd/framework_backend/manager/service/handler" ) -// ClusterInfo define the information from the cluster -type ClusterInfo struct { - // IP indicate cluster server ip - Ip string `json:"ip"` - // Port indicate cluster server port - Port string `json:"port"` - // Name indicate cluster server service name - Name string `json:"name"` - // Role - Role string `json:"role"` -} - -// Config define the configuration of manager -type Config struct { - // JobId indicate the id of the job where the manager is located - JobId string `json:"job_id"` - // NodeNums indicate the number of nodes where the manager is located - NodeNums int `json:"node_nums"` - // ProcPerNode indicate the number of business processes where the manager's job is located - ProcPerNode int `json:"proc_per_node"` - // PluginDir indicate the plugin dir - PluginDir string `json:"plugin_dir"` - // ClusterInfos indicate the information of cluster - ClusterInfos []ClusterInfo `json:"cluster_infos"` -} - // NewTaskDManager return taskd manager instance -func NewTaskDManager(config Config) *BaseManager { +func NewTaskDManager(config constant.Config) *BaseManager { return &BaseManager{ Config: config, } @@ -63,7 +37,7 @@ func NewTaskDManager(config Config) *BaseManager { // BaseManager the class taskd manager backend type BaseManager struct { - Config + constant.Config BusinessHandler *application.BusinessStreamProcessor MsgHd *application.MsgHandler svcCtx context.Context @@ -85,7 +59,7 @@ func (m *BaseManager) Init() error { hwlog.RunLog.Errorf("business handler init failed, err: %v", err) return err } - m.clusterdhandle() + m.clusterHandle() hwlog.RunLog.Info("manager init success!") return nil } @@ -132,17 +106,17 @@ func (m *BaseManager) clusterHandle() { foundClusterd := false for _, info := range m.ClusterInfos { if info.Role == constant.ClusterDRank { - m.clusterdhandle() + m.clusterdHandle(info) foundClusterd = true } } if !foundClusterd { - m.clusterdhandle() + m.clusterdHandle(constant.ClusterInfo{}) } } -func (m *BaseManager) clusterdhandle() { - clusterdHandler := service.NewClusterdHandler(m.JobId, m.svcCtx) +func (m *BaseManager) clusterdHandle(info constant.ClusterInfo) { + clusterdHandler := handler.NewClusterdHandler(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) go clusterdHandler.RegisterClusterd(0) go clusterdHandler.WatchProfilingCmdChange() } diff --git a/component/taskd/taskd/go/framework_backend/manager/service/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go similarity index 81% rename from component/taskd/taskd/go/framework_backend/manager/service/clusterd.go rename to component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go index 43690650c..f06792f03 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/clusterd.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go @@ -12,12 +12,14 @@ limitations under the License. */ -// Package service is to provide other service tools, i.e. clusterd -package service +// Package handler is to provide cluster handler, i.e. clusterd, taskd +package handler import ( "context" + "fmt" "io" + "net" "sync/atomic" "time" @@ -41,27 +43,30 @@ const ( waitGapTime = 1 ) -type Handler struct { - jobId string - svcCtx context.Context - profilingFromClusterD atomic.Bool -} - -func NewClusterdHandler(jobId string, svcCtx context.Context) *Handler { - return &Handler{ - jobId: jobId, - svcCtx: svcCtx, +func NewClusterdHandler( + jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *ClusterdHandler { + return &ClusterdHandler{ + Handler: &Handler{ + jobId: jobId, + svcCtx: svcCtx, + queue: queue, + clusterInfo: info, + }, profilingFromClusterD: atomic.Bool{}, } } -func (m *Handler) RegisterClusterd(retryTime time.Duration) { +func (m *ClusterdHandler) RegisterClusterd(retryTime time.Duration) { if retryTime >= maxRegRetryTime { hwlog.RunLog.Error("init clusterd connect meet max retry time") return } time.Sleep(retryTime * time.Second) - addr, err := utils.GetClusterdAddr() + addr, err := m.getAddr() + if err != nil { + hwlog.RunLog.Errorf("get address from cluster info err: %v, try get address from env variable", err) + } + addr, err = utils.GetClusterdAddr() if err != nil { hwlog.RunLog.Errorf("get clusterd address err: %v", err) return @@ -78,7 +83,7 @@ func (m *Handler) RegisterClusterd(retryTime time.Duration) { go m.subscribeSwitchNic(conn) } -func (m *Handler) subscribeSwitchNic(conn *grpc.ClientConn) { +func (m *ClusterdHandler) subscribeSwitchNic(conn *grpc.ClientConn) { client := pb.NewRecoverClient(conn) clientInfo := &pb.ClientInfo{ JobId: m.jobId, @@ -97,7 +102,7 @@ func (m *Handler) subscribeSwitchNic(conn *grpc.ClientConn) { } } -func (m *Handler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { +func (m *ClusterdHandler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { stream, err := client.SubscribeNotifySwitch(m.svcCtx, clientInfo) if err != nil { hwlog.RunLog.Errorf("register Clusterd notify switch fail, err: %v", err) @@ -129,7 +134,7 @@ func (m *Handler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInf } } -func (m *Handler) enqueueSwitchNic(ranks []string, ops []bool) { +func (m *ClusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { rankStr := utils.ObjToString(ranks) opStr := utils.ObjToString(ops) msg := map[string]string{ @@ -153,7 +158,7 @@ func (m *Handler) enqueueSwitchNic(ranks []string, ops []bool) { Extension: msg, }, } - err := m.MsgHd.MsgQueue.Enqueue(message) + err := m.queue.Enqueue(message) if err != nil { hwlog.RunLog.Errorf("enqueue switch msg err %v", err) return @@ -161,7 +166,7 @@ func (m *Handler) enqueueSwitchNic(ranks []string, ops []bool) { hwlog.RunLog.Infof("enqueue switch msg %v", msg) } -func (m *Handler) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { +func (m *ClusterdHandler) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { m.profilingFromClusterD.Store(false) if retryTime >= maxRegRetryTime { hwlog.RunLog.Error("register Cluster profiling meet max retry time") @@ -219,7 +224,7 @@ func (m *Handler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichS Code: utils.ProfilingCmdToBizCode(cmd), }, } - err := m.MsgHd.MsgQueue.Enqueue(message) + err := m.queue.Enqueue(message) if err != nil { hwlog.RunLog.Infof("%s enqueue profiling cmd %v err %v", whichServer, cmd, err) return @@ -227,7 +232,7 @@ func (m *Handler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichS hwlog.RunLog.Infof("%s enqueue profiling cmd %v", whichServer, cmd) } -func (m *Handler) WatchProfilingCmdChange() { +func (m *ClusterdHandler) WatchProfilingCmdChange() { hwlog.RunLog.Info("begin watch ProfilingSwitchFilePath...") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -246,7 +251,7 @@ func (m *Handler) WatchProfilingCmdChange() { } } -func (m *Handler) getProfilingFromFile() { +func (m *ClusterdHandler) getProfilingFromFile() { profilingSwitch, err := utils.GetProfilingSwitch(constant.ProfilingSwitchFilePath) if err != nil { hwlog.RunLog.Errorf("GetProfilingSwitch err: %v", err) @@ -256,6 +261,18 @@ func (m *Handler) getProfilingFromFile() { m.enqueueProfilingSwitch(domainSwitch, constant.TaskDRank) } +func (m *ClusterdHandler) getAddr() (string, error) { + res := m.clusterInfo.Ip + ":" + m.clusterInfo.Port + if m.clusterInfo.Ip == "" || m.clusterInfo.Port == "" { + return "", fmt.Errorf("invalid ip:port %s", res) + } + parsedIP := net.ParseIP(m.clusterInfo.Ip) + if parsedIP == nil { + return "", fmt.Errorf("%s is NOT a valid IP address", m.clusterInfo.Ip) + } + return res, nil +} + func convertProfilingMsg(profilingSwitchData *profiling.ProfilingSwitch) constant.ProfilingSwitch { profilingSwitch := constant.ProfilingSwitch{ CommunicationOperator: profilingSwitchData.CommunicationOperator, diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go new file mode 100644 index 000000000..a4150f523 --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go @@ -0,0 +1,36 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package handler is to provide cluster handler, i.e. clusterd, taskd +package handler + +import ( + "context" + "sync/atomic" + + "taskd/common/constant" + "taskd/framework_backend/manager/infrastructure/storage" +) + +type Handler struct { + jobId string + svcCtx context.Context + queue *storage.MsgQueue + clusterInfo constant.ClusterInfo +} + +type ClusterdHandler struct { + *Handler + profilingFromClusterD atomic.Bool +} -- Gitee From 016c68b53463d5d6e997e105a60be797be38d159 Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 14:28:29 +0800 Subject: [PATCH 3/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../go/framework_backend/manager/manager.go | 3 +- .../framework_backend/manager/manager_test.go | 162 ++++++++++++++++++ .../manager/service/handler/clusterd.go | 13 +- .../manager/service/handler/type.go | 7 + 4 files changed, 180 insertions(+), 5 deletions(-) create mode 100644 component/taskd/taskd/go/framework_backend/manager/manager_test.go diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index 97c8807fc..c49b92cb2 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -117,6 +117,5 @@ func (m *BaseManager) clusterHandle() { func (m *BaseManager) clusterdHandle(info constant.ClusterInfo) { clusterdHandler := handler.NewClusterdHandler(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) - go clusterdHandler.RegisterClusterd(0) - go clusterdHandler.WatchProfilingCmdChange() + clusterdHandler.Handle() } diff --git a/component/taskd/taskd/go/framework_backend/manager/manager_test.go b/component/taskd/taskd/go/framework_backend/manager/manager_test.go new file mode 100644 index 000000000..70d4c3d7a --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/manager_test.go @@ -0,0 +1,162 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package manager for taskd manager backend +package manager + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + + "github.com/agiledragon/gomonkey/v2" + "github.com/smartystreets/goconvey/convey" + + "ascend-common/common-utils/hwlog" + "taskd/common/constant" + "taskd/common/utils" + "taskd/framework_backend/manager/application" + "taskd/framework_backend/manager/infrastructure/storage" + "taskd/framework_backend/manager/service/handler" +) + +const ( + JobId = "JobId" +) + +// TestMain test main +func TestMain(m *testing.M) { + if err := setup(); err != nil { + return + } + code := m.Run() + fmt.Printf("exit_code = %v\n", code) +} + +func setup() error { + return initLog() +} + +func initLog() error { + logConfig := &hwlog.LogConfig{ + OnlyToStdout: true, + } + if err := hwlog.InitRunLogger(logConfig, context.Background()); err != nil { + fmt.Printf("init hwlog failed, %v\n", err) + return err + } + return nil +} + +func getManager() *BaseManager { + config := constant.Config{ + JobId: JobId, + ClusterInfos: []constant.ClusterInfo{ + { + Role: constant.ClusterDRank, + }, + }, + } + manager := NewTaskDManager(config) + manager.MsgHd = application.NewMsgHandler() + manager.BusinessHandler = application.NewBusinessStreamProcessor(manager.MsgHd) + return manager +} + +func TestNewTaskDManager(t *testing.T) { + manager := getManager() + convey.ShouldEqual(manager.JobId, JobId) +} + +func TestBaseManagerInit(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + patches.ApplyFunc(utils.InitHwLogger, func(string, context.Context) error { + return nil + }) + msgHd := application.NewMsgHandler() + patches.ApplyFunc(application.NewMsgHandler, func() *application.MsgHandler { + return msgHd + }) + calledStart := atomic.Bool{} + patches.ApplyMethod(msgHd, "Start", func(*application.MsgHandler, context.Context) { + calledStart.Store(true) + }) + calledClusterHandle := atomic.Bool{} + manager := getManager() + patches.ApplyPrivateMethod(manager, "clusterHandle", func(*BaseManager) { + calledClusterHandle.Store(true) + }) + err := manager.Init() + convey.ShouldBeNil(err) + convey.ShouldBeTrue(calledClusterHandle.Load()) + convey.ShouldBeTrue(calledStart.Load()) +} + +func TestBaseManagerStart(t *testing.T) { + patches := gomonkey.NewPatches() + defer patches.Reset() + manager := getManager() + calledInit := false + calledProcess := false + patches.ApplyMethod(manager, "Init", func(*BaseManager) error { + calledInit = true + return nil + }).ApplyMethod(manager, "Process", func(*BaseManager) error { + calledProcess = true + return nil + }) + err := manager.Start() + convey.ShouldBeNil(err) + convey.ShouldBeTrue(calledInit) + convey.ShouldBeTrue(calledProcess) +} + +func TestBaseManagerService(t *testing.T) { + manager := getManager() + patches := gomonkey.NewPatches() + defer patches.Reset() + calledAllocateToken := false + calledStreamRun := false + gomonkey.ApplyMethod(manager.BusinessHandler, "AllocateToken", + func(*application.BusinessStreamProcessor, *storage.SnapShot) { + calledAllocateToken = true + }).ApplyMethod(manager.BusinessHandler, "StreamRun", + func(*application.BusinessStreamProcessor) error { + calledStreamRun = true + return nil + }) + err := manager.Service(&storage.SnapShot{}) + convey.ShouldBeNil(err) + convey.ShouldBeTrue(calledAllocateToken) + convey.ShouldBeTrue(calledStreamRun) +} + +func TestBaseManagerClusterHandle(t *testing.T) { + manager := getManager() + patches := gomonkey.NewPatches() + defer patches.Reset() + clusterdHandler := handler.NewClusterdHandler( + manager.JobId, manager.svcCtx, manager.MsgHd.MsgQueue, manager.ClusterInfos[0]) + calledHandle := false + patches.ApplyFunc(handler.NewClusterdHandler, func( + string, context.Context, *storage.MsgQueue, constant.ClusterInfo) *handler.ClusterdHandler { + return clusterdHandler + }).ApplyMethod(clusterdHandler, "Handle", func(*handler.ClusterdHandler) { + calledHandle = true + }) + manager.clusterHandle() + convey.ShouldBeTrue(calledHandle) +} diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go index f06792f03..281380f21 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go @@ -43,6 +43,7 @@ const ( waitGapTime = 1 ) +// NewClusterdHandler return clusterd handler func NewClusterdHandler( jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *ClusterdHandler { return &ClusterdHandler{ @@ -56,7 +57,13 @@ func NewClusterdHandler( } } -func (m *ClusterdHandler) RegisterClusterd(retryTime time.Duration) { +// Handle Clusterd +func (m *ClusterdHandler) Handle() { + go m.registerClusterd(0) + go m.watchProfilingCmdChange() +} + +func (m *ClusterdHandler) registerClusterd(retryTime time.Duration) { if retryTime >= maxRegRetryTime { hwlog.RunLog.Error("init clusterd connect meet max retry time") return @@ -75,7 +82,7 @@ func (m *ClusterdHandler) RegisterClusterd(retryTime time.Duration) { conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { hwlog.RunLog.Errorf("init clusterd connect err: %v", err) - m.RegisterClusterd(retryTime + 1) + m.registerClusterd(retryTime + 1) return } @@ -232,7 +239,7 @@ func (m *Handler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichS hwlog.RunLog.Infof("%s enqueue profiling cmd %v", whichServer, cmd) } -func (m *ClusterdHandler) WatchProfilingCmdChange() { +func (m *ClusterdHandler) watchProfilingCmdChange() { hwlog.RunLog.Info("begin watch ProfilingSwitchFilePath...") ticker := time.NewTicker(time.Second) defer ticker.Stop() diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go index a4150f523..b65f6d333 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go @@ -23,6 +23,12 @@ import ( "taskd/framework_backend/manager/infrastructure/storage" ) +// Handle clusters +type Handle interface { + Handle() +} + +// Handler of clusters type Handler struct { jobId string svcCtx context.Context @@ -30,6 +36,7 @@ type Handler struct { clusterInfo constant.ClusterInfo } +// ClusterdHandler handler of cluseterd type ClusterdHandler struct { *Handler profilingFromClusterD atomic.Bool -- Gitee From b1d497210a40c2968a8f81431a20f955310f60a9 Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 15:10:44 +0800 Subject: [PATCH 4/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../go/framework_backend/manager/manager.go | 11 ++++-- .../framework_backend/manager/manager_test.go | 12 +++--- .../manager/service/handler/clusterd.go | 37 +++++++++++-------- .../manager/service/handler/methods.go | 33 +++++++++++++++++ .../manager/service/handler/type.go | 13 ++++--- 5 files changed, 76 insertions(+), 30 deletions(-) create mode 100644 component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index c49b92cb2..bc3c468be 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -111,11 +111,16 @@ func (m *BaseManager) clusterHandle() { } } if !foundClusterd { - m.clusterdHandle(constant.ClusterInfo{}) + m.clusterdHandle(constant.ClusterInfo{Role: constant.ClusterDRank}) } } func (m *BaseManager) clusterdHandle(info constant.ClusterInfo) { - clusterdHandler := handler.NewClusterdHandler(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) - clusterdHandler.Handle() + clusterHandler, err := handler.InitHandler(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) + if err != nil { + hwlog.RunLog.Error(err) + return + } + err = clusterHandler.Handle() + hwlog.RunLog.Error(err) } diff --git a/component/taskd/taskd/go/framework_backend/manager/manager_test.go b/component/taskd/taskd/go/framework_backend/manager/manager_test.go index 70d4c3d7a..f6f3f3919 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager_test.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager_test.go @@ -148,15 +148,17 @@ func TestBaseManagerClusterHandle(t *testing.T) { manager := getManager() patches := gomonkey.NewPatches() defer patches.Reset() - clusterdHandler := handler.NewClusterdHandler( + clusterdHandler, err := handler.InitHandler( manager.JobId, manager.svcCtx, manager.MsgHd.MsgQueue, manager.ClusterInfos[0]) calledHandle := false - patches.ApplyFunc(handler.NewClusterdHandler, func( - string, context.Context, *storage.MsgQueue, constant.ClusterInfo) *handler.ClusterdHandler { - return clusterdHandler - }).ApplyMethod(clusterdHandler, "Handle", func(*handler.ClusterdHandler) { + patches.ApplyFunc(handler.InitHandler, func( + string, context.Context, *storage.MsgQueue, constant.ClusterInfo) (handler.Handler, error) { + return clusterdHandler, err + }).ApplyMethod(clusterdHandler, "Handle", func(*handler.ClusterdHandler) error { calledHandle = true + return nil }) manager.clusterHandle() convey.ShouldBeTrue(calledHandle) + convey.ShouldBeNil(err) } diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go index 281380f21..9bad0cec5 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go @@ -47,7 +47,7 @@ const ( func NewClusterdHandler( jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *ClusterdHandler { return &ClusterdHandler{ - Handler: &Handler{ + BaseHandler: &BaseHandler{ jobId: jobId, svcCtx: svcCtx, queue: queue, @@ -58,17 +58,7 @@ func NewClusterdHandler( } // Handle Clusterd -func (m *ClusterdHandler) Handle() { - go m.registerClusterd(0) - go m.watchProfilingCmdChange() -} - -func (m *ClusterdHandler) registerClusterd(retryTime time.Duration) { - if retryTime >= maxRegRetryTime { - hwlog.RunLog.Error("init clusterd connect meet max retry time") - return - } - time.Sleep(retryTime * time.Second) +func (m *ClusterdHandler) Handle() error { addr, err := m.getAddr() if err != nil { hwlog.RunLog.Errorf("get address from cluster info err: %v, try get address from env variable", err) @@ -76,16 +66,31 @@ func (m *ClusterdHandler) registerClusterd(retryTime time.Duration) { addr, err = utils.GetClusterdAddr() if err != nil { hwlog.RunLog.Errorf("get clusterd address err: %v", err) - return + return fmt.Errorf("get clusterd address err: %v", err) } hwlog.RunLog.Infof("get clusterd addr %v", addr) + go m.registerClusterd(addr, 0) + go m.watchProfilingCmdChange() + return nil +} + +// SendMsg to Clusterd +func (m *ClusterdHandler) SendMsg(msg string, msgType string) error { + return nil +} + +func (m *ClusterdHandler) registerClusterd(addr string, retryTime time.Duration) { + if retryTime >= maxRegRetryTime { + hwlog.RunLog.Error("init clusterd connect meet max retry time") + return + } + time.Sleep(retryTime * time.Second) conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { hwlog.RunLog.Errorf("init clusterd connect err: %v", err) - m.registerClusterd(retryTime + 1) + m.registerClusterd(addr, retryTime+1) return } - go m.subscribeProfiling(conn, 0) go m.subscribeSwitchNic(conn) } @@ -215,7 +220,7 @@ func (m *ClusterdHandler) subscribeProfiling(conn *grpc.ClientConn, retryTime ti } } -func (m *Handler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { +func (m *ClusterdHandler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { message := storage.BaseMessage{ Header: storage.MsgHeader{ BizType: "default", diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go new file mode 100644 index 000000000..34e10c57a --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go @@ -0,0 +1,33 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package handler is to provide cluster handler, i.e. clusterd, taskd +package handler + +import ( + "context" + "fmt" + + "taskd/common/constant" + "taskd/framework_backend/manager/infrastructure/storage" +) + +// InitHandler return handler according to Role of ClusterInfo +func InitHandler( + jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) (Handler, error) { + if info.Role == constant.ClusterDRank { + return NewClusterdHandler(jobId, svcCtx, queue, info), nil + } + return nil, fmt.Errorf("init handler for %v failed", info) +} diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go index b65f6d333..7292e89a8 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go @@ -23,13 +23,14 @@ import ( "taskd/framework_backend/manager/infrastructure/storage" ) -// Handle clusters -type Handle interface { - Handle() +// Handler interface of clusters +type Handler interface { + Handle() error + SendMsg(msg string, msgType string) error } -// Handler of clusters -type Handler struct { +// BaseHandler of clusters +type BaseHandler struct { jobId string svcCtx context.Context queue *storage.MsgQueue @@ -38,6 +39,6 @@ type Handler struct { // ClusterdHandler handler of cluseterd type ClusterdHandler struct { - *Handler + *BaseHandler profilingFromClusterD atomic.Bool } -- Gitee From e690dc740974c48773adf8f6da6925f293ec8288 Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 16:08:46 +0800 Subject: [PATCH 5/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/application/businessStream.go | 4 + .../go/framework_backend/manager/manager.go | 5 +- .../framework_backend/manager/manager_test.go | 14 ++- .../manager/service/handler/clusterd.go | 100 ++++++++++++------ .../handler/{methods.go => factory.go} | 24 ++++- .../manager/service/handler/type.go | 9 +- 6 files changed, 111 insertions(+), 45 deletions(-) rename component/taskd/taskd/go/framework_backend/manager/service/handler/{methods.go => factory.go} (67%) diff --git a/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go b/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go index 2f8d80987..4270e1d3b 100644 --- a/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go +++ b/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go @@ -27,6 +27,7 @@ import ( "taskd/framework_backend/manager/infrastructure" "taskd/framework_backend/manager/infrastructure/storage" "taskd/framework_backend/manager/service" + "taskd/framework_backend/manager/service/handler" "taskd/toolkit_backend/net/common" ) @@ -145,6 +146,9 @@ func (b *BusinessStreamProcessor) DistributeMsg(msgs []infrastructure.Msg) error b.DistributedMsgToMgr(msg) continue } + if handler.DistributeMsg(msg, receiver) { + continue + } sendMsg, err := json.Marshal(msg.Body) if err != nil { hwlog.RunLog.Errorf("business handler send msg marshal failed, err: %v", err) diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index bc3c468be..f0db9158c 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -121,6 +121,7 @@ func (m *BaseManager) clusterdHandle(info constant.ClusterInfo) { hwlog.RunLog.Error(err) return } - err = clusterHandler.Handle() - hwlog.RunLog.Error(err) + if err = clusterHandler.Handle(); err != nil { + hwlog.RunLog.Error(err) + } } diff --git a/component/taskd/taskd/go/framework_backend/manager/manager_test.go b/component/taskd/taskd/go/framework_backend/manager/manager_test.go index f6f3f3919..d8925a858 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager_test.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager_test.go @@ -144,21 +144,25 @@ func TestBaseManagerService(t *testing.T) { convey.ShouldBeTrue(calledStreamRun) } +type mockHandler struct{} + +func (m *mockHandler) Handle() error { return nil } + +func (m *mockHandler) SendMsg(msg storage.MsgBody) error { return nil } + func TestBaseManagerClusterHandle(t *testing.T) { manager := getManager() patches := gomonkey.NewPatches() defer patches.Reset() - clusterdHandler, err := handler.InitHandler( - manager.JobId, manager.svcCtx, manager.MsgHd.MsgQueue, manager.ClusterInfos[0]) calledHandle := false + m := &mockHandler{} patches.ApplyFunc(handler.InitHandler, func( string, context.Context, *storage.MsgQueue, constant.ClusterInfo) (handler.Handler, error) { - return clusterdHandler, err - }).ApplyMethod(clusterdHandler, "Handle", func(*handler.ClusterdHandler) error { + return m, nil + }).ApplyMethod(m, "Handle", func(*mockHandler) error { calledHandle = true return nil }) manager.clusterHandle() convey.ShouldBeTrue(calledHandle) - convey.ShouldBeNil(err) } diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go index 9bad0cec5..beffdfb9d 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go @@ -37,17 +37,17 @@ import ( ) const ( - roleTaskd = "taskd" - maxRegRetryTime = 60 - maxWaitTime = 60 - waitGapTime = 1 + roleTaskd = "taskd" + maxRegRetryTime = 60 + maxSendRetryTime = 3 + maxWaitTime = 60 + waitGapTime = 1 ) -// NewClusterdHandler return clusterd handler -func NewClusterdHandler( - jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *ClusterdHandler { - return &ClusterdHandler{ - BaseHandler: &BaseHandler{ +func newClusterdHandler( + jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *clusterdHandler { + return &clusterdHandler{ + baseHandler: &baseHandler{ jobId: jobId, svcCtx: svcCtx, queue: queue, @@ -58,15 +58,10 @@ func NewClusterdHandler( } // Handle Clusterd -func (m *ClusterdHandler) Handle() error { +func (m *clusterdHandler) Handle() error { addr, err := m.getAddr() if err != nil { - hwlog.RunLog.Errorf("get address from cluster info err: %v, try get address from env variable", err) - } - addr, err = utils.GetClusterdAddr() - if err != nil { - hwlog.RunLog.Errorf("get clusterd address err: %v", err) - return fmt.Errorf("get clusterd address err: %v", err) + return fmt.Errorf("get clusterd addr err %v", err) } hwlog.RunLog.Infof("get clusterd addr %v", addr) go m.registerClusterd(addr, 0) @@ -75,11 +70,52 @@ func (m *ClusterdHandler) Handle() error { } // SendMsg to Clusterd -func (m *ClusterdHandler) SendMsg(msg string, msgType string) error { +func (m *clusterdHandler) SendMsg(msg storage.MsgBody) error { + if msg.MsgType == constant.SwitchNic { + return m.sendSwitchNicStatusRetry(msg) + } + return fmt.Errorf("cannot handle msg %v", msg) +} + +func (m *clusterdHandler) sendSwitchNicStatusRetry(msg storage.MsgBody) error { + var status bool + switch msg.Code { + case 0: + status = false + case 1: + status = true + default: + return fmt.Errorf("invalid status %d of SwitchNic", msg.Code) + } + sendSucc := false + var err error + for retryTime := time.Duration(0); retryTime < maxSendRetryTime; retryTime++ { + time.Sleep(retryTime * time.Second) + addr, err := m.getAddr() + if err != nil { + return fmt.Errorf("send switch nic status err: %v", err) + } + conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + hwlog.RunLog.Errorf("init clusterd connect err: %v", err) + continue + } + client := pb.NewRecoverClient(conn) + _, err = client.ReplySwitchNicResult(context.TODO(), &pb.SwitchResult{Result: status, JobId: m.jobId}) + if err != nil { + hwlog.RunLog.Errorf("reply SwitchNicResult err: %v", err) + continue + } + sendSucc = true + break + } + if !sendSucc { + return fmt.Errorf("ReplySwitchNicResult failed, last err: %v", err) + } return nil } -func (m *ClusterdHandler) registerClusterd(addr string, retryTime time.Duration) { +func (m *clusterdHandler) registerClusterd(addr string, retryTime time.Duration) { if retryTime >= maxRegRetryTime { hwlog.RunLog.Error("init clusterd connect meet max retry time") return @@ -95,7 +131,7 @@ func (m *ClusterdHandler) registerClusterd(addr string, retryTime time.Duration) go m.subscribeSwitchNic(conn) } -func (m *ClusterdHandler) subscribeSwitchNic(conn *grpc.ClientConn) { +func (m *clusterdHandler) subscribeSwitchNic(conn *grpc.ClientConn) { client := pb.NewRecoverClient(conn) clientInfo := &pb.ClientInfo{ JobId: m.jobId, @@ -114,7 +150,7 @@ func (m *ClusterdHandler) subscribeSwitchNic(conn *grpc.ClientConn) { } } -func (m *ClusterdHandler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { +func (m *clusterdHandler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { stream, err := client.SubscribeNotifySwitch(m.svcCtx, clientInfo) if err != nil { hwlog.RunLog.Errorf("register Clusterd notify switch fail, err: %v", err) @@ -146,7 +182,7 @@ func (m *ClusterdHandler) listenSignal(client pb.RecoverClient, clientInfo *pb.C } } -func (m *ClusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { +func (m *clusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { rankStr := utils.ObjToString(ranks) opStr := utils.ObjToString(ops) msg := map[string]string{ @@ -178,7 +214,7 @@ func (m *ClusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { hwlog.RunLog.Infof("enqueue switch msg %v", msg) } -func (m *ClusterdHandler) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { +func (m *clusterdHandler) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { m.profilingFromClusterD.Store(false) if retryTime >= maxRegRetryTime { hwlog.RunLog.Error("register Cluster profiling meet max retry time") @@ -220,7 +256,7 @@ func (m *ClusterdHandler) subscribeProfiling(conn *grpc.ClientConn, retryTime ti } } -func (m *ClusterdHandler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { +func (m *clusterdHandler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { message := storage.BaseMessage{ Header: storage.MsgHeader{ BizType: "default", @@ -244,7 +280,7 @@ func (m *ClusterdHandler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd hwlog.RunLog.Infof("%s enqueue profiling cmd %v", whichServer, cmd) } -func (m *ClusterdHandler) watchProfilingCmdChange() { +func (m *clusterdHandler) watchProfilingCmdChange() { hwlog.RunLog.Info("begin watch ProfilingSwitchFilePath...") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -263,7 +299,7 @@ func (m *ClusterdHandler) watchProfilingCmdChange() { } } -func (m *ClusterdHandler) getProfilingFromFile() { +func (m *clusterdHandler) getProfilingFromFile() { profilingSwitch, err := utils.GetProfilingSwitch(constant.ProfilingSwitchFilePath) if err != nil { hwlog.RunLog.Errorf("GetProfilingSwitch err: %v", err) @@ -273,16 +309,16 @@ func (m *ClusterdHandler) getProfilingFromFile() { m.enqueueProfilingSwitch(domainSwitch, constant.TaskDRank) } -func (m *ClusterdHandler) getAddr() (string, error) { +func (m *clusterdHandler) getAddr() (string, error) { res := m.clusterInfo.Ip + ":" + m.clusterInfo.Port - if m.clusterInfo.Ip == "" || m.clusterInfo.Port == "" { - return "", fmt.Errorf("invalid ip:port %s", res) + if m.clusterInfo.Ip != "" && m.clusterInfo.Port != "" && net.ParseIP(m.clusterInfo.Ip) != nil { + return res, nil } - parsedIP := net.ParseIP(m.clusterInfo.Ip) - if parsedIP == nil { - return "", fmt.Errorf("%s is NOT a valid IP address", m.clusterInfo.Ip) + addr, err := utils.GetClusterdAddr() + if err != nil { + return "", fmt.Errorf("get address from %v and env err: %v", m.clusterInfo, err) } - return res, nil + return addr, nil } func convertProfilingMsg(profilingSwitchData *profiling.ProfilingSwitch) constant.ProfilingSwitch { diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/factory.go similarity index 67% rename from component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go rename to component/taskd/taskd/go/framework_backend/manager/service/handler/factory.go index 34e10c57a..f9427f547 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/methods.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/factory.go @@ -19,15 +19,37 @@ import ( "context" "fmt" + "ascend-common/common-utils/hwlog" "taskd/common/constant" + "taskd/framework_backend/manager/infrastructure" "taskd/framework_backend/manager/infrastructure/storage" ) +var ( + clusterd Handler +) + // InitHandler return handler according to Role of ClusterInfo func InitHandler( jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) (Handler, error) { if info.Role == constant.ClusterDRank { - return NewClusterdHandler(jobId, svcCtx, queue, info), nil + clusterd = newClusterdHandler(jobId, svcCtx, queue, info) + return clusterd, nil } return nil, fmt.Errorf("init handler for %v failed", info) } + +// DistributeMsg to cluster according to receiver +func DistributeMsg(msg infrastructure.Msg, receiver string) bool { + if receiver == constant.ClusterDRank { + if clusterd == nil { + hwlog.RunLog.Debugf("clusterd handler is nil") + return false + } + go func() { + err := clusterd.SendMsg(msg.Body) + hwlog.RunLog.Error(err) + }() + } + return false +} diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go index 7292e89a8..b22c6826a 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go @@ -26,19 +26,18 @@ import ( // Handler interface of clusters type Handler interface { Handle() error - SendMsg(msg string, msgType string) error + SendMsg(msg storage.MsgBody) error } // BaseHandler of clusters -type BaseHandler struct { +type baseHandler struct { jobId string svcCtx context.Context queue *storage.MsgQueue clusterInfo constant.ClusterInfo } -// ClusterdHandler handler of cluseterd -type ClusterdHandler struct { - *BaseHandler +type clusterdHandler struct { + *baseHandler profilingFromClusterD atomic.Bool } -- Gitee From b532157cfcb548909b61087f9fc22a193bbab4e2 Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 16:43:27 +0800 Subject: [PATCH 6/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/service/handler/clusterd.go | 73 +++++++++++-------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go index beffdfb9d..bcd805032 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go @@ -64,7 +64,7 @@ func (m *clusterdHandler) Handle() error { return fmt.Errorf("get clusterd addr err %v", err) } hwlog.RunLog.Infof("get clusterd addr %v", addr) - go m.registerClusterd(addr, 0) + go m.registerClusterd(addr) go m.watchProfilingCmdChange() return nil } @@ -110,24 +110,30 @@ func (m *clusterdHandler) sendSwitchNicStatusRetry(msg storage.MsgBody) error { break } if !sendSucc { - return fmt.Errorf("ReplySwitchNicResult failed, last err: %v", err) + return fmt.Errorf("reply switchNic result failed, last err: %v", err) } return nil } -func (m *clusterdHandler) registerClusterd(addr string, retryTime time.Duration) { - if retryTime >= maxRegRetryTime { - hwlog.RunLog.Error("init clusterd connect meet max retry time") - return +func (m *clusterdHandler) registerClusterd(addr string) { + var conn *grpc.ClientConn + var err error + var succ bool + for retryTime := time.Duration(0); retryTime < maxRegRetryTime; retryTime++ { + time.Sleep(retryTime * time.Second) + conn, err = grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + hwlog.RunLog.Errorf("init clusterd connect err: %v", err) + continue + } + succ = true + break } - time.Sleep(retryTime * time.Second) - conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - hwlog.RunLog.Errorf("init clusterd connect err: %v", err) - m.registerClusterd(addr, retryTime+1) + if !succ { + hwlog.RunLog.Error("init clusterd connect meet max retry time") return } - go m.subscribeProfiling(conn, 0) + go m.subscribeProfiling(conn) go m.subscribeSwitchNic(conn) } @@ -214,33 +220,42 @@ func (m *clusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { hwlog.RunLog.Infof("enqueue switch msg %v", msg) } -func (m *clusterdHandler) subscribeProfiling(conn *grpc.ClientConn, retryTime time.Duration) { - m.profilingFromClusterD.Store(false) - if retryTime >= maxRegRetryTime { - hwlog.RunLog.Error("register Cluster profiling meet max retry time") - return +func (m *clusterdHandler) subscribeProfiling(conn *grpc.ClientConn) { + var stream profiling.TrainingDataTrace_SubscribeDataTraceSwitchClient + var err error + var succ bool + for retryTime := time.Duration(0); retryTime < maxRegRetryTime; retryTime++ { + m.profilingFromClusterD.Store(false) + time.Sleep(retryTime * time.Second) + traceClient := profiling.NewTrainingDataTraceClient(conn) + stream, err = traceClient.SubscribeDataTraceSwitch(m.svcCtx, &profiling.ProfilingClientInfo{ + JobId: m.jobId, + Role: roleTaskd, + }) + if err != nil { + hwlog.RunLog.Errorf("register Cluster profiling fail, err: %v", err) + continue + } + if !m.recvProfiling(stream) { + break + } } - time.Sleep(retryTime * time.Second) - traceClient := profiling.NewTrainingDataTraceClient(conn) - stream, err := traceClient.SubscribeDataTraceSwitch(m.svcCtx, &profiling.ProfilingClientInfo{ - JobId: m.jobId, - Role: roleTaskd, - }) - if err != nil { - hwlog.RunLog.Errorf("register Cluster profiling fail, err: %v", err) - go m.subscribeProfiling(conn, retryTime+1) + if !succ { + hwlog.RunLog.Error("register Cluster profiling meet max retry time") return } +} + +func (m *clusterdHandler) recvProfiling(stream profiling.TrainingDataTrace_SubscribeDataTraceSwitchClient) bool { m.profilingFromClusterD.Store(true) for { select { case <-m.svcCtx.Done(): hwlog.RunLog.Info("taskd exit, stop subscribe clusterd fault info") - return + return false case <-stream.Context().Done(): hwlog.RunLog.Info("client stream exit, stop subscribe profiling info and re-register") - go m.subscribeProfiling(conn, retryTime+1) - return + return true default: responseMsg, recvErr := stream.Recv() if recvErr != nil { -- Gitee From d158d139ebd675fb2e090551591177987d6c42c7 Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 16:57:13 +0800 Subject: [PATCH 7/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/application/businessStream.go | 4 +- .../go/framework_backend/manager/manager.go | 6 +-- .../framework_backend/manager/manager_test.go | 6 +-- .../service/{handler => adaptor}/clusterd.go | 38 +++++++++---------- .../service/{handler => adaptor}/factory.go | 14 +++---- .../service/{handler => adaptor}/type.go | 15 ++++---- 6 files changed, 41 insertions(+), 42 deletions(-) rename component/taskd/taskd/go/framework_backend/manager/service/{handler => adaptor}/clusterd.go (90%) rename component/taskd/taskd/go/framework_backend/manager/service/{handler => adaptor}/factory.go (82%) rename component/taskd/taskd/go/framework_backend/manager/service/{handler => adaptor}/type.go (80%) diff --git a/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go b/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go index 4270e1d3b..587391c71 100644 --- a/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go +++ b/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go @@ -27,7 +27,7 @@ import ( "taskd/framework_backend/manager/infrastructure" "taskd/framework_backend/manager/infrastructure/storage" "taskd/framework_backend/manager/service" - "taskd/framework_backend/manager/service/handler" + "taskd/framework_backend/manager/service/adaptor" "taskd/toolkit_backend/net/common" ) @@ -146,7 +146,7 @@ func (b *BusinessStreamProcessor) DistributeMsg(msgs []infrastructure.Msg) error b.DistributedMsgToMgr(msg) continue } - if handler.DistributeMsg(msg, receiver) { + if adaptor.DistributeMsg(msg, receiver) { continue } sendMsg, err := json.Marshal(msg.Body) diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index f0db9158c..5f140c178 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -25,7 +25,7 @@ import ( "taskd/common/utils" "taskd/framework_backend/manager/application" "taskd/framework_backend/manager/infrastructure/storage" - "taskd/framework_backend/manager/service/handler" + "taskd/framework_backend/manager/service/adaptor" ) // NewTaskDManager return taskd manager instance @@ -116,12 +116,12 @@ func (m *BaseManager) clusterHandle() { } func (m *BaseManager) clusterdHandle(info constant.ClusterInfo) { - clusterHandler, err := handler.InitHandler(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) + clusterAdaptor, err := adaptor.InitAdaptor(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) if err != nil { hwlog.RunLog.Error(err) return } - if err = clusterHandler.Handle(); err != nil { + if err = clusterAdaptor.Handle(); err != nil { hwlog.RunLog.Error(err) } } diff --git a/component/taskd/taskd/go/framework_backend/manager/manager_test.go b/component/taskd/taskd/go/framework_backend/manager/manager_test.go index d8925a858..1ebf167ee 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager_test.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager_test.go @@ -29,7 +29,7 @@ import ( "taskd/common/utils" "taskd/framework_backend/manager/application" "taskd/framework_backend/manager/infrastructure/storage" - "taskd/framework_backend/manager/service/handler" + "taskd/framework_backend/manager/service/adaptor" ) const ( @@ -156,8 +156,8 @@ func TestBaseManagerClusterHandle(t *testing.T) { defer patches.Reset() calledHandle := false m := &mockHandler{} - patches.ApplyFunc(handler.InitHandler, func( - string, context.Context, *storage.MsgQueue, constant.ClusterInfo) (handler.Handler, error) { + patches.ApplyFunc(adaptor.InitAdaptor, func( + string, context.Context, *storage.MsgQueue, constant.ClusterInfo) (adaptor.Adaptor, error) { return m, nil }).ApplyMethod(m, "Handle", func(*mockHandler) error { calledHandle = true diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/clusterd.go similarity index 90% rename from component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go rename to component/taskd/taskd/go/framework_backend/manager/service/adaptor/clusterd.go index bcd805032..3c0161fee 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/clusterd.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/clusterd.go @@ -12,8 +12,8 @@ limitations under the License. */ -// Package handler is to provide cluster handler, i.e. clusterd, taskd -package handler +// Package adaptor is to provide cluster adaptor, i.e. clusterd, taskd +package adaptor import ( "context" @@ -44,10 +44,10 @@ const ( waitGapTime = 1 ) -func newClusterdHandler( - jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *clusterdHandler { - return &clusterdHandler{ - baseHandler: &baseHandler{ +func newClusterdAdapator( + jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *clusterdAdaptor { + return &clusterdAdaptor{ + baseAdaptor: &baseAdaptor{ jobId: jobId, svcCtx: svcCtx, queue: queue, @@ -58,7 +58,7 @@ func newClusterdHandler( } // Handle Clusterd -func (m *clusterdHandler) Handle() error { +func (m *clusterdAdaptor) Handle() error { addr, err := m.getAddr() if err != nil { return fmt.Errorf("get clusterd addr err %v", err) @@ -70,14 +70,14 @@ func (m *clusterdHandler) Handle() error { } // SendMsg to Clusterd -func (m *clusterdHandler) SendMsg(msg storage.MsgBody) error { +func (m *clusterdAdaptor) SendMsg(msg storage.MsgBody) error { if msg.MsgType == constant.SwitchNic { return m.sendSwitchNicStatusRetry(msg) } return fmt.Errorf("cannot handle msg %v", msg) } -func (m *clusterdHandler) sendSwitchNicStatusRetry(msg storage.MsgBody) error { +func (m *clusterdAdaptor) sendSwitchNicStatusRetry(msg storage.MsgBody) error { var status bool switch msg.Code { case 0: @@ -115,7 +115,7 @@ func (m *clusterdHandler) sendSwitchNicStatusRetry(msg storage.MsgBody) error { return nil } -func (m *clusterdHandler) registerClusterd(addr string) { +func (m *clusterdAdaptor) registerClusterd(addr string) { var conn *grpc.ClientConn var err error var succ bool @@ -137,7 +137,7 @@ func (m *clusterdHandler) registerClusterd(addr string) { go m.subscribeSwitchNic(conn) } -func (m *clusterdHandler) subscribeSwitchNic(conn *grpc.ClientConn) { +func (m *clusterdAdaptor) subscribeSwitchNic(conn *grpc.ClientConn) { client := pb.NewRecoverClient(conn) clientInfo := &pb.ClientInfo{ JobId: m.jobId, @@ -156,7 +156,7 @@ func (m *clusterdHandler) subscribeSwitchNic(conn *grpc.ClientConn) { } } -func (m *clusterdHandler) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { +func (m *clusterdAdaptor) listenSignal(client pb.RecoverClient, clientInfo *pb.ClientInfo, wTime int) (bool, int) { stream, err := client.SubscribeNotifySwitch(m.svcCtx, clientInfo) if err != nil { hwlog.RunLog.Errorf("register Clusterd notify switch fail, err: %v", err) @@ -188,7 +188,7 @@ func (m *clusterdHandler) listenSignal(client pb.RecoverClient, clientInfo *pb.C } } -func (m *clusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { +func (m *clusterdAdaptor) enqueueSwitchNic(ranks []string, ops []bool) { rankStr := utils.ObjToString(ranks) opStr := utils.ObjToString(ops) msg := map[string]string{ @@ -220,7 +220,7 @@ func (m *clusterdHandler) enqueueSwitchNic(ranks []string, ops []bool) { hwlog.RunLog.Infof("enqueue switch msg %v", msg) } -func (m *clusterdHandler) subscribeProfiling(conn *grpc.ClientConn) { +func (m *clusterdAdaptor) subscribeProfiling(conn *grpc.ClientConn) { var stream profiling.TrainingDataTrace_SubscribeDataTraceSwitchClient var err error var succ bool @@ -246,7 +246,7 @@ func (m *clusterdHandler) subscribeProfiling(conn *grpc.ClientConn) { } } -func (m *clusterdHandler) recvProfiling(stream profiling.TrainingDataTrace_SubscribeDataTraceSwitchClient) bool { +func (m *clusterdAdaptor) recvProfiling(stream profiling.TrainingDataTrace_SubscribeDataTraceSwitchClient) bool { m.profilingFromClusterD.Store(true) for { select { @@ -271,7 +271,7 @@ func (m *clusterdHandler) recvProfiling(stream profiling.TrainingDataTrace_Subsc } } -func (m *clusterdHandler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { +func (m *clusterdAdaptor) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd, whichServer string) { message := storage.BaseMessage{ Header: storage.MsgHeader{ BizType: "default", @@ -295,7 +295,7 @@ func (m *clusterdHandler) enqueueProfilingSwitch(cmd constant.ProfilingDomainCmd hwlog.RunLog.Infof("%s enqueue profiling cmd %v", whichServer, cmd) } -func (m *clusterdHandler) watchProfilingCmdChange() { +func (m *clusterdAdaptor) watchProfilingCmdChange() { hwlog.RunLog.Info("begin watch ProfilingSwitchFilePath...") ticker := time.NewTicker(time.Second) defer ticker.Stop() @@ -314,7 +314,7 @@ func (m *clusterdHandler) watchProfilingCmdChange() { } } -func (m *clusterdHandler) getProfilingFromFile() { +func (m *clusterdAdaptor) getProfilingFromFile() { profilingSwitch, err := utils.GetProfilingSwitch(constant.ProfilingSwitchFilePath) if err != nil { hwlog.RunLog.Errorf("GetProfilingSwitch err: %v", err) @@ -324,7 +324,7 @@ func (m *clusterdHandler) getProfilingFromFile() { m.enqueueProfilingSwitch(domainSwitch, constant.TaskDRank) } -func (m *clusterdHandler) getAddr() (string, error) { +func (m *clusterdAdaptor) getAddr() (string, error) { res := m.clusterInfo.Ip + ":" + m.clusterInfo.Port if m.clusterInfo.Ip != "" && m.clusterInfo.Port != "" && net.ParseIP(m.clusterInfo.Ip) != nil { return res, nil diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/factory.go b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go similarity index 82% rename from component/taskd/taskd/go/framework_backend/manager/service/handler/factory.go rename to component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go index f9427f547..3eb88ccd3 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/factory.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go @@ -12,8 +12,8 @@ limitations under the License. */ -// Package handler is to provide cluster handler, i.e. clusterd, taskd -package handler +// Package adaptor is to provide cluster adaptor, i.e. clusterd, taskd +package adaptor import ( "context" @@ -26,14 +26,14 @@ import ( ) var ( - clusterd Handler + clusterd Adaptor ) -// InitHandler return handler according to Role of ClusterInfo -func InitHandler( - jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) (Handler, error) { +// InitAdaptor return handler according to Role of ClusterInfo +func InitAdaptor( + jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) (Adaptor, error) { if info.Role == constant.ClusterDRank { - clusterd = newClusterdHandler(jobId, svcCtx, queue, info) + clusterd = newClusterdAdapator(jobId, svcCtx, queue, info) return clusterd, nil } return nil, fmt.Errorf("init handler for %v failed", info) diff --git a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go similarity index 80% rename from component/taskd/taskd/go/framework_backend/manager/service/handler/type.go rename to component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go index b22c6826a..69957d52e 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/handler/type.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go @@ -12,8 +12,8 @@ limitations under the License. */ -// Package handler is to provide cluster handler, i.e. clusterd, taskd -package handler +// Package adaptor is to provide cluster adaptor, i.e. clusterd, taskd +package adaptor import ( "context" @@ -23,21 +23,20 @@ import ( "taskd/framework_backend/manager/infrastructure/storage" ) -// Handler interface of clusters -type Handler interface { +// Adaptor interface of clusters +type Adaptor interface { Handle() error SendMsg(msg storage.MsgBody) error } -// BaseHandler of clusters -type baseHandler struct { +type baseAdaptor struct { jobId string svcCtx context.Context queue *storage.MsgQueue clusterInfo constant.ClusterInfo } -type clusterdHandler struct { - *baseHandler +type clusterdAdaptor struct { + *baseAdaptor profilingFromClusterD atomic.Bool } -- Gitee From 6d14f9558d04079f2e630817d28046812ab34ff6 Mon Sep 17 00:00:00 2001 From: lirui238 <2396601465@qq.com> Date: Mon, 16 Jun 2025 18:02:14 +0800 Subject: [PATCH 8/8] =?UTF-8?q?=E3=80=90=E4=BF=AE=E6=94=B9=E8=AF=B4?= =?UTF-8?q?=E6=98=8E=E3=80=91[taskd]=20cluster-refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../taskd/taskd/go/common/constant/const.go | 3 + .../manager/application/businessStream.go | 19 +------ .../manager/application/msghandler.go | 11 ---- .../manager/application/msghandler_test.go | 22 ------- .../go/framework_backend/manager/manager.go | 13 ++++- .../manager/service/adaptor/factory.go | 24 ++++---- .../manager/service/adaptor/taskd.go | 57 +++++++++++++++++++ .../manager/service/adaptor/type.go | 4 ++ 8 files changed, 89 insertions(+), 64 deletions(-) create mode 100644 component/taskd/taskd/go/framework_backend/manager/service/adaptor/taskd.go diff --git a/component/taskd/taskd/go/common/constant/const.go b/component/taskd/taskd/go/common/constant/const.go index 2aa72e3df..e713bbc1f 100644 --- a/component/taskd/taskd/go/common/constant/const.go +++ b/component/taskd/taskd/go/common/constant/const.go @@ -131,6 +131,9 @@ const ( ProfilingAllOnCmdCode = 711 ) +// DefaultBizType is net tool default biz type +const DefaultBizType = "default" + // RequestChanNum message handler chan number const RequestChanNum = 100 diff --git a/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go b/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go index 587391c71..68625c9fe 100644 --- a/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go +++ b/component/taskd/taskd/go/framework_backend/manager/application/businessStream.go @@ -20,8 +20,6 @@ import ( "fmt" "strings" - "github.com/google/uuid" - "ascend-common/common-utils/hwlog" "taskd/common/constant" "taskd/framework_backend/manager/infrastructure" @@ -142,10 +140,6 @@ func (b *BusinessStreamProcessor) DistributeMsg(msgs []infrastructure.Msg) error continue } for _, receiver := range msg.Receiver { - if receiver == common.MgrRole { - b.DistributedMsgToMgr(msg) - continue - } if adaptor.DistributeMsg(msg, receiver) { continue } @@ -160,17 +154,6 @@ func (b *BusinessStreamProcessor) DistributeMsg(msgs []infrastructure.Msg) error return nil } -// DistributedMsgToMgr distributed message to manager -func (b *BusinessStreamProcessor) DistributedMsgToMgr(msg infrastructure.Msg) { - b.MsgHandler.SendMsgToMgr(uuid.New().String(), constant.DefaultDomainName, - &common.Position{ - Role: common.MgrRole, - ServerRank: "0", - ProcessRank: "-1", - }, msg.Body) - hwlog.RunLog.Debugf("business handler send msg %v to mgr", msg.Body) -} - // DistributedMsgToOthers distributed message to others func (b *BusinessStreamProcessor) DistributedMsgToOthers(receiver string, sendMsg []byte) { var dst *common.Position @@ -189,6 +172,6 @@ func (b *BusinessStreamProcessor) DistributedMsgToOthers(receiver string, sendMs return } } - b.MsgHandler.SendMsgUseGrpc(constant.DefaultDomainName, string(sendMsg), dst) + b.MsgHandler.SendMsgUseGrpc(constant.DefaultBizType, string(sendMsg), dst) hwlog.RunLog.Debugf("business handler send msg %s to others", string(sendMsg)) } diff --git a/component/taskd/taskd/go/framework_backend/manager/application/msghandler.go b/component/taskd/taskd/go/framework_backend/manager/application/msghandler.go index 18d4c1611..7ada603eb 100644 --- a/component/taskd/taskd/go/framework_backend/manager/application/msghandler.go +++ b/component/taskd/taskd/go/framework_backend/manager/application/msghandler.go @@ -35,7 +35,6 @@ import ( type MsgHandlerInterface interface { GetDataPool() *storage.DataPool SendMsgUseGrpc(msgType string, msgBody string, dst *common.Position) - SendMsgToMgr(uuid string, bizType string, src *common.Position, msgBody storage.MsgBody) } // MsgHandler receive, send, process and store message info @@ -176,16 +175,6 @@ func (mhd *MsgHandler) SendMsgUseGrpc(msgType string, msgBody string, dst *commo } } -// SendMsgToMgr send message into manager message queue -func (mhd *MsgHandler) SendMsgToMgr(uuid string, bizType string, src *common.Position, msgBody storage.MsgBody) { - data := mhd.MsgQueue.NewMsg(uuid, bizType, src, msgBody) - err := mhd.MsgQueue.Enqueue(data) - if err != nil { - hwlog.RunLog.Errorf("enqueue failed: %v", err) - mhd.SendMsgUseGrpc(bizType, err.Error(), src) - } -} - // GetDataPool return data pool func (mhd *MsgHandler) GetDataPool() *storage.DataPool { return mhd.DataPool diff --git a/component/taskd/taskd/go/framework_backend/manager/application/msghandler_test.go b/component/taskd/taskd/go/framework_backend/manager/application/msghandler_test.go index d1addc0bf..757d8d3ad 100644 --- a/component/taskd/taskd/go/framework_backend/manager/application/msghandler_test.go +++ b/component/taskd/taskd/go/framework_backend/manager/application/msghandler_test.go @@ -27,7 +27,6 @@ import ( "github.com/smartystreets/goconvey/convey" "ascend-common/common-utils/hwlog" - "taskd/common/constant" "taskd/framework_backend/manager/infrastructure/storage" "taskd/framework_backend/manager/service" "taskd/toolkit_backend/net" @@ -137,24 +136,3 @@ func TestSendMsgUseGrpc(t *testing.T) { convey.So(req.Dst, convey.ShouldEqual, testDst) }) } - -// TestSendMsgToMgr test manager send msg enqueue -func TestSendMsgToMgr(t *testing.T) { - convey.Convey("TestSendMsgToMgr manager send msg enqueue success", t, func() { - mhd := NewMsgHandler() - testSrc := &common.Position{Role: common.WorkerRole} - oldLength := len(mhd.MsgQueue.Queue) - mhd.SendMsgToMgr("test-uuid", "test-type", testSrc, storage.MsgBody{}) - convey.So(oldLength+1, convey.ShouldEqual, len(mhd.MsgQueue.Queue)) - }) - convey.Convey("TestSendMsgToMgr manager send msg enqueue fail", t, func() { - mhd := &MsgHandler{ - Sender: &service.MsgSender{RequestChan: make(chan service.SendGrpcMsg, constant.RequestChanNum)}, - MsgQueue: &storage.MsgQueue{Queue: make([]storage.BaseMessage, constant.MaxMsgQueueLength), - Mutex: sync.Mutex{}}, - } - testSrc := &common.Position{Role: common.WorkerRole} - mhd.SendMsgToMgr("test-uuid", "test-type", testSrc, storage.MsgBody{}) - convey.So(len(mhd.MsgQueue.Queue), convey.ShouldEqual, constant.MaxMsgQueueLength) - }) -} diff --git a/component/taskd/taskd/go/framework_backend/manager/manager.go b/component/taskd/taskd/go/framework_backend/manager/manager.go index 5f140c178..50ada6cca 100644 --- a/component/taskd/taskd/go/framework_backend/manager/manager.go +++ b/component/taskd/taskd/go/framework_backend/manager/manager.go @@ -104,18 +104,25 @@ func (m *BaseManager) Service(snapshot *storage.SnapShot) error { func (m *BaseManager) clusterHandle() { foundClusterd := false + foundTaskd := false for _, info := range m.ClusterInfos { + m.clusterAdaptor(info) if info.Role == constant.ClusterDRank { - m.clusterdHandle(info) foundClusterd = true } + if info.Role == constant.TaskDRank { + foundTaskd = true + } } if !foundClusterd { - m.clusterdHandle(constant.ClusterInfo{Role: constant.ClusterDRank}) + m.clusterAdaptor(constant.ClusterInfo{Role: constant.ClusterDRank}) + } + if !foundTaskd { + m.clusterAdaptor(constant.ClusterInfo{Role: constant.TaskDRank}) } } -func (m *BaseManager) clusterdHandle(info constant.ClusterInfo) { +func (m *BaseManager) clusterAdaptor(info constant.ClusterInfo) { clusterAdaptor, err := adaptor.InitAdaptor(m.JobId, m.svcCtx, m.MsgHd.MsgQueue, info) if err != nil { hwlog.RunLog.Error(err) diff --git a/component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go index 3eb88ccd3..babe4c66e 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/factory.go @@ -25,31 +25,35 @@ import ( "taskd/framework_backend/manager/infrastructure/storage" ) -var ( - clusterd Adaptor -) +var adaptorMap map[string]Adaptor // InitAdaptor return handler according to Role of ClusterInfo func InitAdaptor( jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) (Adaptor, error) { + if adaptorMap == nil { + adaptorMap = make(map[string]Adaptor) + } if info.Role == constant.ClusterDRank { - clusterd = newClusterdAdapator(jobId, svcCtx, queue, info) + clusterd := newClusterdAdapator(jobId, svcCtx, queue, info) + adaptorMap[constant.ClusterDRank] = clusterd return clusterd, nil } + if info.Role == constant.TaskDRank { + taskd := NewTaskdAdaptor(jobId, svcCtx, queue, info) + adaptorMap[constant.TaskDRank] = taskd + return taskd, nil + } return nil, fmt.Errorf("init handler for %v failed", info) } // DistributeMsg to cluster according to receiver func DistributeMsg(msg infrastructure.Msg, receiver string) bool { - if receiver == constant.ClusterDRank { - if clusterd == nil { - hwlog.RunLog.Debugf("clusterd handler is nil") - return false - } + if clusterAdaptor, found := adaptorMap[receiver]; found { go func() { - err := clusterd.SendMsg(msg.Body) + err := clusterAdaptor.SendMsg(msg.Body) hwlog.RunLog.Error(err) }() + return true } return false } diff --git a/component/taskd/taskd/go/framework_backend/manager/service/adaptor/taskd.go b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/taskd.go new file mode 100644 index 000000000..bff0fbefb --- /dev/null +++ b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/taskd.go @@ -0,0 +1,57 @@ +/* Copyright(C) 2025. Huawei Technologies Co.,Ltd. All rights reserved. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +// Package adaptor is to provide cluster adaptor, i.e. clusterd, taskd +package adaptor + +import ( + "context" + "fmt" + + "github.com/google/uuid" + + "taskd/common/constant" + "taskd/framework_backend/manager/infrastructure/storage" + "taskd/toolkit_backend/net/common" +) + +func NewTaskdAdaptor( + jobId string, svcCtx context.Context, queue *storage.MsgQueue, info constant.ClusterInfo) *taskdAdaptor { + return &taskdAdaptor{ + baseAdaptor: &baseAdaptor{ + jobId: jobId, + svcCtx: svcCtx, + queue: queue, + clusterInfo: info, + }, + } +} + +func (t *taskdAdaptor) Handle() error { + return nil +} + +func (t *taskdAdaptor) SendMsg(msg storage.MsgBody) error { + newMsg := t.queue.NewMsg(uuid.New().String(), constant.DefaultBizType, + &common.Position{ + Role: common.MgrRole, + ServerRank: "0", + ProcessRank: "-1", + }, msg) + err := t.queue.Enqueue(newMsg) + if err != nil { + return fmt.Errorf("send msg to taskd err: %v", err) + } + return nil +} diff --git a/component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go index 69957d52e..2f4ef2f80 100644 --- a/component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go +++ b/component/taskd/taskd/go/framework_backend/manager/service/adaptor/type.go @@ -40,3 +40,7 @@ type clusterdAdaptor struct { *baseAdaptor profilingFromClusterD atomic.Bool } + +type taskdAdaptor struct { + *baseAdaptor +} -- Gitee