diff --git a/msmonitor/README.md b/msmonitor/README.md index 4ff0ae80006274c828665b269f09fe0a16389f2b..f38851159b9a55bc672fed12469d4d4f65583a2d 100644 --- a/msmonitor/README.md +++ b/msmonitor/README.md @@ -75,6 +75,9 @@ dyno --certs-dir /home/client_certs nputrace --start-step 10 --iterations 2 --ac ## 📖 特性介绍 ⚠️ 由于底层资源限制,npumonitor功能和nputrace不能同时开启。 + +1. 执行 dyno 命令后,响应结果里有一个 ‘response’ 的json字符串。该字符串中的 ‘commandStatus’ 字段用于标识命令是否生效:‘effective’ 表示命令会生效,‘ineffective’ 表示命令无效。其他字段均为 dynolog 的原生字段。 + ### 📈 npumonitor特性 npumonitor特性为用户提供轻量化监控关键指标的能力,npumonitor基于[MSPTI](https://www.hiascend.com/document/detail/zh/mindstudio/81RC1/T&ITools/Profiling/atlasprofiling_16_0021.html)开发,用户可以通过npumonitor查看模型运行时的计算、通信算子执行耗时。 具体使用方式请参考[npumonitor使用方式](./docs/npumonitor.md),MindSpore框架下使用方式请参考[MindSpore框架下msMonitor的使用方法](./docs/mindspore_adapter.md)。 diff --git a/msmonitor/dynolog_npu/dynolog/src/LibkinetoConfigManager.cpp b/msmonitor/dynolog_npu/dynolog/src/LibkinetoConfigManager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9fcbb141a90846bd7c5f4c22779807f13e70208d --- /dev/null +++ b/msmonitor/dynolog_npu/dynolog/src/LibkinetoConfigManager.cpp @@ -0,0 +1,361 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +#include "dynolog/src/LibkinetoConfigManager.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "hbt/src/common/System.h" +#ifdef __linux__ +#include +#endif + +namespace dynolog { + +namespace { + +const int VerboseLevel = 2; +constexpr std::chrono::seconds kKeepAliveTimeSecs(60); +constexpr char kConfigFile[] = "/etc/libkineto.conf"; + +inline void setThreadName(const std::string& name) { +#ifdef __linux__ + constexpr size_t kMaxBuff = 16; + std::array buff; + std::size_t len = name.copy(buff.begin(), 0, kMaxBuff - 1); + buff[len] = '\0'; + ::prctl(PR_SET_NAME, buff.begin(), 0, 0, 0); +#endif +} + +} // namespace + +static std::string addTraceIdToConfigString( + const std::string& trace_id, + const std::string& config) { + const std::string kTraceIdIdentifier = "REQUEST_TRACE_ID"; + return fmt::format( + R"( + {} + {}={})", + config, + kTraceIdIdentifier, + trace_id); +} + +static std::string generateTraceId(int32_t pid) { + // Hostname + PID + timestamp should be a unique trace id in the context + // of this code's execution + std::string str_trace_id = fmt::format( + "{}:{}:{}", facebook::hbt::getHostName(), pid, std::time(nullptr)); + std::size_t hashed_trace_id = std::hash{}(str_trace_id); + return std::to_string(hashed_trace_id); +} + +LibkinetoConfigManager::LibkinetoConfigManager() { + managerThread_ = new std::thread(&LibkinetoConfigManager::start, this); +} + +LibkinetoConfigManager::~LibkinetoConfigManager() { + stopFlag_ = true; + managerCondVar_.notify_one(); + managerThread_->join(); + delete managerThread_; + managerThread_ = nullptr; +} + +std::shared_ptr LibkinetoConfigManager::getInstance() { + static auto instance = std::make_shared(); + return instance; +} + +void LibkinetoConfigManager::start() { + setThreadName("kinetoConfigMgr"); + // Periodically clean the job table and check base config changes. + // If a libkineto instance hasn't contacted us for a while, remove it. + LOG(INFO) << "Starting LibkinetoConfigManager runloop"; + while (true) { + refreshBaseConfig(); + std::unique_lock lock(mutex_); + managerCondVar_.wait_for(lock, kKeepAliveTimeSecs); + if (stopFlag_) { + break; + } + runGc(); + } +} + +// return "" on errors. Otherwise a config string. +static std::string readConfigFromConfigFile(const char* filename) { + // Read whole file into a string. + std::ifstream file(filename); + if (!file) { + return ""; + } + std::string conf; + try { + conf.assign( + std::istreambuf_iterator(file), std::istreambuf_iterator()); + } catch (std::exception& e) { + LOG(ERROR) << "Error in reading libkineto config from config file: " + << e.what(); + } + return conf; +} + +void LibkinetoConfigManager::refreshBaseConfig() { + auto cfg = readConfigFromConfigFile(kConfigFile); + if (!cfg.empty() && cfg != baseConfig_) { + std::lock_guard guard(mutex_); + baseConfig_ = cfg; + } +} + +void LibkinetoConfigManager::runGc() { + auto t = std::chrono::system_clock::now(); + int job_count = jobs_.size(); + for (auto job_it = jobs_.begin(); job_it != jobs_.end();) { + auto& procs = job_it->second; + for (auto proc_it = procs.begin(); proc_it != procs.end();) { + struct LibkinetoProcess& proc = proc_it->second; + if ((t - proc.lastRequestTime) > kKeepAliveTimeSecs) { + LOG(INFO) << fmt::format( + "Stopped tracking process ({}) from job {}", + fmt::join(proc_it->first, ","), + job_it->first); + onProcessCleanup(proc_it->first); + proc_it = procs.erase(proc_it); + } else { + proc_it++; + } + } + if (procs.empty()) { + LOG(INFO) << "Stopped tracking job " << job_it->first; + jobInstancesPerGpu_.erase(job_it->first); + job_it = jobs_.erase(job_it); + } else { + job_it++; + } + } + if (job_count != jobs_.size()) { + LOG(INFO) << "Tracked jobs: " << jobs_.size(); + } +} + +int32_t LibkinetoConfigManager::registerLibkinetoContext( + const std::string& jobId, + int32_t pid, + int32_t gpu) { + std::lock_guard guard(mutex_); + auto& instances = jobInstancesPerGpu_[jobId][gpu]; + instances.insert(pid); + LOG(INFO) << fmt::format("Registered process ({}) for job {}.", pid, jobId); + return instances.size(); +} + +// Called by libkineto instances periodically. +// In addition to returning a configuration string if one is found, +// register the jobId and set of pids with the config manager. +// This is how we keep track of running instances of libkineto. +// LibkinetoConfigManager::run() periodically scans the table +// for processes no longer calling this function and removes them. +std::string LibkinetoConfigManager::obtainOnDemandConfig( + const std::string& jobId, + const std::vector& pids, + int32_t configType) { + VLOG(VerboseLevel) << fmt::format( + "obtainOnDemandConfig({}, ({}), {})", + jobId, + fmt::join(pids, ","), + configType); + std::string ret; + std::set pids_set(pids.begin(), pids.end()); + std::lock_guard guard(mutex_); + + auto _emplace_result = jobs_[jobId].emplace(pids_set, LibkinetoProcess{}); + const auto& it = _emplace_result.first; + bool newProcess = _emplace_result.second; + struct LibkinetoProcess& process = it->second; + + if (newProcess) { + // First time - intialize! + // 'pids' is an ordered ancestor list starting with the + // child (leaf) process, i.e. the one making this request. + // Keep a copy of this pid so that clients can know which + // pids are being profiled. + process.pid = pids[0]; // Remember child (leaf) process + LOG(INFO) << fmt::format( + "Registered process ({}) for job {}.", fmt::join(pids, ", "), jobId); + + onRegisterProcess(pids_set); + } + if ((configType & int(LibkinetoConfigType::EVENTS)) && + !process.eventProfilerConfig.empty()) { + ret += process.eventProfilerConfig + "\n"; + process.eventProfilerConfig.clear(); + } + + if ((configType & int(LibkinetoConfigType::ACTIVITIES)) && + !process.activityProfilerConfig.empty()) { + ret += process.activityProfilerConfig + "\n"; + process.activityProfilerConfig.clear(); + } + // Track last request time so we know which libkineto instances + // are currently active. + process.lastRequestTime = std::chrono::system_clock::now(); + return ret; +} + +void LibkinetoConfigManager::setOnDemandConfigForProcess( + GpuProfilerResult& res, + LibkinetoProcess& process, + const std::string& config, + int32_t configType /* LibkinetoConfigType */, + int32_t limit) { + res.processesMatched.push_back(process.pid); + + if (res.eventProfilersTriggered.size() < limit && + (configType & int(LibkinetoConfigType::EVENTS))) { + if (process.eventProfilerConfig.empty()) { + process.eventProfilerConfig = config; + res.eventProfilersTriggered.push_back(process.pid); + } else { + res.eventProfilersBusy++; + } + } + if (res.activityProfilersTriggered.size() < limit && + (configType & int(LibkinetoConfigType::ACTIVITIES))) { + if (process.activityProfilerConfig.empty()) { + preCheckOnDemandConfig(process); + + std::string trace_id = generateTraceId(process.pid); + std::string updatedConfig = addTraceIdToConfigString(trace_id, config); + + res.activityProfilersTriggered.push_back(process.pid); + process.activityProfilerConfig = updatedConfig; + res.traceIds.push_back(trace_id); + + LOG(INFO) << " PID: " << process.pid << ", Trace Id: " << trace_id; + } else { + res.activityProfilersBusy++; + } + } +} + +// Called by clients to control one or more libkineto instances. +// The config is any legal libkineto on-demand config (see wiki). +// Set config type to indicate whether this request is for +// event profiling, activity profiling or both. +// The limit argument is used when the job uses multiple processes or +// the pid is a parent pid of multiple processes with libkineto. +// For example, when specifying a pid with 8 child processes, +// the limit argument can be used to profile 2 of those. +GpuProfilerResult LibkinetoConfigManager::setOnDemandConfig( + const std::string& jobId, + const std::set& pids, + const std::string& config, + int32_t configType /* LibkinetoConfigType */, + int32_t limit) { + LOG(INFO) << fmt::format( + + "Initiating on-demand GPU profiling for job ID {}, pids [{}]", + jobId, + fmt::join(pids, ",")); + + GpuProfilerResult res; + res.activityProfilersBusy = 0; + res.eventProfilersBusy = 0; + + size_t nPids = pids.size(); + // For backwards compatibility with older versions of the dyno CLI, + // there are two conditions under which all processes should be traced: + // 1. target PIDs are empty + // 2. target PIDs contain a single PID, 0. + // As older versions of the CLI are phased out, 2) will no longer need to be + // accounted for. + bool traceAllPids = nPids == 0 || (nPids == 1 && *pids.begin() == 0); + { + std::lock_guard guard(mutex_); + if (auto it = jobs_.find(jobId); it != jobs_.end()) { + auto& processes = it->second; + for (auto& pair : processes) { + for (const auto& pid : pair.first) { + // Trace the process if we find a match or target pids is empty. + if (traceAllPids || pids.find(pid) != pids.end()) { + auto& process = pair.second; + setOnDemandConfigForProcess( + res, process, config, configType, limit); + // the user could provide multiple pids that belong to the same the + // LibkientoProcess object, so we break after the first match for + // the LibkinetoProcess. + break; + } + } + } + if (res.activityProfilersTriggered.size() > 0) { + onSetOnDemandConfig(pids); + } + } + } + + LOG(INFO) << "On-demand request: " << res.processesMatched.size() + << " matching processes"; + if (configType & int(LibkinetoConfigType::EVENTS)) { + LOG(INFO) << "Installed event profiler config for " + << res.eventProfilersTriggered.size() << " process(es) " << "(" + << res.eventProfilersBusy << " busy)"; + } + if (configType & int(LibkinetoConfigType::ACTIVITIES)) { + LOG(INFO) << "Installed activity profiler config for " + << res.activityProfilersTriggered.size() << " process(es) " << "(" + << res.activityProfilersBusy << " busy)"; + } + return res; +} + +int LibkinetoConfigManager::processCount(const std::string& jobId) const { + int count = 0; + std::lock_guard guard(mutex_); + auto it = jobs_.find(jobId); + if (it != jobs_.end()) { + count = it->second.size(); + } + LOG(INFO) << "Process count for job ID " << jobId << ": " << count; + return count; +} + +void LibkinetoConfigManager::updateNpuStatus( + const std::string& jobId, + int32_t pid, + int32_t status, + const std::string& msgType) { + // jobId, pid为预留参数,目前无用 + std::lock_guard guard(mutex_); + if (msgType == kLibkinetoTraceStatus) { + npuTraceStatus_ = status; + } else if (msgType == kLibkinetoMonitorStatus) { + npuMonitorStatus_ = status; + } +} + +int32_t LibkinetoConfigManager::getNpuTraceStatus() +{ + std::lock_guard guard(mutex_); + return npuTraceStatus_; +} + +int32_t LibkinetoConfigManager::getNpuMonitorStatus() +{ + std::lock_guard guard(mutex_); + return npuMonitorStatus_; +} + +} // namespace dynolog diff --git a/msmonitor/dynolog_npu/dynolog/src/LibkinetoConfigManager.h b/msmonitor/dynolog_npu/dynolog/src/LibkinetoConfigManager.h new file mode 100644 index 0000000000000000000000000000000000000000..437609ea0492a32624c2202777e48fe821248840 --- /dev/null +++ b/msmonitor/dynolog_npu/dynolog/src/LibkinetoConfigManager.h @@ -0,0 +1,110 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "dynolog/src/LibkinetoTypes.h" + +namespace dynolog { + +const std::string kLibkinetoTraceStatus = "npuTraceStatus"; +const std::string kLibkinetoMonitorStatus = "npuMonitorStatus"; + +class LibkinetoConfigManager { + public: + LibkinetoConfigManager(); + virtual ~LibkinetoConfigManager(); + + int32_t + registerLibkinetoContext(const std::string& jobId, int32_t pid, int32_t gpu); + static std::shared_ptr getInstance(); + + std::string getBaseConfig() { + std::lock_guard guard(mutex_); + return baseConfig_; + } + + std::string obtainOnDemandConfig( + const std::string& jobId, + const std::vector& pids, + int32_t configType); + + GpuProfilerResult setOnDemandConfig( + const std::string& jobId, + const std::set& pids, + const std::string& config, + int32_t configType, + int32_t limit); + + void updateNpuStatus(const std::string& jobId, int32_t pid, int32_t status, const std::string& msgType); + int32_t getNpuTraceStatus(); + int32_t getNpuMonitorStatus(); + + // Return the number of active libkineto processes + // with the given Chronos / Tangram Job Id + int processCount(const std::string& jobId) const; + + protected: + struct LibkinetoProcess { + int32_t pid; + std::chrono::system_clock::time_point lastRequestTime; + std::string eventProfilerConfig; + std::string activityProfilerConfig; + }; + + // A few callbacks for additional instrumentation. + virtual void onRegisterProcess(const std::set& /*pids*/) {} + + virtual void preCheckOnDemandConfig(const LibkinetoProcess& /*process*/) {} + + virtual void onSetOnDemandConfig(const std::set& /*pids*/) {} + + virtual void onProcessCleanup(const std::set& /*pids*/) {} + + // Map of pid ancestry -> LibkinetoProcess + using ProcessMap = std::map, LibkinetoProcess>; + std::map jobs_; + + // Map of gpu id -> pids + using InstancesPerGpuMap = std::map>; + // Job id -> InstancesPerGpu + std::map jobInstancesPerGpu_; + mutable std::mutex mutex_; + + void setOnDemandConfigForProcess( + GpuProfilerResult& res, + LibkinetoProcess& process, + const std::string& config, + int32_t configType, + int32_t limit); + + private: + // Garbage collection and config refresh - periodically clean up + // data from terminated processes. + void start(); + void runGc(); + void refreshBaseConfig(); + + std::string baseConfig_; + std::thread* managerThread_{nullptr}; + std::atomic_bool stopFlag_{false}; + std::condition_variable managerCondVar_; + int32_t npuTraceStatus_ = 0; + int32_t npuMonitorStatus_ = 0; + // mutable std::mutex mutex_; TODO make private again +}; + +} // namespace dynolog diff --git a/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServerInl.h b/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServerInl.h new file mode 100644 index 0000000000000000000000000000000000000000..b51d973e4ff3f7d3f4434f79bac186640f8f0e79 --- /dev/null +++ b/msmonitor/dynolog_npu/dynolog/src/rpc/SimpleJsonServerInl.h @@ -0,0 +1,150 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +#pragma once + +#include +#include +#include +#include "dynolog/src/rpc/SimpleJsonServer.h" + +namespace dynolog { + +template +class SimpleJsonServer : public SimpleJsonServerBase { + public: + explicit SimpleJsonServer(std::shared_ptr handler, int port) + : SimpleJsonServerBase(port), handler_(std::move(handler)) {} + + ~SimpleJsonServer() {} + + std::string processOneImpl(const std::string& request) override; + + private: + std::shared_ptr handler_; +}; + +// convert to json and validate the request message +// the request should contain : +// { "fn" : "" +// .. +// } + +nlohmann::json toJson(const std::string& message) { + using json = nlohmann::json; + json result; + if (message.empty()) { + return result; + } + try { + result = json::parse(message); + } catch (json::parse_error&) { + LOG(ERROR) << "Error parsing message = " << message; + return result; + } + + if (result.empty() || !result.is_object()) { + LOG(ERROR) + << "Request message should not be empty and should be json object."; + return json(); + } + + if (!result.contains("fn")) { + LOG(ERROR) << "Request must contain a 'fn' field for the RPC call " + << " request json = " << result.dump(); + return json(); + } + + return result; +} + +std::string GetCommandStatus(const std::string& configStr) +{ + auto npuTraceStatus = LibkinetoConfigManager::getInstance()->getNpuTraceStatus(); + auto npuMonitorStatus = LibkinetoConfigManager::getInstance()->getNpuMonitorStatus(); + std::string prefix = "NPU_MONITOR_START"; + if (configStr.compare(0, prefix.size(), prefix) == 0) { + if (npuTraceStatus == 1) { + return "ineffective"; + } + else if (npuTraceStatus == 0) { + return "effective"; + } + else { + return "unknown"; + } + } else { + if (npuMonitorStatus == 1) { + return "ineffective"; + } + else if (npuMonitorStatus == 0) { + return "effective"; + } + else { + return "unknown"; + } + } +} + +template +std::string SimpleJsonServer::processOneImpl( + const std::string& request_str) { + using json = nlohmann::json; + json request = toJson(request_str); + json response; + + if (request.empty()) { + LOG(ERROR) << "Failed parsing request, continuing ..."; + return ""; + } + + if (request["fn"] == "getStatus") { + response["status"] = handler_->getStatus(); + } else if (request["fn"] == "getVersion") { + response["version"] = handler_->getVersion(); + } else if (request["fn"] == "setKinetOnDemandRequest") { + if (!request.contains("config") || !request.contains("pids")) { + response["status"] = "failed"; + } else { + try { + std::string config = request.value("config", ""); + std::vector pids = request.at("pids").get>(); + std::set pids_set{pids.begin(), pids.end()}; // TODO directly convert? + + int job_id = request.value("job_id", 0); + int process_limit = request.value("process_limit", 1000); + auto result = handler_->setKinetOnDemandRequest(job_id, pids_set, config, process_limit); + auto commandStatus = GetCommandStatus(config); + response["commandStatus"] = commandStatus; + response["processesMatched"] = result.processesMatched; + response["eventProfilersTriggered"] = result.eventProfilersTriggered; + response["activityProfilersTriggered"] = result.activityProfilersTriggered; + response["eventProfilersBusy"] = result.eventProfilersBusy; + response["activityProfilersBusy"] = result.activityProfilersBusy; + } catch (const std::exception& ex) { + LOG(ERROR) << "setKinetOnDemandRequest: parsing exception = " << ex.what(); + response["status"] = fmt::format("failed with exception = {}", ex.what()); + } + } + } else if (request["fn"] == "dcgmProfPause") { + if (!request.contains("duration_s")) { + response["status"] = "failed"; + } else { + int duration_s = request.value("duration_s", 300); + bool result = handler_->dcgmProfPause(duration_s); + response["status"] = result; + } + } else if (request["fn"] == "dcgmProfResume") { + bool result = handler_->dcgmProfResume(); + response["status"] = result; + } else { + LOG(ERROR) << "Unknown RPC call = " << request["fn"]; + return ""; + } + + return response.dump(); +} + +} // namespace dynolog diff --git a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp index b8ccd7aa023c146f12187473179838875287a05d..096c03716d96ae84218d3c1293099aa22b3e3484 100644 --- a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp @@ -78,6 +78,18 @@ void IPCMonitor::processMsg(std::unique_ptr msg) kLibkinetoRequest.data(), kLibkinetoRequest.size()) == 0) { getLibkinetoOnDemandRequest(std::move(msg)); + } else if ( + memcmp( // NOLINT(facebook-security-vulnerable-memcmp) + msg->metadata.type, + kLibkinetoTraceStatus.data(), + kLibkinetoTraceStatus.size()) == 0) { + updateLibkinetoStatus(std::move(msg), kLibkinetoTraceStatus); + } else if ( + memcmp( // NOLINT(facebook-security-vulnerable-memcmp) + msg->metadata.type, + kLibkinetoMonitorStatus.data(), + kLibkinetoMonitorStatus.size()) == 0) { + updateLibkinetoStatus(std::move(msg), kLibkinetoMonitorStatus); } else { LOG(ERROR) << "TYPE UNKOWN: " << msg->metadata.type; } @@ -188,5 +200,22 @@ void IPCMonitor::registerLibkinetoContext( return; } +void IPCMonitor::updateLibkinetoStatus( + std::unique_ptr msg, const std::string& msgType) +{ + struct NpuStatus { + int32_t status; + pid_t pid; + int64_t jobId; + }; + NpuStatus* status = (NpuStatus*)msg->buf.get(); + try { + LibkinetoConfigManager::getInstance()->updateNpuStatus( + std::to_string(status->jobId), status->pid, status->status, msgType); + } catch (const std::runtime_error& ex) { + LOG(ERROR) << "Kineto config manager exception when updateNpuStatus: " << ex.what(); + } +} + } // namespace tracing } // namespace dynolog diff --git a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h index cbc59fd2bbc796fd835a56117e9a42b195feae5d..b9c2c2ebc205c5af56a83c11fe822931ab044674 100644 --- a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h +++ b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h @@ -30,6 +30,7 @@ public: virtual void processDataMsg(std::unique_ptr msg); void getLibkinetoOnDemandRequest(std::unique_ptr msg); void registerLibkinetoContext(std::unique_ptr msg); + void updateLibkinetoStatus(std::unique_ptr msg, const std::string& msgType); void setLogger(std::unique_ptr logger); void LogData(const nlohmann::json& result); diff --git a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py index 2b389355453a2a0b980d60dc8cc63b0ccbd5a9b4..b2ceea962c0cd6ed47c1fe5cf1f10aca493f4dfb 100644 --- a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py +++ b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py @@ -44,3 +44,7 @@ class PyDynamicMonitorProxy: @classmethod def finalize_dyno(cls): ipcMonitor_C_module.finalize_dyno() + + @classmethod + def update_profiler_status(cls, status: dict): + ipcMonitor_C_module.update_profiler_status(status) \ No newline at end of file diff --git a/msmonitor/plugin/README.md b/msmonitor/plugin/README.md index b82d2a7507139802ef733076765f9923bdb7559c..cac84ea8e45644d2a0dc45edc18be156a620b978 100644 --- a/msmonitor/plugin/README.md +++ b/msmonitor/plugin/README.md @@ -18,6 +18,9 @@ __PyDynamicMonitorProxy接口说明__: * `finalize_dyno` 释放msmonitor中相关资源、线程 * input: None * return: None +* `update_profiler_status` 上报profiler_status + * input: status(Dict[str,str]) + * return: None ## 安装方式 ### 1. 通过shell脚本一键安装 diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index b678056853657bd0d64b5caff8b79433fcc798d7..794b49767b9748b8011746fca712273c8eab3515 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -37,4 +37,7 @@ PYBIND11_MODULE(IPCMonitor_C, m) { m.def("set_cluster_config_data", [](const std::unordered_map& cluster_config) -> void { dynolog_npu::ipc_monitor::MsptiMonitor::GetInstance()->SetClusterConfigData(cluster_config); }, py::arg("cluster_config")); + m.def("update_profiler_status", [](std::unordered_map& status) -> void { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->UpdateProfilerStatus(status); + }, py::arg("status")); } diff --git a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp index 8e0fe68f8c3ea8dfe5d9bef2e619bab45c080557..a8b8220a9a362ae3f1e67e99a425c466f4d1e63e 100644 --- a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp +++ b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.cpp @@ -124,6 +124,7 @@ void DynoLogNpuMonitor::EnableMsptiMonitor(std::unordered_map(MsptiMonitor::GetInstance()->IsStarted()), MSG_TYPE_MONITOR_STATUS); } } @@ -131,5 +132,15 @@ void DynoLogNpuMonitor::Finalize() { MsptiMonitor::GetInstance()->Uninit(); } + +void DynoLogNpuMonitor::UpdateNpuStatus(int32_t status, const std::string& msgType) +{ + bool res = ipcClient_.SendNpuStatus(status, msgType); + if (res) { + LOG(INFO) << "Send npu status successfully"; + } else { + LOG(WARNING) << "Send npu status failed"; + } +} } // namespace ipc_monitor } // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h index 50ea01723a1fcc86f4dc53e64fbb7aa463c6be95..3641925db49fbbbe89f3931bff2f3c4831b66c38 100644 --- a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h +++ b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h @@ -34,6 +34,7 @@ public: std::string Poll() override; void EnableMsptiMonitor(std::unordered_map& cfg_map); void Finalize(); + void UpdateNpuStatus(int32_t status, const std::string& msgType); void SetNpuId(int id) override { npuId_ = id; diff --git a/msmonitor/plugin/ipc_monitor/NpuIpcClient.cpp b/msmonitor/plugin/ipc_monitor/NpuIpcClient.cpp index 89639e4d004b5bc1aed157f12566997615b6789f..4bd425ed12ff538527c9b53a4c28f25f380add3b 100644 --- a/msmonitor/plugin/ipc_monitor/NpuIpcClient.cpp +++ b/msmonitor/plugin/ipc_monitor/NpuIpcClient.cpp @@ -45,6 +45,27 @@ bool IpcClient::RegisterInstance(int32_t npu) return true; } +bool IpcClient::SendNpuStatus(int32_t status, const std::string& msgType) +{ + NpuStatus npuStatus{ + .status = status, + .pid = GetProcessId(), + .jobId = JOB_ID, + }; + std::unique_ptr message = Message::ConstructMessage(npuStatus, msgType); + try { + if (!SyncSendMessage(*message, DYNO_IPC_NAME)) { + LOG(WARNING) << "Failed to send msmonitor status for pid " << npuStatus.pid << " with dyno"; + return false; + } + } catch (const std::exception &e) { + LOG(WARNING) << "Error when SyncSendMessage: " << e.what(); + return false; + } + LOG(INFO) << "Send msmonitor status for pid " << npuStatus.pid << " for dynolog success!"; + return true; +} + std::string IpcClient::IpcClientNpuConfig() { auto size = pids_.size(); diff --git a/msmonitor/plugin/ipc_monitor/NpuIpcClient.h b/msmonitor/plugin/ipc_monitor/NpuIpcClient.h index 4b4937bd6886c169faa2cfe76aeaf1ed10c85592..42cfcccf6da2a23355ffb4afc4e115c82c696173 100644 --- a/msmonitor/plugin/ipc_monitor/NpuIpcClient.h +++ b/msmonitor/plugin/ipc_monitor/NpuIpcClient.h @@ -34,6 +34,8 @@ constexpr const int MAX_SLEEP_US = 10000; const std::string DYNO_IPC_NAME = "dynolog"; const std::string MSG_TYPE_REQUEST = "req"; const std::string MSG_TYPE_CONTEXT = "ctxt"; +const std::string MSG_TYPE_TRACE_STATUS = "npuTraceStatus"; +const std::string MSG_TYPE_MONITOR_STATUS = "npuMonitorStatus"; const std::string MSG_TYPE_DATA = "data"; struct NpuRequest { @@ -49,6 +51,12 @@ struct NpuContext { int64_t jobId; }; +struct NpuStatus { + int32_t status; + pid_t pid; + int64_t jobId; +}; + struct Metadata { size_t size = 0; char type[TYPE_SIZE] = ""; @@ -135,6 +143,7 @@ public: IpcClient() = default; bool Init(); bool RegisterInstance(int32_t npu); + bool SendNpuStatus(int32_t npuTraceStatus, const std::string& msgType); std::string IpcClientNpuConfig(); bool SyncSendMessage(const Message &message, const std::string &destName, int numRetry = 10, int seepTimeUs = 10000); diff --git a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h index 4836b29301d14c18d10f0462b3a00f909e9fd1d0..a3e8105312baf9bac52e1eae8f4cf706a5787547 100644 --- a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h +++ b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h @@ -55,6 +55,19 @@ public: { DynoLogNpuMonitor::GetInstance()->Finalize(); } + + void UpdateProfilerStatus(std::unordered_map& status) + { + int32_t npuTraceStatus = 0; + auto it = status.find("profiler_status"); + if (it != status.end() && !it->second.empty()) { + Str2Int32(npuTraceStatus, it->second); + } else { + LOG(ERROR) << "Missing key 'profiler_status'."; + return; + } + DynoLogNpuMonitor::GetInstance()->UpdateNpuStatus(npuTraceStatus, MSG_TYPE_TRACE_STATUS); + } private: MonitorBase *monitor_ = nullptr; }; diff --git a/msmonitor/plugin/ipc_monitor/utils.cpp b/msmonitor/plugin/ipc_monitor/utils.cpp index 7c5c592494d6a9df7755385c533d757bb06c9ddf..d5025f09ebcab11ae3382434eed8f0841c524389 100644 --- a/msmonitor/plugin/ipc_monitor/utils.cpp +++ b/msmonitor/plugin/ipc_monitor/utils.cpp @@ -232,6 +232,26 @@ bool Str2Uint32(uint32_t& dest, const std::string& str) return true; } +bool Str2Int32(int32_t& dest, const std::string& str) +{ + if (str.empty()) { + LOG(ERROR) << "Str to int32 failed, input string is null"; + return false; + } + size_t pos = 0; + try { + dest = static_cast(std::stol(str, &pos)); + } catch(...) { + LOG(ERROR) << "Str to int32 failed, input string is " << str; + return false; + } + if (pos != str.size()) { + LOG(ERROR) << "Str to int32 failed, input string is " << str; + return false; + } + return true; +} + bool Str2Bool(bool& dest, const std::string& str) { std::string lower_str = str; diff --git a/msmonitor/plugin/ipc_monitor/utils.h b/msmonitor/plugin/ipc_monitor/utils.h index 8e699fb275e3e16668c5c95590cfa6d378eb725c..8a0d93ce42536acc9c214ab2231803c45a298bf8 100644 --- a/msmonitor/plugin/ipc_monitor/utils.h +++ b/msmonitor/plugin/ipc_monitor/utils.h @@ -36,6 +36,7 @@ std::vector> GetPidCommandPairsofAncestors(); std::string getCurrentTimestamp(); uint64_t getCurrentTimestamp64(); bool Str2Uint32(uint32_t& dest, const std::string& str); +bool Str2Int32(int32_t& dest, const std::string& str); bool Str2Bool(bool& dest, const std::string& str); std::string& trim(std::string& str); std::vector split(const std::string& str, char delimiter);