From 82b40257bb321bf142adf21a0e54c5979e3084fc Mon Sep 17 00:00:00 2001 From: Gallium Date: Thu, 27 Feb 2025 16:09:52 +0800 Subject: [PATCH 1/2] metric_process --- .../plugin/ipc_monitor/DynoLogNpuMonitor.cpp | 6 +-- .../plugin/ipc_monitor/InputParser.cpp | 2 +- .../plugin/ipc_monitor/MsptiMonitor.cpp | 26 +---------- .../plugin/metric/MetricKernelProcess.cpp | 17 +++++++ .../plugin/metric/MetricKernelProcess.h | 21 +++++++++ dynolog_npu/plugin/metric/MetricManager.cpp | 44 +++++++++++++++++++ dynolog_npu/plugin/metric/MetricManager.h | 25 +++++++++++ dynolog_npu/plugin/metric/MetricProcessBase.h | 19 ++++++++ 8 files changed, 132 insertions(+), 28 deletions(-) create mode 100644 dynolog_npu/plugin/metric/MetricKernelProcess.cpp create mode 100644 dynolog_npu/plugin/metric/MetricKernelProcess.h create mode 100644 dynolog_npu/plugin/metric/MetricManager.cpp create mode 100644 dynolog_npu/plugin/metric/MetricManager.h create mode 100644 dynolog_npu/plugin/metric/MetricProcessBase.h diff --git a/dynolog_npu/plugin/ipc_monitor/DynoLogNpuMonitor.cpp b/dynolog_npu/plugin/ipc_monitor/DynoLogNpuMonitor.cpp index 4aea4b8aa1..d94190872b 100644 --- a/dynolog_npu/plugin/ipc_monitor/DynoLogNpuMonitor.cpp +++ b/dynolog_npu/plugin/ipc_monitor/DynoLogNpuMonitor.cpp @@ -25,7 +25,7 @@ bool DynoLogNpuMonitor::Init() ErrCode DynoLogNpuMonitor::DealMonitorReq(const MsptiMonitorCfg& cmd) { if (cmd.monitorStart && !msptiMonitor_.IsStarted()) { - PRINT_INFO("Start Mspti Monitor thread to collect, reportTimes: %s, enableActivity: %s", reportTimes, enableActivity); + PRINT_INFO("Start Mspti Monitor thread to collect, reportTimes: %s, enableActivity: %s"); msptiMonitor_.Start(); } @@ -40,7 +40,7 @@ ErrCode DynoLogNpuMonitor::DealMonitorReq(const MsptiMonitorCfg& cmd) } } msptiMonitor_.SetFlushInterval(cmd.reportIntervals); - return ErrCode.SUC; + return ErrCode::SUC; } std::string DynoLogNpuMonitor::Poll() @@ -63,7 +63,7 @@ void DynoLogNpuMonitor::EnableMsptiMonitor(std::unordered_mapDynoLogGetOpts(cfg_map); if (cmd.isMonitor) { auto ans = DealMonitorReq(cmd); - if (ans != ErrCode.SUC) { + if (ans != ErrCode::SUC) { PRINT_ERROR("deal monitor request fail, because" + IPC_ERROR(ans)); } } diff --git a/dynolog_npu/plugin/ipc_monitor/InputParser.cpp b/dynolog_npu/plugin/ipc_monitor/InputParser.cpp index 7a1f0d20b3..b936b3e574 100644 --- a/dynolog_npu/plugin/ipc_monitor/InputParser.cpp +++ b/dynolog_npu/plugin/ipc_monitor/InputParser.cpp @@ -42,7 +42,7 @@ std::vector str2Kinds(const std::string& kindStrs) if (kind == kindStrMap.end()) { return {MSPTI_ACTIVITY_KIND_INVALID}; } - ans.push_back(kind); + ans.push_back(kind->second); } return ans; } diff --git a/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp b/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp index 81a8fdc190..419202b0a1 100644 --- a/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp +++ b/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp @@ -1,5 +1,6 @@ #include "MsptiMonitor.h" +#include "metric/MetricManager.h" #include #include @@ -287,30 +288,7 @@ void MsptiMonitor::BufferConsume(msptiActivity *record) if (record == nullptr) { return; } - std::string message; - if (record->kind == MSPTI_ACTIVITY_KIND_MARKER) { - msptiActivityMarker *marker = ReinterpretConvert(record); - message = ConstructMarkerMessage(*marker); - } else if (record->kind == MSPTI_ACTIVITY_KIND_KERNEL) { - msptiActivityKernel *kernel = ReinterpretConvert(record); - message = ConstructKernelMessage(*kernel); - } else if (record->kind == MSPTI_ACTIVITY_KIND_HCCL) { - msptiActivityHccl *hccl = ReinterpretConvert(record); - message = ConstructHcclMessage(*hccl); - } else if (record->kind == MSPTI_ACTIVITY_KIND_API) { - msptiActivityApi *api = ReinterpretConvert(record); - message = ConstructApiMessage(*api); - } else if (record->kind == MSPTI_ACTIVITY_KIND_MEMORY) { - msptiActivityMemory *memory = ReinterpretConvert(record); - message = ConstructMemoryMessage(*memory); - } else if (record->kind == MSPTI_ACTIVITY_KIND_MEMSET) { - msptiActivityMemset *memset = ReinterpretConvert(record); - message = ConstructMemsetMessage(*memset); - } else if (record->kind == MSPTI_ACTIVITY_KIND_MEMCPY) { - msptiActivityMemcpy *memcpy = ReinterpretConvert(record); - message = ConstructMemcpyMessage(*memcpy); - } - SendMessage(message); + MetricManger::GetInstance().consumeMsptiData(record); } void MsptiMonitor::SendMessage(const std::string &message) diff --git a/dynolog_npu/plugin/metric/MetricKernelProcess.cpp b/dynolog_npu/plugin/metric/MetricKernelProcess.cpp new file mode 100644 index 0000000000..015ef49b35 --- /dev/null +++ b/dynolog_npu/plugin/metric/MetricKernelProcess.cpp @@ -0,0 +1,17 @@ +#include + + +namespace dynolog_npu { +namespace metric { + +void MetricKernelProcess::consumeMsptiData(msptiActivity *record) +{ + +} + +void MetricKernelProcess::aggregatedData() +{ + +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricKernelProcess.h b/dynolog_npu/plugin/metric/MetricKernelProcess.h new file mode 100644 index 0000000000..4aafa5a752 --- /dev/null +++ b/dynolog_npu/plugin/metric/MetricKernelProcess.h @@ -0,0 +1,21 @@ +#include "MetricProcessBase.h" + +namespace dynolog_npu { +namespace metric { + +class KernelMetric { + +public: + std::string series(); +} + +class MetricKernelProcess: MetricProcessBase +{ +private: + /* data */ +public: + override void consumeMsptiData(msptiActivity *record); + override void aggregatedData(); +}; +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricManager.cpp b/dynolog_npu/plugin/metric/MetricManager.cpp new file mode 100644 index 0000000000..692dcd4dc2 --- /dev/null +++ b/dynolog_npu/plugin/metric/MetricManager.cpp @@ -0,0 +1,44 @@ +#include "MetricManager.cpp" + +namespace dynolog_npu { +namespace metric { + + +ErrCode MetricManager::consumeMsptiData(msptiActivity *record) +{ + if (!kindSwitch[record-kind]) { + return ErrCode.PERMISSION; + } + metricProcessBase metricProcess = metrics[record->kind]; + consumeStatus_[record->kind] = true; + metricProcess.consumeMsptiData(); + consumeStatus_[record->kind] = false; + return ErrCode.PERMISSION; +} + +void MetricManager::setReportInterval(uint32_t intervalTimes) +{ + ReportInterval_.store(intervalTimes); +} + +void MetricManager::Run() +{ + while (true) + { + std::unique_lock lock(cvMtx_); + cv_.wait_for(lock, std::chrono::seconds(flushInterval_.load()), + [&]() { return checkFlush_.load() || !start_.load();}); + sendMetricMsg(); + } +} + +void sendMetricMsg() +{ + for (int i = 0; i < MSPTI_ACTIVITY_KIND_COUNT; i++) { + if (kindSwitchs[i]) { + metric_process.sendMessage(); + } + } +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricManager.h b/dynolog_npu/plugin/metric/MetricManager.h new file mode 100644 index 0000000000..ddf3656a09 --- /dev/null +++ b/dynolog_npu/plugin/metric/MetricManager.h @@ -0,0 +1,25 @@ +#include +#include + +#include "ipc_monitor/utils.h" +#include "ipc_monitor/singleton.h" +#include "ipc_monitor/mspti.h" + +namespace dynolog_npu { +namespace metric { +class MetricManager: ipc_monitor::Singleton +{ +private: + std::vector> kindSwitchs_; + std::vector> consumeStatus_; + std::atomic ReportInterval_; + std::vector> metrics; +public: + MetricManager(/* args */) = default; + ~MetricManager() = default; + ErrCode consumeMsptiData(msptiActivity *record); + void setReportInterval(uint32_t intervalTimes); + void sendMetricMsg(); +}; +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricProcessBase.h b/dynolog_npu/plugin/metric/MetricProcessBase.h new file mode 100644 index 0000000000..f6c1714793 --- /dev/null +++ b/dynolog_npu/plugin/metric/MetricProcessBase.h @@ -0,0 +1,19 @@ +#include "mspti.h" + + +namespace dynolog_npu { +namespace metric { +class MetricProcessBase +{ +public: + void sendMessage(std::string msg); + virtual void consumeMsptiData(msptiActivity *record); + virtual void aggregatedData(); +}; + +void MetricProcessBase::sendMessage(std::string msg) +{ + +} +} +} \ No newline at end of file -- Gitee From c15fad3b10f0e3735c27ce995cb7943c524173f5 Mon Sep 17 00:00:00 2001 From: Gallium Date: Thu, 27 Feb 2025 16:15:50 +0800 Subject: [PATCH 2/2] metric --- .../dynolog_npu/dynolog/src/Metric.cpp | 37 +++++++++ .../dynolog/src/tracing/IPCMonitor.cpp | 14 ++++ .../plugin/ipc_monitor/InputParser.cpp | 1 + .../plugin/ipc_monitor/MsptiMonitor.cpp | 5 +- dynolog_npu/plugin/ipc_monitor/TimerTask.h | 79 +++++++++++++++++++ .../ipc_monitor/metric/MetricApiProcess.cpp | 47 +++++++++++ .../ipc_monitor/metric/MetricApiProcess.h | 35 ++++++++ .../metric/MetricKernelProcess.cpp | 43 ++++++++++ .../ipc_monitor/metric/MetricKernelProcess.h | 34 ++++++++ .../ipc_monitor/metric/MetricManager.cpp | 60 ++++++++++++++ .../{ => ipc_monitor}/metric/MetricManager.h | 28 ++++--- .../ipc_monitor/metric/MetricMarkProcess.cpp | 32 ++++++++ .../ipc_monitor/metric/MetricMarkProcess.h | 34 ++++++++ .../ipc_monitor/metric/MetricProcessBase.h | 63 +++++++++++++++ dynolog_npu/plugin/ipc_monitor/utils.h | 19 +++++ .../plugin/metric/MetricKernelProcess.cpp | 17 ---- .../plugin/metric/MetricKernelProcess.h | 21 ----- dynolog_npu/plugin/metric/MetricManager.cpp | 44 ----------- dynolog_npu/plugin/metric/MetricProcessBase.h | 19 ----- dynolog_npu/plugin/setup.py | 3 +- 20 files changed, 523 insertions(+), 112 deletions(-) create mode 100644 dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp create mode 100644 dynolog_npu/plugin/ipc_monitor/TimerTask.h create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp rename dynolog_npu/plugin/{ => ipc_monitor}/metric/MetricManager.h (39%) create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.cpp create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.h create mode 100644 dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h delete mode 100644 dynolog_npu/plugin/metric/MetricKernelProcess.cpp delete mode 100644 dynolog_npu/plugin/metric/MetricKernelProcess.h delete mode 100644 dynolog_npu/plugin/metric/MetricManager.cpp delete mode 100644 dynolog_npu/plugin/metric/MetricProcessBase.h diff --git a/dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp b/dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp new file mode 100644 index 0000000000..9b8f22ecac --- /dev/null +++ b/dynolog_npu/dynolog_npu/dynolog/src/Metric.cpp @@ -0,0 +1,37 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. + +#include "dynolog/src/Metrics.h" + +#include +#include + +namespace dynolog { + +const std::vector getAllMetrics() { + static std::vector metrics_ = { + {.name = "kindName", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spend on user or system mode."}, + {.name = "duration", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spent in user mode."}, + {.name = "timestamp", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spent in system mode."}, + {.name = "deviceId", + .type = MetricType::Ratio, + .desc = "Fraction of total CPU time spent in idle mode."}, + }; + return metrics_; +} + +// These metrics are dynamic per network drive +const std::vector getNetworkMetrics() { + static std::vector metrics_ = {}; + return metrics_; +} + +} // namespace dynolog \ No newline at end of file diff --git a/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp b/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp index c2b6625dbd..fddebc367b 100644 --- a/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp +++ b/dynolog_npu/dynolog_npu/dynolog/src/tracing/IPCMonitor.cpp @@ -73,6 +73,7 @@ void IPCMonitor::processMsg(std::unique_ptr msg) { msg->metadata.type, kLibkinetoRequest.data(), kLibkinetoRequest.size()) == 0) { + getLibkinetoOnDemandRequest(std::move(msg)); } else { LOG(ERROR) << "TYPE UNKOWN: " << msg->metadata.type; } @@ -83,6 +84,18 @@ void tracing::IPCMonitor::setLogger(std::unique_ptr logger) logger_ = std::move(logger); } +void LogData(nlohmann::json result) +{ + if (result["kind"] == "API") { + auto timestamp = result["timestamp"].get(); + logger_->logInt("timestamp", timestamp); + auto duration = result["duration"].get(); + logger_->logFloat("duration", duration); + auto deviceId = result["deviceId"].get(); + logger_->logInt("deviceId", deviceId); + } +} + void IPCMonitor::processDataMsg(std::unique_ptr msg) { if (!data_ipc_manager_) { @@ -96,6 +109,7 @@ void IPCMonitor::processDataMsg(std::unique_ptr msg) std::string message = std::string((char*)msg->buf.get(), msg->metadata.size); try { nlohmann::json result = nlohmann::json::parse(message); + LogData(result); LOG(INFO) << "Received data message : " << result; } catch (nlohmann::json::parse_error&) { LOG(ERROR) << "Error parsing message = " << message; diff --git a/dynolog_npu/plugin/ipc_monitor/InputParser.cpp b/dynolog_npu/plugin/ipc_monitor/InputParser.cpp index b936b3e574..c261245474 100644 --- a/dynolog_npu/plugin/ipc_monitor/InputParser.cpp +++ b/dynolog_npu/plugin/ipc_monitor/InputParser.cpp @@ -26,6 +26,7 @@ const std::unordered_set cfgMap { const std::unordered_map kindStrMap { {"Marker", MSPTI_ACTIVITY_KIND_MARKER}, + {"Kernel", MSPTI_ACTIVITY_KIND_KERNEL}, {"API", MSPTI_ACTIVITY_KIND_API}, {"Hccl", MSPTI_ACTIVITY_KIND_HCCL}, {"Memory", MSPTI_ACTIVITY_KIND_MEMORY}, diff --git a/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp b/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp index 3e31f27199..40120bc6dd 100644 --- a/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp +++ b/dynolog_npu/plugin/ipc_monitor/MsptiMonitor.cpp @@ -186,6 +186,7 @@ void MsptiMonitor::Uninit() if (!start_.load()) { return; } + metric::MetricManager::GetInstance()->Stop(); start_.store(false); cv_.notify_one(); Thread::Stop(); @@ -200,6 +201,7 @@ void MsptiMonitor::EnableActivity(msptiActivityKind kind) } else { std::cout << "MsptiMonitor::EnableActivity failed, kind: " << static_cast(kind) << std::endl; } + metric::MetricManager::GetInstance()->EnableKindSwitch_(kind); } } @@ -222,6 +224,7 @@ void MsptiMonitor::SetFlushInterval(uint32_t interval) if (start_.load()) { cv_.notify_one(); } + metric::MetricManager::GetInstance()->SetReportInterval(interval); } bool MsptiMonitor::IsStarted() @@ -325,7 +328,7 @@ void MsptiMonitor::BufferConsume(msptiActivity *record) if (record == nullptr) { return; } - MetricManger::GetInstance().consumeMsptiData(record); + metric::MetricManager::GetInstance()->ConsumeMsptiData(record); } void MsptiMonitor::SendMessage(const std::string &message) diff --git a/dynolog_npu/plugin/ipc_monitor/TimerTask.h b/dynolog_npu/plugin/ipc_monitor/TimerTask.h new file mode 100644 index 0000000000..3965a0c486 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/TimerTask.h @@ -0,0 +1,79 @@ +#ifndef TIMER_TASK_H +#define TIMER_TASK_H + +#include +#include +#include +#include +#include +#include + +namespace dynolog_npu { +namespace ipc_monitor { +class TimerTask { +public: + TimerTask(int interval) + : name(name), interval(interval), running(false), manual_trigger(false) {} + + ~TimerTask() + { + Stop(); + } + + void Run() { + if (running) { + std::cerr << "Timer task is already running." << std::endl; + return; + } + running = true; + taskThread = std::thread(&TimerTask::timerTask, this); + } + + void Trigger() { + std::unique_lock lock(cv_mutex); + manual_trigger = true; + cv.notify_one(); + } + + // 停止定时任务 + void Stop() { + if (!running) { + std::cerr << "Timer task is not running." << std::endl; + return; + } + + running = false; + cv.notify_one(); + taskThread.join(); + } + +private: + // 定时任务线程函数 + void TaskRun() { + std::cout << name << " Timer task started." << std::endl; + while (running) { + cv_.wait_for(lock. std::chrono::seconds(3), [&] {return manual_trigger || !running;}); + if (!running) { + break; + } + if (manual_trigger) { + manual_trigger = false; + } else if (running) { + ExecuteTask(); + } + } + std::cout << name << " Timer task stopped." << std::endl; + } + + int interval; + std::string name; + std::condition_variable cv; + std::mutex cv_mutex; + bool manual_trigger; + std::atomic running; + std::thread taskThread; +}; + +} +} +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp new file mode 100644 index 0000000000..71e7c5f38a --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.cpp @@ -0,0 +1,47 @@ +#include "metric/MetricAPIProcess.h" +#include "utils.h" +#include "numeric" +#include + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +std::string ApiMetric::seriesToJson() +{ + nlohmann::json jsonMsg; + jsonMsg["kind"] = "msptiActivityApi"; + jsonMsg["deviceId"] = -1; + jsonMsg["duration"] = duration; + jsonMsg["timestamp"] = timestamp; + return jsonMsg.dump(); +} + +void MetricApiProcess::ConsumeMsptiData(msptiActivity *record) +{ + msptiActivityApi* apiData = ReinterpretConvert(record); + msptiActivityApi* tmp = ReinterpretConvert(MsptiMalloc(sizeof(msptiActivityApi)), 8); + memcpy(ptr, apiData, sizeof(msptiActivityApi)); + records.emplace_back(ptr); +} + +std::string MetricApiProcess::AggregatedData() +{ + ApiMetric apiMetric{}; + auto ans = std::accumulate(records.begin(), records.end(), 0, + [](int acc, std::shared_ptr api) { + return acc + api->end - api->start; + }); + apiMetric.duration = ans; + apiMetric.deviceId = -1; + apiMetric.timesatmp = getCurrentTimestamp(); + return apiMetric.seriesToJson(); +} + +void MetricApiProcess::Clear() +{ + records.clear(); +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h new file mode 100644 index 0000000000..8d79b2feea --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricApiProcess.h @@ -0,0 +1,35 @@ +#ifndef METRIC_API_PROCESS_H +#define METRIC_API_PROCESS_H + +#include +#include +#include "metric/MetricProcessBase.h" + + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +struct ApiMetric { + std::string kindName; + double duration; + uint32_t timestamp; + uint32_t deviceId; +public: + std::string seriesToJson(); +}; + +class MetricApiProcess: public MetricProcessBase +{ +private: + std::vector> records; +public: + void ConsumeMsptiData(msptiActivity *record) override; + std::string AggregatedData() override; + void Clear() override; +}; +} +} +} + +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp new file mode 100644 index 0000000000..717a87447a --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.cpp @@ -0,0 +1,43 @@ +#include "MetricKernelProcess.h" + + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +void MetricKernelProcess::ConsumeMsptiData(msptiActivity *record) +{ + msptiActivityKernel* kernel = ReinterpretConvert(record); + msptiActivityKernel* ptr = ReinterpretConvert(MsptiMalloc(sizeof(msptiActivityKernel), 8)); + memcpy(ptr, kernel, sizeof(msptiActivityKernel)); + records.emplace_back(ptr); +} + +std::string MetricKernelProcess::AggregatedData() +{ + auto deviceId2KernelData = groupby(records, [](std::shared_ptr kernel) { + return kernel->ds.deviceId; + }); + for (auto& pair : deviceId2KernelData) { + auto deviceId = pair->first; + auto kernelData = pair->second; + KernelMetric kernelMetric{}; + auto ans = std::accumulate(kernelData.begin(), kernelData.end(), 0, + [](int acc, std::shared_ptr kernel) { + return acc + kernel->end - kernel->start; + }); + kernelMetric.duration = ans; + kernelMetric.name = "msptiActivityKernel"; + kernelMetric.deviceId = deviceId; + kernelMetric.timestamp = getCurrentTimestamp(); + } + return kernelMetric.seriesToJson(); +} + +void MetricKernelProcess::Clear() +{ + records.clear(); +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h new file mode 100644 index 0000000000..42b6d1aa9b --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricKernelProcess.h @@ -0,0 +1,34 @@ +#ifndef METRIC_KERNEL_PROCESS_H +#define METRIC_KERNEL_PROCESS_H + +#include +#include "metric/MetricProcessBase.h" + + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +class KernelMetric { + std::string name; + double duration; + double timestamp; + uint32_t deviceId; +public: + std::string seriesToJson(); +}; + +class MetricKernelProcess: public MetricProcessBase +{ +private: + std::vector> records; +public: + void ConsumeMsptiData(msptiActivity *record) override; + std::string AggregatedData() override; + void Clear() override; +}; +} +} +} + +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp new file mode 100644 index 0000000000..2eae5e1fed --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.cpp @@ -0,0 +1,60 @@ +#include "metric/MetricManager.h" +#include "MetricKernelProcess.h" +#include "MetricMarkProcess.h" +#include "MetricAPIProcess.h" + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +MetricManager::MetricManager(): TimerTask("MetricManager", 60), +kindSwitchs_(MSPTI_ACTIVITY_KIND_COUNT), consumeStatus_(MSPTI_ACTIVITY_KIND_COUNT), metrics(MSPTI_ACTIVITY_KIND_COUNT) { + metrics[MSPTI_ACTIVITY_KIND_KERNEL] = std::make_shared(); + metrics[MSPTI_ACTIVITY_KIND_API] = std::make_shared(); + metrics[MSPTI_ACTIVITY_KIND_MARKER] = std::make_shared(); +} + +ErrCode MetricManager::ConsumeMsptiData(msptiActivity *record) +{ + if (!kindSwitchs_[record->kind]) { + return ErrCode::PERMISSION; + } + auto metricProcess = metrics[record->kind]; + consumeStatus_[record->kind] = true; + metricProcess->consumeMsptiData(); + consumeStatus_[record->kind] = false; + return ErrCode::SUC; +} + +void MetricManager::SetReportInterval(uint32_t intervalTimes) +{ + if (ReportInterval_.load() != intervalTimes) { + // 这里需要添加一次发送请求 + SendMetricMsg(); + SetInterval(intervalTimes); + ReportInterval_.store(intervalTimes); + } +} + +void MetricManager::ExecuteTask() +{ + SendMetricMsg(); +} + +void MetricManager::SendMetricMsg() +{ + for (int i = 0; i < MSPTI_ACTIVITY_KIND_COUNT; i++) { + if (kindSwitchs_[i].load()) { + metrics[i]->SendMessage(metrics[i]->AggregatedData()); + metrics[i]->Clear(); + } + } +} + +void MetricManager::EnableKindSwitch_(msptiActivityKind kind) +{ + kindSwitchs_[kind] = true; +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricManager.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.h similarity index 39% rename from dynolog_npu/plugin/metric/MetricManager.h rename to dynolog_npu/plugin/ipc_monitor/metric/MetricManager.h index ddf3656a09..d57bab1ea9 100644 --- a/dynolog_npu/plugin/metric/MetricManager.h +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricManager.h @@ -1,13 +1,19 @@ +#ifndef METRIC_MANAGER_H +#define METRIC_MANAGER_H + #include #include -#include "ipc_monitor/utils.h" -#include "ipc_monitor/singleton.h" -#include "ipc_monitor/mspti.h" +#include "utils.h" +#include "singleton.h" +#include "mspti.h" +#include "TimerTask.h" +#include "metric/MetricProcessBase.h" namespace dynolog_npu { +namespace ipc_monitor { namespace metric { -class MetricManager: ipc_monitor::Singleton +class MetricManager: public ipc_monitor::Singleton, public TimerTask { private: std::vector> kindSwitchs_; @@ -15,11 +21,15 @@ private: std::atomic ReportInterval_; std::vector> metrics; public: - MetricManager(/* args */) = default; + MetricManager(); ~MetricManager() = default; - ErrCode consumeMsptiData(msptiActivity *record); - void setReportInterval(uint32_t intervalTimes); - void sendMetricMsg(); + ErrCode ConsumeMsptiData(msptiActivity *record); + void SetReportInterval(uint32_t intervalTimes); + void SendMetricMsg(); + void ExecuteTask() override; + void EnableKindSwitch_(msptiActivityKind kind); }; } -} \ No newline at end of file +} +} +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.cpp b/dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.cpp new file mode 100644 index 0000000000..62b6dbe7ee --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.cpp @@ -0,0 +1,32 @@ +#include "metric/MetricProcessBase.h" + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +void MetricMarkerProcess::ConsumeMsptiData(msptiActivity *record) +{ + msptiActivityMarker* marker = ReinterpretConvert(record); + msptiActivityMarker* ptr = ReinterpretConvert(MsptiMalloc(sizeof(msptiActivityMarker), 8)); + memcpy(ptr, marker, sizeof(msptiActivityMarker)); + records.emplace_back(ptr); +} + +std::string MetricMarkerProcess::AggregatedData() +{ + auto afterFilter = filter(records, [](std::shared_ptr data) { + return data->domain == "communcation"; + }); + + auto deviceId2Data = groupBy(records, [](std::shared_ptr data) { + return data-> + }); +} + +void MetricMarkerProcess::Clear() +{ + records.Clear(); +} +} +} +} \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.h new file mode 100644 index 0000000000..b3cbc73d58 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricMarkProcess.h @@ -0,0 +1,34 @@ +#ifndef METRIC_MARK_PROCESS_H +#define METRIC_MARK_PROCESS_H + +#include +#include "metric/MetricProcessBase.h" + +namespace dynolog_npu { +namespace ipc_monitor{ +namespace metric { + +class KernelMetric { + std::string name; + std::string domain; + double duration; + double timestamp; + uint32 deviceId; +public: + std::string seriesToJson(); +} + +class MetricKernelProcess: MetricProcessBase +{ +private: + std::vector> records;; +public: + void ConsumeMsptiData(msptiActivity *record) override; + void AggregatedData() override; + void Clear() override; +}; +} +} +} + +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h b/dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h new file mode 100644 index 0000000000..7150d4ebd9 --- /dev/null +++ b/dynolog_npu/plugin/ipc_monitor/metric/MetricProcessBase.h @@ -0,0 +1,63 @@ +#ifndef METRIC_PROCESS_BASE_H +#define METRIC_PROCESS_BASE_H + +#include + +#include "DynoLogNpuMonitor.h" +#include "NpuIpcClient.h" +#include "mspti.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace metric { +class MetricProcessBase +{ +public: + void SendMessage(std::string message) + { + if (message.empty()) { + std::cout << "MsptiMonitor::SendMessage message is empty" << std::endl; + return; + } + static const std::string destName = DYNO_IPC_NAME + "_data"; + static const int maxRetry = 5, retryWaitTimeUs = 1000; + auto msg = Message::ConstructStrMessage(message, MSG_TYPE_DATA); + if (!msg) { + std::cout << "MsptiMonitor::ConstructStrMessage failed, message: " << message << std::endl; + return; + } + auto ipcClient = DynoLogNpuMonitor::GetInstance()->GetIpcClient(); + if (!ipcClient) { + std::cout << "DynoLogNpuMonitor ipcClient is nullptr" << std::endl; + return; + } + if (!ipcClient->SyncSendMessage(*msg, destName, maxRetry, retryWaitTimeUs)) { + std::cout << "send mspti message failed: " << message << std::endl; + perror("send mspti message failed"); + } + } + + virtual void ConsumeMsptiData(msptiActivity *record) = 0; + virtual std::string AggregatedData() = 0; + virtual void Clear() = 0; + + void *MsptiMalloc(size_t size, size_t alignment) + { + if (alignment > 0) { + size = (size + alignment - 1) / alignment * alignment; + } + #if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L + void *ptr = nullptr; + if (posix_memalign(&ptr, alignment, size) != 0) { + ptr = nullptr; + } + return ptr; + #else + return malloc(size); + #endif + } +}; +} +} +} +#endif \ No newline at end of file diff --git a/dynolog_npu/plugin/ipc_monitor/utils.h b/dynolog_npu/plugin/ipc_monitor/utils.h index 7a0ef7ec25..73fc78911c 100644 --- a/dynolog_npu/plugin/ipc_monitor/utils.h +++ b/dynolog_npu/plugin/ipc_monitor/utils.h @@ -58,7 +58,26 @@ inline T ReinterpretConvert(V ptr) { return reinterpret_cast(ptr); } +template +std::unordered_map> groupby(const std::vector& vec, KeyFunc keyFunc) { + std::unordered_map> grouped; + for (const auto& item : vec) { + grouped[keyFunc(item)].push_back(item); + } + return grouped; +} +template +auto filter(const Container& container, Predicate predicate) { + Container result; + result.reserve(container.size()); + for (const auto& item : container) { + if (predicate(item)) { + result.push_back(item); + } + } + return result; +} } // namespace ipc_monitor } // namespace dynolog_npu diff --git a/dynolog_npu/plugin/metric/MetricKernelProcess.cpp b/dynolog_npu/plugin/metric/MetricKernelProcess.cpp deleted file mode 100644 index 015ef49b35..0000000000 --- a/dynolog_npu/plugin/metric/MetricKernelProcess.cpp +++ /dev/null @@ -1,17 +0,0 @@ -#include - - -namespace dynolog_npu { -namespace metric { - -void MetricKernelProcess::consumeMsptiData(msptiActivity *record) -{ - -} - -void MetricKernelProcess::aggregatedData() -{ - -} -} -} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricKernelProcess.h b/dynolog_npu/plugin/metric/MetricKernelProcess.h deleted file mode 100644 index 4aafa5a752..0000000000 --- a/dynolog_npu/plugin/metric/MetricKernelProcess.h +++ /dev/null @@ -1,21 +0,0 @@ -#include "MetricProcessBase.h" - -namespace dynolog_npu { -namespace metric { - -class KernelMetric { - -public: - std::string series(); -} - -class MetricKernelProcess: MetricProcessBase -{ -private: - /* data */ -public: - override void consumeMsptiData(msptiActivity *record); - override void aggregatedData(); -}; -} -} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricManager.cpp b/dynolog_npu/plugin/metric/MetricManager.cpp deleted file mode 100644 index 692dcd4dc2..0000000000 --- a/dynolog_npu/plugin/metric/MetricManager.cpp +++ /dev/null @@ -1,44 +0,0 @@ -#include "MetricManager.cpp" - -namespace dynolog_npu { -namespace metric { - - -ErrCode MetricManager::consumeMsptiData(msptiActivity *record) -{ - if (!kindSwitch[record-kind]) { - return ErrCode.PERMISSION; - } - metricProcessBase metricProcess = metrics[record->kind]; - consumeStatus_[record->kind] = true; - metricProcess.consumeMsptiData(); - consumeStatus_[record->kind] = false; - return ErrCode.PERMISSION; -} - -void MetricManager::setReportInterval(uint32_t intervalTimes) -{ - ReportInterval_.store(intervalTimes); -} - -void MetricManager::Run() -{ - while (true) - { - std::unique_lock lock(cvMtx_); - cv_.wait_for(lock, std::chrono::seconds(flushInterval_.load()), - [&]() { return checkFlush_.load() || !start_.load();}); - sendMetricMsg(); - } -} - -void sendMetricMsg() -{ - for (int i = 0; i < MSPTI_ACTIVITY_KIND_COUNT; i++) { - if (kindSwitchs[i]) { - metric_process.sendMessage(); - } - } -} -} -} \ No newline at end of file diff --git a/dynolog_npu/plugin/metric/MetricProcessBase.h b/dynolog_npu/plugin/metric/MetricProcessBase.h deleted file mode 100644 index f6c1714793..0000000000 --- a/dynolog_npu/plugin/metric/MetricProcessBase.h +++ /dev/null @@ -1,19 +0,0 @@ -#include "mspti.h" - - -namespace dynolog_npu { -namespace metric { -class MetricProcessBase -{ -public: - void sendMessage(std::string msg); - virtual void consumeMsptiData(msptiActivity *record); - virtual void aggregatedData(); -}; - -void MetricProcessBase::sendMessage(std::string msg) -{ - -} -} -} \ No newline at end of file diff --git a/dynolog_npu/plugin/setup.py b/dynolog_npu/plugin/setup.py index c6d42cf984..a6fb747b49 100644 --- a/dynolog_npu/plugin/setup.py +++ b/dynolog_npu/plugin/setup.py @@ -23,8 +23,9 @@ BASE_DIR = os.path.dirname(os.path.realpath(__file__)) ext_modules = [ Pybind11Extension( "IPCMonitor", # Name of the Python module - sources=["bindings.cpp"] + list(glob("ipc_monitor/*.cpp")), # Source files + sources=["bindings.cpp"] + list(glob("ipc_monitor/*.cpp")) + list(glob("ipc_monitor/metric/*.cpp")), # Source files include_dirs=[os.path.join(BASE_DIR, "ipc_monitor"), # Include directories + os.path.join(BASE_DIR, "ipc_monitor/metric"), os.path.join(os.path.dirname(BASE_DIR), "third_party", "dynolog", "third_party", "json", "single_include")], extra_compile_args=["-std=c++14", "-fPIC", "-fstack-protector-all", "-fno-strict-aliasing", "-fno-common", -- Gitee