From 355abbc91573ab234a151bd549219757e12c674c Mon Sep 17 00:00:00 2001 From: mei-feiyao <1332490378@qq.com> Date: Tue, 26 Aug 2025 17:07:21 +0800 Subject: [PATCH] write data --- msmonitor/dynolog_npu/dynolog/src/Main.cpp | 12 +- .../dynolog/src/tracing/IPCMonitor.cpp | 22 +++- .../dynolog/src/tracing/IPCMonitor.h | 104 +++++++++++++++++- 3 files changed, 134 insertions(+), 4 deletions(-) diff --git a/msmonitor/dynolog_npu/dynolog/src/Main.cpp b/msmonitor/dynolog_npu/dynolog/src/Main.cpp index a24bb802a..684d967b4 100644 --- a/msmonitor/dynolog_npu/dynolog/src/Main.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/Main.cpp @@ -33,6 +33,11 @@ #include "dynolog/src/DynologTensorBoardLogger.h" #endif +DEFINE_string( + export_path, + "", + "Path to export metrics. If empty, metrics will not be exported to file."); + using namespace dynolog; using json = nlohmann::json; namespace hbt = facebook::hbt; @@ -66,6 +71,11 @@ DEFINE_bool( false, "Enabled GPU monitorng, currently supports NVIDIA GPUs."); DEFINE_bool(enable_perf_monitor, false, "Enable heartbeat perf monitoring."); +DEFINE_int32( + export_cnt, + 40, + "Number of log entries to export in each log file." +); std::unique_ptr getLogger(const std::string& scribe_category = "") { @@ -190,7 +200,7 @@ int main(int argc, char** argv) if (FLAGS_enable_ipc_monitor) { LOG(INFO) << "Starting IPC Monitor"; - ipcmon = std::make_unique(); + ipcmon = std::make_unique(FLAGS_export_path, FLAGS_export_cnt); ipcmon->setLogger(std::move(getLogger())); ipcmon_thread = std::make_unique([&ipcmon]() { ipcmon->loop(); }); diff --git a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp index b8ccd7aa0..3b03d6059 100644 --- a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp +++ b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp @@ -26,13 +26,30 @@ const std::string kLibkinetoRequest = "req"; const std::string kLibkinetoContext = "ctxt"; const std::string kLibkinetoData = "data"; -IPCMonitor::IPCMonitor(const std::string& ipc_fabric_name) +IPCMonitor::IPCMonitor(const std::string &exportPath, int32_t exportCnt, const std::string& ipc_fabric_name) { ipc_manager_ = FabricManager::factory(ipc_fabric_name); data_ipc_manager_ = FabricManager::factory(ipc_fabric_name + "_data"); // below ensures singleton exists LOG(INFO) << "Kineto config manager : active processes = " << LibkinetoConfigManager::getInstance()->processCount("0"); + if (!exportPath.empty()) { + exportPath_ = exportPath; + if (exportCnt <= 0) { + LOG(WARNING) << "Invalid export count, using default value of 10"; + exportCnt = 20; + } + LOG(INFO) << "Exporting metrics to file : " << exportPath << ", export count: " << exportCnt; + loggerGenerator_ = std::make_unique(exportPath, exportCnt); + loggerGenerator_->start(); + } +} + +IPCMonitor::~IPCMonitor() +{ + if (loggerGenerator_) { + loggerGenerator_->stop(); + } } void IPCMonitor::loop() @@ -90,6 +107,9 @@ void tracing::IPCMonitor::setLogger(std::unique_ptr logger) void IPCMonitor::LogData(const nlohmann::json& result) { + if (!exportPath_.empty()) { + return; // No logging if exportPath is set + } auto timestamp = result["timestamp"].get(); logger_->logUint("timestamp", timestamp); auto duration = result["duration"].get(); diff --git a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h index cbc59fd2b..a3bac9790 100644 --- a/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h +++ b/msmonitor/dynolog_npu/dynolog/src/tracing/IPCMonitor.h @@ -6,6 +6,13 @@ #pragma once #include +#include +#include +#include +#include +#include +#include +#include // Use glog for FabricManager.h #define USE_GOOGLE_LOG @@ -16,11 +23,100 @@ namespace dynolog { namespace tracing { +class LoggerGenerator { +public: + LoggerGenerator(const std::string& exportFilePath, + const int &logcnt) : logFilePath_(exportFilePath), running_(false), logCnt_(logcnt) {} + + void start() { + running_ = true; + std::thread([this]() { + int cnt = 0; + while (running_) { + auto starttime = std::chrono::steady_clock::now(); + { + std::lock_guard lock(mutex_); + std::ofstream logFile(logFilePath_, std::ios::trunc); + if (!logFile) { + std::cerr << "Failed to open log file: " << logFilePath_ << std::endl; + continue; + } + generateLog(logFile, cnt); + } + auto endtime = std::chrono::steady_clock::now(); + auto eplashedMs = std::chrono::duration_cast(endtime - starttime).count(); + auto sleepTime = intervalMs_ - eplashedMs; + + if (sleepTime > 0) { + std::this_thread::sleep_for(std::chrono::milliseconds(sleepTime)); + } + cnt++; + } + }).detach(); + } + + void stop() { + running_ = false; + } +private: + std::atomic running_; + std::mutex mutex_; + std::string logFilePath_; + int logCnt_; + const int intervalMs_ = 60000; // 1 minute + + inline int GetDuration(int step) { + if (step <= 1000) { + return 1000; + } else if (step <= 2000) { + return 1000 + (step - 1000) * 3; + } else { + return 4000; + } + } + + void generateLog(std::ofstream& logFile, const int cnt) { + int start = 1 + cnt * logCnt_; + int end = logCnt_ + cnt * logCnt_; + int64_t timestamp = std::chrono::system_clock::now().time_since_epoch().count() / 1000000; + for (int i = start; i <= end; ++i) { + int duration = GetDuration(i); + logFile << "mct{name=\"" << i + << "\", src=\"ht\", pid=\"" << 100 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"de\", did=\"" << 0 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"ht\", pid=\"" << 101 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"de\", did=\"" << 1 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"ht\", pid=\"" << 102 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"de\", did=\"" << 2 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"ht\", pid=\"" << 103 << "\"} " + << duration << " " << timestamp << "\n"; + logFile << "mct{name=\"" << i + << "\", src=\"de\", did=\"" << 3 << "\"} " + << duration << " " << timestamp << "\n"; + timestamp += 20; + } + } +}; + class IPCMonitor { public: using FabricManager = dynolog::ipcfabric::FabricManager; - IPCMonitor(const std::string& ipc_fabric_name = "dynolog"); - virtual ~IPCMonitor() {} + IPCMonitor(const std::string &exportPath = "", + int32_t exportCnt = 20, + const std::string& ipc_fabric_name = "dynolog"); + virtual ~IPCMonitor(); void loop(); void dataLoop(); @@ -37,8 +133,12 @@ public: std::unique_ptr data_ipc_manager_; std::unique_ptr logger_; + std::unique_ptr loggerGenerator_; + // friend class test_case_name##_##test_name##_Test friend class IPCMonitorTest_LibkinetoRegisterAndOndemandTest_Test; +private: + std::string exportPath_; }; } // namespace tracing -- Gitee