diff --git a/accuracy_tools/msprobe/core/components/__init__.py b/accuracy_tools/msprobe/core/components/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e895ba49535fa5f91bb916f234904d4f0890b00f --- /dev/null +++ b/accuracy_tools/msprobe/core/components/__init__.py @@ -0,0 +1,26 @@ +# Copyright (c) 2025-2025 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprobe.core.components.dumper_acl import ACLCompatibleComp, ACLDumperComp +from msprobe.core.components.dumper_atb import AtbActuatorComp +from msprobe.core.components.dumper_caffe import CaffeActuatorComp, CaffeDumperComp +from msprobe.core.components.dumper_om import OmActuatorComp +from msprobe.core.components.dumper_onnx import OnnxActuatorComp, OnnxDumperComp +from msprobe.core.components.dumper_tf import ( + FrozenGraphActuatorCompCPU, + FrozenGraphActuatorCompNPU, + FrozenGraphDumperCompCPU, + FrozenGraphSetGECompNPU, +) +from msprobe.core.components.dumper_writer import DumpWriterComp diff --git a/accuracy_tools/msprobe/core/components/dumper_acl.py b/accuracy_tools/msprobe/core/components/dumper_acl.py new file mode 100644 index 0000000000000000000000000000000000000000..0a9f60d01afb02652668cb1adc3f1607993ec0f4 --- /dev/null +++ b/accuracy_tools/msprobe/core/components/dumper_acl.py @@ -0,0 +1,150 @@ +# Copyright (c) 2025-2025 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprobe.base import Component, ConsumerComp, ProducerComp +from msprobe.common.dirs import DirPool +from msprobe.core.dump import acl_device_manager +from msprobe.utils.constants import ACLConst, CompConst, MsgConst, PathConst +from msprobe.utils.exceptions import MsprobeException +from msprobe.utils.io import save_json +from msprobe.utils.log import logger +from msprobe.utils.path import get_abs_path, get_name_and_ext, join_path + + +class ACLDumpDataProcessor: + def __init__(self, chunk): + self.dump_chunk = chunk + self.buffer_bytes = bytes() + self.total_len = 0 + self.completed = False + + @property + def is_completed(self): + return self.completed + + def get_data(self): + if not self.buffer_bytes or not self.completed: + return None + return self.buffer_bytes + + def process_data(self): + if self.is_completed: + logger.error(f"DataProcessor receive data when completed. Some errors may occur.") + return + self._tackle_last_chunk() + self._tackle_buf_len() + + def _tackle_last_chunk(self): + if self.dump_chunk.get(ACLConst.IS_LAST_CHUNK): + self.completed = True + + def _tackle_buf_len(self): + buf_len = self.dump_chunk.get(ACLConst.BUF_LEN) + if buf_len == 0 or buf_len + self.total_len > PathConst.SIZE_4G: + raise MsprobeException( + MsgConst.RISK_ALERT, + f"Buffer overflow (cached size {self.total_len}), receiving size: {buf_len}.", + ) + self.total_len += buf_len + self.buffer_bytes += self.dump_chunk.get(ACLConst.DATA_BUF) + + +@Component.register(CompConst.ACL_DUMPER_COMP) +class ACLDumperComp(ProducerComp): + def __init__(self, priority, data_mode, rank, **kwargs): + super().__init__(priority) + self.data_processor_map = {} + self.data_map = {} + self.data_mode = data_mode + self.model_path = kwargs.get("model_path") + self.acl_resource_manager = acl_device_manager.get_acl_resource_manager(rank) + + @staticmethod + def _get_node_name(chunk): + dump_file_path = get_abs_path(chunk.get(ACLConst.FILE_NAME)) + file_name = dump_file_path.split("/")[-1] + file_name = file_name.split(".") + if len(file_name) >= 2: + type_and_name = "-".join(file_name[:2]) + return type_and_name + else: + raise MsprobeException(MsgConst.INVALID_ARGU, "The filename returned by ACL has no dot.") + + def activate(self, *args, **kwargs): + self.acl_resource_manager.initialize() + self.acl_resource_manager.set_dump(self._get_dump_json(), self._dump_call_back) + + def deactivate(self, *args, **kwargs): + self.acl_resource_manager.destroy_resource() + + def load_data(self): + if not self.data_map: + return None + data_map = self.data_map + self.data_map = {} + return data_map + + def _get_dump_json(self): + if self.model_path: + name, _ = get_name_and_ext(self.model_path) + dump_list = [{"model_name": name}] + else: + dump_list = [] + acl_dump_json_dict = { + "dump": { + "dump_path": DirPool.get_msprobe_dir(), + "dump_mode": self.data_mode[0], + "dump_list": dump_list, + } + } + acl_dump_json_path = join_path(DirPool.get_msprobe_dir(), "acl_dump.json") + save_json(acl_dump_json_dict, acl_dump_json_path, indent=4) + return acl_dump_json_path + + def _dump_call_back(self, chunk, length): + # The _dump_call_back function must take two arguments; the second one is required and cannot be removed. + type_and_name = self._get_node_name(chunk) + self.data_map[type_and_name] = self._get_data(type_and_name, chunk) + + def _get_data(self, type_and_name, chunk): + if type_and_name not in self.data_processor_map: + self.data_processor_map[type_and_name] = ACLDumpDataProcessor(chunk) + processor = self.data_processor_map.get(type_and_name) + processor.process_data() + if not processor.is_completed: + return None + bytes_data = processor.get_data() + if not bytes_data: + return None + """ + Store the data in a Map to prevent processing errors caused by the model having + already been finalized when directly publishing the last chunk of data. + """ + self.data_processor_map.pop(type_and_name) + return bytes_data + + +@Component.register(CompConst.ACL_COMPATIBLE_COMP) +class ACLCompatibleComp(ConsumerComp, ProducerComp): + def __init__(self, priority, **kwargs): + super().__init__(priority) + + def load_data(self): + pass + + def consume(self, packages): + data_map = packages[0][1] + for node_name, data in data_map.items(): + sealed_data = [node_name, "all", "args_name", "x", data] + self.publish((sealed_data, None)) diff --git a/accuracy_tools/msprobe/core/components/dumper_atb.py b/accuracy_tools/msprobe/core/components/dumper_atb.py new file mode 100644 index 0000000000000000000000000000000000000000..2da3e9fa10b1c39667401a611c17bcca7b45e301 --- /dev/null +++ b/accuracy_tools/msprobe/core/components/dumper_atb.py @@ -0,0 +1,120 @@ +# Copyright (c) 2025-2025 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from msprobe.base import SIZE_1M, BaseComponent, Component +from msprobe.common.dirs import DirPool +from msprobe.utils.constants import CfgConst, CompConst, DumpConst, MsgConst, PathConst +from msprobe.utils.env import evars +from msprobe.utils.exceptions import MsprobeException +from msprobe.utils.log import logger +from msprobe.utils.path import is_enough_disk_space +from msprobe.utils.toolkits import run_subprocess, seed_all + + +@Component.register(CompConst.ATB_ACTUATOR_COMP) +class AtbActuatorComp(BaseComponent): + def __init__(self, priority, dump_path, **kwargs): + super().__init__(priority) + self.dump_path = dump_path + self.task = kwargs.get("task", CfgConst.TASK_STAT) + self.dump_level = kwargs.get("dump_level", [CfgConst.LEVEL_KERNEL]) + self.step = kwargs.get("step", []) + self.rank = kwargs.get("rank", []) + self.seed = kwargs.get("seed") + self.log_level = kwargs.get("log_level") + self.summary_mode = kwargs.get("summary_mode", CfgConst.TASK_STAT) + self.buffer_size = kwargs.get("buffer_size", SIZE_1M) + self.data_mode = kwargs.get("data_mode", ["all"]) + self.dump_extra = kwargs.get("dump_extra", []) + self.op_id = kwargs.get("op_id", []) + self.op_name = kwargs.get("op_name", {}) + self.exec = kwargs.get("exec", []) + + def activate(self, *args, **kwargs): + self.set_env_vars() + self.execute_dump() + + def set_env_vars(self): + self._set_dump_path() + self._set_task() + self._set_dump_level() + self._set_step() + self._set_rank() + self._set_seed() + self._set_log_level() + self._set_summary_mode() + self._set_buffer_size() + self._set_data_mode() + self._set_dump_extra() + self._set_op_id() + self._set_op_name() + logger.info("The ATB dump parameters have been set.") + + def execute_dump(self): + if not is_enough_disk_space(DirPool.get_msprobe_dir(), PathConst.SIZE_2G): + raise MsprobeException( + MsgConst.RISK_ALERT, "Please reserve at least 2GB of disk space for saving dump data." + ) + run_subprocess(self.exec) + + def _set_dump_path(self): + evars.set(DumpConst.ENVVAR_LINK_DUMP_PATH, self.dump_path) + + def _set_task(self): + evars.set(DumpConst.ENVVAR_LINK_DUMP_TASK, self.task) + + def _set_dump_level(self): + evars.set(DumpConst.ENVVAR_LINK_DUMP_LEVEL, ",".join(self.dump_level)) + + def _set_step(self): + evars.set(DumpConst.ENVVAR_LINK_STEP, ",".join([str(i) for i in self.step])) + + def _set_rank(self): + evars.set(DumpConst.ENVVAR_LINK_RANK, ",".join([str(i) for i in self.rank])) + + def _set_seed(self): + if self.seed: + seed_all(self.seed, mode=True, rm_dropout=False) + + def _set_log_level(self): + evars.set(DumpConst.ENVVAR_LINK_LOG_LEVEL, logger.get_level_id(self.log_level)) + + def _set_summary_mode(self): + evars.set(DumpConst.ENVVAR_LINK_SUMMARY_MODE, self.summary_mode) + + def _set_buffer_size(self): + evars.set(DumpConst.ENVVAR_LINK_BUFFER_SIZE, self.buffer_size) + + def _set_data_mode(self): + evars.set(DumpConst.ENVVAR_LINK_DATA_MODE, ",".join(self.data_mode)) + + def _set_dump_extra(self): + options = { + "tiling": DumpConst.ENVVAR_LINK_SAVE_TILING, + "cpu_profiling": DumpConst.ENVVAR_LINK_SAVE_CPU_PROFILING, + "kernel_info": DumpConst.ENVVAR_LINK_SAVE_KERNEL_INFO, + "op_info": DumpConst.ENVVAR_LINK_SAVE_OP_INFO, + "param": DumpConst.ENVVAR_LINK_SAVE_PARAM, + } + for key, env_var in options.items(): + evars.set(env_var, "1" if key in self.dump_extra else "0") + + def _set_op_id(self): + evars.set(DumpConst.ENVVAR_LINK_SAVE_TENSOR_IDS, ",".join([str(i) for i in self.op_id])) + + def _set_op_name(self): + op_name = [] + for ll in self.dump_level: + op_name.extend(self.op_name.get(ll, [])) + evars.set(DumpConst.ENVVAR_LINK_SAVE_TENSOR_RUNNER, ",".join([str(i).lower() for i in op_name])) diff --git a/accuracy_tools/msprobe/core/components/dumper_writer.py b/accuracy_tools/msprobe/core/components/dumper_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..1b696765e5f4952c403df5463c7fd5c7e43dad73 --- /dev/null +++ b/accuracy_tools/msprobe/core/components/dumper_writer.py @@ -0,0 +1,149 @@ +# Copyright (c) 2025-2025 Huawei Technologies Co., Ltd. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from inspect import stack + +from msprobe.base import Component, ConsumerComp +from msprobe.common.dirs import DirPool +from msprobe.common.stat import DataStat +from msprobe.core.base import RankDirFile, SaveTensor +from msprobe.utils.constants import CfgConst, CompConst, DumpConst +from msprobe.utils.io import save_json +from msprobe.utils.log import logger +from msprobe.utils.path import join_path + +_STACK_FILTER_PATH = ["msprobe/core", "msprobe/base", "msprobe/common", "msprobe/utils", "torch/nn/modules/module.py"] +_WITHOUT_CALL_STACK = "The call stack retrieval failed." + + +class DumpJson(RankDirFile): + def __init__(self, buffer_size, task, level, framework, summary_mode): + super().__init__(buffer_size) + self.cache_file = {} + self._init(task=task, level=level, framework=framework) + self.summary_mode = summary_mode + + def update_stat(self, node_name=None, in_out=None, args_name=None, npy_data=None): + if node_name not in self.cache_file[DumpConst.DATA]: + self.cache_file[DumpConst.DATA][node_name] = {} + self._update_dump_json( + self.cache_file[DumpConst.DATA][node_name], + in_out, + {**{"data_name": args_name}, **DataStat.collect_stats_for_numpy(npy_data, self.summary_mode)}, + ) + + def _save(self): + dump_json_path = join_path(self.rank_dir, DumpConst.DUMP_JSON) + save_json(self.cache_file, dump_json_path, indent=4) + + def _init(self, **kwargs): + self.cache_file.update( + { + CfgConst.TASK: kwargs.get("task", None), + CfgConst.LEVEL: kwargs.get("level", None), + CfgConst.FRAMEWORK: kwargs.get("framework", None), + DumpConst.DUMP_DATA_DIR: kwargs.get(DumpConst.DUMP_DATA_DIR, None), + DumpConst.DATA: {}, + } + ) + + def _update_dump_json(self, dump_dic, in_out, kwargs: dict): + if in_out not in dump_dic: + dump_dic[in_out] = [] + dump_dic.get(in_out).append(kwargs) + self.cover(dump_dic) + + +class StackJson(RankDirFile): + def __init__(self, buffer_size): + super().__init__(buffer_size) + self.cache_file = {} + + @staticmethod + def _call_stack(name: str): + try: + _stack = stack()[:5] + except Exception as e: + logger.warning(f"The call stack of {name} failed to retrieve, {e}.") + _stack = None + stack_str = [] + if _stack: + for _, path, line, func, code, _ in _stack: + if not code: + continue + if any(filter_path in path for filter_path in _STACK_FILTER_PATH): + continue + stack_line = f"File {path}, line {str(line)}, in {func}, \n {code[0].strip()}" + stack_str.append(stack_line) + else: + stack_str.append(_WITHOUT_CALL_STACK) + stack_info = {name: stack_str} + return stack_info + + def update_stack(self, name): + self.cache_file.update(self._call_stack(name)) + self.cover(self.cache_file) + + def _save(self): + stack_json_path = join_path(self.rank_dir, DumpConst.STACK_JSON) + save_json(self.cache_file, stack_json_path, indent=4) + + +@Component.register(CompConst.DUMP_WRITER_COMP) +class DumpWriterComp(ConsumerComp): + def __init__(self, priority, task, level, framework, summary_mode, strategy, buffer_size: int, dir_pool: DirPool): + super().__init__(priority) + self.net_output_nodes = None + self.task = task + self.dump_json = DumpJson(buffer_size, task, level, framework, summary_mode) + self.stack_json = StackJson(buffer_size) + self.strategy = strategy + self.save_strategy = SaveTensor.get(strategy)() + self.dir_pool = dir_pool + self.add_path() + + def add_path(self): + self.dump_json.add_rank_dir(self.dir_pool.rank_dir) + self.stack_json.add_rank_dir(self.dir_pool.rank_dir) + if self.task == CfgConst.TASK_TENSOR: + self.save_strategy.add_tensor_dir(self.dir_pool.tensor_dir) + + def consume(self, packages): + if self.task == CfgConst.TASK_TENSOR and not self.dump_json.cache_file.get(DumpConst.DUMP_DATA_DIR): + self.dump_json.cache_file[DumpConst.DUMP_DATA_DIR] = self.dir_pool.get_tensor_dir() + received_data = packages[0][1] + sealed_data, self.net_output_nodes = received_data + self.write(sealed_data) + return + + def write(self, sealed_data): + if self.strategy == DumpConst.NPY_FORMAT: + # Input sealed_data is a list, specifically: node_name, in_or_out, args_name, i, npy_data + self.dump_json.update_stat(sealed_data[0], sealed_data[1], sealed_data[2], sealed_data[4]) + if self.task == CfgConst.TASK_TENSOR: + if DumpConst.INPUT in sealed_data[1]: + self.save_strategy.save_tensor_data(sealed_data[0], sealed_data[2], sealed_data[4]) + elif DumpConst.OUTPUT in sealed_data[1]: + self.save_strategy.save_tensor_data(sealed_data[0], sealed_data[2], sealed_data[4]) + else: + self.save_strategy.save_tensor_data(sealed_data[0], sealed_data[2], sealed_data[4]) + + def finalize(self): + if self.net_output_nodes: + save_json(self.net_output_nodes, join_path(self.dir_pool.get_model_dir(), DumpConst.NET_OUTPUT_NODES_JSON)) + self._flush_remaining_cache() + + def _flush_remaining_cache(self): + self.dump_json.clear_cache() + self.stack_json.clear_cache()