From 9778c3659e744c62a037e4a93eb300985c8b04fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=A3=98=E5=87=AF=E8=BE=BE?= Date: Mon, 14 Aug 2023 17:05:39 +0800 Subject: [PATCH 1/2] Ascend PyTorch Dir PreCheck. --- .../pytorch_data_preprocessor.py | 41 +++++++++++++++---- 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 8dc47bd822..7e0487a2c3 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -13,16 +13,41 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import defaultdict import os - class PytorchDataPreprocessor: - def __init__(self, collection_path: str): - self.collection_path = collection_path - self.data_map = {} + PROFILER_INFO_HEAD = 'profiler_info_' + PROFILER_INFO_EXTENSION = '.json' + + def __init__(self, path: str): + self.path = os.path.realpath(path) + + def get_data_map(self) -> dict: + if not os.path.exists(self.path) or not os.access(self.path, os.R_OK): + print('[Error]path:{} not exist or not accessable.'.format(self.path)) + return dict() + + collector_dirs = [dir_name for dir_name in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, dir_name))] + ascend_pt_dirs = [dir_name for dir_name in collector_dirs if dir_name.endswith("ascend_pt")] + + rank_id_map = defaultdict(list) + for dir_name in ascend_pt_dirs: + rank_id = self.get_rank_id(dir_name) + if rank_id < 0: + print('[Error]fail to get rankid or rankid invalid.') + + rank_id_map[rank_id].append(dir_name) - def get_data_map(self): - self.input_data() + ret_dict = dict() + for (rank_id, dir_list) in rank_id_map.items(): + dir_list.sort(key=lambda x: x.split('_')[-3]) + ret_dict[rank_id] = os.path.join(self.path, dir_list[0]) + return ret_dict - def input_data(self): - pass + def get_rank_id(self, dir_name: str) -> int: + files = os.listdir(os.path.join(self.path, dir_name)) + for file_name in files: + if file_name.startswith(self.PROFILER_INFO_HEAD) and file_name.endswith(self.PROFILER_INFO_EXTENSION): + return int(file_name[len(self.PROFILER_INFO_HEAD): -1 * len(self.PROFILER_INFO_EXTENSION)]) + return -1 -- Gitee From 460d01b6fa5e5db4fcb6afd401ee4789e87e5693 Mon Sep 17 00:00:00 2001 From: sunboquan Date: Tue, 15 Aug 2023 22:56:10 +0800 Subject: [PATCH 2/2] profiling analysis --- .../analysis/analysis_facade.py | 19 +-- .../analysis/communication_analysis.py | 130 ++++++++++++++++++ .../analysis/step_trace_time_analysis.py | 90 ++++++++++++ profiler/cluster_analyse/cluster_analysis.py | 21 ++- .../pytorch_data_preprocessor.py | 8 +- .../cluster_analyse/common_func/constant.py | 42 +++++- .../common_func/file_manager.py | 55 ++++---- .../communication_group_generator.py | 95 +++++++++++-- .../cluster_analyse/prof_bean/__init__.py | 14 ++ .../prof_bean/step_trace_time_bean.py | 39 ++++++ 10 files changed, 460 insertions(+), 53 deletions(-) create mode 100644 profiler/cluster_analyse/analysis/communication_analysis.py create mode 100644 profiler/cluster_analyse/analysis/step_trace_time_analysis.py create mode 100644 profiler/cluster_analyse/prof_bean/__init__.py create mode 100644 profiler/cluster_analyse/prof_bean/step_trace_time_bean.py diff --git a/profiler/cluster_analyse/analysis/analysis_facade.py b/profiler/cluster_analyse/analysis/analysis_facade.py index e6966d4402..ff352c94e2 100644 --- a/profiler/cluster_analyse/analysis/analysis_facade.py +++ b/profiler/cluster_analyse/analysis/analysis_facade.py @@ -13,21 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor -from communication_group.communication_group_generator import CommunicationGroupGenerator +from analysis.communication_analysis import CommunicationAnalysis +from analysis.step_trace_time_analysis import StepTraceTimeAnalysis class AnalysisFacade: - analysis_module = {} + analysis_module = {CommunicationAnalysis, StepTraceTimeAnalysis} - def __init__(self, collection_path: str, data_map: dict, communication_group: dict): - self.collection_path = collection_path - self.data_map = data_map - self.communication_group = communication_group + def __init__(self, param: dict): + self.param = param def cluster_analyze(self): - data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() - if not data_map: - print("Can not get rank info or profiling data.") - communication_group = CommunicationGroupGenerator(self.collection_path, data_map).generate() - + for analysis in self.analysis_module: + analysis(self.param).run() diff --git a/profiler/cluster_analyse/analysis/communication_analysis.py b/profiler/cluster_analyse/analysis/communication_analysis.py new file mode 100644 index 0000000000..19f2bf7d57 --- /dev/null +++ b/profiler/cluster_analyse/analysis/communication_analysis.py @@ -0,0 +1,130 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from common_func.constant import Constant +from collections import defaultdict +from common_func.file_manager import FileManager + + +class CommunicationAnalysis: + CLUSTER_COMMUNICATION_JSON = "cluster_communication.json" + + def __init__(self, param: dict): + self.collection_path = param.get(Constant.COLLECTION_PATH) + self.data_map = param.get(Constant.DATA_MAP) + self.collective_group_dict = param.get(Constant.COLLECTIVE_GROUP) + self.communication_ops = param.get(Constant.COMMUNICATION_OPS) + self.comm_ops_struct = {} + + @staticmethod + def combine_size_distribution(op_dict: dict, total_dict: dict): + for size, size_info in op_dict.items(): + total_dict[size][0] += size_info[0] + total_dict[size][1] += size_info[1] + + def run(self): + self.split_op_by_group() + self.combine_ops_total_info() + self.dump_data() + + def dump_data(self): + if not self.comm_ops_struct: + print("There is no final comm ops data generated") + return + output_comm_data = {} + for key in self.comm_ops_struct: + output_comm_data[str(key)] = self.comm_ops_struct.get(key) + FileManager.create_json_file(self.collection_path, output_comm_data, self.CLUSTER_COMMUNICATION_JSON) + + def split_op_by_group(self): + for single_op in self.communication_ops: + if single_op.get(Constant.COMM_OP_TYPE) == Constant.P2P: + rank_tup = Constant.P2P + else: + rank_tup = tuple(self.collective_group_dict.get(single_op.get(Constant.GROUP_NAME), [])) + rank_id = single_op.get(Constant.RANK_ID, 'N/A') + step_id = single_op.get(Constant.STEP_ID, 'N/A') + op_name = single_op.get(Constant.COMM_OP_NAME, 'N/A') + op_info = single_op.get(Constant.COMM_OP_INFO) + self.comm_ops_struct.setdefault(rank_tup, {}).setdefault(step_id, {}).\ + setdefault(op_name, {}).setdefault(rank_id, op_info) + + def combine_ops_total_info(self): + for rank_tup, group_dict in self.comm_ops_struct.items(): + for step_id, communication_ops in group_dict.items(): + self.compute_total_info(communication_ops) + + def compute_total_info(self, comm_ops: dict): + if not comm_ops: + return + total_rank_dict = {} + for communication_op, rank_dict in comm_ops.items(): + for rank_id, communication_op_info in rank_dict.items(): + total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_TIME_INFO, defaultdict(float)) + total_rank_dict.setdefault(rank_id, {}).setdefault(Constant.COMMUNICATION_BANDWIDTH_INFO, {}) + for com_info, com_info_dict in communication_op_info.items(): + if com_info == Constant.COMMUNICATION_TIME_INFO: + self.combine_time_info(com_info_dict, total_rank_dict[rank_id][com_info]) + if com_info == Constant.COMMUNICATION_BANDWIDTH_INFO: + self.combine_bandwidth_info(com_info_dict, total_rank_dict[rank_id][com_info]) + for rank_id in total_rank_dict: + self.compute_time_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_TIME_INFO]) + self.compute_bandwidth_ratio(total_rank_dict[rank_id][Constant.COMMUNICATION_BANDWIDTH_INFO]) + comm_ops[Constant.TOTAL_OP_INFO] = total_rank_dict + + def combine_time_info(self, com_info_dict: dict, total_time_info_dict: dict): + ratio_list = [Constant.WAIT_TIME_RATIO, Constant.SYNCHRONIZATION_TIME_RATIO] + for time_info in com_info_dict: + if time_info not in ratio_list and time_info != Constant.START_TIMESTAMP: + total_time_info_dict[time_info] += com_info_dict.get(time_info) + + def combine_bandwidth_info(self, com_info_dict: dict, total_bandwidth_info_dict: dict): + add_list = [Constant.TRANSIT_TIME_MS, Constant.TRANSIT_SIZE_MB] + dict_list = [Constant.SIZE_DISTRIBUTION] + for transport_type, part_transport_dict in com_info_dict.items(): + if transport_type not in total_bandwidth_info_dict: + total_bandwidth_info_dict[transport_type] = { + Constant.TRANSIT_TIME_MS: 0, + Constant.TRANSIT_SIZE_MB: 0, + Constant.SIZE_DISTRIBUTION: defaultdict(lambda: [0, 0]) + } + for bandwidth_msg, value in part_transport_dict.items(): + if bandwidth_msg in add_list: + total_bandwidth_info_dict[transport_type][bandwidth_msg] += value + if bandwidth_msg in dict_list: + self.combine_size_distribution(value, total_bandwidth_info_dict[transport_type][bandwidth_msg]) + + def compute_time_ratio(self, total_time_info_dict: dict): + if total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: + total_time_info_dict[Constant.WAIT_TIME_RATIO] = 0 + else: + total_time_info_dict[Constant.WAIT_TIME_RATIO] = \ + round(total_time_info_dict[Constant.WAIT_TIME_MS] / + (total_time_info_dict[Constant.WAIT_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + if total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + total_time_info_dict[Constant.TRANSIT_TIME_MS] == 0: + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = 0 + else: + total_time_info_dict[Constant.SYNCHRONIZATION_TIME_RATIO] = \ + round(total_time_info_dict[Constant.WAIT_TIME_MS] / + (total_time_info_dict[Constant.SYNCHRONIZATION_TIME_MS] + + total_time_info_dict[Constant.TRANSIT_TIME_MS]), 4) + + def compute_bandwidth_ratio(self, total_bandwidth_info_dict: dict): + for transport_type, bandwidth_dict in total_bandwidth_info_dict.items(): + if bandwidth_dict[Constant.TRANSIT_TIME_MS] == 0: + bandwidth_dict[Constant.BANDWIDTH_GB_S] = 0 + else: + bandwidth_dict[Constant.BANDWIDTH_GB_S] = \ + round(bandwidth_dict[Constant.TRANSIT_SIZE_MB] / bandwidth_dict[Constant.TRANSIT_TIME_MS], 4) diff --git a/profiler/cluster_analyse/analysis/step_trace_time_analysis.py b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py new file mode 100644 index 0000000000..5f04978116 --- /dev/null +++ b/profiler/cluster_analyse/analysis/step_trace_time_analysis.py @@ -0,0 +1,90 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from common_func.constant import Constant +from collections import defaultdict +from common_func.file_manager import FileManager +from prof_bean.step_trace_time_bean import StepTraceTimeBean + + +class StepTraceTimeAnalysis: + CLUSTER_TRACE_TIME_CSV = "cluster_step_trace_time.csv" + + def __init__(self, param: dict): + self.collection_path = param.get(Constant.COLLECTION_PATH) + self.data_map = param.get(Constant.DATA_MAP) + self.communication_group = param.get(Constant.COMMUNICATION_GROUP) + self.step_time_dict = {} + self.step_data_list = [] + + @staticmethod + def get_max_data_row(data_group_list: list): + if not data_group_list: + return [] + ret = [] + for idx in range(len(data_group_list[0])): + max_val = 0 + for idy in range(len(data_group_list)): + max_val = max(max_val, data_group_list[idy][idx]) + ret.append(max_val) + return ret + + def run(self): + self.load_step_trace_time_data() + self.analyze_step_time() + self.dump_data() + + def dump_data(self): + if not self.step_data_list: + print("Can't get step time info!") + headers = self.get_headers() + FileManager.create_csv_file(self.collection_path, self.step_data_list, self.CLUSTER_TRACE_TIME_CSV, headers) + + def load_step_trace_time_data(self): + for rank_id, profiling_dir_path in self.data_map.items(): + step_time_file = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.STEP_TIME_CSV) + if step_time_file: + self.step_time_dict[rank_id] = FileManager.read_csv_file(step_time_file, StepTraceTimeBean) + if not self.step_time_dict.get(rank_id): + print(f"rank {rank_id} does not have a valid step_trace_time.json.") + + def analyze_step_time(self): + for rank_id, data_bean_list in self.step_time_dict.items(): + for data_bean in data_bean_list: + self.step_data_list.append([data_bean.step, Constant.RANK, rank_id] + data_bean.row) + stage_list = self.communication_group.get(Constant.P2P) + if not stage_list: + return + step_group_dict = {} + for data_list in self.step_data_list: + stage_group = 'None' + for stage in stage_list: + if data_list[2] in stage: + stage_group = tuple(stage) + break + key = (data_list[0], stage_group) + step_group_dict.setdefault(key, []).append(data_list[3:]) + + for key, data_group_list in step_group_dict.items(): + self.step_data_list.append([key[0], Constant.STAGE, key[1]] + self.get_max_data_row(data_group_list)) + + def get_headers(self): + if self.step_time_dict: + for rank in self.step_time_dict: + if self.step_time_dict.get(rank): + return self.step_time_dict[rank][0].all_headers + return [] diff --git a/profiler/cluster_analyse/cluster_analysis.py b/profiler/cluster_analyse/cluster_analysis.py index d1daea002b..861bdeac65 100644 --- a/profiler/cluster_analyse/cluster_analysis.py +++ b/profiler/cluster_analyse/cluster_analysis.py @@ -16,6 +16,9 @@ import argparse from cluster_data_preprocess.pytorch_data_preprocessor import PytorchDataPreprocessor from communication_group.communication_group_generator import CommunicationGroupGenerator +from common_func.constant import Constant +from common_func.file_manager import FileManager +from analysis.analysis_facade import AnalysisFacade class Interface: @@ -25,8 +28,24 @@ class Interface: self.communication_group = {} def run(self): + FileManager.create_output_dir(self.collection_path) data_map = PytorchDataPreprocessor(self.collection_path).get_data_map() - communication_group = CommunicationGroupGenerator(self.collection_path, data_map).generate() + if not data_map: + print("Can not get rank info or profiling data.") + return + communication_group, collective_group_dict, communication_ops = \ + CommunicationGroupGenerator(self.collection_path, data_map).generate() + if not collective_group_dict: + print("Can not get communication info from ranks") + return + params = { + Constant.COLLECTION_PATH: self.collection_path, + Constant.DATA_MAP: data_map, + Constant.COLLECTIVE_GROUP: collective_group_dict, + Constant.COMMUNICATION_OPS: communication_ops, + Constant.COMMUNICATION_GROUP: communication_group + } + AnalysisFacade(params).cluster_analyze() if __name__ == "__main__": diff --git a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py index 7e0487a2c3..2870048a5f 100644 --- a/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py +++ b/profiler/cluster_analyse/cluster_data_preprocess/pytorch_data_preprocessor.py @@ -14,8 +14,10 @@ # limitations under the License. from collections import defaultdict +from common_func.file_manager import FileManager import os + class PytorchDataPreprocessor: PROFILER_INFO_HEAD = 'profiler_info_' PROFILER_INFO_EXTENSION = '.json' @@ -24,9 +26,7 @@ class PytorchDataPreprocessor: self.path = os.path.realpath(path) def get_data_map(self) -> dict: - if not os.path.exists(self.path) or not os.access(self.path, os.R_OK): - print('[Error]path:{} not exist or not accessable.'.format(self.path)) - return dict() + FileManager.check_file_or_directory_path(self.path, isdir=True) collector_dirs = [dir_name for dir_name in os.listdir(self.path) if os.path.isdir(os.path.join(self.path, dir_name))] ascend_pt_dirs = [dir_name for dir_name in collector_dirs if dir_name.endswith("ascend_pt")] @@ -36,7 +36,7 @@ class PytorchDataPreprocessor: rank_id = self.get_rank_id(dir_name) if rank_id < 0: print('[Error]fail to get rankid or rankid invalid.') - + continue rank_id_map[rank_id].append(dir_name) ret_dict = dict() diff --git a/profiler/cluster_analyse/common_func/constant.py b/profiler/cluster_analyse/common_func/constant.py index e6cc07eb6e..84d4a9e054 100644 --- a/profiler/cluster_analyse/common_func/constant.py +++ b/profiler/cluster_analyse/common_func/constant.py @@ -17,12 +17,50 @@ class Constant(object): # dir name FRAMEWORK_DIR = "FRAMEWORK" - OUTPUT_DIR = "ASCEND_PROFILER_OUTPUT" + CLUSTER_ANALYSIS_OUTPUT = "cluster_analysis_output" + SINGLE_OUTPUT = "ASCEND_PROFILER_OUTPUT" COMM_JSON = "communication.json" - STEP_TIME_CSV == "step_time.csv" + STEP_TIME_CSV = "step_trace_time.csv" # file authority FILE_AUTHORITY = 0o640 DIR_AUTHORITY = 0o750 MAX_JSON_SIZE = 1024 * 1024 * 1024 * 10 MAX_CSV_SIZE = 1024 * 1024 * 1024 * 5 + MAX_PATH_LENGTH = 4096 + + # communication + P2P = "p2p" + COLLECTIVE = "collective" + STEP_ID = "step_id" + RANK_ID = "rank_id" + GROUP_NAME = "group_name" + COMM_OP_TYPE = "comm_op_type" + COMM_OP_NAME = "comm_op_name" + COMM_OP_INFO = "comm_op_info" + TOTAL_OP_INFO = "Total Op Info" + COMMUNICATION_TIME_INFO = "Communication Time Info" + START_TIMESTAMP = "Start Timestamp(us)" + COMMUNICATION_BANDWIDTH_INFO = "Communication Bandwidth Info" + HCOM_SEND = "hcom_send" + HCOM_RECEIVE = "hcom_receive" + SYNCHRONIZATION_TIME_RATIO = "Synchronization Time Ratio" + SYNCHRONIZATION_TIME_MS = "Synchronization Time(ms)" + WAIT_TIME_RATIO = "Wait Time Ratio" + TRANSIT_TIME_MS = "Transit Time(ms)" + TRANSIT_SIZE_MB = "Transit Size(MB)" + SIZE_DISTRIBUTION = "Size Distribution" + WAIT_TIME_MS = "Wait Time(ms)" + BANDWIDTH_GB_S = "Bandwidth(GB/s)" + COMMUNICATION = "communication.json" + + # params + DATA_MAP = "data_map" + COLLECTIVE_GROUP = "collective_group" + COMMUNICATION_OPS = "communication_ops" + COLLECTION_PATH = "collection_path" + COMMUNICATION_GROUP = "communication_group" + + # step time + RANK = 'rank' + STAGE = 'stage' diff --git a/profiler/cluster_analyse/common_func/file_manager.py b/profiler/cluster_analyse/common_func/file_manager.py index 6f0c9f99e9..3ac6b843a7 100644 --- a/profiler/cluster_analyse/common_func/file_manager.py +++ b/profiler/cluster_analyse/common_func/file_manager.py @@ -33,6 +33,18 @@ class FileManager: Exception Description: when invalid data throw exception """ + if not os.access(path, os.R_OK): + raise RuntimeError( + 'The path {} does not have permission to read. Please check the path permission'.format(path)) + + if len(path) > Constant.MAX_PATH_LENGTH: + msg = f"The length of file path exceeded the maximum value {Constant.MAX_PATH_LENGTH}: {path}" + raise RuntimeError(msg) + + if os.path.islink(path): + msg = f"Invalid profiling path is soft link: {path}" + raise RuntimeError(msg) + if isdir: if not os.path.exists(path): raise RuntimeError('The path {} is not exist.'.format(path)) @@ -47,10 +59,6 @@ class FileManager: if not os.path.isfile(path): raise RuntimeError('{} is an invalid file or non-exist.'.format(path)) - if not os.access(path, os.R_OK): - raise RuntimeError( - 'The path {} does not have permission to read. Please check the path permission'.format(path)) - @classmethod def read_csv_file(cls, file_path: str, class_bean: any) -> list: cls.check_file_or_directory_path(file_path) @@ -78,7 +86,7 @@ class FileManager: if file_size <= 0: return {} if file_size > Constant.MAX_JSON_SIZE: - print(f"The file size exceeds the preset value {Constant.MAX_CSV_SIZE / 1024 / 1024}MB, " + print(f"The file size exceeds the preset value {Constant.MAX_JSON_SIZE / 1024 / 1024}MB, " f"please check the file: {file_path}") return {} try: @@ -92,49 +100,44 @@ class FileManager: def create_csv_file(cls, profiler_path: str, data: list, file_name: str, headers: list = None) -> None: if not data: return - file_path = os.path.join(profiler_path, Constant.OUTPUT_DIR, file_name) + output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_file = os.path.join(output_path, file_name) + cls.check_file_or_directory_path(output_path, isdir=True) try: - with os.fdopen(os.open(file_path, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", + with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w", newline="") as file: writer = csv.writer(file) if headers: writer.writerow(headers) writer.writerows(data) except Exception: - raise RuntimeError(f"Can't create file: {file_path}") + raise RuntimeError(f"Can't create file: {output_file}") @classmethod - def create_json_file(cls, profiler_path: str, data: list, file_name: str) -> None: + def create_json_file(cls, profiler_path: str, data: dict, file_name: str) -> None: if not data: return - file_path = os.path.join(profiler_path, Constant.OUTPUT_DIR, file_name) - cls.create_json_file_by_path(file_path, data) - - @classmethod - def create_json_file_by_path(cls, output_path: str, data: list) -> None: - dir_name = os.path.dirname(output_path) - if not os.path.exists(dir_name): - try: - os.makedirs(dir_name, mode=Constant.DIR_AUTHORITY) - except Exception: - raise RuntimeError(f"Can't create directory: {dir_name}") + output_path = os.path.join(profiler_path, Constant.CLUSTER_ANALYSIS_OUTPUT) + output_file = os.path.join(output_path, file_name) + cls.check_file_or_directory_path(output_path, isdir=True) try: - with os.fdopen(os.open(output_path, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: + with os.fdopen(os.open(output_file, os.O_WRONLY | os.O_CREAT, Constant.FILE_AUTHORITY), "w") as file: json.dump(data, file) except Exception: - raise RuntimeError(f"Can't create file: {output_path}") + raise RuntimeError(f"Can't create the file: {output_file}") @classmethod - def remove_and_make_output_dir(cls, profiler_path) -> None: - output_path = os.path.join(profiler_path, Constant.OUTPUT_DIR) + def create_output_dir(cls, collection_path: str) -> None: + output_path = os.path.join(collection_path, Constant.CLUSTER_ANALYSIS_OUTPUT) if os.path.isdir(output_path): try: + cls.check_file_or_directory_path(output_path, isdir=True) shutil.rmtree(output_path) os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: - raise RuntimeError(f"Can't delete files in the directory: {output_path}") + raise RuntimeError(f"Can't delete the directory: {output_path}") return try: os.makedirs(output_path, mode=Constant.DIR_AUTHORITY) except Exception: - raise RuntimeError(f"Can't create directory: {output_path}") + raise RuntimeError(f"Can't create the directory: {output_path}") diff --git a/profiler/cluster_analyse/communication_group/communication_group_generator.py b/profiler/cluster_analyse/communication_group/communication_group_generator.py index e95175b0b3..0202ea0e56 100644 --- a/profiler/cluster_analyse/communication_group/communication_group_generator.py +++ b/profiler/cluster_analyse/communication_group/communication_group_generator.py @@ -16,31 +16,110 @@ import os from common_func.constant import Constant from common_func.file_manager import FileManager +from collections import defaultdict + class CommunicationGroupGenerator: + COMMUNICATION_GROUP_JSON = "communication_group.json" + def __init__(self, collection_path: str, data_map: dict): self.collection_path = collection_path self.data_map = data_map self.communication_group = {} + self.collective_group_dict = defaultdict(set) + self.p2p_group_dict = defaultdict(list) self.rank_comm_dir_dict = {} + self.communication_ops = [] def generate(self): self.load_communication_json() + self.analyze_communication_ops() + self.generate_collective_communication_group() + self.generate_p2p_communication_group() + FileManager.create_json_file(self.collection_path, self.communication_group, self.COMMUNICATION_GROUP_JSON) + return self.communication_group, self.collective_group_dict, self.communication_ops + + def analyze_communication_ops(self): + for rank_id, rank_id_dict in self.rank_comm_dir_dict.items(): + for step_id, step_id_dict in rank_id_dict.items(): + if not isinstance(step_id_dict, dict): + print(f"rank{rank_id}'s communication.json has a wrong data struct.") + continue + for comm_op_type, comm_op_dict in step_id_dict.items(): + if comm_op_type == Constant.COLLECTIVE: + self.get_collective_ops_name(rank_id, comm_op_dict) + elif comm_op_type == Constant.P2P: + pass + else: + print(f"rank{rank_id}'s communication.json has no p2p or collective.") + continue + self.add_communication_ops(rank_id, step_id, comm_op_type, comm_op_dict) def load_communication_json(self): - for rank_id, profiling_dir_path in self.data_map: - comm_dir = profiling_dir_path.get(Constant.COMM_JSON) + for rank_id, profiling_dir_path in self.data_map.items(): + comm_dir = os.path.join(profiling_dir_path, Constant.SINGLE_OUTPUT, Constant.COMM_JSON) if comm_dir: self.rank_comm_dir_dict[rank_id] = FileManager.read_json_file(comm_dir) if not self.rank_comm_dir_dict.get(rank_id): - print(f"rank {rank_id} does not have a valid communication.json") - + print(f"rank {rank_id} does not have a valid communication.json.") def generate_collective_communication_group(self): - pass + self.communication_group[Constant.COLLECTIVE] = [list(group) for group in self.collective_group_dict.values()] def generate_p2p_communication_group(self): - pass + stage_group = {} + for rank_set in self.collective_group_dict.values(): + unioned_set = {} + remove_key = [] + for first_rank, stage in stage_group.items(): + if UnionFind.is_connected(rank_set, stage): + unioned_set = UnionFind.union(rank_set, stage, unioned_set) + remove_key.append(first_rank) + if unioned_set: + for key in remove_key: + del stage_group[key] + stage_group[min(unioned_set)] = unioned_set + else: + stage_group[min(rank_set)] = rank_set + first_rank_sort_list = sorted([first_rank for first_rank in stage_group]) + self.communication_group[Constant.P2P] = \ + [list(stage_group.get(first_rank, {})) for first_rank in first_rank_sort_list] + + def get_collective_ops_name(self, rank_id: int, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.collective_group_dict[group_name].add(rank_id) + + def add_communication_ops(self, rank_id: str, step_id: str, comm_op_type: str, comm_op_dict: dict): + for comm_op in comm_op_dict: + if comm_op.startswith('Total'): + continue + group_name = comm_op.split('@')[-1] + self.communication_ops.append({ + Constant.RANK_ID: rank_id, + Constant.STEP_ID: step_id, + Constant.COMM_OP_TYPE: comm_op_type, + Constant.COMM_OP_NAME: comm_op, + Constant.GROUP_NAME: group_name, + Constant.COMM_OP_INFO: comm_op_dict.get(comm_op) + }) + + +class UnionFind(object): + """Disjoint Set Union""" + @classmethod + def union(cls, p: set, q: set, o: set): + """make p and q the same set""" + return p | q | o - def get_all_collective_ops_name(self): - pass + @classmethod + def is_connected(cls, p: set, q: set): + """ + check whether set p and set q are connected + """ + if p & q: + return True + else: + False diff --git a/profiler/cluster_analyse/prof_bean/__init__.py b/profiler/cluster_analyse/prof_bean/__init__.py new file mode 100644 index 0000000000..8400fd5ecd --- /dev/null +++ b/profiler/cluster_analyse/prof_bean/__init__.py @@ -0,0 +1,14 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py new file mode 100644 index 0000000000..80f09d5615 --- /dev/null +++ b/profiler/cluster_analyse/prof_bean/step_trace_time_bean.py @@ -0,0 +1,39 @@ +# Copyright (c) 2023, Huawei Technologies Co., Ltd. +# All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class StepTraceTimeBean: + STEP = "Step" + COMPLEMENT_HEADER = ["Step", "Type", "Index"] + + def __init__(self, data: list): + self._data = data + + @property + def row(self) -> list: + row = [] + for field_name in self._data.keys(): + if field_name == self.STEP: + continue + row.append(float(self._data.get(field_name, ))) + return row + + @property + def step(self) -> float: + return float(self._data.get(self.STEP, '')) + + @property + def all_headers(self) -> list: + return self.COMPLEMENT_HEADER + list(self._data.keys())[1:] -- Gitee