diff --git a/msmonitor/dynolog_npu/CMakeLists.txt b/msmonitor/dynolog_npu/CMakeLists.txt index 2aeddf3d667645f8739ef3b7aa6b55c0780ad8f3..1d43c3fed0144ff2e8f25f5d232088eba4dc715a 100644 --- a/msmonitor/dynolog_npu/CMakeLists.txt +++ b/msmonitor/dynolog_npu/CMakeLists.txt @@ -11,9 +11,6 @@ OFF) option(USE_PROMETHEUS "Enable logging to prometheus, this requires prometheus-cpp to be installed on the system" OFF) -option(USE_TENSORBOARD "Enable logging to tensorboard, this requires -protobuf to be installed on the system" -ON) if(USE_PROMETHEUS) find_package(prometheus-cpp CONFIG REQUIRED) diff --git a/msmonitor/dynolog_npu/dynolog/src/Main.cpp b/msmonitor/dynolog_npu/dynolog/src/Main.cpp index a24bb802ab478905894df725eb015a23cef8d629..7d210aea82fd85508c94cc8d6561db74da246398 100644 --- a/msmonitor/dynolog_npu/dynolog/src/Main.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/Main.cpp @@ -33,6 +33,11 @@ #include "dynolog/src/DynologTensorBoardLogger.h" #endif +DEFINE_string( + export_path, + "", + "Path to export metrics. If empty, metrics will not be exported to file."); + using namespace dynolog; using json = nlohmann::json; namespace hbt = facebook::hbt; @@ -191,6 +196,9 @@ int main(int argc, char** argv) if (FLAGS_enable_ipc_monitor) { LOG(INFO) << "Starting IPC Monitor"; ipcmon = std::make_unique(); + if (!ipcmon->CreateExportPath(FLAGS_export_path)) { + return -1; + } ipcmon->setLogger(std::move(getLogger())); ipcmon_thread = std::make_unique([&ipcmon]() { ipcmon->loop(); }); diff --git a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp index b8ccd7aa023c146f12187473179838875287a05d..adecae416806ffb8396754653aad2752fa281ac8 100644 --- a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp @@ -8,12 +8,17 @@ #include #include #include +#include +#include +#include #include #include #include #include #include #include +#include +#include #include "dynolog/src/LibkinetoConfigManager.h" #include "dynolog/src/ipcfabric/Utils.h" @@ -25,6 +30,8 @@ constexpr int kDataMsgSleepUs = 1000; const std::string kLibkinetoRequest = "req"; const std::string kLibkinetoContext = "ctxt"; const std::string kLibkinetoData = "data"; +const mode_t DATA_FILE_AUTHORITY = 0640; +const mode_t DATA_DIR_AUTHORITY = 0750; IPCMonitor::IPCMonitor(const std::string& ipc_fabric_name) { @@ -35,6 +42,82 @@ IPCMonitor::IPCMonitor(const std::string& ipc_fabric_name) << LibkinetoConfigManager::getInstance()->processCount("0"); } +IPCMonitor::~IPCMonitor() +{ + if (loggerGenerator_) { + loggerGenerator_->Stop(); + } +} + +std::string IPCMonitor::DirName(const std::string &path) +{ + std::string tmpPath = std::string(path.begin(), path.end()); + char *cPath = dirname(const_cast(tmpPath.data())); + return cPath ? std::string(cPath) : ""; +} + +bool IPCMonitor::IsFileExist(const std::string &path) +{ + return access(path.c_str(), F_OK) == 0; +} + +bool IPCMonitor::IsDir(const std::string &path) +{ + struct stat st{}; + int ret = stat(path.c_str(), &st); + if (ret != 0) { + return false; + } + return S_ISDIR(st.st_mode); +} + +bool IPCMonitor::CreateDir(const std::string &path) +{ + if (IsFileExist(path)) { + return IsDir(path); + } + size_t pos = 0; + while ((pos = path.find_first_of('/', pos)) != std::string::npos) { + std::string subPath = path.substr(0, pos++); + if (IsFileExist(subPath)) { + if (IsDir(subPath)) { + continue; + } else { + return false; + } + } + if (mkdir(subPath.c_str(), DATA_DIR_AUTHORITY) != 0) { + if (errno != EEXIST) { + return false; + } + } + } + auto ret = mkdir(path.c_str(), DATA_DIR_AUTHORITY); + return (ret == 0 || errno == EEXIST); +} + +bool IPCMonitor::CreateExportPath(const std::string &path) +{ + if (path.empty() || path.size() >= PATH_MAX) { + LOG(ERROR) << "Export path to save data is invalid! Please input a valid path."; + return false; + } + if (!CreateDir(DirName(path))) { + LOG(ERROR) << "Create dir failed, path: " << DirName(path); + return false; + } + int fd = open(path.c_str(), O_CREAT | O_WRONLY | O_TRUNC, DATA_FILE_AUTHORITY); + if (fd < 0 || close(fd) != 0) { + LOG(ERROR) << "Create file failed, path: " << path << ", errno: " << errno; + return false; + } + LOG(INFO) << "Create export path successfully, path: " << path; + exportPath_ = path; + loggerGenerator_ = std::make_unique(exportPath_); + loggerGenerator_->Start(); + return true; +} + void IPCMonitor::loop() { while (ipc_manager_) { @@ -90,23 +173,29 @@ void tracing::IPCMonitor::setLogger(std::unique_ptr logger) void IPCMonitor::LogData(const nlohmann::json& result) { - auto timestamp = result["timestamp"].get(); - logger_->logUint("timestamp", timestamp); - auto duration = result["duration"].get(); - logger_->logUint("duration", duration); - auto deviceId = result["deviceId"].get(); - logger_->logStr("deviceId", std::to_string(deviceId)); - auto kind = result["kind"].get(); - logger_->logStr("kind", kind); - if (result.contains("domain") && result["domain"].is_string()) { - auto domain = result["domain"].get(); - logger_->logStr("domain", domain); - } - if (result.contains("name") && result["name"].is_string()) { + if (!exportPath_.empty()) { + struct MarkerData data; + data.name = result["name"].get(); + data.pid = result["pid"].get(); + data.deviceId = result["did"].get(); + data.start = result["hst"].get(); + data.end = result["hed"].get(); + data.devStart = result["dst"].get(); + data.devEnd = result["ded"].get(); + loggerGenerator_->Push(data); + } else { + auto timestamp = result["timestamp"].get(); + logger_->logUint("timestamp", timestamp); + auto duration = result["duration"].get(); + logger_->logUint("duration", duration); + auto deviceId = result["deviceId"].get(); + logger_->logUint("deviceId", deviceId); auto name = result["name"].get(); logger_->logStr("name", name); + auto sourceKind = result["sourceKind"].get(); + logger_->logStr("sourceKind", sourceKind); + logger_->finalize(); } - logger_->finalize(); } void IPCMonitor::processDataMsg(std::unique_ptr msg) diff --git a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h index cbc59fd2bbc796fd835a56117e9a42b195feae5d..a17c6be6ac9b06dfcf4edc45ea8d649bc7a78cc1 100644 --- a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h +++ b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h @@ -6,6 +6,12 @@ #pragma once #include +#include +#include +#include +#include +#include +#include // Use glog for FabricManager.h #define USE_GOOGLE_LOG @@ -16,15 +22,101 @@ namespace dynolog { namespace tracing { +struct MarkerData { + std::string name; + uint64_t start; + uint64_t end; + uint64_t deviceStart; + uint64_t deviceEnd; + uint32_t deviceId; + uint32_t pid; +}; + +class ExportLogger { +public: + explicit ExportLogger(const std::string &path) : path_(path) + { + running_.store(false); + tryNum_.store(0); + } + + void Start() + { + running_.store(true); + + std::thread([this]() { + const size_t MAX_DATA_ZISE = 30; + const size_t MAX_TRY_NUM = 10; + while (running_.load() && tryNum_.load() < MAX_TRY_NUM) { + std::ofstream outFile(path_, std::ios::trunc); + if (!outFile) { + LOG(ERROR) << "Open export file failed, path: " << path_; + tryNum_.fetch_add(1); + continue; + } + std::vector batch; + auto starttime = std::chrono::steady_clock::now(); + { + std::lock_guard lock(mtx_); + size_t cnt = std::min(data_.size(), MAX_DATA_ZISE); + if (cnt > 0) { + batch.insert(batch.end(), data_.begin(), data_.begin() + cnt); + data_.erase(data_.begin(), data_.begin() + cnt); + } + } + if (!batch.empty()) { + for (const auto &item : batch) { + outFile << "mct{name=\"" << item.name + << "\", src=\"ht\", pid=\"" << item.pid << "\"} " + << (item.end - item.start) << " " << item.start << "\n"; + outFile << "mct{name=\"" << item.name + << "\", src=\"de\", did=\"" << item.deviceId << "\"} " + << (item.deviceEnd - item.deviceStart) << " " << item.deviceStart << "\n"; + } + outFile.flush(); + } + outFile.close(); + auto endtime = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(endtime - starttime).count(); + auto sleepTime = std::max(60000 - duration, 0L); + + if (sleepTime > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + } + } + }).detach(); + } + + void Stop() + { + running_.store(false); + } + + void Push(MarkerData &data) + { + std::lock_guard lock(mtx_); + data_.emplace_back(std::move(data)); + } + +private: + std::atomic running_; + std::atomic tryNum_; + std::string path_; + std::mutex mtx_; + std::vector data_; +}; + class IPCMonitor { public: using FabricManager = dynolog::ipcfabric::FabricManager; IPCMonitor(const std::string& ipc_fabric_name = "dynolog"); - virtual ~IPCMonitor() {} + virtual ~IPCMonitor(); void loop(); void dataLoop(); + bool CreateExportPath(const std::string &path); + public: virtual void processMsg(std::unique_ptr msg); virtual void processDataMsg(std::unique_ptr msg); @@ -37,8 +129,17 @@ public: std::unique_ptr data_ipc_manager_; std::unique_ptr logger_; + std::unique_ptr loggerGenerator_; + // friend class test_case_name##_##test_name##_Test friend class IPCMonitorTest_LibkinetoRegisterAndOndemandTest_Test; +private: + std::string exportPath_; +private: + static std::string DirName(const std::string &path); + static bool IsFileExist(const std::string &path); + static bool IsDir(const std::string &path); + static bool CreateDir(const std::string &path); }; } // namespace tracing diff --git a/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.cpp b/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.cpp index 458be03f3cdd9b6f959fb2fe1c4e579f1f46b045..484cba4012ad47aef2e6c9607e73e8e3d95ce47d 100644 --- a/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.cpp +++ b/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.cpp @@ -31,12 +31,13 @@ constexpr size_t COMPLETE_RANGE_DATA_SIZE = 4; std::string MarkMetric::seriesToJson() { nlohmann::json jsonMsg; - jsonMsg["kind"] = "Marker"; - jsonMsg["deviceId"] = deviceId; - jsonMsg["domain"] = domain; jsonMsg["name"] = name; - jsonMsg["duration"] = duration; - jsonMsg["timestamp"] = timestamp; + jsonMsg["pid"] = pid; + jsonMsg["did"] = deviceId; + jsonMsg["hst"] = start; + jsonMsg["hed"] = end; + jsonMsg["dst"] = devStart; + jsonMsg["ded"] = devEnd; return jsonMsg.dump(); } @@ -55,6 +56,7 @@ bool MetricMarkProcess::TransMarkData2Range(const std::vectortimestamp; rangemarkData.name = activityMarker->name; + rangemarkData.pid = activityMarker->objectId.pt.processId; } } if (activityMarker->flag == MSPTI_ACTIVITY_FLAG_MARKER_END_WITH_DEVICE) { @@ -65,15 +67,7 @@ bool MetricMarkProcess::TransMarkData2Range(const std::vectorid; - std::string domainName = "default"; - auto it = domainMsg.find(markId); - if (it != domainMsg.end()) { - domainName = *it->second; - } - rangemarkData.domain = domainName; id2Marker.erase(markId); - domainMsg.erase(markId); return true; } @@ -89,12 +83,6 @@ void MetricMarkProcess::ConsumeMsptiData(msptiActivity *record) { std::unique_lock lock(dataMutex); records.emplace_back(std::move(tmp)); - if (markerData->flag == MSPTI_ACTIVITY_FLAG_MARKER_START_WITH_DEVICE && - markerData->sourceKind == MSPTI_ACTIVITY_SOURCE_KIND_HOST) { - std::string domainStr = markerData->domain; - auto markId = markerData->id; - domainMsg.emplace(markId, std::make_shared(domainStr)); - } } } @@ -111,31 +99,23 @@ std::vector MetricMarkProcess::AggregatedData() } std::vector rangeDatas; for (auto pair = id2Marker.rbegin(); pair != id2Marker.rend(); ++pair) { - auto markId = pair->first; - auto markDatas = pair->second; RangeMarkData rangeMark{}; - if (TransMarkData2Range(markDatas, rangeMark)) { + if (TransMarkData2Range(pair->second, rangeMark)) { rangeDatas.emplace_back(rangeMark); } } std::vector ans; - MarkMetric hostMarkMetric{}; - MarkMetric devMarkMetric{}; + MarkMetric metric; for (const auto& data : rangeDatas) { - hostMarkMetric.name = data.name; - hostMarkMetric.domain = data.domain; - hostMarkMetric.deviceId = -1; - hostMarkMetric.duration = data.end - data.start; - hostMarkMetric.timestamp = data.start; - ans.emplace_back(hostMarkMetric); - - devMarkMetric.name = data.name; - devMarkMetric.domain = data.domain; - devMarkMetric.deviceId = data.deviceId; - devMarkMetric.duration = data.deviceEnd - data.deviceStart; - devMarkMetric.timestamp = data.deviceStart; - ans.emplace_back(devMarkMetric); + metric.name = data.name; + metric.start = data.start; + metric.end = data.end; + metric.devStart = data.deviceStart; + metric.devEnd = data.deviceEnd; + metric.deviceId = data.deviceId; + metric.pid = data.pid; + ans.emplace_back(metric); } return ans; } diff --git a/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.h b/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.h index 35c9ddf700e349283e6b1b1880dffe2fed5a4090..e0518e17af8bdd02883d403eee15a870a690242c 100644 --- a/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.h +++ b/msmonitor/plugin/ipc_monitor/metric/MetricMarkProcess.h @@ -27,23 +27,25 @@ namespace metric { struct MarkMetric { std::string name; - std::string domain; - uint64_t duration; - uint64_t timestamp; + uint64_t start{0}; + uint64_t end{0}; + uint64_t devStart; + uint64_t devEnd; uint32_t deviceId; + uint32_t pid; + public: std::string seriesToJson(); }; struct RangeMarkData { std::string name; - std::string domain; - uint64_t duration; uint64_t start{0}; uint64_t end{0}; uint64_t deviceStart{0}; uint64_t deviceEnd{0}; uint32_t deviceId; + uint32_t pid; };