diff --git a/dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp b/dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp new file mode 100644 index 0000000000000000000000000000000000000000..9b8f22ecac657f425bdc2a520b96e69f506d581e --- /dev/null +++ b/dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp @@ -0,0 +1,37 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +#include "dynolog/src/Metrics.h" + +#include +#include + +namespace dynolog { + +const std::vector getAllMetrics() { + static std::vector metrics_ = { + {.name = "kindName", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spend on user or system mode."}, + {.name = "duration", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spent in user mode."}, + {.name = "timestamp", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spent in system mode."}, + {.name = "deviceId", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spent in idle mode."}, + }; + return metrics_; +} + +// These metrics are dynamic per network drive +const std::vector getNetworkMetrics() { + static std::vector metrics_ = {}; + return metrics_; +} + +} // namespace dynolog \ No newline at end of file diff --git a/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp b/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp index c2b6625dbd2f84d9e0a2e08223716f2979d904f2..aba89a59f553b520a88f5c04da1c215c2f7c0de7 100644 --- a/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp +++ b/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp @@ -73,6 +73,7 @@ void IPCMonitor::processMsg(std::unique_ptr msg) { msg->metadata.type, kLibkinetoRequest.data(), kLibkinetoRequest.size()) == 0) { + getLibkinetoOnDemandRequest(std::move(msg)); } else { LOG(ERROR) << "TYPE UNKOWN: " << msg->metadata.type; } @@ -83,6 +84,17 @@ void tracing::IPCMonitor::setLogger(std::unique_ptr logger) logger_ = std::move(logger); } +void IPCMonitor::LogData(const nlohmann::json& result) +{ + auto timestamp = result["timestamp"].get(); + logger_->logInt("timestamp", timestamp); + auto duration = result["duration"].get(); + logger_->logFloat("duration", duration); + auto deviceId = result["deviceId"].get(); + logger_->logInt("deviceId", deviceId); + logger_->finalize(); +} + void IPCMonitor::processDataMsg(std::unique_ptr msg) { if (!data_ipc_manager_) { @@ -97,6 +109,7 @@ void IPCMonitor::processDataMsg(std::unique_ptr msg) try { nlohmann::json result = nlohmann::json::parse(message); LOG(INFO) << "Received data message : " << result; + LogData(result); } catch (nlohmann::json::parse_error&) { LOG(ERROR) << "Error parsing message = " << message; return; diff --git a/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.h b/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.h index 14c5125e61182154e5ab09134b570cb50410ecac..1dc0cd2345fd7d7e556bc5c95361206e0fe2d7f2 100644 --- a/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.h +++ b/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.h @@ -31,6 +31,7 @@ class IPCMonitor { void getLibkinetoOnDemandRequest(std::unique_ptr msg); void registerLibkinetoContext(std::unique_ptr msg); void setLogger(std::unique_ptr logger); + void LogData(const nlohmann::json& result); std::unique_ptr ipc_manager_; std::unique_ptr data_ipc_manager_; diff --git a/dynolog_npu/plugin/ipc_monitor/InputParser.cpp b/dynolog_npu/plugin/ipc_monitor/InputParser.cpp index 6f77b2ba86a6b835cef07f564c3dc57f7e77d71e..bc77d33f1ae2f029e2fe548f8f9d9a7f5a594935 100644 --- a/dynolog_npu/plugin/ipc_monitor/InputParser.cpp +++ b/dynolog_npu/plugin/ipc_monitor/InputParser.cpp @@ -21,6 +21,7 @@ const std::unordered_set cfgMap { const std::unordered_map kindStrMap { {"Marker", MSPTI_ACTIVITY_KIND_MARKER}, + {"Kernel", MSPTI_ACTIVITY_KIND_KERNEL}, {"API", MSPTI_ACTIVITY_KIND_API}, {"Hccl", MSPTI_ACTIVITY_KIND_HCCL}, {"Memory", MSPTI_ACTIVITY_KIND_MEMORY}, diff --git a/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp b/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp index 693a91b2eeeb59beb5bd3b50db9fd5b48c5f6cb3..e141ae047bace714f6c4025094c6c4e92eda1944 100644 --- a/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp +++ b/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp @@ -4,7 +4,9 @@ #include #include #include + #include "DynoLogNpuMonitor.h" +#include "MetricManager.h" namespace { constexpr size_t ALIGN_SIZE = 8; @@ -34,113 +36,6 @@ void MsptiFree(uint8_t *ptr) free(ptr); } } - -std::string ConstructMarkerMessage(const msptiActivityMarker& marker) -{ - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Marker"; - jsonMsg["name"] = marker.name; - jsonMsg["flag"] = marker.flag; - jsonMsg["sourceKind"] = marker.sourceKind; - jsonMsg["timestamp"] = marker.timestamp; - jsonMsg["id"] = marker.id; - jsonMsg["objectId"]["processId"] = marker.objectId.pt.processId; - jsonMsg["objectId"]["threadId"] = marker.objectId.pt.threadId; - return jsonMsg.dump(); -} - -std::string ConstructKernelMessage(const msptiActivityKernel& kernel) -{ - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Kernel"; - jsonMsg["name"] = kernel.name; - jsonMsg["start"] = kernel.start; - jsonMsg["end"] = kernel.end; - jsonMsg["deviceId"] = kernel.ds.deviceId; - jsonMsg["streamId"] = kernel.ds.streamId; - return jsonMsg.dump(); -} - -std::string ConstructHcclMessage(const msptiActivityHccl& hccl) -{ - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Hccl"; - jsonMsg["name"] = hccl.name; - jsonMsg["commName"] = hccl.commName; - jsonMsg["start"] = hccl.start; - jsonMsg["end"] = hccl.end; - jsonMsg["bandWidth"] = hccl.bandWidth; - jsonMsg["deviceId"] = hccl.ds.deviceId; - jsonMsg["streamId"] = hccl.ds.streamId; - return jsonMsg.dump(); -} - -std::string ConstructApiMessage(const msptiActivityApi& api) -{ - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Api"; - jsonMsg["name"] = api.name; - jsonMsg["start"] = api.start; - jsonMsg["end"] = api.end; - jsonMsg["processId"] = api.pt.processId; - jsonMsg["threadId"] = api.pt.threadId; - return jsonMsg.dump(); -} - -std::string ConstructMemoryMessage(const msptiActivityMemory& memory) -{ - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Memory"; - jsonMsg["optype"] = (memory.memoryOperationType == MSPTI_ACTIVITY_MEMORY_OPERATION_TYPE_ALLOCATATION? "alloc" : "free"); - jsonMsg["memoryKind"] = static_cast(memory.memoryKind); - jsonMsg["start"] = memory.start; - jsonMsg["end"] = memory.end; - jsonMsg["address"] = memory.address; - jsonMsg["bytes"] = memory.bytes; - jsonMsg["processId"] = memory.processId; - jsonMsg["deviceId"] = memory.deviceId; - jsonMsg["streamId"] = memory.streamId; - return jsonMsg.dump(); -} - -std::string ConstructMemsetMessage(const msptiActivityMemset& memset) -{ - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Memset"; - jsonMsg["value"] = memset.value; - jsonMsg["bytes"] = memset.bytes; - jsonMsg["start"] = memset.start; - jsonMsg["end"] = memset.end; - jsonMsg["deviceId"] = memset.deviceId; - jsonMsg["streamId"] = memset.streamId; - jsonMsg["isAsync"] = static_cast(memset.isAsync); - return jsonMsg.dump(); -} - -std::string ConstructMemcpyMessage(const msptiActivityMemcpy& memcpy) -{ - static const std::unordered_map memcpyKindMap = { - {MSPTI_ACTIVITY_MEMCPY_KIND_UNKNOWN, "UNKNOWN"}, - {MSPTI_ACTIVITY_MEMCPY_KIND_HTOH, "HTOH"}, - {MSPTI_ACTIVITY_MEMCPY_KIND_HTOD, "HTOD"}, - {MSPTI_ACTIVITY_MEMCPY_KIND_DTOH, "DTOH"}, - {MSPTI_ACTIVITY_MEMCPY_KIND_DTOD, "DTOD"}, - {MSPTI_ACTIVITY_MEMCPY_KIND_DEFAULT, "DEFAULT"} - }; - - auto memcpyKindIter = memcpyKindMap.find(memcpy.copyKind); - auto memcpyKind = (memcpyKindIter != memcpyKindMap.end() ? memcpyKindIter->second : "INVALID"); - nlohmann::json jsonMsg; - jsonMsg["kind"] = "Memcpy"; - jsonMsg["memcpyKind"] = memcpyKind; - jsonMsg["bytes"] = memcpy.bytes; - jsonMsg["start"] = memcpy.start; - jsonMsg["end"] = memcpy.end; - jsonMsg["deviceId"] = memcpy.deviceId; - jsonMsg["streamId"] = memcpy.streamId; - jsonMsg["isAsync"] = static_cast(memcpy.isAsync); - return jsonMsg.dump(); -} } namespace dynolog_npu { @@ -168,6 +63,7 @@ void MsptiMonitor::Start() return; } start_.store(true); + metric::MetricManager::GetInstance()->Run(); LOG(INFO) << "MsptiMonitor start successfully"; } @@ -189,6 +85,7 @@ void MsptiMonitor::Uninit() if (!start_.load()) { return; } + metric::MetricManager::GetInstance()->Stop(); start_.store(false); cv_.notify_one(); Thread::Stop(); @@ -203,6 +100,7 @@ void MsptiMonitor::EnableActivity(msptiActivityKind kind) } else { LOG(ERROR) << "MsptiMonitor enableActivity failed, kind: " << static_cast(kind); } + metric::MetricManager::GetInstance()->EnableKindSwitch_(kind); } } @@ -225,6 +123,7 @@ void MsptiMonitor::SetFlushInterval(uint32_t interval) if (start_.load()) { cv_.notify_one(); } + metric::MetricManager::GetInstance()->SetReportInterval(interval); } bool MsptiMonitor::IsStarted() @@ -336,30 +235,7 @@ void MsptiMonitor::BufferConsume(msptiActivity *record) if (record == nullptr) { return; } - std::string message; - if (record->kind == MSPTI_ACTIVITY_KIND_MARKER) { - msptiActivityMarker *marker = ReinterpretConvert(record); - message = ConstructMarkerMessage(*marker); - } else if (record->kind == MSPTI_ACTIVITY_KIND_KERNEL) { - msptiActivityKernel *kernel = ReinterpretConvert(record); - message = ConstructKernelMessage(*kernel); - } else if (record->kind == MSPTI_ACTIVITY_KIND_HCCL) { - msptiActivityHccl *hccl = ReinterpretConvert(record); - message = ConstructHcclMessage(*hccl); - } else if (record->kind == MSPTI_ACTIVITY_KIND_API) { - msptiActivityApi *api = ReinterpretConvert(record); - message = ConstructApiMessage(*api); - } else if (record->kind == MSPTI_ACTIVITY_KIND_MEMORY) { - msptiActivityMemory *memory = ReinterpretConvert(record); - message = ConstructMemoryMessage(*memory); - } else if (record->kind == MSPTI_ACTIVITY_KIND_MEMSET) { - msptiActivityMemset *memset = ReinterpretConvert(record); - message = ConstructMemsetMessage(*memset); - } else if (record->kind == MSPTI_ACTIVITY_KIND_MEMCPY) { - msptiActivityMemcpy *memcpy = ReinterpretConvert(record); - message = ConstructMemcpyMessage(*memcpy); - } - SendMessage(message); + metric::MetricManager::GetInstance()->ConsumeMsptiData(record); } void MsptiMonitor::SendMessage(const std::string &message) diff --git a/dynolog_npu/plugin/ipc_monitor/TimerTask.h b/dynolog_npu/plugin/ipc_monitor/TimerTask.h new file mode 100644 index 0000000000000000000000000000000000000000..bdfa5c2e71c596c2bfcb0a3ff306cc32221d5e09 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/TimerTask.h @@ -0,0 +1,91 @@ +#ifndef TIMER_TASK_H +#define TIMER_TASK_H + +#include +#include +#include +#include +#include +#include +#include + +namespace dynolog_npu { +namespace ipc_monitor { +class TimerTask { +public: + TimerTask(const std::string& name, int interval) + : interval(interval), name(name), manual_trigger(false),running(false) {} + + ~TimerTask() + { + Stop(); + } + + void Run() + { + if (running) { + LOG(ERROR) << name << " Timer task is already running."; + return; + } + running = true; + taskThread = std::thread(&TimerTask::TaskRun, this); + } + + void Trigger() + { + std::unique_lock lock(cv_mutex); + manual_trigger = true; + cv.notify_one(); + } + + // 停止定时任务 + void Stop() + { + if (!running) { + LOG(ERROR) << name << "Timer task is not running."; + return; + } + + running = false; + cv.notify_one(); + taskThread.join(); + } + + void SetInterval(int intervalTimes) + { + interval.store(intervalTimes); + } + + virtual void ExecuteTask() = 0; +private: + // 定时任务线程函数 + void TaskRun() + { + LOG(INFO) << name << " Timer task started."; + while (running) { + std::unique_lock lock(cv_mutex); + cv.wait_for(lock, std::chrono::seconds(interval.load()), [&] {return manual_trigger || !running;}); + if (!running) { + break; + } + if (manual_trigger) { + manual_trigger = false; + } else if (running) { + ExecuteTask(); + } + } + LOG(INFO) << name << " Timer task stopped."; + } + + std::atomic interval; + std::string name; + std::condition_variable cv; + std::mutex cv_mutex; + std::atomic manual_trigger; + std::atomic running; + std::thread taskThread; +}; + +} +} +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1e4b18103fdb1ca5c66949c2289dc96e18ad1aa2 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp @@ -0,0 +1,67 @@ +#include "MetricApiProcess.h" + +#include +#include + +#include "utils.h" + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +std::string ApiMetric::seriesToJson() +{ + nlohmann::json jsonMsg; + jsonMsg["kind"] = "msptiActivityApi"; + jsonMsg["deviceId"] = -1; + jsonMsg["duration"] = duration; + jsonMsg["timestamp"] = timestamp; + return jsonMsg.dump(); +} + +void MetricApiProcess::ConsumeMsptiData(msptiActivity *record) +{ + msptiActivityApi* apiData = ReinterpretConvert(record); + msptiActivityApi* tmp = ReinterpretConvert(MsptiMalloc(sizeof(msptiActivityApi), 8)); + memcpy(tmp, apiData, sizeof(msptiActivityApi)); + { + std::unique_lock lock(dataMutex); + records.emplace_back(tmp); + } + +} + +std::vector MetricApiProcess::AggregatedData() +{ + std::vector> copyRecords; + { + std::unique_lock lock(dataMutex); + copyRecords = std::move(records); + records.clear(); + } + ApiMetric apiMetric{}; + auto ans = std::accumulate(copyRecords.begin(), copyRecords.end(), 0, + [](int acc, std::shared_ptr api) { + return acc + api->end - api->start; + }); + apiMetric.duration = ans; + apiMetric.deviceId = -1; + apiMetric.timestamp = getCurrentTimestamp64(); + return {apiMetric}; +} + +void MetricApiProcess::SendProcessMessage() +{ + auto afterAggregated = AggregatedData(); + for (auto& metric: afterAggregated) { + SendMessage(metric.seriesToJson()); + } +} + +void MetricApiProcess::Clear() +{ + records.clear(); +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h new file mode 100644 index 0000000000000000000000000000000000000000..01e6864e0108fba0ccba5c9a6b61c6bdc04e57eb --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h @@ -0,0 +1,37 @@ +#ifndef METRIC_API_PROCESS_H +#define METRIC_API_PROCESS_H + +#include +#include +#include "MetricProcessBase.h" + + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +struct ApiMetric { + double duration; + uint64_t timestamp; + uint32_t deviceId; +public: + std::string seriesToJson(); +}; + +class MetricApiProcess: public MetricProcessBase +{ +public: + MetricApiProcess() = default; + void ConsumeMsptiData(msptiActivity *record) override; + std::vector AggregatedData(); + void SendProcessMessage() override; + void Clear() override; +private: + std::mutex dataMutex; + std::vector> records; +}; +} +} +} + +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp new file mode 100644 index 0000000000000000000000000000000000000000..71f29a7c470ee3953fa2b28c843fbc268da89975 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp @@ -0,0 +1,67 @@ +#include "MetricKernelProcess.h" + +#include + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +std::string KernelMetric::seriesToJson() +{ + nlohmann::json jsonMsg; + jsonMsg["kind"] = "msptiActivityKernel"; + jsonMsg["deviceId"] = deviceId; + jsonMsg["duration"] = duration; + jsonMsg["timestamp"] = timestamp; + return jsonMsg.dump(); +} + +void MetricKernelProcess::ConsumeMsptiData(msptiActivity *record) +{ + msptiActivityKernel* kernel = ReinterpretConvert(record); + msptiActivityKernel* ptr = ReinterpretConvert(MsptiMalloc(sizeof(msptiActivityKernel), 8)); + memcpy(ptr, kernel, sizeof(msptiActivityKernel)); + { + std::unique_lock lock(dataMutex); + records.emplace_back(ptr); + } +} + +std::vector MetricKernelProcess::AggregatedData() +{ + std::vector> copyRecords; + { + std::unique_lock lock(dataMutex); + copyRecords = std::move(records); + records.clear(); + } + if (copyRecords.empty()) { + return {}; + } + auto deviceId = copyRecords[0]->ds.deviceId; + KernelMetric kernelMetric{}; + auto ans = std::accumulate(copyRecords.begin(), copyRecords.end(), 0, + [](int acc, std::shared_ptr kernel) { + return acc + kernel->end - kernel->start; + }); + kernelMetric.duration = ans; + kernelMetric.deviceId = deviceId; + kernelMetric.timestamp = getCurrentTimestamp64(); + return {kernelMetric}; +} + +void MetricKernelProcess::SendProcessMessage() +{ + auto afterAggregated = AggregatedData(); + for (auto& metric: afterAggregated) { + SendMessage(metric.seriesToJson()); + } +} + +void MetricKernelProcess::Clear() +{ + records.clear(); +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h new file mode 100644 index 0000000000000000000000000000000000000000..406f8a5516a72df92a176fc4875347448d566f50 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h @@ -0,0 +1,36 @@ +#ifndef METRIC_KERNEL_PROCESS_H +#define METRIC_KERNEL_PROCESS_H + +#include +#include "MetricProcessBase.h" + + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +struct KernelMetric { + double duration; + uint64_t timestamp; + uint32_t deviceId; +public: + std::string seriesToJson(); +}; + +class MetricKernelProcess: public MetricProcessBase +{ +public: + MetricKernelProcess() = default; + void ConsumeMsptiData(msptiActivity *record) override; + std::vector AggregatedData(); + void SendProcessMessage() override; + void Clear() override; +private: + std::mutex dataMutex; + std::vector> records; +}; +} +} +} + +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0f47d7cbeab46fd92ed4e21554d5869b665ace3e --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp @@ -0,0 +1,61 @@ +#include "MetricManager.h" +#include "MetricKernelProcess.h" +#include "MetricApiProcess.h" +#include "MetricMemCpyProcess.h" + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +constexpr int DEFALUT_REPORT_INTERVAL = 60; + +MetricManager::MetricManager(): TimerTask("MetricManager", DEFALUT_REPORT_INTERVAL), +kindSwitchs_(MSPTI_ACTIVITY_KIND_COUNT), consumeStatus_(MSPTI_ACTIVITY_KIND_COUNT){ + metrics.resize(MSPTI_ACTIVITY_KIND_COUNT); + metrics[MSPTI_ACTIVITY_KIND_KERNEL] = std::make_shared(); + metrics[MSPTI_ACTIVITY_KIND_API] = std::make_shared(); + metrics[MSPTI_ACTIVITY_KIND_MEMCPY] = std::make_shared(); +} + +ErrCode MetricManager::ConsumeMsptiData(msptiActivity *record) +{ + if (!kindSwitchs_[record->kind]) { + return ErrCode::PERMISSION; + } + auto metricProcess = metrics[record->kind]; + consumeStatus_[record->kind] = true; + metricProcess->ConsumeMsptiData(record); + consumeStatus_[record->kind] = false; + return ErrCode::SUC; +} + +void MetricManager::SetReportInterval(uint32_t intervalTimes) +{ + if (reportInterval_.load() != intervalTimes) { + SendMetricMsg(); + SetInterval(intervalTimes); + reportInterval_.store(intervalTimes); + } +} + +void MetricManager::ExecuteTask() +{ + SendMetricMsg(); +} + +void MetricManager::SendMetricMsg() +{ + for (int i = 0; i < MSPTI_ACTIVITY_KIND_COUNT; i++) { + if (kindSwitchs_[i].load()) { + metrics[i]->SendProcessMessage(); + } + } +} + +void MetricManager::EnableKindSwitch_(msptiActivityKind kind) +{ + kindSwitchs_[kind] = true; +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.h new file mode 100644 index 0000000000000000000000000000000000000000..84b2acfeb74def9fcb2cffef10e3d8417de9b45c --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.h @@ -0,0 +1,35 @@ +#ifndef METRIC_MANAGER_H +#define METRIC_MANAGER_H + +#include +#include + +#include "utils.h" +#include "singleton.h" +#include "mspti.h" +#include "TimerTask.h" +#include "MetricProcessBase.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace metric { +class MetricManager: public ipc_monitor::Singleton, public TimerTask +{ +private: + std::vector> kindSwitchs_; + std::vector> consumeStatus_; + std::atomic reportInterval_; + std::vector> metrics; +public: + MetricManager(); + ~MetricManager() = default; + ErrCode ConsumeMsptiData(msptiActivity *record); + void SetReportInterval(uint32_t intervalTimes); + void SendMetricMsg(); + void ExecuteTask() override; + void EnableKindSwitch_(msptiActivityKind kind); +}; +} +} +} +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricMemCpyProcess.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricMemCpyProcess.cpp new file mode 100644 index 0000000000000000000000000000000000000000..836790245289ab95db920d0c1c1c4c206db33628 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricMemCpyProcess.cpp @@ -0,0 +1,67 @@ +#include "MetricMemCpyProcess.h" + +#include + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +std::string MemCpyMetric::seriesToJson() +{ + nlohmann::json jsonMsg; + jsonMsg["kind"] = "msptiActivityMemcpy"; + jsonMsg["deviceId"] = deviceId; + jsonMsg["duration"] = duration; + jsonMsg["timestamp"] = timestamp; + return jsonMsg.dump(); +} + +void MetricMemCpyProcess::ConsumeMsptiData(msptiActivity *record) +{ + msptiActivityMemcpy* kernel = ReinterpretConvert(record); + msptiActivityMemcpy* ptr = ReinterpretConvert(MsptiMalloc(sizeof(msptiActivityMemcpy), 8)); + memcpy(ptr, kernel, sizeof(msptiActivityMemcpy)); + { + std::unique_lock lock(dataMutex); + records.emplace_back(ptr); + } +} + +std::vector MetricMemCpyProcess::AggregatedData() +{ + std::vector> copyRecords; + { + std::unique_lock lock(dataMutex); + copyRecords = std::move(records); + records.clear(); + } + if (copyRecords.empty()) { + return {}; + } + auto deviceId = copyRecords[0]->deviceId; + MemCpyMetric memCpyMetric{}; + auto ans = std::accumulate(copyRecords.begin(), copyRecords.end(), 0, + [](int acc, std::shared_ptr memcpy) { + return acc + memcpy->end - memcpy->start; + }); + memCpyMetric.duration = ans; + memCpyMetric.deviceId = deviceId; + memCpyMetric.timestamp = getCurrentTimestamp64(); + return {memCpyMetric}; +} + +void MetricMemCpyProcess::SendProcessMessage() +{ + auto afterAggregated = AggregatedData(); + for (auto& metric: afterAggregated) { + SendMessage(metric.seriesToJson()); + } +} + +void MetricMemCpyProcess::Clear() +{ + records.clear(); +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricMemCpyProcess.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricMemCpyProcess.h new file mode 100644 index 0000000000000000000000000000000000000000..727de81b527be2b21c1e7e8051009e23aca529aa --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricMemCpyProcess.h @@ -0,0 +1,35 @@ +#ifndef METRIC_MEMCPY_PROCESS_H +#define METRIC_MEMCPY_PROCESS_H + +#include +#include "MetricProcessBase.h" + + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +struct MemCpyMetric { + double duration; + uint64_t timestamp; + uint32_t deviceId; +public: + std::string seriesToJson(); +}; + +class MetricMemCpyProcess: public MetricProcessBase +{ +public: + void ConsumeMsptiData(msptiActivity *record) override; + std::vector AggregatedData(); + void SendProcessMessage() override; + void Clear() override; +private: + std::mutex dataMutex; + std::vector> records; +}; +} +} +} + +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h new file mode 100644 index 0000000000000000000000000000000000000000..3b66f4490523f7bab2c77d69eaeeb720b77ada74 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h @@ -0,0 +1,62 @@ +#ifndef METRIC_PROCESS_BASE_H +#define METRIC_PROCESS_BASE_H + +#include +#include + +#include "DynoLogNpuMonitor.h" +#include "NpuIpcClient.h" +#include "mspti.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace metric { +class MetricProcessBase +{ +public: + void SendMessage(std::string message) + { + if (message.empty()) { + LOG(ERROR) << "SendMessage message is empty"; + return; + } + static const std::string destName = DYNO_IPC_NAME + "_data"; + static const int maxRetry = 5, retryWaitTimeUs = 1000; + auto msg = Message::ConstructStrMessage(message, MSG_TYPE_DATA); + if (!msg) { + LOG(ERROR) << "ConstructStrMessage failed, message: " << message; + return; + } + auto ipcClient = DynoLogNpuMonitor::GetInstance()->GetIpcClient(); + if (!ipcClient) { + LOG(ERROR) << "DynoLogNpuMonitor ipcClient is nullptr"; + return; + } + if (!ipcClient->SyncSendMessage(*msg, destName, maxRetry, retryWaitTimeUs)) { + LOG(ERROR) << "send mspti message failed: " << message; + } + } + virtual void ConsumeMsptiData(msptiActivity *record) = 0; + virtual void Clear() = 0; + virtual void SendProcessMessage() = 0; + + void *MsptiMalloc(size_t size, size_t alignment) + { + if (alignment > 0) { + size = (size + alignment - 1) / alignment * alignment; + } + #if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L + void *ptr = nullptr; + if (posix_memalign(&ptr, alignment, size) != 0) { + ptr = nullptr; + } + return ptr; + #else + return malloc(size); + #endif + } +}; +} +} +} +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/utils.cpp b/dynolog_npu/plugin/ipc_monitor/utils.cpp index fce7103f4896b112e058961c0cce27901f70bab1..3b4d44a46cfb5b6664de30cd20672212bda57d38 100644 --- a/dynolog_npu/plugin/ipc_monitor/utils.cpp +++ b/dynolog_npu/plugin/ipc_monitor/utils.cpp @@ -48,6 +48,14 @@ std::string getCurrentTimestamp() return oss.str(); } +uint64_t getCurrentTimestamp64() +{ + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast(now.time_since_epoch()); + auto milli_time = std::chrono::duration_cast(micros).count(); + return milli_time; +} + std::string formatErrorCode(SubModule submodule, ErrCode errorCode) { std::ostringstream oss; diff --git a/dynolog_npu/plugin/ipc_monitor/utils.h b/dynolog_npu/plugin/ipc_monitor/utils.h index 728fcb2608fa835f649675fe9a325e956114bbdf..d3ceb4d647104d80e8b13ecf0759133dc0d6563e 100644 --- a/dynolog_npu/plugin/ipc_monitor/utils.h +++ b/dynolog_npu/plugin/ipc_monitor/utils.h @@ -5,7 +5,7 @@ #include #include #include - +#include namespace dynolog_npu { namespace ipc_monitor { @@ -16,6 +16,7 @@ std::vector GetPids(); std::pair GetParentPidAndCommand(int32_t pid); std::vector> GetPidCommandPairsofAncestors(); std::string getCurrentTimestamp(); +uint64_t getCurrentTimestamp64(); bool Str2Uint32(uint32_t& dest, const std::string& str); bool Str2Bool(bool& dest, const std::string& str); std::string& trim(std::string& str); @@ -49,7 +50,28 @@ template inline T ReinterpretConvert(V ptr) { return reinterpret_cast(ptr); } +template +auto groupby(const Container& vec, KeyFunc keyFunc) { + using KeyType = decltype(keyFunc(*vec.begin())); + using ValueType = typename Container::value_type; + std::unordered_map> grouped; + for (const auto& item : vec) { + grouped[keyFunc(item)].push_back(item); + } + return grouped; +} +template +auto filter(const Container& container, Predicate predicate) { + Container result; + result.reserve(container.size()); + for (const auto& item : container) { + if (predicate(item)) { + result.push_back(item); + } + } + return result; +} } // namespace ipc_monitor } // namespace dynolog_npu #endif // IPC_MONITOR_UTILS_H diff --git a/dynolog_npu/plugin/setup.py b/dynolog_npu/plugin/setup.py index 28e558b2f20bfd4cf55fc088da762a1b8e62e79f..c4b726c543eace0a79e23f29e01ac55b967606e8 100644 --- a/dynolog_npu/plugin/setup.py +++ b/dynolog_npu/plugin/setup.py @@ -27,8 +27,11 @@ GLOG_LIB_PATH = os.path.join(DYNOLOG_PATH, "build", "third_party", "glog") ext_modules = [ Pybind11Extension( "IPCMonitor", # Name of the Python module - sources=["bindings.cpp"] + list(glob("ipc_monitor/*.cpp")), # Source files + sources=["bindings.cpp"] + + list(glob("ipc_monitor/*.cpp")) + + list(glob("ipc_monitor/metric/*.cpp")), # Source files include_dirs=[os.path.join(BASE_DIR, "ipc_monitor"), # Include directories + os.path.join(BASE_DIR, "ipc_monitor/metric"), JSON_INC_PATH, GLOG_INC_PATH, GLOG_LIB_PATH], extra_compile_args=["-std=c++14", "-fPIC", "-fstack-protector-all", "-fno-strict-aliasing", "-fno-common", "-fvisibility=hidden", "-fvisibility-inlines-hidden", "-Wfloat-equal", "-Wextra", "-O2"],