diff --git a/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs b/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs index 442ee8c45eae2a46d697a89b1d9b7633fe855a61..239145ec2e976e225305a44d484319ea625c0e57 100644 --- a/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs +++ b/msmonitor/dynolog_npu/cli/src/commands/npumonitor.rs @@ -8,6 +8,7 @@ pub struct NpuMonitorConfig { pub npu_monitor_stop: bool, pub report_interval_s: u32, pub mspti_activity_kind: String, + pub log_file: String } impl NpuMonitorConfig { @@ -16,11 +17,13 @@ impl NpuMonitorConfig { r#"NPU_MONITOR_START={} NPU_MONITOR_STOP={} REPORT_INTERVAL_S={} -MSPTI_ACTIVITY_KIND={}"#, +MSPTI_ACTIVITY_KIND={} +NPU_MONITOR_LOG_FILE={}"#, self.npu_monitor_start, self.npu_monitor_stop, self.report_interval_s, - self.mspti_activity_kind + self.mspti_activity_kind, + self.log_file ) } } diff --git a/msmonitor/dynolog_npu/cli/src/main.rs b/msmonitor/dynolog_npu/cli/src/main.rs index d6ee4ebcaa9e34f57a594ecfeb07b6574ed1b121..a04594c24bf83d1a4118a7676f244154acd3ed81 100644 --- a/msmonitor/dynolog_npu/cli/src/main.rs +++ b/msmonitor/dynolog_npu/cli/src/main.rs @@ -263,6 +263,9 @@ enum Command { /// MSPTI collect activity kind #[clap(long, value_parser = parse_mspti_activity_kinds, default_value = "Marker")] mspti_activity_kind: String, + /// Log file for NPU monitor. + #[clap(long, default_value = "")] + log_file: String, }, /// Pause dcgm profiling. This enables running tools like Nsight compute and avoids conflicts. DcgmPause { @@ -795,12 +798,14 @@ fn main() -> Result<()> { npu_monitor_stop, report_interval_s, mspti_activity_kind, + log_file, } => { let npu_mon_config = NpuMonitorConfig { npu_monitor_start, npu_monitor_stop, report_interval_s, - mspti_activity_kind + mspti_activity_kind, + log_file }; npumonitor::run_npumonitor(client, npu_mon_config) } diff --git a/msmonitor/plugin/CMakeLists.txt b/msmonitor/plugin/CMakeLists.txt index a7d6916b1415612b0a3e8dcb323b8cd91e346d5f..6795fa89d091f478f36760a0028b2a191826f8a2 100644 --- a/msmonitor/plugin/CMakeLists.txt +++ b/msmonitor/plugin/CMakeLists.txt @@ -15,6 +15,7 @@ set(ENV{PROJECT_ROOT_PATH} "${CMAKE_SOURCE_DIR}") include(utils) find_package(glog MODULE REQUIRED) find_package(nlohmannjson MODULE REQUIRED) +find_package(sqlite3 MODULE REQUIRED) include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor @@ -35,6 +36,7 @@ set(SOURCES bindings.cpp ${IPC_SOURCES} ${SECUREC_SOURCES} + ${sqlite3_SOURCES} ) add_library(IPCMonitor MODULE ${SOURCES}) @@ -64,9 +66,9 @@ target_link_options(IPCMonitor PRIVATE -s ) -if(${CMAKE_BUILD_TYPE} STREQUAL "Debug") - add_compile_options(-O0 -g) - add_link_options(-O0 -g) -else() - add_compile_options(-D_FORITFY_SOURCE=2 -O2) -endif() +set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -g") +set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g") +set(CMAKE_EXE_LINKER_FLAGS_DEBUG "${CMAKE_EXE_LINKER_FLAGS_DEBUG} -O0 -g") + +set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -D_FORTIFY_SOURCE=2 -O2") +set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -D_FORTIFY_SOURCE=2 -O2") diff --git a/msmonitor/plugin/IPCMonitor/utils.py b/msmonitor/plugin/IPCMonitor/utils.py index 2b549c376b4abba15473183ca779834aa9f60848..9656b1ef73934df2e38b3b1ea577e5b9d2c84922 100644 --- a/msmonitor/plugin/IPCMonitor/utils.py +++ b/msmonitor/plugin/IPCMonitor/utils.py @@ -29,7 +29,7 @@ def get_pytorch_rank_id() -> Optional[int]: if rank_id is not None and not isinstance(rank_id, int): rank_id = int(rank_id) except Exception as ex: - raise RuntimeError(f"Get rank id failed in pytorch failed: {str(ex)}") from ex + raise RuntimeError(f"Get rank id failed in pytorch: {str(ex)}") from ex return rank_id @@ -77,7 +77,7 @@ def get_mindspore_rank_id() -> Optional[int]: if rank_id is not None and not isinstance(rank_id, int): rank_id = int(rank_id) except Exception as ex: - raise RuntimeError(f"Get rank id failed in mindspore failed: {str(ex)}") from ex + raise RuntimeError(f"Get rank id failed in mindspore: {str(ex)}") from ex return rank_id diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index 7b0caea52930a7655bccb9294b2083943b2a77ed..545c6c5f62550425f963f7f11616f8f333c86ae4 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -34,7 +34,4 @@ PYBIND11_MODULE(IPCMonitor_C, m) { m.def("finalize_dyno", []() -> void { dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->FinalizeDyno(); }); - m.def("set_parallel_group_info", [](std::string parallel_group_info) -> void { - dynolog_npu::ipc_monitor::SetParallelGroupInfo(parallel_group_info); - }, py::arg("parallel_group_info")); } diff --git a/msmonitor/plugin/cmake/Findsqlite3.cmake b/msmonitor/plugin/cmake/Findsqlite3.cmake new file mode 100644 index 0000000000000000000000000000000000000000..afc8c73adc7b8ace75c5b193b9c9832683b6c644 --- /dev/null +++ b/msmonitor/plugin/cmake/Findsqlite3.cmake @@ -0,0 +1,22 @@ +set(PACKAGE_VERSION 3.50.3) + +set(PKG_NAME sqlite3) +set(DOWNLOAD_PATH "$ENV{PROJECT_ROOT_PATH}/third_party") +set(DIR_NAME "${DOWNLOAD_PATH}/sqlite-amalgamation-3500300") + +if (NOT ${PKG_NAME}_FOUND) + +download_opensource_pkg(${PKG_NAME} + DOWNLOAD_PATH ${DOWNLOAD_PATH} +) + +file(GLOB SQLITE3_SRC "${DIR_NAME}/sqlite3.c") +if (NOT SQLITE3_SRC) + message(FATAL_ERROR "Failed to get sqlite3 source code.") +endif() + +set(${PKG_NAME}_SOURCES ${SQLITE3_SRC}) +include_directories(${DIR_NAME}) +set(${PKG_NAME}_FOUND TRUE) + +endif() diff --git a/msmonitor/plugin/cmake/config.ini b/msmonitor/plugin/cmake/config.ini index 5303523816bf0b601c679745ae90402f28e7e083..f7b2eaa4e96bfec6a3b39b75b0b2667962b04e5a 100644 --- a/msmonitor/plugin/cmake/config.ini +++ b/msmonitor/plugin/cmake/config.ini @@ -2,4 +2,7 @@ url = https://gitee.com/mirrors/glog.git [nlohmannjson] -url = https://gitee.com/mirrors/nlohmann-json.git \ No newline at end of file +url = https://gitee.com/mirrors/nlohmann-json.git + +[sqlite3] +url = https://sqlite.org/2025/sqlite-amalgamation-3500300.zip \ No newline at end of file diff --git a/msmonitor/plugin/ipc_monitor/InputParser.cpp b/msmonitor/plugin/ipc_monitor/InputParser.cpp index 7d43eb976a16c63c1868443e18305850f19ff394..0558f00530106ae4df340ad2608863056ca8e06f 100644 --- a/msmonitor/plugin/ipc_monitor/InputParser.cpp +++ b/msmonitor/plugin/ipc_monitor/InputParser.cpp @@ -25,14 +25,7 @@ const std::string MSPTI_ACTIVITY_KIND_KEY = "MSPTI_ACTIVITY_KIND"; const std::string REPORT_INTERVAL_S_KEY = "REPORT_INTERVAL_S"; const std::string NPU_MONITOR_START_KEY = "NPU_MONITOR_START"; const std::string NPU_MONITOR_STOP_KEY = "NPU_MONITOR_STOP"; - -const std::unordered_set cfgMap { - "MSPTI_ACTIVITY_KIND", - "REPORT_INTERVAL_S", - "NPU_MONITOR_START", - "NPU_MONITOR_STOP", - "REQUEST_TRACE_ID" -}; +const std::string NPU_MONITOR_SAVE_PATH = "NPU_MONITOR_LOG_FILE"; const std::unordered_map kindStrMap { {"Marker", MSPTI_ACTIVITY_KIND_MARKER}, @@ -62,7 +55,7 @@ std::set str2Kinds(const std::string& kindStrs) MsptiMonitorCfg InputParser::DynoLogGetOpts(std::unordered_map& cmd) { if (!cmd.count(NPU_MONITOR_START_KEY)) { - return {{MSPTI_ACTIVITY_KIND_INVALID}, 0, false, false, false}; + return {{MSPTI_ACTIVITY_KIND_INVALID}, 0, false, false, false, ""}; } auto activityKinds = str2Kinds(cmd[MSPTI_ACTIVITY_KIND_KEY]); uint32_t reportTimes = 0; @@ -71,7 +64,7 @@ MsptiMonitorCfg InputParser::DynoLogGetOpts(std::unordered_map -#include +#include #include -#include +#include "mspti.h" +#include "singleton.h" namespace dynolog_npu { namespace ipc_monitor { @@ -30,15 +31,15 @@ struct MsptiMonitorCfg { bool monitorStart; bool monitorStop; bool isMonitor; + std::string savePath; }; -class InputParser : public dynolog_npu::ipc_monitor::Singleton { +class InputParser : public Singleton { public: MsptiMonitorCfg DynoLogGetOpts(std::unordered_map& cmd); }; } // namespace ipc_monitor } // namespace dynolog_npu - -#endif \ No newline at end of file +#endif // INPUT_PARSER_H diff --git a/msmonitor/plugin/ipc_monitor/TimerTask.h b/msmonitor/plugin/ipc_monitor/TimerTask.h index 7ddc5d28ada0dd9ae255ef4748d58f8cd410190a..f47e4901d76d15965a71e2a3a7ec0d54547c4e80 100644 --- a/msmonitor/plugin/ipc_monitor/TimerTask.h +++ b/msmonitor/plugin/ipc_monitor/TimerTask.h @@ -30,7 +30,7 @@ public: TimerTask(const std::string& name, int interval) : interval(interval), name(name), manual_trigger(false), running(false) {} - ~TimerTask() + virtual ~TimerTask() { Stop(); } @@ -58,7 +58,7 @@ public: void Stop() { if (!running) { - LOG(ERROR) << name << "Timer task is not running."; + LOG(WARNING) << name << "Timer task is not running."; return; } @@ -77,6 +77,7 @@ public: virtual void InitResource() {}; virtual void ReleaseResource() {}; virtual void ExecuteTask() = 0; + bool IsRunning() { return running.load(); } private: // 定时任务线程函数 void TaskRun() @@ -112,7 +113,6 @@ private: std::atomic running; std::thread taskThread; }; - -} -} -#endif \ No newline at end of file +} // namespace ipc_monitor +} // namespace dynolog_npu +#endif // TIMER_TASK_H diff --git a/msmonitor/plugin/ipc_monitor/db/Connection.cpp b/msmonitor/plugin/ipc_monitor/db/Connection.cpp new file mode 100644 index 0000000000000000000000000000000000000000..a142c6ee1beb187e1d3858a541950ecd9e79fa52 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/Connection.cpp @@ -0,0 +1,228 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "db/Connection.h" +#include "utils.h" + +namespace { +constexpr int32_t TIMEOUT = INT32_MAX; +const std::string CREATE_TABLE = "CREATE TABLE"; +const std::string CREATE_INDEX = "CREATE INDEX"; +const std::string DROP_TABLE = "DROP TABLE"; +const std::string UPDATE = "UPDATE"; +const std::string DELETE = "DELETE"; +const std::string CHECK = "CHECK"; +} + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +Connection::Connection(const std::string &path) +{ + auto rc = sqlite3_open(path.c_str(), &db_); + if (rc != SQLITE_OK) { + LOG(ERROR) << "Open database failed: " << rc << ", msg: " << sqlite3_errmsg(db_); + sqlite3_close_v2(db_); + db_ = nullptr; + } else { + sqlite3_exec(db_, "PRAGMA synchronous=OFF;", nullptr, nullptr, nullptr); + } +} + +Connection::~Connection() +{ + if (stmt_) { + sqlite3_finalize(stmt_); + } + if (db_) { + auto rc = sqlite3_close(db_); + if (rc != SQLITE_OK) { + LOG(ERROR) << "Close database failed: " << rc << ", msg: " << sqlite3_errmsg(db_); + sqlite3_close_v2(db_); + } + db_ = nullptr; + } +} + +bool Connection::ExecuteSql(const std::string &sql, const std::string &sqlType) +{ + CHAR_PTR errMsg{nullptr}; + sqlite3_busy_timeout(db_, TIMEOUT); + auto rc = sqlite3_exec(db_, sql.c_str(), nullptr, nullptr, &errMsg); + if (rc != SQLITE_OK) { + if (sqlType == CHECK) { + LOG(WARNING) << "Execute sql failed: " << rc << ", type: " << sqlType << ", msg: " << errMsg; + } else { + LOG(ERROR) << "Execute sql failed: " << rc << ", type: " << sqlType << ", msg: " << errMsg; + } + sqlite3_free(errMsg); + return false; + } + return true; +} + +bool Connection::CheckTableExists(const std::string &tableName) +{ + std::string sql = "SELECT COUNT(1) FROM " + tableName; + return ExecuteSql(sql, CHECK); +} + +bool Connection::ExecuteCreateTable(const std::string &sql) +{ + return ExecuteSql(sql, CREATE_TABLE); +} + +bool Connection::ExecuteCreateIndex(const std::string &sql) +{ + return ExecuteSql(sql, CREATE_INDEX); +} + +bool Connection::ExecuteDropTable(const std::string &sql) +{ + return ExecuteSql(sql, DROP_TABLE); +} + +bool Connection::ExecuteUpdate(const std::string &sql) +{ + return ExecuteSql(sql, UPDATE); +} + +bool Connection::ExecuteDelete(const std::string &sql) +{ + return ExecuteSql(sql, DELETE); +} + +std::vector Connection::ExecuteGetTableColumns(const std::string &tableName) +{ + std::vector columns; + std::string sql = "PRAGMA table_info(" + tableName + ")"; + sqlite3_busy_timeout(db_, TIMEOUT); + auto rc = sqlite3_prepare_v2(db_, sql.c_str(), -1, &stmt_, nullptr); + if (rc != SQLITE_OK) { + LOG(ERROR) << "Execute sql failed: " << rc << ", msg: " << sqlite3_errmsg(db_); + return columns; + } + while (sqlite3_step(stmt_) == SQLITE_ROW) { + std::string name, type; + GetColumn(name); + GetColumn(type); + columns.emplace_back(name, type); + index_ = 0; + } + return columns; +} + +bool Connection::InsertCmd(const std::string &tableName, uint32_t colNum) +{ + std::string sql = "INSERT INTO " + tableName + " VALUES ("; + for (uint32_t i = 0; i < colNum; ++i) { + sql += "?"; + if (i < colNum - 1) { + sql += ", "; + } + } + sql += ")"; + sqlite3_busy_timeout(db_, TIMEOUT); + auto rc = sqlite3_prepare_v2(db_, sql.c_str(), -1, &stmt_, nullptr); + if (rc != SQLITE_OK) { + LOG(ERROR) << "Execute sql failed: " << rc << ", msg: " << sqlite3_errmsg(db_); + return false; + } + return true; +} + +bool Connection::QueryCmd(const std::string &sql) +{ + auto rc = sqlite3_prepare_v2(db_, sql.c_str(), -1, &stmt_, nullptr); + if (rc != SQLITE_OK) { + LOG(ERROR) << "Execute sql failed: " << rc << ", msg: " << sqlite3_errmsg(db_); + return false; + } + return true; +} + +bool Connection::BindParameters(int32_t value) +{ + return sqlite3_bind_int(stmt_, ++index_, value) == SQLITE_OK; +} + +bool Connection::BindParameters(uint32_t value) +{ + return sqlite3_bind_int64(stmt_, ++index_, value) == SQLITE_OK; +} + +bool Connection::BindParameters(int64_t value) +{ + return sqlite3_bind_int64(stmt_, ++index_, value) == SQLITE_OK; +} + +bool Connection::BindParameters(uint64_t value) +{ + return sqlite3_bind_int64(stmt_, ++index_, value) == SQLITE_OK; +} + +bool Connection::BindParameters(double value) +{ + return sqlite3_bind_double(stmt_, ++index_, value) == SQLITE_OK; +} + +bool Connection::BindParameters(std::string value) +{ + return sqlite3_bind_text(stmt_, ++index_, value.c_str(), -1, SQLITE_TRANSIENT) == SQLITE_OK; +} + +void Connection::GetColumn(uint16_t &value) +{ + value = static_cast(sqlite3_column_int(stmt_, ++index_)); +} + +void Connection::GetColumn(int32_t &value) +{ + value = sqlite3_column_int(stmt_, ++index_); +} + +void Connection::GetColumn(uint32_t &value) +{ + value = static_cast(sqlite3_column_int64(stmt_, ++index_)); +} + +void Connection::GetColumn(int64_t &value) +{ + value = sqlite3_column_int64(stmt_, ++index_); +} + +void Connection::GetColumn(uint64_t &value) +{ + value = static_cast(sqlite3_column_int64(stmt_, ++index_)); +} + +void Connection::GetColumn(double &value) +{ + value = sqlite3_column_double(stmt_, ++index_); +} + +void Connection::GetColumn(std::string &value) +{ + const unsigned char *text = sqlite3_column_text(stmt_, ++index_); + if (text == nullptr) { + value.clear(); + } else { + value = std::string(ReinterpretConvert(text)); + } +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/db/Connection.h b/msmonitor/plugin/ipc_monitor/db/Connection.h new file mode 100644 index 0000000000000000000000000000000000000000..93f939863759d038e977192aed67763e58ec7fdf --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/Connection.h @@ -0,0 +1,208 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef IPC_MONITOR_DB_CONNECTION_H +#define IPC_MONITOR_DB_CONNECTION_H + +#include +#include +#include +#include + +#include +#include + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +using CHAR_PTR = char*; +struct TableColumn { + std::string name; + std::string type; + bool isPrimaryKey = false; + + TableColumn(const std::string &name, const std::string &type, bool isPrimaryKey = false) + : name(name), type(type), isPrimaryKey(isPrimaryKey) {} + + std::string ToString() const + { + return name + " " + type + (isPrimaryKey ? " PRIMARY KEY" : ""); + } + + bool operator==(const TableColumn &other) const + { + return (name == other.name) && (type == other.type); + } +}; + +template +struct IndexSequence {}; + +template +struct IndexSequenceMaker : IndexSequenceMaker {}; + +template +struct IndexSequenceMaker<0, S...> { + using type = IndexSequence; +}; + +template +using MakeIndexSequence = typename IndexSequenceMaker::type; + +class Connection { +public: + explicit Connection(const std::string &path); + ~Connection(); + bool CheckTableExists(const std::string &tableName); + bool ExecuteSql(const std::string &sql, const std::string &sqlType); + bool ExecuteCreateTable(const std::string &sql); + bool ExecuteCreateIndex(const std::string &sql); + bool ExecuteDropTable(const std::string &sql); + template + bool ExecuteInsert(const std::string &tableName, const std::vector> &data); + template + bool ExecuteQuery(const std::string &sql, std::vector> &result); + bool ExecuteUpdate(const std::string &sql); + bool ExecuteDelete(const std::string &sql); + std::vector ExecuteGetTableColumns(const std::string &tableName); + +private: + bool InsertCmd(const std::string &tableName, uint32_t colNum); + bool BindParameters(int32_t value); + bool BindParameters(uint32_t value); + bool BindParameters(int64_t value); + bool BindParameters(uint64_t value); + bool BindParameters(double value); + bool BindParameters(std::string value); + template + void ExecuteInsertHelper(T &row, IndexSequence); + template + int ExecuteInsertHelperHerlper(T t); + template + void InsertRow(T &row); + + bool QueryCmd(const std::string &sql); + void GetColumn(uint16_t &value); + void GetColumn(int32_t &value); + void GetColumn(uint32_t &value); + void GetColumn(int64_t &value); + void GetColumn(uint64_t &value); + void GetColumn(double &value); + void GetColumn(std::string &value); + template + void ExecuteQueryHelper(T &row, IndexSequence); + template + int ExecuteQueryHelperHelper(T &t); + template + void GetRow(T &row); + +private: + int index_{0}; + sqlite3 *db_{nullptr}; + sqlite3_stmt *stmt_{nullptr}; +}; + +template +int Connection::ExecuteInsertHelperHerlper(T t) +{ + return BindParameters(t) ? 0 : -1; +} + +template +void Connection::ExecuteInsertHelper(T &row, IndexSequence) +{ + std::initializer_list {(ExecuteInsertHelperHerlper(std::get(row)), 0)...}; +} + +template +void Connection::InsertRow(T &row) +{ + using TupleType = typename std::decay::type; + ExecuteInsertHelper(row, MakeIndexSequence::value>{}); +} + +template +int Connection::ExecuteQueryHelperHelper(T &t) +{ + GetColumn(t); + return 0; +} + +template +void Connection::ExecuteQueryHelper(T &row, IndexSequence) +{ + std::initializer_list {(ExecuteQueryHelperHelper(std::get(row)), 0)...}; +} + +template +void Connection::GetRow(T &row) +{ + using TupleType = typename std::decay::type; + ExecuteQueryHelper(row, MakeIndexSequence::value>{}); +} + +template +bool Connection::ExecuteInsert(const std::string &tableName, const std::vector> &data) +{ + uint32_t colNum = sizeof...(Args); + sqlite3_exec(db_, "BEGIN", nullptr, nullptr, nullptr); + if (!InsertCmd(tableName, colNum)) { + return false; + } + for (const auto &row : data) { + index_ = 0; + sqlite3_reset(stmt_); + InsertRow(row); + auto rc = sqlite3_step(stmt_); + if (rc != SQLITE_DONE) { + LOG(ERROR) << "ExecuteInsert failed: " << rc << ", msg: " << sqlite3_errmsg(db_) << ", insert failed"; + if (sqlite3_exec(db_, "ROLLBACK", nullptr, nullptr, nullptr) != SQLITE_OK) { + LOG(ERROR) << "ExecuteInsert failed: " << rc << ", rollback failed"; + } + return false; + } + } + sqlite3_exec(db_, "COMMIT", nullptr, nullptr, nullptr); + return true; +} + +template +bool Connection::ExecuteQuery(const std::string &sql, std::vector> &result) +{ + if (!QueryCmd(sql)) { + return false; + } + while(true) { + auto rc = sqlite3_step(stmt_); + if (rc != SQLITE_ROW) { + if (rc != SQLITE_DONE) { + LOG(ERROR) << "ExecuteQuery failed: " << rc << ", msg: " << sqlite3_errmsg(db_) << ", query failed"; + return false; + } + break; + } + index_ = -1; + std::tuple row; + GetRow(row); + result.emplace_back(row); + } + return true; +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu + +#endif // IPC_MONITOR_DB_CONNECTION_H diff --git a/msmonitor/plugin/ipc_monitor/db/DBConstant.h b/msmonitor/plugin/ipc_monitor/db/DBConstant.h new file mode 100644 index 0000000000000000000000000000000000000000..41220390eb17a2eb80b60a67b5648833d5c5c261 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DBConstant.h @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef IPC_MONITOR_DB_CONSTANT_H +#define IPC_MONITOR_DB_CONSTANT_H + +#include + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +const std::string SQL_TEXT_TYPE = "TEXT"; +const std::string SQL_INT_TYPE = "INTEGER"; +const std::string SQL_REAL_TYPE = "REAL"; +const std::string SQL_NUMERIC_TYPE = "NUMERIC"; + +const std::string TABLE_STRING_IDS = "STRING_IDS"; +const std::string TABLE_CANN_API = "CANN_API"; +const std::string TABLE_TASK = "TASK"; +const std::string TABLE_COMPUTE_TASK_INFO = "COMPUTE_TASK_INFO"; +const std::string TABLE_COMMUNICATION_OP = "COMMUNICATION_OP"; +const std::string TABLE_MSTX = "MSTX_EVENTS"; +const std::string TABLE_MSTX_EVENT_TYPE = "ENUM_MSTX_EVENT_TYPE"; +const std::string TABLE_HCCL_DATA_TYPE = "ENUM_HCCL_DATA_TYPE"; +const std::string TABLE_API_TYPE = "ENUM_API_TYPE"; +const std::string TABLE_HOST_INFO = "HOST_INFO"; +const std::string TABLE_RANK_DEVICE_MAP = "RANK_DEVICE_MAP"; +const std::string TABLE_META_DATA = "META_DATA"; + +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu + +#endif // IPC_MONITOR_DB_CONSTANT_H diff --git a/msmonitor/plugin/ipc_monitor/db/DBRunner.cpp b/msmonitor/plugin/ipc_monitor/db/DBRunner.cpp new file mode 100644 index 0000000000000000000000000000000000000000..171597615c2f3cc049defa64d8588c722c4c3561 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DBRunner.cpp @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "db/DBRunner.h" +#include + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +namespace { +std::string GetColumnsString(const std::vector &columns) +{ + std::vector columnStrings(columns.size()); + std::transform(columns.begin(), columns.end(), columnStrings.begin(), [](const TableColumn &column) { + return column.ToString(); + }); + return join(columnStrings, ","); +} +} + +bool DBRunner::CheckTableExists(const std::string &tableName) const +{ + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return false; + } + return conn->CheckTableExists(tableName); +} + +bool DBRunner::CreateTable(const std::string &tableName, const std::vector &columns) const +{ + if (tableName.empty()) { + LOG(ERROR) << "Create table failed, table name is empty"; + return false; + } + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return false; + } + LOG(INFO) << "Create table " << tableName; + std::string columnsString = GetColumnsString(columns); + std::string sql = "CREATE TABLE IF NOT EXISTS " + tableName + " (" + columnsString + ")"; + if (!conn->ExecuteCreateTable(sql)) { + LOG(ERROR) << "Create table " << tableName << " failed"; + return false; + } + LOG(INFO) << "Create table " << tableName << " success"; + return true; +} + +bool DBRunner::CreateIndex(const std::string &tableName, const std::string &indexName, + const std::vector &colNames) const +{ + if (tableName.empty() || indexName.empty() || colNames.empty()) { + LOG(ERROR) << "Create index failed, table name or index name or column name is empty"; + return false; + } + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return false; + } + LOG(INFO) << "Create index " << indexName << " on table " << tableName; + std::string valueStr = join(colNames, ","); + std::string sql = "CREATE INDEX IF NOT EXISTS " + indexName + " ON " + tableName + " (" + valueStr + ")"; + if (!conn->ExecuteCreateIndex(sql)) { + LOG(ERROR) << "Create index " << indexName << " on table " << tableName << " failed, sql: " << sql; + return false; + } + LOG(INFO) << "Create index " << indexName << " on table " << tableName << " success"; + return true; +} + +bool DBRunner::DropTable(const std::string &tableName) const +{ + if (tableName.empty()) { + LOG(ERROR) << "Drop table failed, table name is empty"; + return false; + } + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return false; + } + LOG(INFO) << "Drop table " << tableName; + std::string sql = "DROP TABLE " + tableName; + if (!conn->ExecuteDropTable(sql)) { + LOG(ERROR) << "Drop table " << tableName << " failed"; + return false; + } + LOG(INFO) << "Drop table " << tableName << " success"; + return true; +} + +bool DBRunner::DeleteData(const std::string &sql) const +{ + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return false; + } + LOG(INFO) << "Delete data, sql: " << sql; + if (!conn->ExecuteDelete(sql)) { + LOG(ERROR) << "Delete data failed, sql: " << sql; + return false; + } + LOG(INFO) << "Delete data success, sql: " << sql; + return true; +} + +bool DBRunner::UpdateData(const std::string &sql) const +{ + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return false; + } + LOG(INFO) << "Update data, sql: " << sql; + if (!conn->ExecuteUpdate(sql)) { + LOG(ERROR) << "Update data failed, sql: " << sql; + return false; + } + LOG(INFO) << "Update data success, sql: " << sql; + return true; +} + +std::vector DBRunner::GetTableColumns(const std::string &tableName) const +{ + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + return {}; + } + LOG(INFO) << "Get table columns, table name: " << tableName; + auto cols = conn->ExecuteGetTableColumns(tableName); + if (cols.empty()) { + LOG(ERROR) << "Get table columns failed, table name: " << tableName; + return cols; + } + LOG(INFO) << "Get table columns success, table name: " << tableName; + return cols; +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/db/DBRunner.h b/msmonitor/plugin/ipc_monitor/db/DBRunner.h new file mode 100644 index 0000000000000000000000000000000000000000..f9619ce48c587faa3b869e3fcf5fcd88de076ba1 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DBRunner.h @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef IPC_MONITOR_DB_RUNNER_H +#define IPC_MONITOR_DB_RUNNER_H +#include "db/Connection.h" +#include "utils.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +class DBRunner { +public: + explicit DBRunner(const std::string &dbPath): path_(dbPath) {}; + ~DBRunner() = default; + bool CheckTableExists(const std::string &tableName) const; + bool CreateTable(const std::string &tableName, const std::vector &cols) const; + bool CreateIndex(const std::string &tableName, const std::string &indexName, + const std::vector &colNames) const; + bool DropTable(const std::string &tableName) const; + template + bool InsertData(const std::string &tableName, const std::vector> &data) const; + bool DeleteData(const std::string &sql) const; + template + bool QueryData(const std::string &sql, std::vector> &result) const; + bool UpdateData(const std::string &sql) const; + std::vector GetTableColumns(const std::string &tableName) const; +private: + std::string path_; +}; + +template +bool DBRunner::InsertData(const std::string &tableName, const std::vector> &data) const +{ + if (tableName.empty()) { + LOG(ERROR) << "Table name is empty"; + return false; + } + LOG(INFO) << "Start insert data to " << tableName; + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + LOG(ERROR) << "Create connection for " << tableName << " failed"; + return false; + } + if (!conn->ExecuteInsert(tableName, data)) { + LOG(ERROR) << "Insert data to " << tableName << " failed"; + return false; + } + LOG(INFO) << "Insert data to " << tableName << " success"; + return true; +} + +template +bool DBRunner::QueryData(const std::string &sql, std::vector> &result) const +{ + LOG(INFO) << "Start query data"; + std::shared_ptr conn{nullptr}; + MakeSharedPtr(conn, path_); + if (conn == nullptr) { + LOG(ERROR) << "Create connection failed: " << sql; + return false; + } + if (!conn->ExecuteQuery(sql, result)) { + LOG(ERROR) << "Query data failed: " << sql; + return false; + } + LOG(INFO) << "Query data success: " << sql; + return true; +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu + +#endif // IPC_MONITOR_DB_RUNNER_H diff --git a/msmonitor/plugin/ipc_monitor/db/DataBase.cpp b/msmonitor/plugin/ipc_monitor/db/DataBase.cpp new file mode 100644 index 0000000000000000000000000000000000000000..f82c1d28f56cc758c4a21dd27aa0b01faef521b5 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DataBase.cpp @@ -0,0 +1,149 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "db/DataBase.h" +#include "db/DBConstant.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +namespace { +const TableColumns STRING_IDS = { + {"id", SQL_INT_TYPE}, + {"value", SQL_TEXT_TYPE} +}; + +const TableColumns ENUM_TABLE = { + {"id", SQL_INT_TYPE, true}, + {"name", SQL_TEXT_TYPE} +}; + +const TableColumns META_DATA = { + {"name", SQL_TEXT_TYPE}, + {"value", SQL_TEXT_TYPE} +}; + +const TableColumns HOST_INFO = { + {"hostUid", SQL_TEXT_TYPE}, + {"hostName", SQL_TEXT_TYPE} +}; + +const TableColumns RANK_DEVICE_MAP = { + {"rankId", SQL_INT_TYPE}, + {"deviceId", SQL_INT_TYPE} +}; + +const TableColumns CANN_API = { + {"startNs", SQL_INT_TYPE}, + {"endNs", SQL_INT_TYPE}, + {"type", SQL_INT_TYPE}, + {"globalTid", SQL_INT_TYPE}, + {"connectionId", SQL_INT_TYPE}, + {"name", SQL_INT_TYPE} +}; + +const TableColumns TASK = { + {"startNs", SQL_INT_TYPE}, + {"endNs", SQL_INT_TYPE}, + {"deviceId", SQL_INT_TYPE}, + {"connectionId", SQL_INT_TYPE}, + {"globalTaskId", SQL_INT_TYPE}, + {"globalPid", SQL_INT_TYPE}, + {"taskType", SQL_INT_TYPE}, + {"contextId", SQL_INT_TYPE}, + {"streamId", SQL_INT_TYPE}, + {"taskId", SQL_INT_TYPE}, + {"modelId", SQL_INT_TYPE} +}; + +const TableColumns COMPUTE_TASK_INFO = { + {"name", SQL_INT_TYPE}, + {"globalTaskId", SQL_INT_TYPE}, + {"blockDim", SQL_INT_TYPE}, + {"mixBlockDim", SQL_INT_TYPE}, + {"taskType", SQL_INT_TYPE}, + {"opType", SQL_INT_TYPE}, + {"inputFormats", SQL_INT_TYPE}, + {"inputDataTypes", SQL_INT_TYPE}, + {"inputShapes", SQL_INT_TYPE}, + {"outputFormats", SQL_INT_TYPE}, + {"outputDataTypes", SQL_INT_TYPE}, + {"outputShapes", SQL_INT_TYPE}, + {"attrInfo", SQL_INT_TYPE}, + {"opState", SQL_INT_TYPE}, + {"hf32Eligible", SQL_INT_TYPE} +}; + +const TableColumns COMMUNICATION_OP = { + {"opName", SQL_INT_TYPE}, + {"startNs", SQL_INT_TYPE}, + {"endNs", SQL_INT_TYPE}, + {"connectionId", SQL_INT_TYPE}, + {"groupName", SQL_INT_TYPE}, + {"opId", SQL_INT_TYPE}, + {"relay", SQL_INT_TYPE}, + {"retry", SQL_INT_TYPE}, + {"dataType", SQL_INT_TYPE}, + {"algType", SQL_INT_TYPE}, + {"count", SQL_NUMERIC_TYPE}, + {"opType", SQL_INT_TYPE} +}; + +const TableColumns MSTX = { + {"startNs", SQL_INT_TYPE}, + {"endNs", SQL_INT_TYPE}, + {"eventType", SQL_INT_TYPE}, + {"rangeId", SQL_INT_TYPE}, + {"category", SQL_INT_TYPE}, + {"meassge", SQL_INT_TYPE}, + {"globalTid", SQL_INT_TYPE}, + {"endGlobalTid", SQL_INT_TYPE}, + {"domainId", SQL_INT_TYPE}, + {"connectionId", SQL_INT_TYPE} +}; +} // namespace + +TableColumns Database::GetTableCols(const std::string &tableName) +{ + auto iter = tableColumns_.find(tableName); + if (iter == tableColumns_.end()) { + LOG(ERROR) << "Table " << tableName << " is not found"; + return {}; + } + return iter->second; +} + +MsMonitorDB::MsMonitorDB() +{ + dbName_ = "msmonitor.db"; + tableColumns_ = { + {TABLE_STRING_IDS, STRING_IDS}, + {TABLE_COMMUNICATION_OP, COMMUNICATION_OP}, + {TABLE_HCCL_DATA_TYPE, ENUM_TABLE}, + {TABLE_MSTX, MSTX}, + {TABLE_MSTX_EVENT_TYPE, ENUM_TABLE}, + {TABLE_API_TYPE, ENUM_TABLE}, + {TABLE_CANN_API, CANN_API}, + {TABLE_TASK, TASK}, + {TABLE_COMPUTE_TASK_INFO, COMPUTE_TASK_INFO}, + {TABLE_META_DATA, META_DATA}, + {TABLE_HOST_INFO, HOST_INFO}, + {TABLE_RANK_DEVICE_MAP, RANK_DEVICE_MAP} + }; +} +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/db/DataBase.h b/msmonitor/plugin/ipc_monitor/db/DataBase.h new file mode 100644 index 0000000000000000000000000000000000000000..851d67a19c3dc2c473c0056c25597e67b907024a --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/db/DataBase.h @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2025-2025. Huawei Technologies Co., Ltd. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef IPC_MONITOR_DB_BASE_H +#define IPC_MONITOR_DB_BASE_H + +#include +#include "db/Connection.h" + +namespace dynolog_npu { +namespace ipc_monitor { +namespace db { +using TableColumns = std::vector; + +class Database { +public: + Database() = default; + virtual ~Database() = default; + void SetDBName(std::string dbName) { dbName_ = std::move(dbName); } + std::string GetDBName() const { return dbName_; } + TableColumns GetTableCols(const std::string &tableName); +protected: + std::string dbName_; + std::unordered_map tableColumns_; +}; + +class MsMonitorDB : public Database { +public: + MsMonitorDB(); +}; +} // namespace db +} // namespace ipc_monitor +} // namespace dynolog_npu + +#endif // IPC_MONITOR_DB_BASE_H diff --git a/msmonitor/plugin/ipc_monitor/utils.cpp b/msmonitor/plugin/ipc_monitor/utils.cpp index d3b0ddd8926b54133dbb60b23b5e800a982eeecb..e7278fda9c6108f297f15f6dbb82f50a927901a3 100644 --- a/msmonitor/plugin/ipc_monitor/utils.cpp +++ b/msmonitor/plugin/ipc_monitor/utils.cpp @@ -49,8 +49,6 @@ std::string IntToHexStr(T number) } } // namespace -static std::string gParallelGroupInfo; - std::unordered_map submoduleMap = { {SubModule::IPC, "IPC"}, }; @@ -110,10 +108,9 @@ std::string formatErrorCode(SubModule submodule, ErrCode errorCode) int32_t GetProcessId() { - static thread_local int32_t pid = 0; - if (pid == 0) { - pid = static_cast(getpid()); - } + static int32_t pid = []() -> int32_t { + return static_cast(getpid()); + }(); return pid; } @@ -455,16 +452,6 @@ int GetRankId() return rankId; } -void SetParallelGroupInfo(std::string parallelGroupInfo) -{ - gParallelGroupInfo = std::move(parallelGroupInfo); -} - -std::string GetParallelGroupInfo() -{ - return gParallelGroupInfo; -} - uint64_t CalcHashId(const std::string &data) { static const uint32_t UINT32_BITS = 32; diff --git a/msmonitor/plugin/ipc_monitor/utils.h b/msmonitor/plugin/ipc_monitor/utils.h index d41b1b720b16ed6ecb4515a8761015d6e557a65e..8e699fb275e3e16668c5c95590cfa6d378eb725c 100644 --- a/msmonitor/plugin/ipc_monitor/utils.h +++ b/msmonitor/plugin/ipc_monitor/utils.h @@ -104,9 +104,7 @@ auto groupby(const Container& vec, KeyFunc keyFunc) } int GetRankId(); -void SetParallelGroupInfo(std::string parallelGroupInfo); uint64_t CalcHashId(const std::string &data); -std::string GetParallelGroupInfo(); std::string GetHostName(); std::string GetHostUid(); bool CreateMsmonitorLogPath(std::string& path);