diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index ed69b098538c2eb94731ee1ff5d09986b233c18e..50eeedd731bc5b9b30fe418b226a9703aec611fb 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -14,8 +14,10 @@ # limitations under the License. import argparse +import os from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor +from cluster_data_preprocess.mindspore_data_preprocessor import MindsporeDataPreprocessor from communication_group.communication_group_generator import CommunicationGroupGenerator from common_func.constant import Constant from common_func.file_manager import FileManager @@ -24,6 +26,9 @@ from analysis.analysis_facade import AnalysisFacade class Interface: + ASCEND_PT = "ascend_pt" + ASCEND_MS = "ascend_ms" + def __init__(self, params: dict): self.collection_path = PathManager.get_realpath(params.get(Constant.COLLECTION_PATH)) self.data_map = {} @@ -32,11 +37,27 @@ class Interface: self.communication_ops = [] self.matrix_ops = [] + def allocate_prof_data(self): + ascend_pt_dirs = [] + ascend_ms_dirs = [] + for root, dirs, files in os.walk(self.collection_path): + for dir_name in dirs: + if dir_name.endswith(self.ASCEND_PT): + ascend_pt_dirs.append(os.path.join(root, dir_name)) + if dir_name.endswith(self.ASCEND_MS): + ascend_ms_dirs.append(os.path.join(root, dir_name)) + pt_data_map = PytorchDataPreprocessor(ascend_pt_dirs).get_data_map() + ms_data_map = MindsporeDataPreprocessor(ascend_ms_dirs).get_data_map() + if pt_data_map and ms_data_map: + print("[ERROR] Can not analyze pytorch and mindspore meantime.") + return[] + return pt_data_map if pt_data_map else ms_data_map + def run(self): PathManager.check_input_directory_path(self.collection_path) PathManager.check_path_owner_consistent(self.collection_path) FileManager.create_output_dir(self.collection_path) - data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() + data_map = self.allocate_prof_data() if not data_map: print("[WARNING] Can not get rank info or profiling data.") return diff --git a/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py new file mode 100644 index 0000000000000000000000000000000000000000..85debdd31bb07cf96b91c12eb731cc00b00fcaa3 --- /dev/null +++ b/profiler/cluster_analyse/cluster_data_preprocess/mindspore_data_preprocessor.py @@ -0,0 +1,57 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import defaultdict +import os +from common_func.file_manager import FileManager +from common_func.path_manager import PathManager + + +class MindsporeDataPreprocessor: + PROFILER_INFO_HEAD = 'profiler_info_' + PROFILER_INFO_EXTENSION = '.json' + + def __init__(self, path_list: str): + self.path_list = path_list + + def get_data_map(self) -> dict: + rank_id_map = defaultdict(list) + for dir_name in self.path_list: + rank_id = self.get_rank_id(dir_name) + if rank_id < 0: + print('[Error]fail to get rankid or rankid invalid.') + continue + rank_id_map[rank_id].append(dir_name) + + ret_dict = dict() + try: + for (rank_id, dir_list) in rank_id_map.items(): + dir_list.sort(key=lambda x: x.split('_')[-3]) + ret_dict[rank_id] = dir_list[0] + except Exception as e: + raise RuntimeError("Found invalid directory name!") from e + return ret_dict + + def get_rank_id(self, dir_name: str) -> int: + files = os.listdir(dir_name) + for file_name in files: + if file_name.startswith(self.PROFILER_INFO_HEAD) and file_name.endswith(self.PROFILER_INFO_EXTENSION): + rank_id_str = file_name[len(self.PROFILER_INFO_HEAD): -1 * len(self.PROFILER_INFO_EXTENSION)] + try: + rank_id = int(rank_id_str) + except ValueError: + rank_id = -1 + return rank_id + return -1 diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 18fd6a23b74830fc3b2bcc69b6c2ad61413f0d0f..f1e4c062a7c05656980f0767a3180154e91942ae 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -23,17 +23,12 @@ class PytorchDataPreprocessor: PROFILER_INFO_HEAD = 'profiler_info_' PROFILER_INFO_EXTENSION = '.json' - def __init__(self, path: str): - self.path = PathManager.get_realpath(path) + def __init__(self, path_list: str): + self.path_list = path_list def get_data_map(self) -> dict: - ascend_pt_dirs = [] - for root, dirs, files in os.walk(self.path): - for dir_name in dirs: - if dir_name.endswith("ascend_pt"): - ascend_pt_dirs.append(os.path.join(root, dir_name)) rank_id_map = defaultdict(list) - for dir_name in ascend_pt_dirs: + for dir_name in self.path_list: rank_id = self.get_rank_id(dir_name) if rank_id < 0: print('[Error]fail to get rankid or rankid invalid.') @@ -44,7 +39,7 @@ class PytorchDataPreprocessor: try: for (rank_id, dir_list) in rank_id_map.items(): dir_list.sort(key=lambda x: x.split('_')[-3]) - ret_dict[rank_id] = os.path.join(self.path, dir_list[0]) + ret_dict[rank_id] = dir_list[0] except Exception as e: raise RuntimeError("Found invalid directory name!") from e return ret_dict