From 7ad4cfaa53d2382868a71005d6b060b96000cd3f Mon Sep 17 00:00:00 2001 From: wjchuee Date: Fri, 29 Aug 2025 14:40:13 +0800 Subject: [PATCH] msmonitor db develop part 1 --- msmonitor/plugin/CMakeLists.txt | 26 ++-- msmonitor/plugin/IPCMonitor/__init__.py | 16 ++ .../IPCMonitor/dynamic_monitor_proxy.py | 44 ++++++ msmonitor/plugin/IPCMonitor/singleton.py | 25 +++ msmonitor/plugin/IPCMonitor/utils.py | 142 ++++++++++++++++++ msmonitor/plugin/bindings.cpp | 27 +++- msmonitor/plugin/build.sh | 2 +- msmonitor/plugin/cmake/Findglog.cmake | 42 ++++++ msmonitor/plugin/cmake/Findnlohmannjson.cmake | 18 +++ msmonitor/plugin/cmake/config.ini | 5 + msmonitor/plugin/cmake/download_opensource.sh | 73 +++++++++ msmonitor/plugin/cmake/utils.cmake | 25 +++ .../ipc_monitor/PyDynamicMonitorProxy.h | 4 +- msmonitor/plugin/ipc_monitor/utils.cpp | 124 ++++++++++++++- msmonitor/plugin/ipc_monitor/utils.h | 9 +- msmonitor/plugin/setup.py | 10 +- 16 files changed, 562 insertions(+), 30 deletions(-) create mode 100644 msmonitor/plugin/IPCMonitor/__init__.py create mode 100644 msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py create mode 100644 msmonitor/plugin/IPCMonitor/singleton.py create mode 100644 msmonitor/plugin/IPCMonitor/utils.py create mode 100644 msmonitor/plugin/cmake/Findglog.cmake create mode 100644 msmonitor/plugin/cmake/Findnlohmannjson.cmake create mode 100644 msmonitor/plugin/cmake/config.ini create mode 100644 msmonitor/plugin/cmake/download_opensource.sh create mode 100644 msmonitor/plugin/cmake/utils.cmake diff --git a/msmonitor/plugin/CMakeLists.txt b/msmonitor/plugin/CMakeLists.txt index 9abfa9a951..a7d6916b14 100644 --- a/msmonitor/plugin/CMakeLists.txt +++ b/msmonitor/plugin/CMakeLists.txt @@ -10,14 +10,17 @@ set(CMAKE_CXX_EXTENSIONS OFF) find_package(pybind11 REQUIRED) find_package(Python REQUIRED COMPONENTS Interpreter Development) +set(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake") +set(ENV{PROJECT_ROOT_PATH} "${CMAKE_SOURCE_DIR}") +include(utils) +find_package(glog MODULE REQUIRED) +find_package(nlohmannjson MODULE REQUIRED) + include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor/metric ${CMAKE_CURRENT_SOURCE_DIR}/ipc_monitor/mspti_monitor ${CMAKE_CURRENT_SOURCE_DIR}/third_party/securec/include - ${DYNOLOG_PATH}/third_party/glog/src - ${DYNOLOG_PATH}/build/third_party/glog - ${DYNOLOG_PATH}/third_party/json/single_include ) file(GLOB_RECURSE IPC_SOURCES @@ -38,31 +41,32 @@ add_library(IPCMonitor MODULE ${SOURCES}) set_target_properties(IPCMonitor PROPERTIES - OUTPUT_NAME IPCMonitor + OUTPUT_NAME IPCMonitor_C + LIBRARY_OUTPUT_DIRECTORY ${CMAKE_INSTALL_PREFIX}/IPCMonitor/lib64 PREFIX "" ) target_link_libraries(IPCMonitor PRIVATE pybind11::module pthread + ${glog_LIBRARIES} ${CMAKE_CURRENT_SOURCE_DIR}/stub/libmspti.so ) -target_link_libraries(IPCMonitor PRIVATE ${DYNOLOG_PATH}/build/third_party/glog/libglog.a) - target_compile_options(IPCMonitor PRIVATE -fPIC -fstack-protector-all -ftrapv - $<$>:-O2> ) -add_compile_options(-D_FORITFY_SOURCE=2 -O2) target_link_options(IPCMonitor PRIVATE -Wl,-z,relro,-z,now,-z,noexecstack -s ) -install(TARGETS IPCMonitor - DESTINATION ${CMAKE_INSTALL_PREFIX}/python-package -) +if(${CMAKE_BUILD_TYPE} STREQUAL "Debug") + add_compile_options(-O0 -g) + add_link_options(-O0 -g) +else() + add_compile_options(-D_FORITFY_SOURCE=2 -O2) +endif() diff --git a/msmonitor/plugin/IPCMonitor/__init__.py b/msmonitor/plugin/IPCMonitor/__init__.py new file mode 100644 index 0000000000..bd4cb27a88 --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .dynamic_monitor_proxy import PyDynamicMonitorProxy diff --git a/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py new file mode 100644 index 0000000000..80be4a427c --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/dynamic_monitor_proxy.py @@ -0,0 +1,44 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import os +import importlib +from .singleton import Singleton + + +so_path = os.path.join(os.path.dirname(__file__), "lib64") +sys.path.append(os.path.realpath(so_path)) +ipcMonitor_C_module = importlib.import_module("IPCMonitor_C") + + +@Singleton +class PyDynamicMonitorProxy: + + @classmethod + def init_dyno(cls, npu_id: int): + return ipcMonitor_C_module.init_dyno(npu_id) + + @classmethod + def poll_dyno(cls): + return ipcMonitor_C_module.poll_dyno() + + @classmethod + def enable_dyno_npu_monitor(cls, config_map: dict): + ipcMonitor_C_module.enable_dyno_npu_monitor(config_map) + + @classmethod + def finalize_dyno(cls): + ipcMonitor_C_module.finalize_dyno() diff --git a/msmonitor/plugin/IPCMonitor/singleton.py b/msmonitor/plugin/IPCMonitor/singleton.py new file mode 100644 index 0000000000..6386c2ab66 --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/singleton.py @@ -0,0 +1,25 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class Singleton(object): + def __init__(self, cls): + self._cls = cls + self._instance = {} + + def __call__(self): + if self._cls not in self._instance: + self._instance[self._cls] = self._cls() + return self._instance[self._cls] diff --git a/msmonitor/plugin/IPCMonitor/utils.py b/msmonitor/plugin/IPCMonitor/utils.py new file mode 100644 index 0000000000..2b549c376b --- /dev/null +++ b/msmonitor/plugin/IPCMonitor/utils.py @@ -0,0 +1,142 @@ +# Copyright (c) 2025, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import json +import warnings +from typing import Optional + + +def get_pytorch_rank_id() -> Optional[int]: + """Get pytorch rank id.""" + try: + import torch + rank_id = os.environ.get("RANK") + if rank_id is None and torch.distributed.is_available() and torch.distributed.is_initialized(): + rank_id = torch.distributed.get_rank() + if rank_id is not None and not isinstance(rank_id, int): + rank_id = int(rank_id) + except Exception as ex: + raise RuntimeError(f"Get rank id failed in pytorch failed: {str(ex)}") from ex + return rank_id + + +def get_pytorch_parallel_group_info() -> str: + """Get pytorch parallel group info.""" + try: + import torch + from torch.distributed.distributed_c10d import _world as distributed_world + if torch.distributed.is_available() and torch.distributed.is_initialized(): + group_info = {} + global_rank = torch.distributed.get_rank() + for group in distributed_world.pg_map.keys(): + if torch.distributed.get_backend(group) != "hccl": + continue + hccl_group = group._get_backend(torch.device("npu")) + comm_name = hccl_group.get_hccl_comm_name(global_rank, init_comm=False) + if comm_name: + group_info[comm_name] = { + "group_name": hccl_group.options.hccl_config.get("group_name", ""), + "group_rank": torch.distributed.get_group_rank(group, global_rank), + "global_ranks": torch.distributed.get_process_group_ranks(group) + } + default_group = torch.distributed.distributed_c10d._get_default_group() + comm_name = default_group._get_backend(torch.device("npu")).get_hccl_comm_name(global_rank, init_comm=False) + if comm_name: + group_info[comm_name] = { + "group_name": "default_group", + "group_rank": torch.distributed.get_group_rank(default_group, global_rank), + "global_ranks": torch.distributed.get_process_group_ranks(default_group) + } + if group_info: + return json.dumps(group_info) + except Exception as ex: + raise RuntimeError(f"Get parallel group info in pytorch failed: {str(ex)}.") from ex + return "" + + +def get_mindspore_rank_id() -> Optional[int]: + """Get mindspore rank id.""" + try: + import mindspore.communication as comm + rank_id = os.environ.get("RANK_ID") + if rank_id is None and comm.GlobalComm.INITED: + rank_id = comm.get_rank() + if rank_id is not None and not isinstance(rank_id, int): + rank_id = int(rank_id) + except Exception as ex: + raise RuntimeError(f"Get rank id failed in mindspore failed: {str(ex)}") from ex + return rank_id + + +def get_mindspore_parallel_group_info() -> str: + """Get mindspore parallel group info.""" + try: + import mindspore.communication as comm + import mindspore.communication._comm_helper as comm_helper + if comm.GlobalComm.INITED and comm.GlobalComm.BACKEND == comm_helper.Backend.HCCL: + group_info = {} + for group_name in comm_helper._get_group_map().keys(): + comm_name = comm.get_comm_name(group_name) + if not comm_name: + continue + group_info[comm_name] = { + "group_name": group_name, + "group_rank": comm.get_local_rank(group_name), + "global_ranks": comm.get_process_group_ranks(group_name) + } + if group_info: + return json.dumps(group_info) + except Exception as ex: + raise RuntimeError(f"Get parallel group info in mindspore failed: {str(ex)}.") from ex + return "" + + +def get_rank_id() -> int: + """Get rank id.""" + rank_id = None + try: + rank_id = get_pytorch_rank_id() + except Exception as ex: + warnings.warn(f"{str(ex)}") + + if rank_id is None: + try: + rank_id = get_mindspore_rank_id() + except Exception as ex: + warnings.warn(f"{str(ex)}") + + if rank_id is None: + warnings.warn("Failed to get rank id from pytorch and mindspore, set rank id to -1.") + rank_id = -1 + + return rank_id + + +def get_parallel_group_info() -> str: + """Get parallel group info.""" + parallel_group_info = "" + try: + parallel_group_info = get_pytorch_parallel_group_info() + except Exception as ex: + warnings.warn(f"{str(ex)}") + + if not parallel_group_info: + try: + parallel_group_info = get_mindspore_parallel_group_info() + except Exception as ex: + warnings.warn(f"{str(ex)}") + + return parallel_group_info diff --git a/msmonitor/plugin/bindings.cpp b/msmonitor/plugin/bindings.cpp index 626e72157e..7b0caea529 100644 --- a/msmonitor/plugin/bindings.cpp +++ b/msmonitor/plugin/bindings.cpp @@ -16,14 +16,25 @@ #include #include #include "ipc_monitor/PyDynamicMonitorProxy.h" +#include "ipc_monitor/utils.h" namespace py = pybind11; -PYBIND11_MODULE(IPCMonitor, m) { - py::class_(m, "PyDynamicMonitorProxy") - .def(py::init<>()) - .def("init_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::InitDyno, py::arg("npuId")) - .def("poll_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::PollDyno) - .def("enable_dyno_npu_monitor", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::EnableMsptiMonitor, py::arg("cfg_map")) - .def("finalize_dyno", &dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::FinalizeDyno); -} \ No newline at end of file + +PYBIND11_MODULE(IPCMonitor_C, m) { + m.def("init_dyno", [](int npu_id) -> bool { + return dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->InitDyno(npu_id); + }, py::arg("npu_id")); + m.def("poll_dyno", []() -> std::string { + return dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->PollDyno(); + }); + m.def("enable_dyno_npu_monitor", [](std::unordered_map& config_map) -> void { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->EnableMsptiMonitor(config_map); + }, py::arg("config_map")); + m.def("finalize_dyno", []() -> void { + dynolog_npu::ipc_monitor::PyDynamicMonitorProxy::GetInstance()->FinalizeDyno(); + }); + m.def("set_parallel_group_info", [](std::string parallel_group_info) -> void { + dynolog_npu::ipc_monitor::SetParallelGroupInfo(parallel_group_info); + }, py::arg("parallel_group_info")); +} diff --git a/msmonitor/plugin/build.sh b/msmonitor/plugin/build.sh index ec20536715..939aaa2baf 100644 --- a/msmonitor/plugin/build.sh +++ b/msmonitor/plugin/build.sh @@ -21,4 +21,4 @@ fi # pip install whl echo "pip install ${files}" -pip install ${files} \ No newline at end of file +pip install ${files} diff --git a/msmonitor/plugin/cmake/Findglog.cmake b/msmonitor/plugin/cmake/Findglog.cmake new file mode 100644 index 0000000000..bbebee6ee2 --- /dev/null +++ b/msmonitor/plugin/cmake/Findglog.cmake @@ -0,0 +1,42 @@ +set(PACKAGE_VERSION 0.6.0) + +set(PKG_NAME glog) +set(DOWNLOAD_PATH "$ENV{PROJECT_ROOT_PATH}/third_party") +set(GIT_TAG "v0.6.0") +set(DIR_NAME "${DOWNLOAD_PATH}/glog") + +if (NOT ${PKG_NAME}_FOUND) + +download_opensource_pkg(${PKG_NAME} + GIT_TAG ${GIT_TAG} + DOWNLOAD_PATH ${DOWNLOAD_PATH} +) + +execute_process( + WORKING_DIRECTORY ${DIR_NAME} + COMMAND cmake -S . -B build -G "Unix Makefiles" -DBUILD_SHARED_LIBS=OFF -DCMAKE_INSTALL_PREFIX=${DIR_NAME}/install -DCMAKE_INSTALL_LIBDIR=${DIR_NAME}/install/lib64 -DWITH_GFLAGS=OFF -DWITH_GTEST=OFF -DWITH_SYMBOLIZE=OFF -DCMAKE_POLICY_VERSION_MINIMUM=3.5 + RESULT_VARIABLE RESULT +) +if (NOT RESULT EQUAL 0) + message(FATAL_ERROR "Failed to build glog. ${RESULT}") +endif() + +execute_process( + WORKING_DIRECTORY ${DIR_NAME} + COMMAND cmake --build build --target install + RESULT_VARIABLE RESULT +) +if (NOT RESULT EQUAL 0) + message(FATAL_ERROR "Failed to build glog. ${RESULT}") +endif() + +file(GLOB GLOG_LIB "${DIR_NAME}/install/lib64/libglog.a") +if (NOT GLOG_LIB) + message(FATAL_ERROR "Failed to build glog.") +endif() + +set(${PKG_NAME}_LIBRARIES ${GLOG_LIB}) +include_directories(${DIR_NAME}/install/include) +set(${PKG_NAME}_FOUND TRUE) + +endif() diff --git a/msmonitor/plugin/cmake/Findnlohmannjson.cmake b/msmonitor/plugin/cmake/Findnlohmannjson.cmake new file mode 100644 index 0000000000..a657cc3acc --- /dev/null +++ b/msmonitor/plugin/cmake/Findnlohmannjson.cmake @@ -0,0 +1,18 @@ +set(PACKAGE_VERSION 3.12.0) + +set(PKG_NAME nlohmannjson) +set(DOWNLOAD_PATH "$ENV{PROJECT_ROOT_PATH}/third_party") +set(GIT_TAG "v3.12.0") +set(DIR_NAME "${DOWNLOAD_PATH}/nlohmann-json") + +if (NOT ${PKG_NAME}_FOUND) + +download_opensource_pkg(${PKG_NAME} + GIT_TAG ${GIT_TAG} + DOWNLOAD_PATH ${DOWNLOAD_PATH} +) + +include_directories(${DIR_NAME}/include) +set(${PKG_NAME}_FOUND TRUE) + +endif() diff --git a/msmonitor/plugin/cmake/config.ini b/msmonitor/plugin/cmake/config.ini new file mode 100644 index 0000000000..5303523816 --- /dev/null +++ b/msmonitor/plugin/cmake/config.ini @@ -0,0 +1,5 @@ +[glog] +url = https://gitee.com/mirrors/glog.git + +[nlohmannjson] +url = https://gitee.com/mirrors/nlohmann-json.git \ No newline at end of file diff --git a/msmonitor/plugin/cmake/download_opensource.sh b/msmonitor/plugin/cmake/download_opensource.sh new file mode 100644 index 0000000000..d04e59b445 --- /dev/null +++ b/msmonitor/plugin/cmake/download_opensource.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +if [ "$#" -lt 2 ]; then + echo "Usage: $0 [ ]" + exit 1 +fi + +pkg_name=$1 +path=$2 + +if [ "$#" -ge 3 ]; then + tag=$3 +fi + +url=$(awk -F " = " '/\['${pkg_name}'\]/{a=1}a==1&&$1~/url/{print $2;exit}' config.ini) +lib_path=$MSTT_LIB_PATH +if [ -n "$lib_path" ]; then + url=${lib_path}$(echo $url | awk -F '/' -v OFS='/' '{print $5,$8}') +fi +if [[ ! $url = https* ]]; then + echo "The URL of $pkg_name is illegal." + exit 1 +fi + +echo "Start to download ${url}..." + +if [ ! -d "$path" ]; then + echo "The specified path does not exist: $path" + exit 1 +fi +cd ${path} + +extension=$(echo "${url}" | awk -F'[./]' '{print $NF}') +if [[ "${extension}" == "gz" || "${extension}" == "zip" ]]; then + fullname="${path}/$(basename "${url}")" + if [[ -e ${fullname} ]]; then + echo "Source ${fullname} is exists, will not download again." + else + curl -L "${url}" -o ${fullname} -k + if [ $? -eq 0 ]; then + echo "Download successful: ${url}" + else + echo "Download failed: ${url}" + exit 1 + fi + fi + + if [[ "${extension}" == "gz" ]]; then + tar -zxvf ${fullname} -C ./ -n > /dev/null + elif [[ "${extension}" == "zip" ]]; then + unzip -n ${fullname} -d ./ > /dev/null + fi +elif [[ "${extension}" == "git" ]]; then + repository="$(basename ${url} .git)" + if [[ -e ${repository} ]]; then + echo "Source ${repository} is exists, will not clone again." + else + if [[ -z "${tag}" ]]; then + git clone ${url} + else + git clone ${url} -b "${tag}" + fi + if [ $? -eq 0 ]; then + echo "Download successful: ${url}" + else + echo "Download failed: ${url}" + exit 1 + fi + fi +else + echo "Unknow url ${url}" + exit 1 +fi diff --git a/msmonitor/plugin/cmake/utils.cmake b/msmonitor/plugin/cmake/utils.cmake new file mode 100644 index 0000000000..3d815d2685 --- /dev/null +++ b/msmonitor/plugin/cmake/utils.cmake @@ -0,0 +1,25 @@ + +function(download_opensource_pkg pkg_name) + message("start to download ${pkg_name}...") + set(options) + set(oneValueArgs GIT_TAG DOWNLOAD_PATH DIR_NAME BUILD_CMD) + set(multiValueArgs PATCHES) + cmake_parse_arguments(PKG "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + + if (NOT PKG_DOWNLOAD_PATH) + set(PKG_DOWNLOAD_PATH "${CMAKE_SOURCE_DIR}/../third_party") + endif() + file(MAKE_DIRECTORY ${PKG_DOWNLOAD_PATH}) + + execute_process( + WORKING_DIRECTORY $ENV{PROJECT_ROOT_PATH}/cmake + COMMAND bash download_opensource.sh ${pkg_name} ${PKG_DOWNLOAD_PATH} ${PKG_GIT_TAG} + RESULT_VARIABLE RESULT + ) + if (NOT RESULT EQUAL 0) + message(FATAL_ERROR "Failed to download ${pkg_name}(${RESULT}).") + endif() + if (PKG_BUILD_CMD) + execute_process(COMMAND bash -c "cd ${PKG_DOWNLOAD_PATH}/${DIR_NAME};${PKG_BUILD_CMD}") + endif() +endfunction() diff --git a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h index 03aa1d0810..4836b29301 100644 --- a/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h +++ b/msmonitor/plugin/ipc_monitor/PyDynamicMonitorProxy.h @@ -23,7 +23,9 @@ namespace dynolog_npu { namespace ipc_monitor { -class PyDynamicMonitorProxy { +class PyDynamicMonitorProxy : public Singleton { + friend class Singleton; + public: PyDynamicMonitorProxy() = default; bool InitDyno(int npuId) diff --git a/msmonitor/plugin/ipc_monitor/utils.cpp b/msmonitor/plugin/ipc_monitor/utils.cpp index b7160aa049..d3b0ddd892 100644 --- a/msmonitor/plugin/ipc_monitor/utils.cpp +++ b/msmonitor/plugin/ipc_monitor/utils.cpp @@ -31,9 +31,26 @@ #include #include #include +#include +#include +#include +#include +#include namespace dynolog_npu { namespace ipc_monitor { +namespace { +template +std::string IntToHexStr(T number) +{ + std::stringstream strStream; + strStream << std::hex << number; + return strStream.str(); +} +} // namespace + +static std::string gParallelGroupInfo; + std::unordered_map submoduleMap = { {SubModule::IPC, "IPC"}, }; @@ -66,7 +83,10 @@ std::string getCurrentTimestamp() auto micro_time = micros.count() % 1000; std::ostringstream oss; - oss << std::put_time(timeInfo, "%Y-%m-%d-%H:%M:%S"); + oss << std::put_time(timeInfo, "%Y%m%d%H%M%S"); + constexpr int kMilliTimeWidth = 3; + oss << std::setw(kMilliTimeWidth) << std::setfill('0') << milli_time; + return oss.str(); } @@ -90,7 +110,11 @@ std::string formatErrorCode(SubModule submodule, ErrCode errorCode) int32_t GetProcessId() { - return static_cast(getpid()); + static thread_local int32_t pid = 0; + if (pid == 0) { + pid = static_cast(getpid()); + } + return pid; } bool ParseProcStat(const std::string& line, std::string& command, int& parentPid) @@ -254,6 +278,15 @@ std::vector split(const std::string& str, char delimiter) return tokens; } +std::string join(const std::vector &strs, const std::string &delimiter) +{ + std::stringstream ss; + for (size_t i = 0, len = strs.size(); i < len; ++i) { + ss << strs[i] << (i == len - 1 ? "" : delimiter); + } + return ss.str(); +} + void *MsptiMalloc(size_t size, size_t alignment) { if (alignment > 0) { @@ -413,7 +446,92 @@ bool PathUtils::DirPathCheck(const std::string& path) return true; } -bool CreateMsmonitorLogPath(std::string& path) +int GetRankId() +{ + static int rankId = []() -> int { + pybind11::gil_scoped_acquire gil; + return pybind11::module::import("IPCMonitor.utils").attr("get_rank_id")().cast(); + }(); + return rankId; +} + +void SetParallelGroupInfo(std::string parallelGroupInfo) +{ + gParallelGroupInfo = std::move(parallelGroupInfo); +} + +std::string GetParallelGroupInfo() +{ + return gParallelGroupInfo; +} + +uint64_t CalcHashId(const std::string &data) +{ + static const uint32_t UINT32_BITS = 32; + uint32_t prime[2] = {29, 131}; + uint32_t hash[2] = {0}; + for (char d : data) { + hash[0] = hash[0] * prime[0] + static_cast(d); + hash[1] = hash[1] * prime[1] + static_cast(d); + } + return (static_cast(hash[0]) << UINT32_BITS) | hash[1]; +} + +std::string GetHostName() +{ + char hostName[PATH_MAX] = {0}; + if (gethostname(hostName, PATH_MAX) != 0) { + return ""; + } + return std::string(hostName); +} + +std::string GetHostUid() +{ + static const uint8_t SECOND_LEAST_BIT = 1 << 1; + struct ifaddrs *ifaddr = nullptr; + if (getifaddrs(&ifaddr) == -1) { + if (ifaddr != nullptr) { + freeifaddrs(ifaddr); + } + return 0; + } + std::vector universalMacAddrs; + std::vector localMacAddrs; + for (struct ifaddrs *ifa = ifaddr; ifa != nullptr; ifa = ifa->ifa_next) { + if (ifa->ifa_addr == nullptr || ifa->ifa_addr->sa_family != AF_PACKET) { + continue; + } + if ((ifa->ifa_flags & IFF_LOOPBACK) != 0) { + continue; + } + struct sockaddr_ll *lladdr = ReinterpretConvert(ifa->ifa_addr); + uint32_t len = static_cast(lladdr->sll_halen); + if (len > 0) { + std::string addr; + for (uint32_t i = 0; i < len; ++i) { + std::string hexAddr = IntToHexStr(static_cast(lladdr->sll_addr[i])); + addr += (hexAddr.length() > 1) ? hexAddr : ("0" + hexAddr); + } + if ((lladdr->sll_addr[0] & SECOND_LEAST_BIT) == 0) { + universalMacAddrs.emplace_back(addr); + } else { + localMacAddrs.emplace_back(addr); + } + } + } + if (ifaddr != nullptr) { + freeifaddrs(ifaddr); + } + if (universalMacAddrs.empty() && localMacAddrs.empty()) { + return 0; + } + auto &macAddrs = universalMacAddrs.empty() ? localMacAddrs : universalMacAddrs; + std::sort(macAddrs.begin(), macAddrs.end()); + return std::to_string(CalcHashId(join(macAddrs, "-"))); +} + +bool CreateMsmonitorLogPath(std::string &path) { const char* logPathEnvVal = getenv("MSMONITOR_LOG_PATH"); std::string logPath; diff --git a/msmonitor/plugin/ipc_monitor/utils.h b/msmonitor/plugin/ipc_monitor/utils.h index e92fe9c956..d41b1b720b 100644 --- a/msmonitor/plugin/ipc_monitor/utils.h +++ b/msmonitor/plugin/ipc_monitor/utils.h @@ -39,13 +39,14 @@ bool Str2Uint32(uint32_t& dest, const std::string& str); bool Str2Bool(bool& dest, const std::string& str); std::string& trim(std::string& str); std::vector split(const std::string& str, char delimiter); +std::string join(const std::vector& strs, const std::string& delimiter); constexpr size_t ALIGN_SIZE = 8; void *MsptiMalloc(size_t size, size_t alignment); void MsptiFree(uint8_t *ptr); const mode_t DATA_FILE_AUTHORITY = 0640; const mode_t DATA_DIR_AUTHORITY = 0750; -const int DEFAULT_FLUSH_INTERVAL = 60; +const uint32_t DEFAULT_FLUSH_INTERVAL = 60; enum class SubModule { IPC = 0 @@ -102,6 +103,12 @@ auto groupby(const Container& vec, KeyFunc keyFunc) return grouped; } +int GetRankId(); +void SetParallelGroupInfo(std::string parallelGroupInfo); +uint64_t CalcHashId(const std::string &data); +std::string GetParallelGroupInfo(); +std::string GetHostName(); +std::string GetHostUid(); bool CreateMsmonitorLogPath(std::string& path); struct PathUtils { diff --git a/msmonitor/plugin/setup.py b/msmonitor/plugin/setup.py index 87c344175a..2cd1881672 100644 --- a/msmonitor/plugin/setup.py +++ b/msmonitor/plugin/setup.py @@ -18,7 +18,7 @@ import sys import subprocess import pybind11 -from setuptools import setup, Extension +from setuptools import setup, Extension, find_namespace_packages from setuptools.command.build_ext import build_ext @@ -43,7 +43,6 @@ class CMakeBuild(build_ext): '-DPYTHON_EXECUTABLE=' + sys.executable, '-DCMAKE_PREFIX_PATH=' + pybind11.get_cmake_dir(), '-DCMAKE_INSTALL_PREFIX=' + ext_dir, - '-DDYNOLOG_PATH=' + os.path.join(os.path.dirname(BASE_DIR), "third_party", "dynolog"), '-DCMAKE_BUILD_TYPE=' + cfg ] @@ -54,15 +53,16 @@ class CMakeBuild(build_ext): if not os.path.exists(self.build_temp): os.makedirs(self.build_temp) subprocess.check_call(['cmake', ext.sourcedir] + cmake_args, cwd=self.build_temp, env=env) - subprocess.check_call(['cmake', '--build', '.', '--target', 'install', '-j', '8'] + build_args, + subprocess.check_call(['cmake', '--build', '.', '-j', '8'] + build_args, cwd=self.build_temp) -BASE_DIR = os.path.dirname(os.path.realpath(__file__)) setup( name="msmonitor_plugin", version="8.1.0", - description="msMonitor plugins", + description="msMonitor plugin", + packages=find_namespace_packages(include=["IPCMonitor*"]), + include_package_data=True, ext_modules=[CMakeExtension('IPCMonitor')], cmdclass=dict(build_ext=CMakeBuild), install_requires=["pybind11"], -- Gitee