From 6320635fbfd3681adafff59511c0e4b0d41d4847 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Tue, 17 Jun 2025 18:10:58 +0800 Subject: [PATCH 01/16] Ant msmonitor plugin dev --- msmonitor/plugin/CMakeLists.txt | 19 +++-- msmonitor/plugin/IPCMonitor/__init__.py | 16 ++++ .../IPCMonitor/dynamic_monitor_proxy.py | 39 ++++++++++ msmonitor/plugin/IPCMonitor/singleton.py | 25 +++++++ msmonitor/plugin/IPCMonitor/utils.py | 34 +++++++++ msmonitor/plugin/bindings.cpp | 23 ++++-- msmonitor/plugin/build.sh | 2 +- msmonitor/plugin/cmake/Findglog.cmake | 42 +++++++++++ msmonitor/plugin/cmake/Findnlohmannjson.cmake | 18 +++++ msmonitor/plugin/cmake/config.ini | 5 ++ msmonitor/plugin/cmake/download_opensource.sh | 73 +++++++++++++++++++ msmonitor/plugin/cmake/utils.cmake | 25 +++++++ .../ipc_monitor/PyDynamicMonitorProxy.h | 4 +- msmonitor/plugin/ipc_monitor/utils.cpp | 12 ++- msmonitor/plugin/ipc_monitor/utils.h | 1 + msmonitor/plugin/setup.py | 9 ++- 16 files changed, 322 insertions(+), 25 deletions(-) create mode 100644 msmonitor/plugin/IPCMonitor/__init__.py create mode 100644 msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py create mode 100644 msmonitor/plugin/IPCMonitor/singleton.py create mode 100644 msmonitor/plugin/IPCMonitor/utils.py create mode 100644 msmonitor/plugin/cmake/Findglog.cmake create mode 100644 msmonitor/plugin/cmake/Findnlohmannjson.cmake create mode 100644 msmonitor/plugin/cmake/config.ini create mode 100644 msmonitor/plugin/cmake/download_opensource.sh create mode 100644 msmonitor/plugin/cmake/utils.cmake diff --git a/msmonitor/plugin/CMakeLists.txt b/msmonitor/plugin/CMakeLists.txt index 9abfa9a951..502abf673a 100644 --- a/msmonitor/plugin/CMakeLists.txt +++ b/msmonitor/plugin/CMakeLists.txt @@ -10,14 +10,17 @@ set(CMAKE_CXX_EXTENSIONS OFF) find_package(pybind11 REQUIRED) find_package(Python REQUIRED COMPONENTS Interpreter Development) +set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake") +set(ENV{PROJECT_ROOT_PATH} "${CMAKE_SOURCE_DIR}") +include(utils) +find_package(glog MODULE REQUIRED) +find_package(nlohmannjson MODULE REQUIRED) + include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor/metric ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor/mspti_monitor ${CMAKE_CURRENT_SOURCE_DIR}/third_party/securec/include - ${DYNOLOG_PATH}/third_party/glog/src - ${DYNOLOG_PATH}/build/third_party/glog - ${DYNOLOG_PATH}/third_party/json/single_include ) file(GLOB_RECURSE IPC_SOURCES @@ -38,18 +41,18 @@ add_library(IPCMonitor MODULE ${SOURCES}) set_target_properties(IPCMonitor PROPERTIES - OUTPUT_NAME IPCMonitor + OUTPUT_NAME IPCMonitor_C + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_INSTALL_PREFIX}/IPCMonitor/lib64 PREFIX "" ) target_link_libraries(IPCMonitor PRIVATE pybind11::module pthread + ${glog_LIBRARIES} ${CMAKE_CURRENT_SOURCE_DIR}/stub/libmspti.so ) -target_link_libraries(IPCMonitor PRIVATE ${DYNOLOG_PATH}/build/third_party/glog/libglog.a) - target_compile_options(IPCMonitor PRIVATE -fPIC -fstack-protector-all @@ -62,7 +65,3 @@ target_link_options(IPCMonitor PRIVATE -Wl,-z,relro,-z,now,-z,noexecstack -s ) - -install(TARGETS IPCMonitor - DESTINATION ${CMAKE_INSTALL_PREFIX}/python-package -) diff --git a/msmonitor/plugin/IPCMonitor/__init__.py b/msmonitor/plugin/IPCMonitor/__init__.py new file mode 100644 index 0000000000..bd4cb27a88 --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .dynamic_monitor_proxy import PyDynamicMonitorProxy diff --git a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py new file mode 100644 index 0000000000..358f629366 --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py @@ -0,0 +1,39 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import os +import importlib +from .singleton import Singleton + +so_path = os.path.join(os.path.dirname(__file__), "lib64") +sys.path.append(os.path.realpath(so_path)) +ipcMonitor_C_module = importlib.import_module("IPCMonitor_C") + + +@Singleton +class PyDynamicMonitorProxy: + + def init_dyno(self, npuId: int): + return ipcMonitor_C_module.init_dyno(npuId) + + def poll_dyno(self): + return ipcMonitor_C_module.poll_dyno() + + def enable_dyno_npu_monitor(self, config_map: dict): + ipcMonitor_C_module.enable_dyno_npu_monitor(config_map) + + def finalize_dyno(self): + ipcMonitor_C_module.finalize_dyno() diff --git a/msmonitor/plugin/IPCMonitor/singleton.py b/msmonitor/plugin/IPCMonitor/singleton.py new file mode 100644 index 0000000000..6386c2ab66 --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/singleton.py @@ -0,0 +1,25 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Singleton(object): + def __init__(self, cls): + self._cls = cls + self._instance = {} + + def __call__(self): + if self._cls not in self._instance: + self._instance[self._cls] = self._cls() + return self._instance[self._cls] diff --git a/msmonitor/plugin/IPCMonitor/utils.py b/msmonitor/plugin/IPCMonitor/utils.py new file mode 100644 index 0000000000..9c61c70241 --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/utils.py @@ -0,0 +1,34 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import warnings +import torch + + +def get_rank_id() -> int: + """Get rank id.""" + try: + rank_id = os.environ.get('RANK') + if rank_id is None and torch.distributed.is_available() and torch.distributed.is_initialized(): + rank_id = torch.distributed.get_rank() + if rank_id is None: + rank_id = -1 + elif not isinstance(rank_id, int): + rank_id = int(rank_id) + except Exception as ex: + warnings.warn(f"Get rank id failed: {str(ex)}, rank_id will be set to -1 !") + rank_id = -1 + return rank_id diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index 626e72157e..e146ae232a 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -19,11 +19,18 @@ namespace py = pybind11; -PYBIND11_MODULE(IPCMonitor, m) { - py::class_(m, "PyDynamicMonitorProxy") - .def(py::init<>()) - .def("init_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::InitDyno, py::arg("npuId")) - .def("poll_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::PollDyno) - .def("enable_dyno_npu_monitor", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::EnableMsptiMonitor, py::arg("cfg_map")) - .def("finalize_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::FinalizeDyno); -} \ No newline at end of file + +PYBIND11_MODULE(IPCMonitor_C, m) { + m.def("init_dyno", [](int npuId) -> bool { + return dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->InitDyno(npuId); + }, py::arg("npuId")); + m.def("poll_dyno", []() -> std::string { + return dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->PollDyno(); + }); + m.def("enable_dyno_npu_monitor", [](std::unordered_map& config_map) -> void { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->EnableMsptiMonitor(config_map); + }, py::arg("config_map")); + m.def("finalize_dyno", []() -> void { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->FinalizeDyno(); + }); +} diff --git a/msmonitor/plugin/build.sh b/msmonitor/plugin/build.sh index ec20536715..939aaa2baf 100644 --- a/msmonitor/plugin/build.sh +++ b/msmonitor/plugin/build.sh @@ -21,4 +21,4 @@ fi # pip install whl echo "pip install ${files}" -pip install ${files} \ No newline at end of file +pip install ${files} diff --git a/msmonitor/plugin/cmake/Findglog.cmake b/msmonitor/plugin/cmake/Findglog.cmake new file mode 100644 index 0000000000..770cdbebc6 --- /dev/null +++ b/msmonitor/plugin/cmake/Findglog.cmake @@ -0,0 +1,42 @@ +set(PACKAGE_VERSION 0.6.0) + +set(PKG_NAME glog) +set(DOWNLOAD_PATH "$ENV{PROJECT_ROOT_PATH}/third_party") +set(GIT_TAG "v0.6.0") +set(DIR_NAME "${DOWNLOAD_PATH}/glog") + +if (NOT ${PKG_NAME}_FOUND) + +download_opensource_pkg(${PKG_NAME} + GIT_TAG ${GIT_TAG} + DOWNLOAD_PATH ${DOWNLOAD_PATH} +) + +execute_process( + WORKING_DIRECTORY ${DIR_NAME} + COMMAND cmake -S . -B build -G "Unix Makefiles" -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=${DIR_NAME}/install -DWITH_GFLAGS=OFF -DWITH_GTEST=OFF -DWITH_SYMBOLIZE=OFF -DCMAKE_POLICY_VERSION_MINIMUM=3.5 + RESULT_VARIABLE RESULT +) +if (NOT RESULT EQUAL 0) + message(FATAL_ERROR "Failed to build glog. ${RESULT}") +endif() + +execute_process( + WORKING_DIRECTORY ${DIR_NAME} + COMMAND cmake --build build --target install + RESULT_VARIABLE RESULT +) +if (NOT RESULT EQUAL 0) + message(FATAL_ERROR "Failed to build glog. ${RESULT}") +endif() + +file(GLOB GLOG_LIB "${DIR_NAME}/install/lib/libglog.a") +if (NOT GLOG_LIB) + message(FATAL_ERROR "Failed to build glog.") +endif() + +set(${PKG_NAME}_LIBRARIES ${GLOG_LIB}) +include_directories(${DIR_NAME}/install/include) +set(${PKG_NAME}_FOUND TRUE) + +endif() diff --git a/msmonitor/plugin/cmake/Findnlohmannjson.cmake b/msmonitor/plugin/cmake/Findnlohmannjson.cmake new file mode 100644 index 0000000000..a657cc3acc --- /dev/null +++ b/msmonitor/plugin/cmake/Findnlohmannjson.cmake @@ -0,0 +1,18 @@ +set(PACKAGE_VERSION 3.12.0) + +set(PKG_NAME nlohmannjson) +set(DOWNLOAD_PATH "$ENV{PROJECT_ROOT_PATH}/third_party") +set(GIT_TAG "v3.12.0") +set(DIR_NAME "${DOWNLOAD_PATH}/nlohmann-json") + +if (NOT ${PKG_NAME}_FOUND) + +download_opensource_pkg(${PKG_NAME} + GIT_TAG ${GIT_TAG} + DOWNLOAD_PATH ${DOWNLOAD_PATH} +) + +include_directories(${DIR_NAME}/include) +set(${PKG_NAME}_FOUND TRUE) + +endif() diff --git a/msmonitor/plugin/cmake/config.ini b/msmonitor/plugin/cmake/config.ini new file mode 100644 index 0000000000..5303523816 --- /dev/null +++ b/msmonitor/plugin/cmake/config.ini @@ -0,0 +1,5 @@ +[glog] +url = https://gitee.com/mirrors/glog.git + +[nlohmannjson] +url = https://gitee.com/mirrors/nlohmann-json.git \ No newline at end of file diff --git a/msmonitor/plugin/cmake/download_opensource.sh b/msmonitor/plugin/cmake/download_opensource.sh new file mode 100644 index 0000000000..d04e59b445 --- /dev/null +++ b/msmonitor/plugin/cmake/download_opensource.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +if [ "$#" -lt 2 ]; then + echo "Usage: $0 [ ]" + exit 1 +fi + +pkg_name=$1 +path=$2 + +if [ "$#" -ge 3 ]; then + tag=$3 +fi + +url=$(awk -F " = " '/\['${pkg_name}'\]/{a=1}a==1&&$1~/url/{print $2;exit}' config.ini) +lib_path=$MSTT_LIB_PATH +if [ -n "$lib_path" ]; then + url=${lib_path}$(echo $url | awk -F '/' -v OFS='/' '{print $5,$8}') +fi +if [[ ! $url = https* ]]; then + echo "The URL of $pkg_name is illegal." + exit 1 +fi + +echo "Start to download ${url}..." + +if [ ! -d "$path" ]; then + echo "The specified path does not exist: $path" + exit 1 +fi +cd ${path} + +extension=$(echo "${url}" | awk -F'[./]' '{print $NF}') +if [[ "${extension}" == "gz" || "${extension}" == "zip" ]]; then + fullname="${path}/$(basename "${url}")" + if [[ -e ${fullname} ]]; then + echo "Source ${fullname} is exists, will not download again." + else + curl -L "${url}" -o ${fullname} -k + if [ $? -eq 0 ]; then + echo "Download successful: ${url}" + else + echo "Download failed: ${url}" + exit 1 + fi + fi + + if [[ "${extension}" == "gz" ]]; then + tar -zxvf ${fullname} -C ./ -n > /dev/null + elif [[ "${extension}" == "zip" ]]; then + unzip -n ${fullname} -d ./ > /dev/null + fi +elif [[ "${extension}" == "git" ]]; then + repository="$(basename ${url} .git)" + if [[ -e ${repository} ]]; then + echo "Source ${repository} is exists, will not clone again." + else + if [[ -z "${tag}" ]]; then + git clone ${url} + else + git clone ${url} -b "${tag}" + fi + if [ $? -eq 0 ]; then + echo "Download successful: ${url}" + else + echo "Download failed: ${url}" + exit 1 + fi + fi +else + echo "Unknow url ${url}" + exit 1 +fi diff --git a/msmonitor/plugin/cmake/utils.cmake b/msmonitor/plugin/cmake/utils.cmake new file mode 100644 index 0000000000..3d815d2685 --- /dev/null +++ b/msmonitor/plugin/cmake/utils.cmake @@ -0,0 +1,25 @@ + +function(download_opensource_pkg pkg_name) + message("start to download ${pkg_name}...") + set(options) + set(oneValueArgs GIT_TAG DOWNLOAD_PATH DIR_NAME BUILD_CMD) + set(multiValueArgs PATCHES) + cmake_parse_arguments(PKG "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + if (NOT PKG_DOWNLOAD_PATH) + set(PKG_DOWNLOAD_PATH "${CMAKE_SOURCE_DIR}/../third_party") + endif() + file(MAKE_DIRECTORY ${PKG_DOWNLOAD_PATH}) + + execute_process( + WORKING_DIRECTORY $ENV{PROJECT_ROOT_PATH}/cmake + COMMAND bash download_opensource.sh ${pkg_name} ${PKG_DOWNLOAD_PATH} ${PKG_GIT_TAG} + RESULT_VARIABLE RESULT + ) + if (NOT RESULT EQUAL 0) + message(FATAL_ERROR "Failed to download ${pkg_name}(${RESULT}).") + endif() + if (PKG_BUILD_CMD) + execute_process(COMMAND bash -c "cd ${PKG_DOWNLOAD_PATH}/${DIR_NAME};${PKG_BUILD_CMD}") + endif() +endfunction() diff --git a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h index 03aa1d0810..4836b29301 100644 --- a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h +++ b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h @@ -23,7 +23,9 @@ namespace dynolog_npu { namespace ipc_monitor { -class PyDynamicMonitorProxy { +class PyDynamicMonitorProxy : public Singleton { + friend class Singleton; + public: PyDynamicMonitorProxy() = default; bool InitDyno(int npuId) diff --git a/msmonitor/plugin/ipc_monitor/utils.cpp b/msmonitor/plugin/ipc_monitor/utils.cpp index 23f749842a..772a1b5c6c 100644 --- a/msmonitor/plugin/ipc_monitor/utils.cpp +++ b/msmonitor/plugin/ipc_monitor/utils.cpp @@ -31,6 +31,7 @@ #include #include #include +#include namespace dynolog_npu { namespace ipc_monitor { @@ -414,7 +415,16 @@ bool PathUtils::DirPathCheck(const std::string& path) return true; } -bool CreateMsmonitorLogPath(std::string& path) +int GetRankId() +{ + static int rankId = []() -> int { + pybind11::gil_scoped_acquire gil; + return pybind11::module::import("IPCMonitor.utils").attr("get_rank_id")().cast(); + }(); + return rankId; +} + +bool CreateMsmonitorLogPath(std::string &path) { const char* logPathEnvVal = getenv("MSMONITOR_LOG_PATH"); std::string logPath; diff --git a/msmonitor/plugin/ipc_monitor/utils.h b/msmonitor/plugin/ipc_monitor/utils.h index da517ba9df..5d5375d304 100644 --- a/msmonitor/plugin/ipc_monitor/utils.h +++ b/msmonitor/plugin/ipc_monitor/utils.h @@ -87,6 +87,7 @@ auto groupby(const Container& vec, KeyFunc keyFunc) return grouped; } +int GetRankId(); bool CreateMsmonitorLogPath(std::string& path); struct PathUtils { diff --git a/msmonitor/plugin/setup.py b/msmonitor/plugin/setup.py index 2e257a48ad..91144e3a3c 100644 --- a/msmonitor/plugin/setup.py +++ b/msmonitor/plugin/setup.py @@ -18,7 +18,7 @@ import sys import subprocess import pybind11 -from setuptools import setup, Extension +from setuptools import setup, Extension, find_namespace_packages from setuptools.command.build_ext import build_ext @@ -43,7 +43,6 @@ class CMakeBuild(build_ext): '-DPYTHON_EXECUTABLE=' + sys.executable, '-DCMAKE_PREFIX_PATH=' + pybind11.get_cmake_dir(), '-DCMAKE_INSTALL_PREFIX=' + ext_dir, - '-DDYNOLOG_PATH=' + os.path.join(os.path.dirname(BASE_DIR), "third_party", "dynolog"), '-DCMAKE_BUILD_TYPE=' + cfg ] @@ -54,7 +53,7 @@ class CMakeBuild(build_ext): if not os.path.exists(self.build_temp): os.makedirs(self.build_temp) subprocess.check_call(['cmake', ext.sourcedir] + cmake_args, cwd=self.build_temp, env=env) - subprocess.check_call(['cmake', '--build', '.', '--target', 'install', '-j', '8'] + build_args, + subprocess.check_call(['cmake', '--build', '.', '-j', '8'] + build_args, cwd=self.build_temp) BASE_DIR = os.path.dirname(os.path.realpath(__file__)) @@ -62,7 +61,9 @@ BASE_DIR = os.path.dirname(os.path.realpath(__file__)) setup( name="msmonitor_plugin", version="0.1", - description="msMonitor plugins", + description="msMonitor plugin", + packages=find_namespace_packages(include=["IPCMonitor*"]), + include_package_data=True, ext_modules=[CMakeExtension('IPCMonitor')], cmdclass=dict(build_ext=CMakeBuild), install_requires=["pybind11"], -- Gitee From eb07a31331f85ca9e10fd3a290869963c02cd756 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Thu, 12 Jun 2025 14:39:18 +0800 Subject: [PATCH 02/16] Ant dyno start_now option --- .../dynolog_npu/cli/src/commands/nputrace.rs | 9 ++++-- msmonitor/dynolog_npu/cli/src/main.rs | 31 +++++++++++-------- msmonitor/plugin/cmake/Findglog.cmake | 4 +-- 3 files changed, 27 insertions(+), 17 deletions(-) diff --git a/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs b/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs index ef84b3038d..a9fc563963 100644 --- a/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs +++ b/msmonitor/dynolog_npu/cli/src/commands/nputrace.rs @@ -11,6 +11,7 @@ pub enum NpuTraceTriggerConfig { }, IterationBased { start_step: u64, + start: bool, iterations: i64, }, } @@ -27,12 +28,14 @@ impl NpuTraceTriggerConfig { ), NpuTraceTriggerConfig::IterationBased { start_step, + start, iterations, } => format!( r#"PROFILE_START_ITERATION=0 PROFILE_START_STEP={} +PROFILE_START={} ACTIVITIES_ITERATIONS={}"#, - start_step, iterations + start_step, start, iterations ), } } @@ -196,14 +199,16 @@ mod test { ACTIVITIES_DURATION_MSECS=1000"# ); - let trigger_config = NpuTraceTriggerConfig::IterationBased { + let trigger_config = NpuTraceTriggerConfig::IterationBased { profile_start_step: 1000, + profile_start: false, iterations: 1000, }; assert_eq!( trigger_config.config(), r#"PROFILE_START_ITERATION=0 PROFILE_START_STEP=1000 +PROFILE_START=false ACTIVITIES_ITERATIONS=1000"# ); } diff --git a/msmonitor/dynolog_npu/cli/src/main.rs b/msmonitor/dynolog_npu/cli/src/main.rs index 2bd85a7963..c43dcd798f 100644 --- a/msmonitor/dynolog_npu/cli/src/main.rs +++ b/msmonitor/dynolog_npu/cli/src/main.rs @@ -78,7 +78,7 @@ fn parse_mspti_activity_kinds(src: &str) -> Result{ return Err(format!("Invalid MSPTI activity kind: {}, Possible values: {:?}.]", kind, allowed_values)); } } - + Ok(src.to_string()) } @@ -175,6 +175,9 @@ enum Command { /// Number of steps to start profile. #[clap(long, default_value_t = 0)] start_step: u64, + /// Whether to start profile now. + #[clap(long, action)] + start: bool, /// Max number of processes to profile. #[clap(long, default_value_t = 3)] process_limit: u32, @@ -285,12 +288,12 @@ fn verify_certificate(cert_der: &[u8], is_root_cert: bool) -> Result<()> { // 检查证书签名算法 let sig_alg = cert.signature_algorithm.algorithm; - + // 定义不安全的算法 OID let md2_rsa = oid!(1.2.840.113549.1.1.2); // MD2 with RSA let md5_rsa = oid!(1.2.840.113549.1.1.4); // MD5 with RSA let sha1_rsa = oid!(1.2.840.113549.1.1.5); // SHA1 with RSA - + // 检查是否使用不安全的算法 if sig_alg == md2_rsa || sig_alg == md5_rsa || sig_alg == sha1_rsa { return Err(io::Error::new( @@ -428,7 +431,7 @@ fn is_cert_revoked(cert_der: &[u8], crl_path: &PathBuf) -> Result { let crl_data = read_to_string(crl_path)?; let (_, pem) = pem::parse_x509_pem(crl_data.as_bytes()) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("Failed to parse CRL PEM: {:?}", e)))?; - + // 解析 CRL let (_, crl) = CertificateRevocationList::from_der(&pem.contents) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("Failed to parse CRL: {:?}", e)))?; @@ -472,7 +475,7 @@ fn is_cert_revoked(cert_der: &[u8], crl_path: &PathBuf) -> Result { for revoked in crl.iter_revoked_certificates() { let revoked_serial = revoked.user_certificate.to_bigint() .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidData, "Failed to convert revoked certificate serial to BigInt"))?; - + if revoked_serial == cert_serial { return Ok(true); } @@ -486,7 +489,7 @@ enum DynoClient { } fn create_dyno_client( - host: &str, + host: &str, port: u16, certs_dir: &str, ) -> Result { @@ -507,7 +510,7 @@ fn create_dyno_client( } fn create_dyno_client_with_no_certs( - host: &str, + host: &str, port: u16, ) -> Result { let addr = (host, port) @@ -519,7 +522,7 @@ fn create_dyno_client_with_no_certs( } fn create_dyno_client_with_certs( - host: &str, + host: &str, port: u16, config: &ClientConfigPath, ) -> Result> { @@ -549,7 +552,7 @@ fn create_dyno_client_with_certs( let cert_file = File::open(&config.cert_path)?; let mut cert_reader = BufReader::new(cert_file); let certs = rustls_pemfile::certs(&mut cert_reader)?; - + // 检查客户端证书的基本要求 for cert in &certs { verify_certificate(cert, false)?; // 验证客户端证书 @@ -587,7 +590,7 @@ fn create_dyno_client_with_certs( println!("Loading client key from: {}", config.key_path.display()); let key_file = File::open(&config.key_path)?; let mut key_reader = BufReader::new(key_file); - + // 检查私钥是否加密 let mut key_data = Vec::new(); key_reader.read_to_end(&mut key_data)?; @@ -600,10 +603,10 @@ fn create_dyno_client_with_certs( let mut password = prompt_password("Please enter the certificate password: ")?; let pkey = PKey::private_key_from_pem_passphrase(&key_data, password.as_bytes()) .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("Failed to decrypt private key: {}", e)))?; - + // 清除密码 password.clear(); - + // 返回私钥 vec![pkey.private_key_to_der() .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("Failed to convert private key to DER: {}", e)))?] @@ -612,7 +615,7 @@ fn create_dyno_client_with_certs( let mut key_reader = BufReader::new(File::open(&config.key_path)?); rustls_pemfile::pkcs8_private_keys(&mut key_reader)? }; - + if keys.is_empty() { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -703,6 +706,7 @@ fn main() -> Result<()> { iterations, profile_start_time, start_step, + start, process_limit, record_shapes, profile_memory, @@ -728,6 +732,7 @@ fn main() -> Result<()> { let trigger_config = if iterations > 0 { NpuTraceTriggerConfig::IterationBased { start_step, + start, iterations, } } else { diff --git a/msmonitor/plugin/cmake/Findglog.cmake b/msmonitor/plugin/cmake/Findglog.cmake index 770cdbebc6..bbebee6ee2 100644 --- a/msmonitor/plugin/cmake/Findglog.cmake +++ b/msmonitor/plugin/cmake/Findglog.cmake @@ -14,7 +14,7 @@ download_opensource_pkg(${PKG_NAME} execute_process( WORKING_DIRECTORY ${DIR_NAME} - COMMAND cmake -S . -B build -G "Unix Makefiles" -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=${DIR_NAME}/install -DWITH_GFLAGS=OFF -DWITH_GTEST=OFF -DWITH_SYMBOLIZE=OFF -DCMAKE_POLICY_VERSION_MINIMUM=3.5 + COMMAND cmake -S . -B build -G "Unix Makefiles" -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=${DIR_NAME}/install -DCMAKE_INSTALL_LIBDIR=${DIR_NAME}/install/lib64 -DWITH_GFLAGS=OFF -DWITH_GTEST=OFF -DWITH_SYMBOLIZE=OFF -DCMAKE_POLICY_VERSION_MINIMUM=3.5 RESULT_VARIABLE RESULT ) if (NOT RESULT EQUAL 0) @@ -30,7 +30,7 @@ if (NOT RESULT EQUAL 0) message(FATAL_ERROR "Failed to build glog. ${RESULT}") endif() -file(GLOB GLOG_LIB "${DIR_NAME}/install/lib/libglog.a") +file(GLOB GLOG_LIB "${DIR_NAME}/install/lib64/libglog.a") if (NOT GLOG_LIB) message(FATAL_ERROR "Failed to build glog.") endif() -- Gitee From 374b293580255a4f6bc667f17f3cd978a4088946 Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Fri, 20 Jun 2025 16:29:22 +0800 Subject: [PATCH 03/16] mspti monitor support stream output to file --- .../IPCMonitor/dynamic_monitor_proxy.py | 6 + msmonitor/plugin/bindings.cpp | 9 +- .../plugin/ipc_monitor/DynoLogNpuMonitor.h | 4 +- .../ipc_monitor/PyDynamicMonitorProxy.h | 13 + .../mspti_monitor/MsptiMonitor.cpp | 22 +- .../ipc_monitor/mspti_monitor/MsptiMonitor.h | 4 + .../mspti_monitor/StreamOutput.cpp | 224 ++++++++++++++++++ .../ipc_monitor/mspti_monitor/StreamOutput.h | 42 ++++ msmonitor/plugin/ipc_monitor/utils.cpp | 26 +- 9 files changed, 339 insertions(+), 11 deletions(-) create mode 100644 msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp create mode 100644 msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h diff --git a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py index 358f629366..97ea71121e 100644 --- a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py +++ b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py @@ -37,3 +37,9 @@ class PyDynamicMonitorProxy: def finalize_dyno(self): ipcMonitor_C_module.finalize_dyno() + + def start_mspti_monitor(self, flush_interval=30): + ipcMonitor_C_module.start_mspti_monitor(flush_interval) + + def stop_mspti_monitor(self): + ipcMonitor_C_module.stop_mspti_monitor() diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index e146ae232a..3ac6ab522c 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -20,6 +20,7 @@ namespace py = pybind11; + PYBIND11_MODULE(IPCMonitor_C, m) { m.def("init_dyno", [](int npuId) -> bool { return dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->InitDyno(npuId); @@ -33,4 +34,10 @@ PYBIND11_MODULE(IPCMonitor_C, m) { m.def("finalize_dyno", []() -> void { dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->FinalizeDyno(); }); -} + m.def("start_mspti_monitor", [](int flush_interval) { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->StartMsptiMonitor(flush_interval); + }); + m.def("stop_mspti_monitor", []() -> void { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->StopMsptiMonitor(); + }); +} \ No newline at end of file diff --git a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h index 5ffec3bd96..5a4cfcbf50 100644 --- a/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h +++ b/msmonitor/plugin/ipc_monitor/DynoLogNpuMonitor.h @@ -44,15 +44,15 @@ public: { return &ipcClient_; } + MsptiMonitor msptiMonitor_; private: bool isInitialized_ = false; int32_t npuId_ = 0; IpcClient ipcClient_; - MsptiMonitor msptiMonitor_; }; } // namespace ipc_monitor } // namespace dynolog_npu -#endif // DYNOLOG_NPU_MONITOR_H +#endif // DYNOLOG_NPU_MONITOR_H \ No newline at end of file diff --git a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h index 4836b29301..cffd8aabe4 100644 --- a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h +++ b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h @@ -55,6 +55,19 @@ public: { DynoLogNpuMonitor::GetInstance()->Finalize(); } + + void StartMsptiMonitor(int flush_interval = 30) { + auto monitor = DynoLogNpuMonitor::GetInstance(); + monitor->msptiMonitor_.EnableActivity(MSPTI_ACTIVITY_KIND_KERNEL); + monitor->msptiMonitor_.SetFlushInterval(flush_interval); + monitor->msptiMonitor_.Start(); + } + + void StopMsptiMonitor() { + auto monitor = DynoLogNpuMonitor::GetInstance(); + monitor->msptiMonitor_.Stop(); + } + private: MonitorBase *monitor_ = nullptr; }; diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp index 33abb8fe38..e95194eee3 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.cpp @@ -19,6 +19,8 @@ #include #include #include +#include +#include #include "DynoLogNpuMonitor.h" #include "MetricManager.h" @@ -55,7 +57,8 @@ void MsptiMonitor::Start() return; } start_.store(true); - metric::MetricManager::GetInstance()->Run(); + // metric::MetricManager::GetInstance()->Run(); + streamOutput.InitStreamOutput(); LOG(INFO) << "MsptiMonitor start successfully"; } @@ -69,6 +72,7 @@ void MsptiMonitor::Stop() if (msptiActivityFlushAll(1) != MSPTI_SUCCESS) { LOG(WARNING) << "MsptiMonitor stop msptiActivityFlushAll failed"; } + streamOutput.CloseStreamOutput(); LOG(INFO) << "MsptiMonitor stop successfully"; } @@ -77,7 +81,7 @@ void MsptiMonitor::Uninit() if (!start_.load()) { return; } - metric::MetricManager::GetInstance()->Stop(); + //metric::MetricManager::GetInstance()->Stop(); start_.store(false); cv_.notify_one(); Thread::Stop(); @@ -92,7 +96,7 @@ void MsptiMonitor::EnableActivity(msptiActivityKind kind) } else { LOG(ERROR) << "MsptiMonitor enableActivity failed, kind: " << static_cast(kind); } - metric::MetricManager::GetInstance()->EnableKindSwitch_(kind, true); + // metric::MetricManager::GetInstance()->EnableKindSwitch_(kind, true); } } @@ -105,7 +109,7 @@ void MsptiMonitor::DisableActivity(msptiActivityKind kind) } else { LOG(ERROR) << "MsptiMonitor disableActivity failed, kind: " << static_cast(kind); } - metric::MetricManager::GetInstance()->EnableKindSwitch_(kind, false); + // metric::MetricManager::GetInstance()->EnableKindSwitch_(kind, false); } } @@ -116,7 +120,7 @@ void MsptiMonitor::SetFlushInterval(uint32_t interval) if (start_.load()) { cv_.notify_one(); } - metric::MetricManager::GetInstance()->SetReportInterval(interval); + // metric::MetricManager::GetInstance()->SetReportInterval(interval); } bool MsptiMonitor::IsStarted() @@ -175,6 +179,7 @@ void MsptiMonitor::Run() } std::atomic MsptiMonitor::allocCnt{0}; +StreamOutput MsptiMonitor::streamOutput{}; void MsptiMonitor::BufferRequest(uint8_t **buffer, size_t *size, size_t *maxNumRecords) { @@ -189,6 +194,7 @@ void MsptiMonitor::BufferRequest(uint8_t **buffer, size_t *size, size_t *maxNumR return; } uint8_t *pBuffer = ReinterpretConvert(MsptiMalloc(DEFAULT_BUFFER_SIZE, ALIGN_SIZE)); + if (pBuffer == nullptr) { *buffer = nullptr; *size = 0; @@ -227,7 +233,11 @@ void MsptiMonitor::BufferConsume(msptiActivity *record) if (record == nullptr) { return; } - metric::MetricManager::GetInstance()->ConsumeMsptiData(record); + if (streamOutput.isStreamOutputMode()) { + streamOutput.StreamOutputRecord(record); + } + // metric::MetricManager::GetInstance()->ConsumeMsptiData(record); } + } // namespace ipc_monitor } // namespace dynolog_npu diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h index d1b73e581e..f9dbfa5fa2 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/MsptiMonitor.h @@ -20,8 +20,11 @@ #include #include #include +#include +#include #include "mspti.h" #include "thread.h" +#include "StreamOutput.h" namespace dynolog_npu { @@ -44,6 +47,7 @@ private: static void BufferComplete(uint8_t *buffer, size_t size, size_t validSize); static void BufferConsume(msptiActivity *record); static std::atomic allocCnt; + static StreamOutput streamOutput; private: void Run() override; diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp new file mode 100644 index 0000000000..2555c9043e --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -0,0 +1,224 @@ +#include "StreamOutput.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include "mspti.h" + +namespace dynolog_npu { +namespace ipc_monitor { + +void StreamOutput::InitStreamOutput() { + const char* outputPathEnv = std::getenv("MSMONITOR_OUTPUT_PATH"); + if (!outputPathEnv || std::strlen(outputPathEnv) == 0) { + streamOutputMode_.store(false); + LOG(ERROR) << "Not found valid environment variable MSMONITOR_OUTPUT_PATH, " + << "Msmonitor output mode stays off. "; + return; + } + + std::string newDir = outputPathEnv; + if (!PathUtils::DirPathCheck(newDir)) { + LOG(ERROR) << "Invalid MSMONITOR_OUTPUT_PATH: " << newDir; + streamOutputMode_.store(false); + return; + } + + const char* master_ip_env = std::getenv("MASTER_IP"); + if (master_ip_env) { + master_ip_ = master_ip_env; + } else { + master_ip_ = ""; + LOG(WARNING) << "MASTER_IP environment variable not set, " + << "will use empty string for master_ip"; + } + + streamOutputDir_ = std::move(newDir); + streamOutputMode_.store(true); + streamOutputFileCreated_ = false; + streamOutputFileFd_ = -1; + kernelNameIndex_.clear(); + nextIndex_ = 0; +} + +void StreamOutput::CloseStreamOutput() { + std::lock_guard lock(streamOutputMtx_); + if (streamOutputFileCreated_) { + // 先刷新缓冲区 + if (streamOutputFile_.is_open()) { + streamOutputFile_.flush(); + } + + // 解锁文件 + if (streamOutputFileFd_ != -1) { + if (::flock(streamOutputFileFd_, LOCK_UN) == -1) { + LOG(ERROR) << "Failed to unlock file: " << strerror(errno); + } + } + + // 关闭文件流 + if (streamOutputFile_.is_open()) { + streamOutputFile_.close(); + } + + // 关闭文件描述符 + if (streamOutputFileFd_ != -1) { + if (::close(streamOutputFileFd_) == -1) { + LOG(ERROR) << "Failed to close file: " << strerror(errno); + } + streamOutputFileFd_ = -1; + } + + streamOutputFileCreated_ = false; + } +} + +std::string StreamOutput::GetRecordName(msptiActivity *record) { + if (!record) return ""; + switch (record->kind) { + case MSPTI_ACTIVITY_KIND_KERNEL: { + auto* kernel = reinterpret_cast(record); + if (!kernel->name) return ""; + + std::string name = kernel->name; + if (name.find("MoeDistributeCombine") != 0 && + name.find("MoeDistributeDispatch") != 0) { + return ""; + } + // if (name.find("Concat") != 0) return ""; + + size_t pos = name.find('_'); + if (pos != std::string::npos) { + return name.substr(0, pos); + } + return name; + } + default: + return ""; + } +} + +void StreamOutput::StreamOutputRecord(msptiActivity *record) { + if (!streamOutputMode_.load() || !record) { + return; + } + + std::string kernelName = GetRecordName(record); + if (kernelName.empty()) return; + + std::lock_guard lock(streamOutputMtx_); + + if (!streamOutputMode_.load()) return; + + if (!streamOutputFileCreated_) { + int pid = GetProcessId(); + int rankId = GetRankId(); + + char hostname[256] = {0}; + if (gethostname(hostname, sizeof(hostname)) != 0) { + strncpy(hostname, "unknown_host", sizeof(hostname)-1); + hostname[sizeof(hostname)-1] = '\0'; + LOG(ERROR) << "Failed to get hostname, using default"; + } + + char fileName[256] = {0}; + snprintf(fileName, sizeof(fileName), "%s_%d_%d.jsonl", + hostname, rankId, pid); + + std::string filePath = streamOutputDir_ + "/" + fileName; + + // 打开文件描述符 + int fd = ::open(filePath.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644); + if (fd == -1) { + LOG(ERROR) << "Failed to open file: " << filePath + << ", error: " << strerror(errno); + streamOutputMode_.store(false); + return; + } + + // 设置文件锁 + if (::flock(fd, LOCK_EX | LOCK_NB) == -1) { + LOG(ERROR) << "Failed to lock file: " << filePath + << ", error: " << strerror(errno); + ::close(fd); + streamOutputMode_.store(false); + return; + } + + // 将文件描述符关联到文件流 + streamOutputFile_.open(filePath, std::ios::out | std::ios::app); + if (!streamOutputFile_.is_open()) { + LOG(ERROR) << "Failed to open file stream: " << filePath; + ::flock(fd, LOCK_UN); + ::close(fd); + streamOutputMode_.store(false); + return; + } + + streamOutputFileFd_ = fd; + streamOutputFilePath_ = filePath; + streamOutputFileCreated_ = true; + } + // 安全构建 JSON 对象 + nlohmann::json j; + switch (record->kind) { + case MSPTI_ACTIVITY_KIND_KERNEL: { + auto kernel = reinterpret_cast(record); + j["kind"] = "COMM"; + j["start"] = static_cast(kernel->start); + j["end"] = static_cast(kernel->end); + j["name"] = kernelName; + + uint32_t index = 0; + auto it = kernelNameIndex_.find(kernelName); + if (it == kernelNameIndex_.end()) { + // 新的 kernel name,分配新索引 + index = nextIndex_; + kernelNameIndex_[kernelName] = nextIndex_; + nextIndex_++; + } else { + // 已知的 kernel name,使用已有索引 + index = it->second; + } + j["index"] = index; + j["master_ip"] = master_ip_; + + break; + } + default: { + break; + } + } + + try { + streamOutputFile_ << j.dump() << std::endl; + } catch (const std::exception& e) { + LOG(ERROR) << "JSON dump failed: " << e.what(); + } + + // 错误处理 + if (streamOutputFile_.fail()) { + LOG(ERROR) << "Write failed, disabling stream output"; + + // 清理资源 + if (streamOutputFileFd_ != -1) { + ::flock(streamOutputFileFd_, LOCK_UN); + ::close(streamOutputFileFd_); + streamOutputFileFd_ = -1; + } + + if (streamOutputFile_.is_open()) { + streamOutputFile_.close(); + } + + streamOutputMode_.store(false); + streamOutputFileCreated_ = false; + } +} +} +} diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h new file mode 100644 index 0000000000..e4d6815468 --- /dev/null +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -0,0 +1,42 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "mspti.h" +#include "thread.h" + +namespace dynolog_npu { +namespace ipc_monitor { +class StreamOutput { +public: + void InitStreamOutput(); + void CloseStreamOutput(); + void StreamOutputRecord(msptiActivity *record); + + bool isStreamOutputMode() const { + return streamOutputMode_.load(); + } + +private: + std::string GetRecordName(msptiActivity *record); + +private: + std::atomic streamOutputMode_{false}; + bool streamOutputFileCreated_ = false; + std::string streamOutputDir_; + // std::string kernelPatternStr_; + // std::regex kernelPattern_; + std::mutex streamOutputMtx_; + std::ofstream streamOutputFile_; + std::string streamOutputFilePath_; + int streamOutputFileFd_ = -1; + std::unordered_map kernelNameIndex_; + uint32_t nextIndex_ = 0; + std::string master_ip_; +}; +} +} diff --git a/msmonitor/plugin/ipc_monitor/utils.cpp b/msmonitor/plugin/ipc_monitor/utils.cpp index 772a1b5c6c..f8aebb7b2c 100644 --- a/msmonitor/plugin/ipc_monitor/utils.cpp +++ b/msmonitor/plugin/ipc_monitor/utils.cpp @@ -418,9 +418,31 @@ bool PathUtils::DirPathCheck(const std::string& path) int GetRankId() { static int rankId = []() -> int { - pybind11::gil_scoped_acquire gil; - return pybind11::module::import("IPCMonitor.utils").attr("get_rank_id")().cast(); + try { + pybind11::gil_scoped_acquire gil; + return pybind11::module::import("IPCMonitor.utils").attr("get_rank_id")().cast(); + } + catch (const pybind11::error_already_set& e) { + LOG(ERROR) << "Python get rank id failed: "; + PyErr_Print(); + + LOG(ERROR) << "C++ catched exception: " << e.what() << "\n"; + + const int DEFAULT_RANK = -1; + LOG(ERROR) << "Default to rank ID: " << DEFAULT_RANK << std::endl; + + return DEFAULT_RANK; + } + catch (const std::exception& e) { + LOG(ERROR) << e.what() << "\n"; + return -1; + } + catch (...) { + LOG(ERROR) << "Unknown Error!" << "\n"; + return -1; + } }(); + return rankId; } -- Gitee From caed7990e6072933bd75c02c0e74fe6ebc43b9ea Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Mon, 23 Jun 2025 09:23:44 +0800 Subject: [PATCH 04/16] add debug logging --- .../plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 2555c9043e..0bbf7cb51e 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -86,6 +86,7 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { if (!kernel->name) return ""; std::string name = kernel->name; + LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; if (name.find("MoeDistributeCombine") != 0 && name.find("MoeDistributeDispatch") != 0) { return ""; @@ -107,15 +108,16 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { if (!streamOutputMode_.load() || !record) { return; } - + LOG(ERROR) << "[DEBUG-1-1][StreamOutput::StreamOutputRecord] Before GetRecordName"; std::string kernelName = GetRecordName(record); if (kernelName.empty()) return; - + LOG(ERROR) << "[DEBUG][StreamOutput::StreamOutputRecord] After GetRecordName"; std::lock_guard lock(streamOutputMtx_); if (!streamOutputMode_.load()) return; if (!streamOutputFileCreated_) { + LOG(ERROR) << "[DEBUG-1-2][StreamOutput::StreamOutputRecord] Init output file"; int pid = GetProcessId(); int rankId = GetRankId(); @@ -163,8 +165,10 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { streamOutputFileFd_ = fd; streamOutputFilePath_ = filePath; streamOutputFileCreated_ = true; + LOG(ERROR) << "[DEBUG-1-3][StreamOutput::StreamOutputRecord] output file created"; } // 安全构建 JSON 对象 + LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] Before writting into file"; nlohmann::json j; switch (record->kind) { case MSPTI_ACTIVITY_KIND_KERNEL: { @@ -200,6 +204,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { } catch (const std::exception& e) { LOG(ERROR) << "JSON dump failed: " << e.what(); } + LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] After writting into file"; // 错误处理 if (streamOutputFile_.fail()) { -- Gitee From e4f837f196da8c788ac936fba18f4e8e2d79eb45 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Mon, 23 Jun 2025 15:02:13 +0800 Subject: [PATCH 05/16] msmonitor plugin cmake fix --- msmonitor/plugin/CMakeLists.txt | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/msmonitor/plugin/CMakeLists.txt b/msmonitor/plugin/CMakeLists.txt index 502abf673a..8295b38c41 100644 --- a/msmonitor/plugin/CMakeLists.txt +++ b/msmonitor/plugin/CMakeLists.txt @@ -8,7 +8,6 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) find_package(pybind11 REQUIRED) -find_package(Python REQUIRED COMPONENTS Interpreter Development) set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake") set(ENV{PROJECT_ROOT_PATH} "${CMAKE_SOURCE_DIR}") @@ -57,11 +56,16 @@ target_compile_options(IPCMonitor PRIVATE -fPIC -fstack-protector-all -ftrapv - $<$>:-O2> ) -add_compile_options(-D_FORITFY_SOURCE=2 -O2) target_link_options(IPCMonitor PRIVATE -Wl,-z,relro,-z,now,-z,noexecstack -s ) + +if(${CMAKE_BUILD_TYPE} STREQUAL "Debug") + add_compile_options(-O0 -g) + add_link_options(-O0 -g) +else() + add_compile_options(-D_FORITFY_SOURCE=2 -O2) +endif() -- Gitee From 4bc1e450940fc474d357927f21ff9c96f9ab30e9 Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Tue, 24 Jun 2025 16:24:14 +0800 Subject: [PATCH 06/16] groupedmatmul test --- .../mspti_monitor/StreamOutput.cpp | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 0bbf7cb51e..258e3eca00 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -87,17 +87,18 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { std::string name = kernel->name; LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; - if (name.find("MoeDistributeCombine") != 0 && - name.find("MoeDistributeDispatch") != 0) { - return ""; + if (name.find("groupedmatmul") != std::string::npos || name.find("GroupedMatmul") != std::string::npos) { + return "GroupedMatmul"; } - // if (name.find("Concat") != 0) return ""; - - size_t pos = name.find('_'); - if (pos != std::string::npos) { - return name.substr(0, pos); - } - return name; + // if (name.find("MoeDistributeCombine") != 0 && + // name.find("MoeDistributeDispatch") != 0) { + // return ""; + // } + // size_t pos = name.find('_'); + // if (pos != std::string::npos) { + // return name.substr(0, pos); + // } + // return name; } default: return ""; @@ -173,7 +174,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { switch (record->kind) { case MSPTI_ACTIVITY_KIND_KERNEL: { auto kernel = reinterpret_cast(record); - j["kind"] = "COMM"; + j["kind"] = "kernel"; j["start"] = static_cast(kernel->start); j["end"] = static_cast(kernel->end); j["name"] = kernelName; @@ -190,7 +191,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { index = it->second; } j["index"] = index; - j["master_ip"] = master_ip_; + // j["master_ip"] = master_ip_; break; } -- Gitee From e78e886a815bd8ceb1b9302d9bdd3d8aa57dcde0 Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Wed, 25 Jun 2025 12:27:49 +0800 Subject: [PATCH 07/16] groupmatmul replace moedistributedispatch&combine --- .../mspti_monitor/StreamOutput.cpp | 46 ++++++++++++++----- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 258e3eca00..11412dc0c4 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -86,9 +86,32 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { if (!kernel->name) return ""; std::string name = kernel->name; - LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; + // LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; if (name.find("groupedmatmul") != std::string::npos || name.find("GroupedMatmul") != std::string::npos) { - return "GroupedMatmul"; + // 根据kernelNameIndex_中的数量决定返回哪个名称 + uint32_t dispatchCount = 0; + uint32_t combineCount = 0; + + auto dispatchIt = kernelNameIndex_.find("MoeDistributeDispatch"); + if (dispatchIt != kernelNameIndex_.end()) { + dispatchCount = dispatchIt->second + 1; // 因为索引从0开始,所以+1 + } + + auto combineIt = kernelNameIndex_.find("MoeDistributeCombine"); + if (combineIt != kernelNameIndex_.end()) { + combineCount = combineIt->second + 1; // 因为索引从0开始,所以+1 + } + + // 如果两个都没有,或者数量一样,返回MoeDistributeDispatch + if (dispatchCount == 0 && combineCount == 0) { + return "MoeDistributeDispatch"; + } else if (dispatchCount == combineCount) { + return "MoeDistributeDispatch"; + } else if (dispatchCount > combineCount) { + return "MoeDistributeCombine"; + } else { + return "MoeDistributeDispatch"; + } } // if (name.find("MoeDistributeCombine") != 0 && // name.find("MoeDistributeDispatch") != 0) { @@ -109,16 +132,16 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { if (!streamOutputMode_.load() || !record) { return; } - LOG(ERROR) << "[DEBUG-1-1][StreamOutput::StreamOutputRecord] Before GetRecordName"; + //LOG(ERROR) << "[DEBUG-1-1][StreamOutput::StreamOutputRecord] Before GetRecordName"; std::string kernelName = GetRecordName(record); if (kernelName.empty()) return; - LOG(ERROR) << "[DEBUG][StreamOutput::StreamOutputRecord] After GetRecordName"; + //LOG(ERROR) << "[DEBUG][StreamOutput::StreamOutputRecord] After GetRecordName"; std::lock_guard lock(streamOutputMtx_); if (!streamOutputMode_.load()) return; if (!streamOutputFileCreated_) { - LOG(ERROR) << "[DEBUG-1-2][StreamOutput::StreamOutputRecord] Init output file"; + // LOG(ERROR) << "[DEBUG-1-2][StreamOutput::StreamOutputRecord] Init output file"; int pid = GetProcessId(); int rankId = GetRankId(); @@ -166,15 +189,15 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { streamOutputFileFd_ = fd; streamOutputFilePath_ = filePath; streamOutputFileCreated_ = true; - LOG(ERROR) << "[DEBUG-1-3][StreamOutput::StreamOutputRecord] output file created"; + // LOG(ERROR) << "[DEBUG-1-3][StreamOutput::StreamOutputRecord] output file created"; } // 安全构建 JSON 对象 - LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] Before writting into file"; + // LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] Before writting into file"; nlohmann::json j; switch (record->kind) { case MSPTI_ACTIVITY_KIND_KERNEL: { auto kernel = reinterpret_cast(record); - j["kind"] = "kernel"; + j["kind"] = "comm"; j["start"] = static_cast(kernel->start); j["end"] = static_cast(kernel->end); j["name"] = kernelName; @@ -187,8 +210,9 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { kernelNameIndex_[kernelName] = nextIndex_; nextIndex_++; } else { - // 已知的 kernel name,使用已有索引 - index = it->second; + // 已知的 kernel name,索引递增 + index = it->second + 1; + kernelNameIndex_[kernelName] = index; } j["index"] = index; // j["master_ip"] = master_ip_; @@ -205,7 +229,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { } catch (const std::exception& e) { LOG(ERROR) << "JSON dump failed: " << e.what(); } - LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] After writting into file"; + //LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] After writting into file"; // 错误处理 if (streamOutputFile_.fail()) { -- Gitee From fe3c7e2c2bb17e503db48b2de9bce96cf348f2ef Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Wed, 25 Jun 2025 14:57:36 +0800 Subject: [PATCH 08/16] master ip fix --- msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 11412dc0c4..8ff66b51d2 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -215,7 +215,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { kernelNameIndex_[kernelName] = index; } j["index"] = index; - // j["master_ip"] = master_ip_; + j["master_ip"] = master_ip_; break; } -- Gitee From 3c94718012c2119b2c7dac7c5a2f57f812ef65b7 Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Wed, 25 Jun 2025 15:33:30 +0800 Subject: [PATCH 09/16] msmonitor_file_name_prefix --- msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 8ff66b51d2..72cd6ad31b 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -153,7 +153,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { } char fileName[256] = {0}; - snprintf(fileName, sizeof(fileName), "%s_%d_%d.jsonl", + snprintf(fileName, sizeof(fileName), "msmonitor_%s_%d_%d.jsonl", hostname, rankId, pid); std::string filePath = streamOutputDir_ + "/" + fileName; -- Gitee From b647cb6a4141909ba7d8ccb27d51f1517b52b607 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Thu, 26 Jun 2025 19:08:47 +0800 Subject: [PATCH 10/16] moe aicpu kernel select --- .../mspti_monitor/StreamOutput.cpp | 101 +++++++++++++----- .../ipc_monitor/mspti_monitor/StreamOutput.h | 10 +- 2 files changed, 79 insertions(+), 32 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 72cd6ad31b..6ebdeea656 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -53,19 +53,19 @@ void StreamOutput::CloseStreamOutput() { if (streamOutputFile_.is_open()) { streamOutputFile_.flush(); } - + // 解锁文件 if (streamOutputFileFd_ != -1) { if (::flock(streamOutputFileFd_, LOCK_UN) == -1) { LOG(ERROR) << "Failed to unlock file: " << strerror(errno); } } - + // 关闭文件流 if (streamOutputFile_.is_open()) { streamOutputFile_.close(); } - + // 关闭文件描述符 if (streamOutputFileFd_ != -1) { if (::close(streamOutputFileFd_) == -1) { @@ -73,7 +73,7 @@ void StreamOutput::CloseStreamOutput() { } streamOutputFileFd_ = -1; } - + streamOutputFileCreated_ = false; } } @@ -84,24 +84,40 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { case MSPTI_ACTIVITY_KIND_KERNEL: { auto* kernel = reinterpret_cast(record); if (!kernel->name) return ""; - + std::string name = kernel->name; - // LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; - if (name.find("groupedmatmul") != std::string::npos || name.find("GroupedMatmul") != std::string::npos) { - // 根据kernelNameIndex_中的数量决定返回哪个名称 + if (name.empty()) { + LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] get empty kernel name"; + return ""; + } + std::string type = kernel->type; + if (type == "KERNEL_AICPU") { + auto it = streamTaskCache_.find(kernel->ds.streamId); + if (it == streamTaskCache_.end()) { + streamTaskCache_[kernel->ds.streamId].insert(name); + return ""; + } + if (it->second.find(name) == it->second.end()) { + it->second.insert(name); + return ""; + } + if (it->second.size() > 50) { + return ""; + } + uint32_t dispatchCount = 0; uint32_t combineCount = 0; - + auto dispatchIt = kernelNameIndex_.find("MoeDistributeDispatch"); if (dispatchIt != kernelNameIndex_.end()) { dispatchCount = dispatchIt->second + 1; // 因为索引从0开始,所以+1 } - + auto combineIt = kernelNameIndex_.find("MoeDistributeCombine"); if (combineIt != kernelNameIndex_.end()) { combineCount = combineIt->second + 1; // 因为索引从0开始,所以+1 } - + // 如果两个都没有,或者数量一样,返回MoeDistributeDispatch if (dispatchCount == 0 && combineCount == 0) { return "MoeDistributeDispatch"; @@ -113,9 +129,36 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { return "MoeDistributeDispatch"; } } - // if (name.find("MoeDistributeCombine") != 0 && + // LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; + // if (name.find("groupedmatmul") != std::string::npos || name.find("GroupedMatmul") != std::string::npos) { + // // 根据kernelNameIndex_中的数量决定返回哪个名称 + // uint32_t dispatchCount = 0; + // uint32_t combineCount = 0; + + // auto dispatchIt = kernelNameIndex_.find("MoeDistributeDispatch"); + // if (dispatchIt != kernelNameIndex_.end()) { + // dispatchCount = dispatchIt->second + 1; // 因为索引从0开始,所以+1 + // } + + // auto combineIt = kernelNameIndex_.find("MoeDistributeCombine"); + // if (combineIt != kernelNameIndex_.end()) { + // combineCount = combineIt->second + 1; // 因为索引从0开始,所以+1 + // } + + // // 如果两个都没有,或者数量一样,返回MoeDistributeDispatch + // if (dispatchCount == 0 && combineCount == 0) { + // return "MoeDistributeDispatch"; + // } else if (dispatchCount == combineCount) { + // return "MoeDistributeDispatch"; + // } else if (dispatchCount > combineCount) { + // return "MoeDistributeCombine"; + // } else { + // return "MoeDistributeDispatch"; + // } + // } + // if (name.find("MoeDistributeCombine") != 0 && // name.find("MoeDistributeDispatch") != 0) { - // return ""; + // return ""; // } // size_t pos = name.find('_'); // if (pos != std::string::npos) { @@ -137,7 +180,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { if (kernelName.empty()) return; //LOG(ERROR) << "[DEBUG][StreamOutput::StreamOutputRecord] After GetRecordName"; std::lock_guard lock(streamOutputMtx_); - + if (!streamOutputMode_.load()) return; if (!streamOutputFileCreated_) { @@ -153,11 +196,11 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { } char fileName[256] = {0}; - snprintf(fileName, sizeof(fileName), "msmonitor_%s_%d_%d.jsonl", + snprintf(fileName, sizeof(fileName), "msmonitor_%s_%d_%d.jsonl", hostname, rankId, pid); - + std::string filePath = streamOutputDir_ + "/" + fileName; - + // 打开文件描述符 int fd = ::open(filePath.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644); if (fd == -1) { @@ -166,7 +209,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { streamOutputMode_.store(false); return; } - + // 设置文件锁 if (::flock(fd, LOCK_EX | LOCK_NB) == -1) { LOG(ERROR) << "Failed to lock file: " << filePath @@ -175,7 +218,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { streamOutputMode_.store(false); return; } - + // 将文件描述符关联到文件流 streamOutputFile_.open(filePath, std::ios::out | std::ios::app); if (!streamOutputFile_.is_open()) { @@ -185,7 +228,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { streamOutputMode_.store(false); return; } - + streamOutputFileFd_ = fd; streamOutputFilePath_ = filePath; streamOutputFileCreated_ = true; @@ -196,12 +239,12 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { nlohmann::json j; switch (record->kind) { case MSPTI_ACTIVITY_KIND_KERNEL: { - auto kernel = reinterpret_cast(record); + auto kernel = reinterpret_cast(record); j["kind"] = "comm"; j["start"] = static_cast(kernel->start); j["end"] = static_cast(kernel->end); j["name"] = kernelName; - + uint32_t index = 0; auto it = kernelNameIndex_.find(kernelName); if (it == kernelNameIndex_.end()) { @@ -214,38 +257,38 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { index = it->second + 1; kernelNameIndex_[kernelName] = index; } - j["index"] = index; + j["index"] = index; j["master_ip"] = master_ip_; - + break; } default: { break; } } - + try { streamOutputFile_ << j.dump() << std::endl; } catch (const std::exception& e) { LOG(ERROR) << "JSON dump failed: " << e.what(); } //LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] After writting into file"; - + // 错误处理 if (streamOutputFile_.fail()) { LOG(ERROR) << "Write failed, disabling stream output"; - + // 清理资源 if (streamOutputFileFd_ != -1) { ::flock(streamOutputFileFd_, LOCK_UN); ::close(streamOutputFileFd_); streamOutputFileFd_ = -1; } - + if (streamOutputFile_.is_open()) { streamOutputFile_.close(); } - + streamOutputMode_.store(false); streamOutputFileCreated_ = false; } diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h index e4d6815468..f9f6a19669 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include "mspti.h" #include "thread.h" @@ -16,14 +18,14 @@ public: void InitStreamOutput(); void CloseStreamOutput(); void StreamOutputRecord(msptiActivity *record); - + bool isStreamOutputMode() const { return streamOutputMode_.load(); } - + private: std::string GetRecordName(msptiActivity *record); - + private: std::atomic streamOutputMode_{false}; bool streamOutputFileCreated_ = false; @@ -37,6 +39,8 @@ private: std::unordered_map kernelNameIndex_; uint32_t nextIndex_ = 0; std::string master_ip_; + + std::unordered_map> streamTaskCache_; }; } } -- Gitee From 2f2b61f698a050a0839310304cb7d4737ad3036c Mon Sep 17 00:00:00 2001 From: wjchuee Date: Fri, 27 Jun 2025 11:15:15 +0800 Subject: [PATCH 11/16] moe aicpu kernel select debug --- msmonitor/plugin/CMakeLists.txt | 4 +- .../mspti_monitor/StreamOutput.cpp | 93 ++++++++++++------- .../ipc_monitor/mspti_monitor/StreamOutput.h | 2 + msmonitor/plugin/setup.py | 1 + 4 files changed, 63 insertions(+), 37 deletions(-) diff --git a/msmonitor/plugin/CMakeLists.txt b/msmonitor/plugin/CMakeLists.txt index 8295b38c41..e42222cf22 100644 --- a/msmonitor/plugin/CMakeLists.txt +++ b/msmonitor/plugin/CMakeLists.txt @@ -64,8 +64,8 @@ target_link_options(IPCMonitor PRIVATE ) if(${CMAKE_BUILD_TYPE} STREQUAL "Debug") - add_compile_options(-O0 -g) - add_link_options(-O0 -g) + add_compile_options(-O0 -g3) + add_link_options(-O0 -g3) else() add_compile_options(-D_FORITFY_SOURCE=2 -O2) endif() diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 6ebdeea656..d9e850644d 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -13,6 +13,11 @@ namespace dynolog_npu { namespace ipc_monitor { +const std::string MoeDispatchTaskId = "1"; +const std::string MoeDispatchName = "MoeDistributeDispatch"; +const std::string MoeCombineTaskId = "3"; +const std::string MoeCombineName = "MoeDistributeCombine"; + void StreamOutput::InitStreamOutput() { const char* outputPathEnv = std::getenv("MSMONITOR_OUTPUT_PATH"); if (!outputPathEnv || std::strlen(outputPathEnv) == 0) { @@ -91,43 +96,56 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { return ""; } std::string type = kernel->type; + // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; if (type == "KERNEL_AICPU") { + LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; auto it = streamTaskCache_.find(kernel->ds.streamId); if (it == streamTaskCache_.end()) { streamTaskCache_[kernel->ds.streamId].insert(name); + LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name; return ""; } if (it->second.find(name) == it->second.end()) { it->second.insert(name); + LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", new name: " << name; return ""; } if (it->second.size() > 50) { + LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << " size large than 50"; return ""; } - - uint32_t dispatchCount = 0; - uint32_t combineCount = 0; - - auto dispatchIt = kernelNameIndex_.find("MoeDistributeDispatch"); - if (dispatchIt != kernelNameIndex_.end()) { - dispatchCount = dispatchIt->second + 1; // 因为索引从0开始,所以+1 - } - - auto combineIt = kernelNameIndex_.find("MoeDistributeCombine"); - if (combineIt != kernelNameIndex_.end()) { - combineCount = combineIt->second + 1; // 因为索引从0开始,所以+1 - } - - // 如果两个都没有,或者数量一样,返回MoeDistributeDispatch - if (dispatchCount == 0 && combineCount == 0) { - return "MoeDistributeDispatch"; - } else if (dispatchCount == combineCount) { - return "MoeDistributeDispatch"; - } else if (dispatchCount > combineCount) { - return "MoeDistributeCombine"; - } else { - return "MoeDistributeDispatch"; + LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << ", size: " << it->second.size(); + if(name == MoeDispatchTaskId) { + dispatchCnt.fetch_add(1); + return MoeDispatchName; + } else if(name == MoeCombineTaskId) { + combineCnt.fetch_add(1); + return MoeCombineName; } + return ""; + // uint32_t dispatchCount = 0; + // uint32_t combineCount = 0; + + // auto dispatchIt = kernelNameIndex_.find("MoeDistributeDispatch"); + // if (dispatchIt != kernelNameIndex_.end()) { + // dispatchCount = dispatchIt->second + 1; // 因为索引从0开始,所以+1 + // } + + // auto combineIt = kernelNameIndex_.find("MoeDistributeCombine"); + // if (combineIt != kernelNameIndex_.end()) { + // combineCount = combineIt->second + 1; // 因为索引从0开始,所以+1 + // } + + // // 如果两个都没有,或者数量一样,返回MoeDistributeDispatch + // if (dispatchCount == 0 && combineCount == 0) { + // return "MoeDistributeDispatch"; + // } else if (dispatchCount == combineCount) { + // return "MoeDistributeDispatch"; + // } else if (dispatchCount > combineCount) { + // return "MoeDistributeCombine"; + // } else { + // return "MoeDistributeDispatch"; + // } } // LOG(ERROR) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name; // if (name.find("groupedmatmul") != std::string::npos || name.find("GroupedMatmul") != std::string::npos) { @@ -245,19 +263,24 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { j["end"] = static_cast(kernel->end); j["name"] = kernelName; - uint32_t index = 0; - auto it = kernelNameIndex_.find(kernelName); - if (it == kernelNameIndex_.end()) { - // 新的 kernel name,分配新索引 - index = nextIndex_; - kernelNameIndex_[kernelName] = nextIndex_; - nextIndex_++; - } else { - // 已知的 kernel name,索引递增 - index = it->second + 1; - kernelNameIndex_[kernelName] = index; + // uint32_t index = 0; + // auto it = kernelNameIndex_.find(kernelName); + // if (it == kernelNameIndex_.end()) { + // // 新的 kernel name,分配新索引 + // index = nextIndex_; + // kernelNameIndex_[kernelName] = nextIndex_; + // nextIndex_++; + // } else { + // // 已知的 kernel name,索引递增 + // index = it->second + 1; + // kernelNameIndex_[kernelName] = index; + // } + // j["index"] = index; + if (kernelName == MoeDispatchName) { + j["index"] = dispatchCnt.load(); + } else if (kernelName == MoeCombineName) { + j["index"] = combineCnt.load(); } - j["index"] = index; j["master_ip"] = master_ip_; break; diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h index f9f6a19669..3df11dad16 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -41,6 +41,8 @@ private: std::string master_ip_; std::unordered_map> streamTaskCache_; + std::atomic_uint64_t dispatchCnt{0}; + std::atomic_uint64_t combineCnt{0}; }; } } diff --git a/msmonitor/plugin/setup.py b/msmonitor/plugin/setup.py index 91144e3a3c..dec8b4170f 100644 --- a/msmonitor/plugin/setup.py +++ b/msmonitor/plugin/setup.py @@ -34,6 +34,7 @@ class CMakeBuild(build_ext): self.build_extension(ext) def build_extension(self, ext): + self.debug = True cfg = 'Debug' if self.debug else 'Release' build_args = ['--config', cfg] -- Gitee From 5a4946355c4e57caae4cff0ab82a940e7e2e1954 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Fri, 27 Jun 2025 18:10:53 +0800 Subject: [PATCH 12/16] Remove debug mode log --- .../plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp | 10 +++++----- msmonitor/plugin/setup.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index d9e850644d..7ab7a010bb 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -98,23 +98,23 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { std::string type = kernel->type; // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; if (type == "KERNEL_AICPU") { - LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; + // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; auto it = streamTaskCache_.find(kernel->ds.streamId); if (it == streamTaskCache_.end()) { streamTaskCache_[kernel->ds.streamId].insert(name); - LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name; + // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name; return ""; } if (it->second.find(name) == it->second.end()) { it->second.insert(name); - LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", new name: " << name; + // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", new name: " << name; return ""; } if (it->second.size() > 50) { - LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << " size large than 50"; + // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << " size large than 50"; return ""; } - LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << ", size: " << it->second.size(); + // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << ", size: " << it->second.size(); if(name == MoeDispatchTaskId) { dispatchCnt.fetch_add(1); return MoeDispatchName; diff --git a/msmonitor/plugin/setup.py b/msmonitor/plugin/setup.py index dec8b4170f..750879f94b 100644 --- a/msmonitor/plugin/setup.py +++ b/msmonitor/plugin/setup.py @@ -34,7 +34,7 @@ class CMakeBuild(build_ext): self.build_extension(ext) def build_extension(self, ext): - self.debug = True + # self.debug = True cfg = 'Debug' if self.debug else 'Release' build_args = ['--config', cfg] -- Gitee From d335714e3637f07d4dc44342d488dda7a29b7a9b Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Wed, 2 Jul 2025 10:22:57 +0800 Subject: [PATCH 13/16] add nranks attribute in output format --- .../plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp | 10 ++++++++++ .../plugin/ipc_monitor/mspti_monitor/StreamOutput.h | 1 + 2 files changed, 11 insertions(+) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 7ab7a010bb..2d6fb5d360 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -43,6 +43,15 @@ void StreamOutput::InitStreamOutput() { << "will use empty string for master_ip"; } + const char* ep_size_env = std::getenv("EP_WORLD_SIZE"); + if (ep_size_env) { + ep_size_ = ep_size_env; + } else { + ep_size_ = ""; + LOG(WARNING) << "EP_WORLD_SIZE environment variable not set, " + << "will use empty string for nranks"; + } + streamOutputDir_ = std::move(newDir); streamOutputMode_.store(true); streamOutputFileCreated_ = false; @@ -282,6 +291,7 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { j["index"] = combineCnt.load(); } j["master_ip"] = master_ip_; + j["nranks"] = ep_size_; break; } diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h index 3df11dad16..69015b4b3e 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -39,6 +39,7 @@ private: std::unordered_map kernelNameIndex_; uint32_t nextIndex_ = 0; std::string master_ip_; + std::string ep_size_; std::unordered_map> streamTaskCache_; std::atomic_uint64_t dispatchCnt{0}; -- Gitee From 662a944822c37abaa89a2c280c68421000564ba5 Mon Sep 17 00:00:00 2001 From: wjchuee Date: Wed, 9 Jul 2025 18:25:00 +0800 Subject: [PATCH 14/16] Moe graph comm op fix --- .../mspti_monitor/StreamOutput.cpp | 28 ++++++++++++------- .../ipc_monitor/mspti_monitor/StreamOutput.h | 2 +- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 2d6fb5d360..7bba079875 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -13,9 +13,7 @@ namespace dynolog_npu { namespace ipc_monitor { -const std::string MoeDispatchTaskId = "1"; const std::string MoeDispatchName = "MoeDistributeDispatch"; -const std::string MoeCombineTaskId = "3"; const std::string MoeCombineName = "MoeDistributeCombine"; void StreamOutput::InitStreamOutput() { @@ -108,26 +106,36 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; if (type == "KERNEL_AICPU") { // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; + uint32_t intName = 0; + if (!Str2Uint32(intName, name)) { + LOG(WARNING) << "[DEBUG][StreamOutput::GetRecordName] get invalid task id: " << name; + return ""; + } + // unaging flag + if ((intName >> 16) == 1) { + return ""; + } + uint32_t taskId = intName & 0xFFFF; auto it = streamTaskCache_.find(kernel->ds.streamId); if (it == streamTaskCache_.end()) { - streamTaskCache_[kernel->ds.streamId].insert(name); + streamTaskCache_[kernel->ds.streamId].insert(taskId); // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name; return ""; } - if (it->second.find(name) == it->second.end()) { - it->second.insert(name); + if (it->second.find(taskId) == it->second.end()) { + it->second.insert(taskId); // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", new name: " << name; return ""; } - if (it->second.size() > 50) { + // if (it->second.size() > 50) { // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << " size large than 50"; - return ""; - } + // return ""; + // } // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] stream id: " << kernel->ds.streamId << ", name: " << name << ", size: " << it->second.size(); - if(name == MoeDispatchTaskId) { + if(dispatchCnt.load() <= combineCnt.load()) { dispatchCnt.fetch_add(1); return MoeDispatchName; - } else if(name == MoeCombineTaskId) { + } else { combineCnt.fetch_add(1); return MoeCombineName; } diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h index 69015b4b3e..ff14f7118d 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -41,7 +41,7 @@ private: std::string master_ip_; std::string ep_size_; - std::unordered_map> streamTaskCache_; + std::unordered_map> streamTaskCache_; std::atomic_uint64_t dispatchCnt{0}; std::atomic_uint64_t combineCnt{0}; }; -- Gitee From 172e767083f4d776984086de98c6e637b6bb52d5 Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Fri, 11 Jul 2025 17:22:40 +0800 Subject: [PATCH 15/16] add msmonitor_disable_dump --- .../ipc_monitor/mspti_monitor/StreamOutput.cpp | 14 +++++++++++++- .../ipc_monitor/mspti_monitor/StreamOutput.h | 1 + 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 7bba079875..3576364c4a 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -50,6 +50,14 @@ void StreamOutput::InitStreamOutput() { << "will use empty string for nranks"; } + const char* disableDumpEnv = std::getenv("MSMONITOR_DISABLE_DUMP"); + if (disableDumpEnv && strcasecmp(disableDumpEnv, "true") == 0) { + enable_output_ = false; + LOG(WARNING) << "MSMONITOR_DISABLE_DUMP is set to true, stream output will be disabled."; + } else { + enable_output_ = true; + } + streamOutputDir_ = std::move(newDir); streamOutputMode_.store(true); streamOutputFileCreated_ = false; @@ -108,7 +116,7 @@ std::string StreamOutput::GetRecordName(msptiActivity *record) { // LOG(INFO) << "[DEBUG][StreamOutput::GetRecordName] kernel name: " << name << ", type: " << type << ", stream: " << kernel->ds.streamId; uint32_t intName = 0; if (!Str2Uint32(intName, name)) { - LOG(WARNING) << "[DEBUG][StreamOutput::GetRecordName] get invalid task id: " << name; + // LOG(WARNING) << "[DEBUG][StreamOutput::GetRecordName] get invalid task id: " << name; return ""; } // unaging flag @@ -308,6 +316,10 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { } } + if (!enable_output_) { + return; + } + try { streamOutputFile_ << j.dump() << std::endl; } catch (const std::exception& e) { diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h index ff14f7118d..11757d77ff 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -29,6 +29,7 @@ private: private: std::atomic streamOutputMode_{false}; bool streamOutputFileCreated_ = false; + bool enable_output_ = true; std::string streamOutputDir_; // std::string kernelPatternStr_; // std::regex kernelPattern_; -- Gitee From cfa6d853d67ca403158b720eab17cc4bfbb01d0e Mon Sep 17 00:00:00 2001 From: minghangc <29514143@qq.com> Date: Mon, 14 Jul 2025 17:12:28 +0800 Subject: [PATCH 16/16] optimize write performance & support write retry --- .../mspti_monitor/StreamOutput.cpp | 169 +++++++++++------- .../ipc_monitor/mspti_monitor/StreamOutput.h | 7 + 2 files changed, 113 insertions(+), 63 deletions(-) diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp index 3576364c4a..bcd4ca6f18 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.cpp @@ -16,6 +16,52 @@ namespace ipc_monitor { const std::string MoeDispatchName = "MoeDistributeDispatch"; const std::string MoeCombineName = "MoeDistributeCombine"; +// 新增:重建输出文件的方法 +bool StreamOutput::ReopenStreamOutputFile() { + // 关闭旧文件 + if (streamOutputFile_.is_open()) streamOutputFile_.close(); + if (streamOutputFileFd_ != -1) { + ::flock(streamOutputFileFd_, LOCK_UN); + ::close(streamOutputFileFd_); + streamOutputFileFd_ = -1; + } + int pid = GetProcessId(); + int rankId = GetRankId(); + char hostname[256] = {0}; + if (gethostname(hostname, sizeof(hostname)) != 0) { + strncpy(hostname, "unknown_host", sizeof(hostname)-1); + hostname[sizeof(hostname)-1] = '\0'; + LOG(ERROR) << "Failed to get hostname, using default"; + } + char fileName[256] = {0}; + snprintf(fileName, sizeof(fileName), "msmonitor_%s_%d_%d.jsonl", + hostname, rankId, pid); + std::string filePath = streamOutputDir_ + "/" + fileName; + int fd = ::open(filePath.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644); + if (fd == -1) { + LOG(ERROR) << "Failed to open file: " << filePath + << ", error: " << strerror(errno); + return false; + } + if (::flock(fd, LOCK_EX | LOCK_NB) == -1) { + LOG(ERROR) << "Failed to lock file: " << filePath + << ", error: " << strerror(errno); + ::close(fd); + return false; + } + streamOutputFile_.open(filePath, std::ios::out | std::ios::app); + if (!streamOutputFile_.is_open()) { + LOG(ERROR) << "Failed to open file stream: " << filePath; + ::flock(fd, LOCK_UN); + ::close(fd); + return false; + } + streamOutputFileFd_ = fd; + streamOutputFilePath_ = filePath; + streamOutputFileCreated_ = true; + return true; +} + void StreamOutput::InitStreamOutput() { const char* outputPathEnv = std::getenv("MSMONITOR_OUTPUT_PATH"); if (!outputPathEnv || std::strlen(outputPathEnv) == 0) { @@ -58,6 +104,18 @@ void StreamOutput::InitStreamOutput() { enable_output_ = true; } + const char* bufferSizeEnv = std::getenv("MSMONITOR_OUTPUT_BUFFER_SIZE"); + if (bufferSizeEnv) { + try { + size_t val = std::stoul(bufferSizeEnv); + if (val > 0) { + buffer_max_size_ = val; + } + } catch (...) { + LOG(WARNING) << "Invalid MSMONITOR_OUTPUT_BUFFER_SIZE, use default: " << buffer_max_size_; + } +} + streamOutputDir_ = std::move(newDir); streamOutputMode_.store(true); streamOutputFileCreated_ = false; @@ -68,6 +126,12 @@ void StreamOutput::InitStreamOutput() { void StreamOutput::CloseStreamOutput() { std::lock_guard lock(streamOutputMtx_); + { + std::lock_guard buffer_lock(buffer_mtx_); + if (!buffer_.empty()) { + FlushBufferToFile(); + } + } if (streamOutputFileCreated_) { // 先刷新缓冲区 if (streamOutputFile_.is_open()) { @@ -227,55 +291,10 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { if (!streamOutputMode_.load()) return; if (!streamOutputFileCreated_) { - // LOG(ERROR) << "[DEBUG-1-2][StreamOutput::StreamOutputRecord] Init output file"; - int pid = GetProcessId(); - int rankId = GetRankId(); - - char hostname[256] = {0}; - if (gethostname(hostname, sizeof(hostname)) != 0) { - strncpy(hostname, "unknown_host", sizeof(hostname)-1); - hostname[sizeof(hostname)-1] = '\0'; - LOG(ERROR) << "Failed to get hostname, using default"; - } - - char fileName[256] = {0}; - snprintf(fileName, sizeof(fileName), "msmonitor_%s_%d_%d.jsonl", - hostname, rankId, pid); - - std::string filePath = streamOutputDir_ + "/" + fileName; - - // 打开文件描述符 - int fd = ::open(filePath.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0644); - if (fd == -1) { - LOG(ERROR) << "Failed to open file: " << filePath - << ", error: " << strerror(errno); - streamOutputMode_.store(false); - return; - } - - // 设置文件锁 - if (::flock(fd, LOCK_EX | LOCK_NB) == -1) { - LOG(ERROR) << "Failed to lock file: " << filePath - << ", error: " << strerror(errno); - ::close(fd); - streamOutputMode_.store(false); - return; - } - - // 将文件描述符关联到文件流 - streamOutputFile_.open(filePath, std::ios::out | std::ios::app); - if (!streamOutputFile_.is_open()) { - LOG(ERROR) << "Failed to open file stream: " << filePath; - ::flock(fd, LOCK_UN); - ::close(fd); + if (!ReopenStreamOutputFile()) { streamOutputMode_.store(false); return; } - - streamOutputFileFd_ = fd; - streamOutputFilePath_ = filePath; - streamOutputFileCreated_ = true; - // LOG(ERROR) << "[DEBUG-1-3][StreamOutput::StreamOutputRecord] output file created"; } // 安全构建 JSON 对象 // LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] Before writting into file"; @@ -320,30 +339,54 @@ void StreamOutput::StreamOutputRecord(msptiActivity *record) { return; } - try { - streamOutputFile_ << j.dump() << std::endl; - } catch (const std::exception& e) { - LOG(ERROR) << "JSON dump failed: " << e.what(); + std::string json_str = j.dump(); + + { + std::lock_guard lock(buffer_mtx_); + buffer_.push_back(std::move(json_str)); + if (buffer_.size() >= buffer_max_size_) { + FlushBufferToFile(); + } } //LOG(ERROR) << "[DEBUG-1-4][StreamOutput::StreamOutputRecord] After writting into file"; - // 错误处理 + // 错误处理:写入失败不清空buffer,只设置标志,等待下次FlushBufferToFile重试 if (streamOutputFile_.fail()) { - LOG(ERROR) << "Write failed, disabling stream output"; + LOG(ERROR) << "Write failed, will retry on next flush."; + streamOutputFileCreated_ = false; + streamOutputMode_.store(false); + } +} - // 清理资源 - if (streamOutputFileFd_ != -1) { - ::flock(streamOutputFileFd_, LOCK_UN); - ::close(streamOutputFileFd_); - streamOutputFileFd_ = -1; +void StreamOutput::FlushBufferToFile() { + if (!enable_output_) return; + if (buffer_.empty()) return; + int retry = 0, max_retry = 3; + while (retry < max_retry) { + if (!streamOutputFileCreated_) { + if (!ReopenStreamOutputFile()) { + retry++; + continue; + } } - - if (streamOutputFile_.is_open()) { - streamOutputFile_.close(); + bool all_success = true; + for (const auto& line : buffer_) { + streamOutputFile_ << line << "\n"; + if (streamOutputFile_.fail()) { + all_success = false; + break; + } + } + if (all_success) { + streamOutputFile_.flush(); + buffer_.clear(); + return; + } else { + LOG(ERROR) << "Write failed, try reopen file, retry=" << retry+1; + streamOutputFileCreated_ = false; + streamOutputMode_.store(false); + retry++; } - - streamOutputMode_.store(false); - streamOutputFileCreated_ = false; } } } diff --git a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h index 11757d77ff..6248a73266 100644 --- a/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h +++ b/msmonitor/plugin/ipc_monitor/mspti_monitor/StreamOutput.h @@ -25,12 +25,19 @@ public: private: std::string GetRecordName(msptiActivity *record); + // 新增:重建输出文件的方法 + bool ReopenStreamOutputFile(); private: std::atomic streamOutputMode_{false}; bool streamOutputFileCreated_ = false; bool enable_output_ = true; std::string streamOutputDir_; + + std::vector buffer_; + size_t buffer_max_size_ = 100; + std::mutex buffer_mtx_; + void FlushBufferToFile(); // std::string kernelPatternStr_; // std::regex kernelPattern_; std::mutex streamOutputMtx_; -- Gitee